Skip to content

Commit dc0f51a

Browse files
authored
PohRecorder: SharedWorkingBank (#7280)
1 parent b82d3bb commit dc0f51a

File tree

9 files changed

+76
-345
lines changed

9 files changed

+76
-345
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/banking_stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ impl BankingStage {
549549
log_messages_bytes_limit,
550550
),
551551
finished_work_sender.clone(),
552-
poh_recorder.read().unwrap().new_leader_bank_notifier(),
552+
poh_recorder.read().unwrap().shared_working_bank(),
553553
);
554554

555555
worker_metrics.push(consume_worker.metrics_handle());

core/src/banking_stage/consume_worker.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
},
77
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
88
solana_measure::measure_us,
9-
solana_poh::leader_bank_notifier::LeaderBankNotifier,
9+
solana_poh::poh_recorder::SharedWorkingBank,
1010
solana_runtime::bank::Bank,
1111
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
1212
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
@@ -16,7 +16,7 @@ use {
1616
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
1717
Arc,
1818
},
19-
time::Duration,
19+
time::{Duration, Instant},
2020
},
2121
thiserror::Error,
2222
};
@@ -34,7 +34,7 @@ pub(crate) struct ConsumeWorker<Tx> {
3434
consumer: Consumer,
3535
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
3636

37-
leader_bank_notifier: Arc<LeaderBankNotifier>,
37+
shared_working_bank: SharedWorkingBank,
3838
metrics: Arc<ConsumeWorkerMetrics>,
3939
}
4040

@@ -44,13 +44,13 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
4444
consume_receiver: Receiver<ConsumeWork<Tx>>,
4545
consumer: Consumer,
4646
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
47-
leader_bank_notifier: Arc<LeaderBankNotifier>,
47+
shared_working_bank: SharedWorkingBank,
4848
) -> Self {
4949
Self {
5050
consume_receiver,
5151
consumer,
5252
consumed_sender,
53-
leader_bank_notifier,
53+
shared_working_bank,
5454
metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
5555
}
5656
}
@@ -67,7 +67,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
6767
}
6868

6969
fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
70-
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
70+
let (maybe_consume_bank, get_bank_us) = measure_us!(self.working_bank_with_timeout());
7171
let Some(mut bank) = maybe_consume_bank else {
7272
self.metrics
7373
.timing_metrics
@@ -82,10 +82,12 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
8282

8383
for work in try_drain_iter(work, &self.consume_receiver) {
8484
if bank.is_complete() || {
85-
// check if the bank got interrupted before completion
86-
self.get_consume_bank_id() != Some(bank.bank_id())
85+
// if working bank has changed, then try to get a new bank.
86+
self.working_bank()
87+
.map(|working_bank| Arc::ptr_eq(&working_bank, &bank))
88+
.unwrap_or(true)
8789
} {
88-
let (maybe_new_bank, get_bank_us) = measure_us!(self.get_consume_bank());
90+
let (maybe_new_bank, get_bank_us) = measure_us!(self.working_bank_with_timeout());
8991
if let Some(new_bank) = maybe_new_bank {
9092
self.metrics
9193
.timing_metrics
@@ -130,16 +132,23 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
130132
Ok(())
131133
}
132134

133-
/// Try to get a bank for consuming.
134-
fn get_consume_bank(&self) -> Option<Arc<Bank>> {
135-
self.leader_bank_notifier
136-
.get_or_wait_for_in_progress(Duration::from_millis(50))
137-
.upgrade()
135+
/// Get the current poh working bank with a timeout - if the Bank is
136+
/// not available within the timeout, return None.
137+
fn working_bank_with_timeout(&self) -> Option<Arc<Bank>> {
138+
const TIMEOUT: Duration = Duration::from_millis(50);
139+
let now = Instant::now();
140+
while now.elapsed() < TIMEOUT {
141+
if let Some(bank) = self.working_bank() {
142+
return Some(bank);
143+
}
144+
}
145+
146+
None
138147
}
139148

140-
/// Try to get the id for the bank that should be used for consuming
141-
fn get_consume_bank_id(&self) -> Option<u64> {
142-
self.leader_bank_notifier.get_current_bank_id()
149+
/// Get the current poh working bank without a timeout.
150+
fn working_bank(&self) -> Option<Arc<Bank>> {
151+
self.shared_working_bank.load()
143152
}
144153

145154
/// Retry current batch and all outstanding batches.
@@ -845,7 +854,7 @@ mod tests {
845854
consume_receiver,
846855
consumer,
847856
consumed_sender,
848-
poh_recorder.read().unwrap().new_leader_bank_notifier(),
857+
poh_recorder.read().unwrap().shared_working_bank(),
849858
);
850859

851860
(

poh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ name = "solana_poh"
2020
dev-context-only-utils = []
2121

2222
[dependencies]
23+
arc-swap = { workspace = true }
2324
core_affinity = { workspace = true }
2425
crossbeam-channel = { workspace = true }
2526
log = { workspace = true }

0 commit comments

Comments
 (0)