Skip to content

Commit ee99241

Browse files
AshwinSekarbw-solana
authored andcommitted
event_handler: unify the completed_block_sender and event_sender (anza-xyz#301)
1 parent 9c1866f commit ee99241

16 files changed

+82
-81
lines changed

core/src/banking_simulation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use {
4545
},
4646
solana_streamer::socket::SocketAddrSpace,
4747
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
48-
solana_votor::event::CompletedBlockReceiver,
48+
solana_votor::event::VotorEventReceiver,
4949
std::{
5050
collections::BTreeMap,
5151
fmt::Display,
@@ -420,7 +420,7 @@ struct SimulatorLoop {
420420
leader_schedule_cache: Arc<LeaderScheduleCache>,
421421
retransmit_slots_sender: Sender<Slot>,
422422
retracer: Arc<BankingTracer>,
423-
_completed_block_receiver: CompletedBlockReceiver,
423+
_completed_block_receiver: VotorEventReceiver,
424424
}
425425

426426
impl SimulatorLoop {

core/src/replay_stage.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ use {
8383
solana_timings::ExecuteTimings,
8484
solana_vote::vote_transaction::VoteTransaction,
8585
solana_votor::{
86-
event::{CompletedBlock, CompletedBlockReceiver, CompletedBlockSender},
86+
event::{CompletedBlock, VotorEvent, VotorEventReceiver, VotorEventSender},
8787
root_utils,
8888
vote_history::VoteHistory,
8989
vote_history_storage::VoteHistoryStorage,
@@ -301,7 +301,7 @@ pub struct ReplaySenders {
301301
pub dumped_slots_sender: Sender<Vec<(u64, Hash)>>,
302302
pub alpenglow_vote_sender: AlpenglowVoteSender,
303303
pub certificate_sender: Sender<(CertificateId, CertificateMessage)>,
304-
pub completed_block_sender: CompletedBlockSender,
304+
pub votor_event_sender: VotorEventSender,
305305
}
306306

307307
pub struct ReplayReceivers {
@@ -312,7 +312,7 @@ pub struct ReplayReceivers {
312312
pub gossip_verified_vote_hash_receiver: Receiver<(Pubkey, u64, Hash)>,
313313
pub popular_pruned_forks_receiver: Receiver<Vec<u64>>,
314314
pub bls_verified_message_receiver: BLSVerifiedMessageReceiver,
315-
pub completed_block_receiver: CompletedBlockReceiver,
315+
pub votor_event_receiver: VotorEventReceiver,
316316
}
317317

318318
/// Timing information for the ReplayStage main processing loop
@@ -608,7 +608,7 @@ impl ReplayStage {
608608
dumped_slots_sender,
609609
alpenglow_vote_sender,
610610
certificate_sender,
611-
completed_block_sender,
611+
votor_event_sender,
612612
} = senders;
613613

614614
let ReplayReceivers {
@@ -619,7 +619,7 @@ impl ReplayStage {
619619
gossip_verified_vote_hash_receiver,
620620
popular_pruned_forks_receiver,
621621
bls_verified_message_receiver,
622-
completed_block_receiver,
622+
votor_event_receiver,
623623
} = receivers;
624624

625625
trace!("replay stage");
@@ -653,7 +653,8 @@ impl ReplayStage {
653653
bank_notification_sender: bank_notification_sender.clone(),
654654
leader_window_notifier,
655655
certificate_sender,
656-
completed_block_receiver: completed_block_receiver.clone(),
656+
event_sender: votor_event_sender.clone(),
657+
event_receiver: votor_event_receiver.clone(),
657658
bls_receiver: bls_verified_message_receiver,
658659
};
659660
let votor = Votor::new(votor_config);
@@ -854,7 +855,7 @@ impl ReplayStage {
854855
first_alpenglow_slot,
855856
(!is_alpenglow_migration_complete).then_some(&mut tbft_structs),
856857
&mut is_alpenglow_migration_complete,
857-
&completed_block_sender,
858+
&votor_event_sender,
858859
);
859860
let did_complete_bank = !new_frozen_slots.is_empty();
860861
if is_alpenglow_migration_complete {
@@ -879,7 +880,7 @@ impl ReplayStage {
879880
let start_leader_time = if !is_alpenglow_migration_complete {
880881
// TODO(ashwin): This will be moved to the event coordinator once we figure out
881882
// migration
882-
for _ in completed_block_receiver.try_iter() {}
883+
for _ in votor_event_receiver.try_iter() {}
883884

884885
// Process cluster-agreed versions of duplicate slots for which we potentially
885886
// have the wrong version. Our version was dead or pruned.
@@ -3382,7 +3383,7 @@ impl ReplayStage {
33823383
poh_recorder: &RwLock<PohRecorder>,
33833384
is_alpenglow_migration_complete: &mut bool,
33843385
mut tbft_structs: Option<&mut TowerBFTStructures>,
3385-
completed_block_sender: &CompletedBlockSender,
3386+
votor_event_sender: &VotorEventSender,
33863387
) -> Vec<Slot> {
33873388
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
33883389
let mut tx_count = 0;
@@ -3611,18 +3612,18 @@ impl ReplayStage {
36113612
}
36123613

36133614
// For leader banks:
3614-
// 1) Replay finishes before shredding, broadcast_stage will take care off
3615+
// 1) Replay finishes before shredding, broadcast_stage will take care of
36153616
// notifying votor
36163617
// 2) Shredding finishes before replay, we notify here
36173618
//
36183619
// For non leader banks (2) is always true, so notify here
36193620
if *is_alpenglow_migration_complete && bank.block_id().is_some() {
36203621
// Leader blocks will not have a block id, broadcast stage will
36213622
// take care of notifying the voting loop
3622-
let _ = completed_block_sender.send(CompletedBlock {
3623+
let _ = votor_event_sender.send(VotorEvent::Block(CompletedBlock {
36233624
slot: bank.slot(),
36243625
bank: bank.clone_without_scheduler(),
3625-
});
3626+
}));
36263627
}
36273628

36283629
if let Some(sender) = bank_notification_sender {
@@ -3721,7 +3722,7 @@ impl ReplayStage {
37213722
first_alpenglow_slot: Option<Slot>,
37223723
tbft_structs: Option<&mut TowerBFTStructures>,
37233724
is_alpenglow_migration_complete: &mut bool,
3724-
completed_block_sender: &CompletedBlockSender,
3725+
votor_event_sender: &VotorEventSender,
37253726
) -> Vec<Slot> /* completed slots */ {
37263727
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
37273728
let num_active_banks = active_bank_slots.len();
@@ -3802,7 +3803,7 @@ impl ReplayStage {
38023803
poh_recorder,
38033804
is_alpenglow_migration_complete,
38043805
tbft_structs,
3805-
completed_block_sender,
3806+
votor_event_sender,
38063807
)
38073808
}
38083809

core/src/tpu.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use {
5454
streamer::StakedNodes,
5555
},
5656
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
57-
solana_votor::event::CompletedBlockSender,
57+
solana_votor::event::VotorEventSender,
5858
std::{
5959
collections::HashMap,
6060
net::{SocketAddr, UdpSocket},
@@ -125,7 +125,7 @@ impl Tpu {
125125
bls_verified_message_sender: BLSVerifiedMessageSender,
126126
connection_cache: &Arc<ConnectionCache>,
127127
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
128-
completed_block_sender: CompletedBlockSender,
128+
votor_event_sender: VotorEventSender,
129129
keypair: &Keypair,
130130
log_messages_bytes_limit: Option<usize>,
131131
staked_nodes: &Arc<RwLock<StakedNodes>>,
@@ -342,7 +342,7 @@ impl Tpu {
342342
bank_forks,
343343
shred_version,
344344
turbine_quic_endpoint_sender,
345-
completed_block_sender,
345+
votor_event_sender,
346346
);
347347

348348
(

core/src/tvu.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use {
5050
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
5151
solana_turbine::retransmit_stage::RetransmitStage,
5252
solana_votor::{
53-
event::{CompletedBlockReceiver, CompletedBlockSender},
53+
event::{VotorEventReceiver, VotorEventSender},
5454
vote_history::VoteHistory,
5555
vote_history_storage::VoteHistoryStorage,
5656
votor::LeaderWindowNotifier,
@@ -177,8 +177,8 @@ impl Tvu {
177177
replay_highest_frozen: Arc<ReplayHighestFrozen>,
178178
leader_window_notifier: Arc<LeaderWindowNotifier>,
179179
voting_service_additional_listeners: Option<&Vec<SocketAddr>>,
180-
completed_block_sender: CompletedBlockSender,
181-
completed_block_receiver: CompletedBlockReceiver,
180+
votor_event_sender: VotorEventSender,
181+
votor_event_receiver: VotorEventReceiver,
182182
) -> Result<Self, String> {
183183
let in_wen_restart = wen_restart_repair_slots.is_some();
184184

@@ -323,7 +323,7 @@ impl Tvu {
323323
dumped_slots_sender,
324324
alpenglow_vote_sender,
325325
certificate_sender,
326-
completed_block_sender,
326+
votor_event_sender,
327327
};
328328

329329
let replay_receivers = ReplayReceivers {
@@ -334,7 +334,7 @@ impl Tvu {
334334
gossip_verified_vote_hash_receiver,
335335
popular_pruned_forks_receiver,
336336
bls_verified_message_receiver,
337-
completed_block_receiver,
337+
votor_event_receiver,
338338
};
339339

340340
let replay_stage_config = ReplayStageConfig {
@@ -568,7 +568,7 @@ pub mod tests {
568568
DEFAULT_TPU_CONNECTION_POOL_SIZE,
569569
)
570570
};
571-
let (completed_block_sender, completed_block_receiver) = unbounded();
571+
let (votor_event_sender, votor_event_receiver) = unbounded();
572572

573573
let tvu = Tvu::new(
574574
&vote_keypair.pubkey(),
@@ -638,8 +638,8 @@ pub mod tests {
638638
Arc::new(ReplayHighestFrozen::default()),
639639
Arc::new(LeaderWindowNotifier::default()),
640640
None,
641-
completed_block_sender,
642-
completed_block_receiver,
641+
votor_event_sender,
642+
votor_event_receiver,
643643
)
644644
.expect("assume success");
645645
if enable_wen_restart {

core/src/validator.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,7 +1557,7 @@ impl Validator {
15571557
Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
15581558
// This channel backing up indicates a serious problem in the voting loop
15591559
// Capping at 1000 for now, TODO: add metrics for channel len
1560-
let (completed_block_sender, completed_block_receiver) = bounded(1000);
1560+
let (votor_event_sender, votor_event_receiver) = bounded(1000);
15611561

15621562
// If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
15631563
let connection_cache_for_warmup = (json_rpc_service.is_some()
@@ -1632,8 +1632,8 @@ impl Validator {
16321632
replay_highest_frozen,
16331633
leader_window_notifier,
16341634
config.voting_service_additional_listeners.as_ref(),
1635-
completed_block_sender.clone(),
1636-
completed_block_receiver,
1635+
votor_event_sender.clone(),
1636+
votor_event_receiver,
16371637
)
16381638
.map_err(ValidatorError::Other)?;
16391639

@@ -1694,7 +1694,7 @@ impl Validator {
16941694
bls_verified_message_sender,
16951695
&connection_cache,
16961696
turbine_quic_endpoint_sender,
1697-
completed_block_sender,
1697+
votor_event_sender,
16981698
&identity_keypair,
16991699
config.runtime_config.log_messages_bytes_limit,
17001700
&staked_nodes,

turbine/src/broadcast_stage.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use {
3434
sendmmsg::{batch_send, SendPktsError},
3535
socket::SocketAddrSpace,
3636
},
37-
solana_votor::event::CompletedBlockSender,
37+
solana_votor::event::VotorEventSender,
3838
static_assertions::const_assert_eq,
3939
std::{
4040
collections::{HashMap, HashSet},
@@ -122,7 +122,7 @@ impl BroadcastStageType {
122122
bank_forks: Arc<RwLock<BankForks>>,
123123
shred_version: u16,
124124
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
125-
completed_block_sender: CompletedBlockSender,
125+
votor_event_sender: VotorEventSender,
126126
) -> BroadcastStage {
127127
match self {
128128
BroadcastStageType::Standard => BroadcastStage::new(
@@ -134,7 +134,7 @@ impl BroadcastStageType {
134134
blockstore,
135135
bank_forks,
136136
quic_endpoint_sender,
137-
completed_block_sender,
137+
votor_event_sender.clone(),
138138
StandardBroadcastRun::new(shred_version),
139139
),
140140

@@ -147,7 +147,7 @@ impl BroadcastStageType {
147147
blockstore,
148148
bank_forks,
149149
quic_endpoint_sender,
150-
completed_block_sender,
150+
votor_event_sender.clone(),
151151
FailEntryVerificationBroadcastRun::new(shred_version),
152152
),
153153

@@ -160,7 +160,7 @@ impl BroadcastStageType {
160160
blockstore,
161161
bank_forks,
162162
quic_endpoint_sender,
163-
completed_block_sender,
163+
votor_event_sender.clone(),
164164
BroadcastFakeShredsRun::new(0, shred_version),
165165
),
166166

@@ -173,7 +173,7 @@ impl BroadcastStageType {
173173
blockstore,
174174
bank_forks,
175175
quic_endpoint_sender,
176-
completed_block_sender,
176+
votor_event_sender.clone(),
177177
BroadcastDuplicatesRun::new(shred_version, config.clone()),
178178
),
179179
}
@@ -188,7 +188,7 @@ trait BroadcastRun {
188188
receiver: &Receiver<WorkingBankEntry>,
189189
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
190190
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
191-
completed_block_sender: &CompletedBlockSender,
191+
votor_event_sender: &VotorEventSender,
192192
) -> Result<()>;
193193
fn transmit(
194194
&mut self,
@@ -231,7 +231,7 @@ impl BroadcastStage {
231231
receiver: &Receiver<WorkingBankEntry>,
232232
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
233233
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
234-
completed_block_sender: &CompletedBlockSender,
234+
votor_event_sender: &VotorEventSender,
235235
mut broadcast_stage_run: impl BroadcastRun,
236236
) -> BroadcastStageReturnType {
237237
loop {
@@ -241,7 +241,7 @@ impl BroadcastStage {
241241
receiver,
242242
socket_sender,
243243
blockstore_sender,
244-
completed_block_sender,
244+
votor_event_sender,
245245
);
246246
let res = Self::handle_error(res, "run");
247247
if let Some(res) = res {
@@ -294,7 +294,7 @@ impl BroadcastStage {
294294
blockstore: Arc<Blockstore>,
295295
bank_forks: Arc<RwLock<BankForks>>,
296296
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
297-
completed_block_sender: CompletedBlockSender,
297+
votor_event_sender: VotorEventSender,
298298
mut broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
299299
) -> Self {
300300
let (socket_sender, socket_receiver) = unbounded();
@@ -315,7 +315,7 @@ impl BroadcastStage {
315315
&receiver,
316316
&socket_sender_,
317317
&blockstore_sender,
318-
&completed_block_sender,
318+
&votor_event_sender,
319319
bs_run,
320320
)
321321
})
@@ -704,7 +704,7 @@ pub mod test {
704704
let bank_forks = BankForks::new_rw_arc(bank);
705705
let bank = bank_forks.read().unwrap().root_bank();
706706

707-
let (completed_block_sender, _) = unbounded();
707+
let (votor_event_sender, _) = unbounded();
708708

709709
// Start up the broadcast stage
710710
let broadcast_service = BroadcastStage::new(
@@ -716,7 +716,7 @@ pub mod test {
716716
blockstore.clone(),
717717
bank_forks,
718718
quic_endpoint_sender,
719-
completed_block_sender,
719+
votor_event_sender,
720720
StandardBroadcastRun::new(0),
721721
);
722722

turbine/src/broadcast_stage/broadcast_duplicates_run.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use {
1010
signature::{Keypair, Signature, Signer},
1111
system_transaction,
1212
},
13-
solana_votor::event::CompletedBlockSender,
13+
solana_votor::event::VotorEventSender,
1414
std::collections::HashSet,
1515
};
1616

@@ -83,7 +83,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
8383
receiver: &Receiver<WorkingBankEntry>,
8484
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
8585
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
86-
_completed_block_sender: &CompletedBlockSender,
86+
_votor_event_sender: &VotorEventSender,
8787
) -> Result<()> {
8888
// 1) Pull entries from banking stage
8989
let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?;

0 commit comments

Comments
 (0)