Skip to content

Commit f28b738

Browse files
wen-codingcarllin
authored andcommitted
Ingest replayed Alpenglow votes into cert pool (anza-xyz#107)
Co-authored-by: carllin <[email protected]>
1 parent a6e8997 commit f28b738

File tree

16 files changed

+268
-41
lines changed

16 files changed

+268
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/banking_stage/committer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,14 @@ impl Committer {
107107
.collect();
108108

109109
let ((), find_and_send_votes_us) = measure_us!({
110+
// We are going to send Alpenglow votes received through TPU port directly to cert pool,
111+
// whether or not the transaction is committed. So we don't need to set alpenglow_vote_sender
112+
// here.
110113
bank_utils::find_and_send_votes(
111114
batch.sanitized_transactions(),
112115
&commit_results,
113116
Some(&self.replay_vote_sender),
117+
None,
114118
);
115119
self.collect_balances_and_send_status_batch(
116120
commit_results,

core/src/replay_stage.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use {
7878
commitment::BlockCommitmentCache,
7979
installed_scheduler_pool::BankWithScheduler,
8080
prioritization_fee_cache::PrioritizationFeeCache,
81-
vote_sender_types::{AlpenglowVoteReceiver, ReplayVoteSender},
81+
vote_sender_types::{AlpenglowVoteReceiver, AlpenglowVoteSender, ReplayVoteSender},
8282
},
8383
solana_sdk::{
8484
clock::{BankId, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
@@ -308,6 +308,7 @@ pub struct ReplaySenders {
308308
pub drop_bank_sender: Sender<Vec<BankWithScheduler>>,
309309
pub block_metadata_notifier: Option<BlockMetadataNotifierArc>,
310310
pub dumped_slots_sender: Sender<Vec<(u64, Hash)>>,
311+
pub alpenglow_vote_sender: AlpenglowVoteSender,
311312
}
312313

313314
pub struct ReplayReceivers {
@@ -605,6 +606,7 @@ impl ReplayStage {
605606
drop_bank_sender,
606607
block_metadata_notifier,
607608
dumped_slots_sender,
609+
alpenglow_vote_sender,
608610
} = senders;
609611

610612
let ReplayReceivers {
@@ -788,6 +790,7 @@ impl ReplayStage {
788790
&verify_recyclers,
789791
&mut heaviest_subtree_fork_choice,
790792
&replay_vote_sender,
793+
&alpenglow_vote_sender,
791794
&bank_notification_sender,
792795
&rpc_subscriptions,
793796
&slot_status_notifier,
@@ -2696,6 +2699,7 @@ impl ReplayStage {
26962699
transaction_status_sender: Option<&TransactionStatusSender>,
26972700
entry_notification_sender: Option<&EntryNotifierSender>,
26982701
replay_vote_sender: &ReplayVoteSender,
2702+
alpenglow_vote_sender: &AlpenglowVoteSender,
26992703
verify_recyclers: &VerifyRecyclers,
27002704
log_messages_bytes_limit: Option<usize>,
27012705
prioritization_fee_cache: &PrioritizationFeeCache,
@@ -2716,6 +2720,7 @@ impl ReplayStage {
27162720
transaction_status_sender,
27172721
entry_notification_sender,
27182722
Some(replay_vote_sender),
2723+
Some(alpenglow_vote_sender),
27192724
verify_recyclers,
27202725
false,
27212726
log_messages_bytes_limit,
@@ -3674,6 +3679,7 @@ impl ReplayStage {
36743679
entry_notification_sender: Option<&EntryNotifierSender>,
36753680
verify_recyclers: &VerifyRecyclers,
36763681
replay_vote_sender: &ReplayVoteSender,
3682+
alpenglow_vote_sender: &AlpenglowVoteSender,
36773683
replay_timing: &mut ReplayLoopTiming,
36783684
log_messages_bytes_limit: Option<usize>,
36793685
active_bank_slots: &[Slot],
@@ -3757,6 +3763,7 @@ impl ReplayStage {
37573763
transaction_status_sender,
37583764
entry_notification_sender,
37593765
&replay_vote_sender.clone(),
3766+
&alpenglow_vote_sender.clone(),
37603767
&verify_recyclers.clone(),
37613768
log_messages_bytes_limit,
37623769
prioritization_fee_cache,
@@ -3789,6 +3796,7 @@ impl ReplayStage {
37893796
entry_notification_sender: Option<&EntryNotifierSender>,
37903797
verify_recyclers: &VerifyRecyclers,
37913798
replay_vote_sender: &ReplayVoteSender,
3799+
alpenglow_vote_sender: &AlpenglowVoteSender,
37923800
replay_timing: &mut ReplayLoopTiming,
37933801
log_messages_bytes_limit: Option<usize>,
37943802
bank_slot: Slot,
@@ -3846,6 +3854,7 @@ impl ReplayStage {
38463854
transaction_status_sender,
38473855
entry_notification_sender,
38483856
&replay_vote_sender.clone(),
3857+
&alpenglow_vote_sender.clone(),
38493858
&verify_recyclers.clone(),
38503859
log_messages_bytes_limit,
38513860
prioritization_fee_cache,
@@ -4313,6 +4322,7 @@ impl ReplayStage {
43134322
verify_recyclers: &VerifyRecyclers,
43144323
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
43154324
replay_vote_sender: &ReplayVoteSender,
4325+
alpenglow_vote_sender: &AlpenglowVoteSender,
43164326
bank_notification_sender: &Option<BankNotificationSenderConfig>,
43174327
rpc_subscriptions: &Arc<RpcSubscriptions>,
43184328
slot_status_notifier: &Option<SlotStatusNotifier>,
@@ -4362,6 +4372,7 @@ impl ReplayStage {
43624372
entry_notification_sender,
43634373
verify_recyclers,
43644374
replay_vote_sender,
4375+
alpenglow_vote_sender,
43654376
replay_timing,
43664377
log_messages_bytes_limit,
43674378
&active_bank_slots,
@@ -4382,6 +4393,7 @@ impl ReplayStage {
43824393
entry_notification_sender,
43834394
verify_recyclers,
43844395
replay_vote_sender,
4396+
alpenglow_vote_sender,
43854397
replay_timing,
43864398
log_messages_bytes_limit,
43874399
*bank_slot,
@@ -6076,6 +6088,7 @@ pub(crate) mod tests {
60766088
{
60776089
let ledger_path = get_tmp_ledger_path!();
60786090
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
6091+
let (alpenglow_vote_sender, _alpenglow_vote_receiver) = unbounded();
60796092
let res = {
60806093
let ReplayBlockstoreComponents {
60816094
blockstore,
@@ -6120,6 +6133,7 @@ pub(crate) mod tests {
61206133
None,
61216134
None,
61226135
&replay_vote_sender,
6136+
&alpenglow_vote_sender,
61236137
&VerifyRecyclers::default(),
61246138
None,
61256139
&PrioritizationFeeCache::new(0u64),
@@ -10289,6 +10303,7 @@ pub(crate) mod tests {
1028910303
None,
1029010304
None,
1029110305
None,
10306+
None,
1029210307
&mut ExecuteTimings::default(),
1029310308
)
1029410309
.unwrap();

core/src/tvu.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use {
4545
bank_forks::BankForks,
4646
commitment::BlockCommitmentCache,
4747
prioritization_fee_cache::PrioritizationFeeCache,
48-
vote_sender_types::{AlpenglowVoteReceiver, ReplayVoteSender},
48+
vote_sender_types::{AlpenglowVoteReceiver, AlpenglowVoteSender, ReplayVoteSender},
4949
},
5050
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
5151
solana_turbine::retransmit_stage::RetransmitStage,
@@ -145,6 +145,7 @@ impl Tvu {
145145
completed_data_sets_sender: Option<CompletedDataSetsSender>,
146146
bank_notification_sender: Option<BankNotificationSenderConfig>,
147147
duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
148+
alpenglow_vote_sender: AlpenglowVoteSender,
148149
alpenglow_vote_receiver: AlpenglowVoteReceiver,
149150
tvu_config: TvuConfig,
150151
max_slots: &Arc<MaxSlots>,
@@ -301,6 +302,7 @@ impl Tvu {
301302
drop_bank_sender,
302303
block_metadata_notifier,
303304
dumped_slots_sender,
305+
alpenglow_vote_sender,
304306
};
305307

306308
let replay_receivers = ReplayReceivers {
@@ -514,6 +516,7 @@ pub mod tests {
514516
let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
515517
let (_verified_vote_sender, verified_vote_receiver) = unbounded();
516518
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
519+
let (alpenglow_vote_sender, _alpenglow_vote_receiver) = unbounded();
517520
let (_, gossip_confirmed_slots_receiver) = unbounded();
518521
let (_, alpenglow_vote_receiver) = unbounded();
519522
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
@@ -580,6 +583,7 @@ pub mod tests {
580583
/*completed_data_sets_sender:*/ None,
581584
None,
582585
gossip_confirmed_slots_receiver,
586+
alpenglow_vote_sender,
583587
alpenglow_vote_receiver,
584588
TvuConfig::default(),
585589
&Arc::new(MaxSlots::default()),

core/src/validator.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,7 @@ impl Validator {
946946
);
947947

948948
let (replay_vote_sender, replay_vote_receiver) = unbounded();
949+
let (alpenglow_vote_sender, alpenglow_vote_receiver) = unbounded();
949950

950951
// block min prioritization fee cache should be readable by RPC, and writable by validator
951952
// (by both replay stage and banking stage)
@@ -1010,6 +1011,7 @@ impl Validator {
10101011
config.runtime_config.log_messages_bytes_limit,
10111012
transaction_status_sender.clone(),
10121013
Some(replay_vote_sender.clone()),
1014+
Some(alpenglow_vote_sender.clone()),
10131015
prioritization_fee_cache.clone(),
10141016
);
10151017
bank_forks
@@ -1354,7 +1356,6 @@ impl Validator {
13541356
let (verified_vote_sender, verified_vote_receiver) = unbounded();
13551357
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
13561358
let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1357-
let (alpenglow_vote_sender, alpenglow_vote_receiver) = unbounded();
13581359
let entry_notification_sender = entry_notifier_service
13591360
.as_ref()
13601361
.map(|service| service.sender_cloned());
@@ -1509,6 +1510,7 @@ impl Validator {
15091510
completed_data_sets_sender,
15101511
bank_notification_sender.clone(),
15111512
duplicate_confirmed_slots_receiver,
1513+
alpenglow_vote_sender.clone(),
15121514
alpenglow_vote_receiver,
15131515
TvuConfig {
15141516
max_ledger_shreds: config.max_ledger_shreds,

core/tests/unified_scheduler.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ fn test_scheduler_waited_by_drop_bank_service() {
9292
None,
9393
None,
9494
None,
95+
None,
9596
ignored_prioritization_fee_cache,
9697
);
9798
let pool = pool_raw.clone();
@@ -229,7 +230,14 @@ fn test_scheduler_producing_blocks() {
229230
None,
230231
Some(leader_schedule_cache),
231232
);
232-
let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
233+
let pool = DefaultSchedulerPool::new(
234+
None,
235+
None,
236+
None,
237+
None,
238+
None,
239+
ignored_prioritization_fee_cache,
240+
);
233241
let channels = {
234242
let banking_tracer = BankingTracer::new_disabled();
235243
banking_tracer.create_channels(true)

ledger-tool/src/ledger_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ pub fn load_and_process_ledger(
378378
}
379379
BlockVerificationMethod::UnifiedScheduler => {
380380
let no_replay_vote_sender = None;
381+
let no_alpenglow_vote_sender = None;
381382
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
382383
bank_forks
383384
.write()
@@ -387,6 +388,7 @@ pub fn load_and_process_ledger(
387388
process_options.runtime_config.log_messages_bytes_limit,
388389
transaction_status_sender.clone(),
389390
no_replay_vote_sender,
391+
no_alpenglow_vote_sender,
390392
ignored_prioritization_fee_cache,
391393
));
392394
}

ledger/benches/blockstore_processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ fn bench_execute_batch(
159159
&bank,
160160
None,
161161
None,
162+
None,
162163
&mut timing,
163164
None,
164165
&prioritization_fee_cache,

0 commit comments

Comments
 (0)