Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 286 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use {
alpenglow_vote::{
bls_message::{BLSMessage, VoteMessage, BLS_KEYPAIR_DERIVE_SEED},
certificate::CertificateType,
vote::Vote,
},
assert_matches::assert_matches,
Expand Down Expand Up @@ -6261,6 +6262,26 @@ fn broadcast_vote(
}
}

fn _vote_to_tuple(vote: &Vote) -> (u64, u8) {
let discriminant = if vote.is_notarization() {
0
} else if vote.is_finalize() {
1
} else if vote.is_skip() {
2
} else if vote.is_notarize_fallback() {
3
} else if vote.is_skip_fallback() {
4
} else {
panic!("Invalid vote type: {:?}", vote)
};

let slot = vote.slot();

(slot, discriminant)
}

/// This test validates the Alpenglow consensus protocol's ability to maintain liveness when a node
/// needs to issue a NotarizeFallback vote. The test sets up a two-node cluster with a specific
/// stake distribution to create a scenario where:
Expand Down Expand Up @@ -6562,6 +6583,7 @@ fn test_alpenglow_ensure_liveness_after_double_notar_fallback() {
let node_d_vote_keypair = node_d_info.info.voting_keypair.clone();

// Vote listener state
#[derive(Debug)]
struct VoteListenerState {
num_notar_fallback_votes: u32,
a_equivocates: bool,
Expand Down Expand Up @@ -6770,3 +6792,267 @@ fn test_alpenglow_ensure_liveness_after_double_notar_fallback() {

vote_listener_thread.join().unwrap();
}

/// Test to validate Alpenglow's ability to maintain liveness when nodes issue both NotarizeFallback
/// and SkipFallback votes in an intertwined manner.
///
/// This test simulates a consensus scenario with four nodes having specific stake distributions:
/// - Node A: 40% + epsilon stake
/// - Node B: 40% - epsilon stake
/// - Node C: 20% - epsilon stake
/// - Node D: epsilon stake (minimal, acts as perpetual leader)
///
/// The test proceeds through two main stages:
///
/// ## Stage 1: Stable Network Operation
/// All nodes are voting normally for leader D's proposals, with notarization votes going through
/// successfully and the network maintaining consensus.
///
/// ## Stage 2: Network Partition and Fallback Scenario
/// At slot 50, Node A's turbine is disabled, creating a network partition. This triggers the
/// following sequence:
/// 1. Node D (leader) proposes a block b1
/// 2. Nodes B, C, and D can communicate and vote to notarize b1
/// 3. Node A is partitioned and cannot receive b1, so it issues a skip vote
/// 4. The vote distribution creates a complex fallback scenario:
/// - Nodes B, C, D: Issue notarize votes initially, then skip fallback votes
/// - Node A: Issues skip vote initially, then notarize fallback vote
/// 5. This creates the specific vote pattern:
/// - B, C, D: notarize + skip_fallback
/// - A: skip + notarize_fallback
///
/// The test validates that:
/// - The network can handle intertwined fallback scenarios
/// - Consensus is maintained despite complex vote patterns
/// - The network continues to make progress and create new roots after the partition is resolved
/// - At least 10 new roots are created post-experiment to ensure sustained liveness
#[test]
#[serial]
fn test_alpenglow_ensure_liveness_after_intertwined_notar_and_skip_fallbacks() {
solana_logger::setup_with_default(AG_DEBUG_LOG_FILTER);

// Configure stake distribution for the four-node cluster
const TOTAL_STAKE: u64 = 10 * DEFAULT_NODE_STAKE;
const EPSILON: u64 = 1;
const NUM_NODES: usize = 4;

// Ensure that node stakes are in decreasing order, so node_index can directly be set as
// vote_message.rank.
let node_stakes = [
TOTAL_STAKE * 4 / 10 + EPSILON, // Node A: 40% + epsilon
TOTAL_STAKE * 4 / 10 - EPSILON, // Node B: 40% - epsilon
TOTAL_STAKE * 2 / 10 - EPSILON, // Node C: 20% - epsilon
EPSILON, // Node D: epsilon
];

assert_eq!(NUM_NODES, node_stakes.len());

// Verify stake distribution adds up correctly
assert_eq!(TOTAL_STAKE, node_stakes.iter().sum::<u64>());

// Control mechanism for network partition
let node_a_turbine_disabled = Arc::new(AtomicBool::new(false));

// Create leader schedule with A as perpetual leader
let (leader_schedule, validator_keys) =
create_custom_leader_schedule_with_random_keys(&[0, 0, 0, 4]);

let leader_schedule = FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
};

// Set up vote monitoring
let vote_listener_socket =
solana_net_utils::bind_to_localhost().expect("Failed to bind vote listener socket");

// Configure validators
let mut validator_config = ValidatorConfig::default_for_test();
validator_config.fixed_leader_schedule = Some(leader_schedule);
validator_config.voting_service_additional_listeners =
Some(vec![vote_listener_socket.local_addr().unwrap()]);

let mut validator_configs = make_identical_validator_configs(&validator_config, NUM_NODES);
// Node A (index 0) will have its turbine disabled during the experiment
validator_configs[0].turbine_disabled = node_a_turbine_disabled.clone();

assert_eq!(NUM_NODES, validator_keys.len());

// Set up cluster configuration
let mut cluster_config = ClusterConfig {
mint_lamports: TOTAL_STAKE,
node_stakes: node_stakes.to_vec(),
validator_configs,
validator_keys: Some(
validator_keys
.iter()
.cloned()
.zip(std::iter::repeat(true))
.collect(),
),
..ClusterConfig::default()
};

// Initialize the cluster
let cluster = LocalCluster::new_alpenglow(&mut cluster_config, SocketAddrSpace::Unspecified);
assert_eq!(NUM_NODES, cluster.validators.len());

/// Helper struct to manage experiment state and vote pattern tracking
#[derive(Debug, PartialEq, Eq)]
enum Stage {
Stability,
ObserveSkipFallbacks,
ObserveLiveness,
}

impl Stage {
fn timeout(&self) -> Duration {
match self {
Stage::Stability => Duration::from_secs(60),
Stage::ObserveSkipFallbacks => Duration::from_secs(120),
Stage::ObserveLiveness => Duration::from_secs(180),
}
}

fn all() -> Vec<Stage> {
vec![
Stage::Stability,
Stage::ObserveSkipFallbacks,
Stage::ObserveLiveness,
]
}
}

#[derive(Debug)]
struct ExperimentState {
stage: Stage,
vote_type_bitmap: HashMap<u64, [u8; 4]>, // slot -> [node_vote_pattern; 4]
consecutive_pattern_matches: usize,
post_experiment_roots: HashSet<u64>,
}

impl ExperimentState {
fn new() -> Self {
Self {
stage: Stage::Stability,
vote_type_bitmap: HashMap::new(),
consecutive_pattern_matches: 0,
post_experiment_roots: HashSet::new(),
}
}

fn record_vote_bitmap(&mut self, slot: u64, node_index: usize, vote: &Vote) {
let (_, vote_type) = _vote_to_tuple(vote);
let slot_pattern = self.vote_type_bitmap.entry(slot).or_insert([0u8; 4]);

assert!(node_index < NUM_NODES, "Invalid node index: {}", node_index);
slot_pattern[node_index] |= 1 << vote_type;
}

fn matches_expected_pattern(&mut self) -> bool {
// Expected patterns:
// Nodes 1, 2, 3: notarize + skip_fallback = (1 << 0) | (1 << 4) = 17
// Node 0: skip + notarize_fallback = (1 << 2) | (1 << 3) = 12
const EXPECTED_PATTERN_MAJORITY: u8 = 17; // notarize + skip_fallback
const EXPECTED_PATTERN_MINORITY: u8 = 12; // skip + notarize_fallback

for pattern in self.vote_type_bitmap.values() {
if pattern[0] == EXPECTED_PATTERN_MINORITY
&& pattern[1] == EXPECTED_PATTERN_MAJORITY
&& pattern[2] == EXPECTED_PATTERN_MAJORITY
&& pattern[3] == EXPECTED_PATTERN_MAJORITY
{
self.consecutive_pattern_matches += 1;
if self.consecutive_pattern_matches >= 3 {
return true;
}
}
}
false
}

fn record_certificate(&mut self, slot: u64) {
self.post_experiment_roots.insert(slot);
}

fn sufficient_roots_created(&self) -> bool {
self.post_experiment_roots.len() >= 8
}
}

// Start vote monitoring thread
let vote_listener_thread = std::thread::spawn({
let node_c_turbine_disabled = node_a_turbine_disabled.clone();

move || {
let mut buffer = [0u8; 65_535];
let mut experiment_state = ExperimentState::new();

let timer = std::time::Instant::now();

loop {
let bytes_received = vote_listener_socket
.recv(&mut buffer)
.expect("Failed to receive vote data");

let bls_message = bincode::deserialize::<BLSMessage>(&buffer[..bytes_received])
.expect("Failed to deserialize BLS message");

match bls_message {
BLSMessage::Vote(vote_message) => {
let vote = &vote_message.vote;
let node_index = vote_message.rank as usize;

// Stage timeouts
let elapsed_time = timer.elapsed();

for stage in Stage::all() {
if elapsed_time > stage.timeout() {
panic!(
"Timeout during {:?}. node_c_turbine_disabled: {:#?}. Latest vote: {:#?}. Experiment state: {:#?}",
stage,
node_c_turbine_disabled.load(Ordering::Acquire),
vote,
experiment_state
);
}
}

// Stage 1: Wait for stability, then introduce partition at slot 20
if vote.slot() == 20 && !node_c_turbine_disabled.load(Ordering::Acquire) {
node_c_turbine_disabled.store(true, Ordering::Release);
experiment_state.stage = Stage::ObserveSkipFallbacks;
}

// Stage 2: Monitor for expected fallback vote patterns
if experiment_state.stage == Stage::ObserveSkipFallbacks {
experiment_state.record_vote_bitmap(vote.slot(), node_index, vote);

// Check if we've observed the expected pattern for 3 consecutive slots
if experiment_state.matches_expected_pattern() {
node_c_turbine_disabled.store(false, Ordering::Release);
experiment_state.stage = Stage::ObserveLiveness;
}
}
}
BLSMessage::Certificate(cert_message) => {
// Stage 3: Verify continued liveness after partition resolution
if experiment_state.stage == Stage::ObserveLiveness
&& [CertificateType::Finalize, CertificateType::FinalizeFast]
.contains(&cert_message.certificate.certificate_type)
{
experiment_state.record_certificate(cert_message.certificate.slot);

if experiment_state.sufficient_roots_created() {
break;
}
}
}
}
}
}
});

vote_listener_thread
.join()
.expect("Vote listener thread panicked");
}