Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 64 additions & 29 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ use {
timing::AtomicInterval,
transaction::Transaction,
},
solana_vote::{
vote_parser::{self, ParsedVote},
vote_transaction::VoteTransaction,
},
solana_vote::vote_parser::{self, ParsedVote, ParsedVoteTransaction},
std::{
cmp::max,
collections::HashMap,
Expand Down Expand Up @@ -287,6 +284,10 @@ impl ClusterInfoVoteListener {
votes.len(),
);
let root_bank = root_bank_cache.root_bank();
let first_alpenglow_slot = root_bank
.feature_set
.activated_slot(&solana_feature_set::secp256k1_program_enabled::id())
.unwrap_or(Slot::MAX);
let epoch_schedule = root_bank.epoch_schedule();
votes
.into_iter()
Expand All @@ -297,8 +298,12 @@ impl ClusterInfoVoteListener {
!packet_batch[0].meta().discard()
})
.filter_map(|(tx, packet_batch)| {
let (vote_account_key, vote, ..) = vote_parser::parse_vote_transaction(&tx)?;
let (vote_account_key, vote, ..) = vote_parser::parse_vote_transaction(&tx)
.or_else(|| vote_parser::parse_alpenglow_vote_transaction(&tx))?;
let slot = vote.last_voted_slot()?;
if (slot >= first_alpenglow_slot) ^ vote.is_alpenglow_vote() {
return None;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this equivalent to
if (slot >= first_alpenglow_slot) ^ vote.is_alpenglow_vote() {
return None;
}
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's equivalent, changed

let epoch = epoch_schedule.get_epoch(slot);
let authorized_voter = root_bank
.epoch_stakes(epoch)?
Expand Down Expand Up @@ -394,6 +399,7 @@ impl ClusterInfoVoteListener {
subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender,
// TODO: send replayed Alpenglow transactions as well
replay_votes_receiver: &ReplayVoteReceiver,
bank_notification_sender: &Option<BankNotificationSender>,
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
Expand Down Expand Up @@ -442,7 +448,7 @@ impl ClusterInfoVoteListener {

#[allow(clippy::too_many_arguments)]
fn track_new_votes_and_notify_confirmations(
vote: VoteTransaction,
vote: ParsedVoteTransaction,
vote_pubkey: &Pubkey,
vote_transaction_signature: Signature,
vote_tracker: &VoteTracker,
Expand All @@ -459,13 +465,17 @@ impl ClusterInfoVoteListener {
bank_hash_cache: &mut BankHashCache,
dumped_slot_subscription: &Mutex<bool>,
) {
if vote.is_empty() {
if vote.slots().is_empty() {
return;
}

// Hold lock for whole function to ensure hash consistency with bank_forks
let mut slots_dumped = dumped_slot_subscription.lock().unwrap();
let (last_vote_slot, last_vote_hash) = vote.last_voted_slot_hash().unwrap();
let Some((last_vote_slot, last_vote_hash)) = vote.last_voted_slot_hash() else {
// TODO: handle sending skip votes to replay because skp votes don't have a hash and will
// return
return;
};

let latest_vote_slot = latest_vote_slot_per_validator
.entry(*vote_pubkey)
Expand Down Expand Up @@ -535,20 +545,29 @@ impl ClusterInfoVoteListener {
let _ = gossip_verified_vote_hash_sender.send((*vote_pubkey, slot, hash));
}

if reached_threshold_results[0] {
if let Some(sender) = duplicate_confirmed_slot_sender {
let _ = sender.send(vec![(slot, hash)]);
let first_alpenglow_slot = root_bank
.feature_set
.activated_slot(&solana_feature_set::secp256k1_program_enabled::id())
.unwrap_or(Slot::MAX);

// Optimistic confirmation and duplicate confirmation are no longer relevant after
// alpenglow
if slot < first_alpenglow_slot {
if reached_threshold_results[0] {
if let Some(sender) = duplicate_confirmed_slot_sender {
let _ = sender.send(vec![(slot, hash)]);
}
}
}
if reached_threshold_results[1] {
new_optimistic_confirmed_slots.push((slot, hash));
// Notify subscribers about new optimistic confirmation
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::OptimisticallyConfirmed(slot))
.unwrap_or_else(|err| {
warn!("bank_notification_sender failed: {:?}", err)
});
if reached_threshold_results[1] {
new_optimistic_confirmed_slots.push((slot, hash));
// Notify subscribers about new optimistic confirmation
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::OptimisticallyConfirmed(slot))
.unwrap_or_else(|err| {
warn!("bank_notification_sender failed: {:?}", err)
});
}
}
}

Expand Down Expand Up @@ -588,7 +607,10 @@ impl ClusterInfoVoteListener {
*latest_vote_slot = max(*latest_vote_slot, last_vote_slot);

if is_new_vote {
subscriptions.notify_vote(*vote_pubkey, vote, vote_transaction_signature);
// TODO: Make sure to notify on notarization votes as well
if let Some(tower_vote) = vote.try_into_tower_transaction() {
subscriptions.notify_vote(*vote_pubkey, tower_vote, vote_transaction_signature);
}
let _ = verified_vote_sender.send((*vote_pubkey, vote_slots));
}
}
Expand Down Expand Up @@ -616,7 +638,10 @@ impl ClusterInfoVoteListener {
let mut gossip_vote_txn_processing_time = Measure::start("gossip_vote_processing_time");
let votes = gossip_vote_txs
.iter()
.filter_map(vote_parser::parse_vote_transaction)
.filter_map(|tx| {
vote_parser::parse_vote_transaction(tx)
.or_else(|| vote_parser::parse_alpenglow_vote_transaction(tx))
})
.zip(repeat(/*is_gossip:*/ true))
.chain(replayed_votes.into_iter().zip(repeat(/*is_gossip:*/ false)));
for ((vote_pubkey, vote, _switch_proof, signature), is_gossip) in votes {
Expand All @@ -637,7 +662,7 @@ impl ClusterInfoVoteListener {
latest_vote_slot_per_validator,
bank_hash_cache,
dumped_slot_subscription,
);
)
}
gossip_vote_txn_processing_time.stop();
let gossip_vote_txn_processing_time_us = gossip_vote_txn_processing_time.as_us();
Expand Down Expand Up @@ -749,7 +774,7 @@ mod tests {
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
solana_vote::vote_transaction,
solana_vote::vote_transaction::{self, VoteTransaction},
solana_vote_program::vote_state::{TowerSync, Vote, MAX_LOCKOUT_HISTORY},
std::{
collections::BTreeSet,
Expand Down Expand Up @@ -962,7 +987,7 @@ mod tests {
replay_votes_sender
.send((
vote_keypair.pubkey(),
VoteTransaction::from(replay_vote.clone()),
ParsedVoteTransaction::Tower(VoteTransaction::from(replay_vote.clone())),
switch_proof_hash,
Signature::default(),
))
Expand Down Expand Up @@ -1285,7 +1310,10 @@ mod tests {
replay_votes_sender
.send((
vote_keypair.pubkey(),
VoteTransaction::from(Vote::new(vec![vote_slot], Hash::default())),
ParsedVoteTransaction::Tower(VoteTransaction::from(Vote::new(
vec![vote_slot],
Hash::default(),
))),
switch_proof_hash,
Signature::default(),
))
Expand Down Expand Up @@ -1334,6 +1362,7 @@ mod tests {
run_test_process_votes3(Some(Hash::default()));
}

// TODO: Add Alpenglow equivalent tests
#[test]
fn test_vote_tracker_references() {
// Create some voters at genesis
Expand Down Expand Up @@ -1388,7 +1417,10 @@ mod tests {
// Add gossip vote for same slot, should not affect outcome
vec![(
validator0_keypairs.vote_keypair.pubkey(),
VoteTransaction::from(Vote::new(vec![voted_slot], Hash::default())),
ParsedVoteTransaction::Tower(VoteTransaction::from(Vote::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add TODO for alpenglow test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah going to need to go through and write some detailed tests, might hand off to @ksn6

vec![voted_slot],
Hash::default(),
))),
None,
Signature::default(),
)],
Expand Down Expand Up @@ -1437,7 +1469,10 @@ mod tests {
vote_txs,
vec![(
validator_keypairs[1].vote_keypair.pubkey(),
VoteTransaction::from(Vote::new(vec![first_slot_in_new_epoch], Hash::default())),
ParsedVoteTransaction::Tower(VoteTransaction::from(Vote::new(
vec![first_slot_in_new_epoch],
Hash::default(),
))),
None,
Signature::default(),
)],
Expand Down
2 changes: 1 addition & 1 deletion core/src/voting_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl VotingService {
tx,
last_voted_slot,
} => {
cluster_info.refresh_vote(tx, last_voted_slot);
cluster_info.refresh_vote(tx, last_voted_slot, false);
}
}
}
Expand Down
31 changes: 20 additions & 11 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,11 +784,15 @@ impl ClusterInfo {
Ok(())
}

pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) {
pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8, is_alpenglow: bool) {
assert!(vote_index < MAX_VOTES);
let self_pubkey = self.id();
let now = timestamp();
let vote = Vote::new(self_pubkey, vote, now).unwrap();
let vote = if is_alpenglow {
Vote::new_alpenglow(self_pubkey, vote, now).unwrap()
} else {
Vote::new(self_pubkey, vote, now).unwrap()
};
let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new(vote, &self.keypair());
let mut gossip_crds = self.gossip.crds.write().unwrap();
Expand Down Expand Up @@ -873,7 +877,7 @@ impl ClusterInfo {

pub fn push_alpenglow_vote(&self, vote: Transaction) {
let vote_index = self.find_alpenglow_vote_index_to_evict();
self.push_vote_at_index(vote, vote_index);
self.push_vote_at_index(vote, vote_index, true);
}

pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
Expand All @@ -894,10 +898,15 @@ impl ClusterInfo {
);
};
debug_assert!(vote_index < MAX_VOTES);
self.push_vote_at_index(vote, vote_index);
self.push_vote_at_index(vote, vote_index, false);
}

pub fn refresh_vote(&self, refresh_vote: Transaction, refresh_vote_slot: Slot) {
pub fn refresh_vote(
&self,
refresh_vote: Transaction,
refresh_vote_slot: Slot,
is_alpenglow: bool,
) {
let vote_index = {
let self_pubkey = self.id();
let gossip_crds =
Expand All @@ -923,7 +932,7 @@ impl ClusterInfo {
// We don't write to an arbitrary index, because it may replace one of this validator's
// existing votes on the network.
if let Some(vote_index) = vote_index {
self.push_vote_at_index(refresh_vote, vote_index);
self.push_vote_at_index(refresh_vote, vote_index, is_alpenglow);
} else {
// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to repush and evict the oldest vote
Expand All @@ -935,7 +944,7 @@ impl ClusterInfo {
return;
};
debug_assert!(vote_index < MAX_VOTES);
self.push_vote_at_index(refresh_vote, vote_index);
self.push_vote_at_index(refresh_vote, vote_index, is_alpenglow);
}
}

Expand Down Expand Up @@ -3534,7 +3543,7 @@ mod tests {
&[refresh_ix], // instructions
None, // payer
);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot, false);
let current_votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(initial_votes, current_votes);
assert!(!current_votes.contains(&refresh_tx));
Expand All @@ -3551,7 +3560,7 @@ mod tests {
&[refresh_ix], // instructions
None, // payer
);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot, false);

// This should evict the latest vote since it's for a slot less than refresh_slot
let votes = cluster_info.get_votes(&mut Cursor::default());
Expand Down Expand Up @@ -3600,7 +3609,7 @@ mod tests {

// Trying to refresh vote when it doesn't yet exist in gossip
// should add the vote without eviction if there is room in the gossip table.
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot, false);

// Should be two votes in gossip
cursor = Cursor::default();
Expand All @@ -3625,7 +3634,7 @@ mod tests {
&[&new_signer],
latest_refreshed_recent_blockhash,
);
cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot);
cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot, false);
// Sleep to avoid votes with same timestamp causing later vote to not override prior vote
std::thread::sleep(Duration::from_millis(1));
}
Expand Down
17 changes: 15 additions & 2 deletions gossip/src/crds_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,15 @@ impl Vote {
})
}

pub fn new_alpenglow(from: Pubkey, transaction: Transaction, wallclock: u64) -> Option<Self> {
vote_parser::parse_alpenglow_vote_transaction(&transaction).map(|(_, vote, ..)| Self {
from,
transaction,
wallclock,
slot: vote.last_voted_slot(),
})
}

/// New random Vote for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Self {
Expand Down Expand Up @@ -398,8 +407,12 @@ impl<'de> Deserialize<'de> for Vote {
vote.transaction
.sanitize()
.map_err(serde::de::Error::custom)?;
Self::new(vote.from, vote.transaction, vote.wallclock)
.ok_or_else(|| serde::de::Error::custom("invalid vote tx"))
let vote = if vote_parser::is_alpenglow_vote_transaction(&vote.transaction) {
Self::new_alpenglow(vote.from, vote.transaction, vote.wallclock)
} else {
Self::new(vote.from, vote.transaction, vote.wallclock)
};
vote.ok_or_else(|| serde::de::Error::custom("invalid vote tx"))
}
}

Expand Down
6 changes: 5 additions & 1 deletion local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,8 @@ fn test_oc_bad_signatures() {
|(_label, leader_vote_tx)| {
let vote = vote_parser::parse_vote_transaction(&leader_vote_tx)
.map(|(_, vote, ..)| vote)
.unwrap()
.try_into_tower_transaction()
.unwrap();
// Filter out empty votes
if !vote.is_empty() {
Expand Down Expand Up @@ -4010,6 +4012,8 @@ fn run_duplicate_shreds_broadcast_leader(vote_on_duplicate: bool) {
if label.pubkey() == bad_leader_id {
let vote = vote_parser::parse_vote_transaction(&leader_vote_tx)
.map(|(_, vote, ..)| vote)
.unwrap()
.try_into_tower_transaction()
.unwrap();
// Filter out empty votes
if !vote.is_empty() {
Expand Down Expand Up @@ -4073,7 +4077,7 @@ fn run_duplicate_shreds_broadcast_leader(vote_on_duplicate: bool) {
);
gossip_vote_index += 1;
gossip_vote_index %= MAX_VOTES;
cluster_info.push_vote_at_index(vote_tx, gossip_vote_index);
cluster_info.push_vote_at_index(vote_tx, gossip_vote_index, false);
}
}
},
Expand Down
Loading