Skip to content

Commit 2f0710b

Browse files
akhi3030bw-solana
authored andcommitted
feat: Introduce a EpochStakesService (anza-xyz#306)
1 parent 783c6a3 commit 2f0710b

File tree

7 files changed

+118
-57
lines changed

7 files changed

+118
-57
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/sigverifier/bls_sigverifier.rs

Lines changed: 17 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,18 @@ use {
1111
alpenglow_vote::bls_message::BLSMessage,
1212
crossbeam_channel::{Sender, TrySendError},
1313
solana_pubkey::Pubkey,
14-
solana_runtime::{bank_forks::BankForks, epoch_stakes::EpochStakes},
15-
solana_sdk::{
16-
clock::{Epoch, Slot},
17-
epoch_schedule::EpochSchedule,
18-
},
14+
solana_runtime::epoch_stakes_service::EpochStakesService,
15+
solana_sdk::clock::Slot,
1916
solana_streamer::packet::PacketBatch,
2017
stats::{BLSSigVerifierStats, StatsUpdater},
21-
std::{
22-
collections::HashMap,
23-
sync::{Arc, RwLock},
24-
time::{Duration, Instant},
25-
},
18+
std::{collections::HashMap, sync::Arc},
2619
};
2720

28-
const EPOCH_STAKES_QUERY_INTERVAL: Duration = Duration::from_secs(60);
29-
3021
pub struct BLSSigVerifier {
31-
bank_forks: Arc<RwLock<BankForks>>,
3222
verified_votes_sender: VerifiedVoteSender,
3323
message_sender: Sender<BLSMessage>,
24+
epoch_stakes_service: Arc<EpochStakesService>,
3425
stats: BLSSigVerifierStats,
35-
root_epoch: Epoch,
36-
epoch_schedule: EpochSchedule,
37-
epoch_stakes_map: Arc<HashMap<Epoch, EpochStakes>>,
38-
epoch_stakes_queried: Instant,
3926
}
4027

4128
impl SigVerifier for BLSSigVerifier {
@@ -72,10 +59,9 @@ impl SigVerifier for BLSSigVerifier {
7259
certificate_message.certificate.slot
7360
}
7461
};
75-
let epoch = self.epoch_schedule.get_epoch(slot);
76-
let rank_to_pubkey_map = if let Some(epoch_stakes) = self.epoch_stakes_map.get(&epoch) {
77-
epoch_stakes.bls_pubkey_to_rank_map()
78-
} else {
62+
63+
let Some(rank_to_pubkey_map) = self.epoch_stakes_service.get_key_to_rank_map(slot)
64+
else {
7965
stats_updater.received_no_epoch_stakes += 1;
8066
continue;
8167
};
@@ -110,46 +96,21 @@ impl SigVerifier for BLSSigVerifier {
11096
self.send_verified_votes(verified_votes);
11197
self.stats.update(stats_updater);
11298
self.stats.maybe_report_stats();
113-
self.update_epoch_stakes_map();
11499
Ok(())
115100
}
116101
}
117102

118103
impl BLSSigVerifier {
119104
pub fn new(
120-
bank_forks: Arc<RwLock<BankForks>>,
105+
epoch_stakes_service: Arc<EpochStakesService>,
121106
verified_votes_sender: VerifiedVoteSender,
122107
message_sender: Sender<BLSMessage>,
123108
) -> Self {
124-
let mut verifier = Self {
125-
bank_forks,
109+
Self {
110+
epoch_stakes_service,
126111
verified_votes_sender,
127112
message_sender,
128113
stats: BLSSigVerifierStats::new(),
129-
epoch_schedule: EpochSchedule::default(),
130-
epoch_stakes_map: Arc::new(HashMap::new()),
131-
root_epoch: Epoch::default(),
132-
epoch_stakes_queried: Instant::now() - EPOCH_STAKES_QUERY_INTERVAL,
133-
};
134-
verifier.update_epoch_stakes_map();
135-
verifier
136-
}
137-
138-
// TODO(wen): We should maybe create a epoch stakes service so all these objects
139-
// only needing epoch stakes don't need to worry about bank_forks and banks.
140-
fn update_epoch_stakes_map(&mut self) {
141-
if self.epoch_stakes_queried.elapsed() < EPOCH_STAKES_QUERY_INTERVAL {
142-
return;
143-
}
144-
self.epoch_stakes_queried = Instant::now();
145-
let root_bank = self.bank_forks.read().unwrap().root_bank();
146-
if self.epoch_stakes_map.is_empty() {
147-
self.epoch_schedule = root_bank.epoch_schedule().clone();
148-
}
149-
let epoch = root_bank.epoch();
150-
if self.epoch_stakes_map.is_empty() || epoch > self.root_epoch {
151-
self.epoch_stakes_map = Arc::new(root_bank.epoch_stakes_map().clone());
152-
self.root_epoch = epoch;
153114
}
154115
}
155116

@@ -181,20 +142,19 @@ mod tests {
181142
vote::Vote,
182143
},
183144
bitvec::prelude::*,
184-
crossbeam_channel::Receiver,
145+
crossbeam_channel::{unbounded, Receiver},
185146
solana_bls_signatures::Signature,
186147
solana_perf::packet::Packet,
187148
solana_runtime::{
188149
bank::Bank,
189-
bank_forks::BankForks,
190150
genesis_utils::{
191151
create_genesis_config_with_alpenglow_vote_accounts_no_program,
192152
ValidatorVoteKeypairs,
193153
},
194154
},
195155
solana_sdk::{hash::Hash, signer::Signer},
196156
stats::STATS_INTERVAL_DURATION,
197-
std::time::Duration,
157+
std::time::{Duration, Instant},
198158
};
199159

200160
fn create_keypairs_and_bls_sig_verifier(
@@ -213,11 +173,13 @@ mod tests {
213173
&validator_keypairs,
214174
stakes_vec,
215175
);
216-
let bank0 = Bank::new_for_tests(&genesis.genesis_config);
217-
let bank_forks = BankForks::new_rw_arc(bank0);
176+
let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
177+
let epoch = bank.epoch();
178+
let (_tx, rx) = unbounded();
179+
let epoch_stakes_service = Arc::new(EpochStakesService::new(bank, epoch, rx));
218180
(
219181
validator_keypairs,
220-
BLSSigVerifier::new(bank_forks, verified_vote_sender, message_sender),
182+
BLSSigVerifier::new(epoch_stakes_service, verified_vote_sender, message_sender),
221183
)
222184
}
223185

core/src/tpu.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use {
4242
},
4343
solana_runtime::{
4444
bank_forks::BankForks,
45+
epoch_stakes_service::EpochStakesService,
4546
prioritization_fee_cache::PrioritizationFeeCache,
4647
root_bank_cache::RootBankCache,
4748
vote_sender_types::{
@@ -263,8 +264,13 @@ impl Tpu {
263264
};
264265

265266
let alpenglow_sigverify_stage = {
267+
let (tx, rx) = unbounded();
268+
bank_forks.write().unwrap().register_new_bank_subscriber(tx);
269+
let bank = bank_forks.read().unwrap().root_bank();
270+
let epoch = bank.epoch();
271+
let epoch_stakes_service = Arc::new(EpochStakesService::new(bank, epoch, rx));
266272
let verifier = BLSSigVerifier::new(
267-
bank_forks.clone(),
273+
epoch_stakes_service,
268274
verified_vote_sender.clone(),
269275
bls_verified_message_sender,
270276
);

runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ num-derive = { workspace = true }
3838
num-traits = { workspace = true }
3939
num_cpus = { workspace = true }
4040
num_enum = { workspace = true }
41+
parking_lot = { workspace = true }
4142
percentage = { workspace = true }
4243
qualifier_attr = { workspace = true }
4344
rand = { workspace = true }

runtime/src/bank_forks.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use {
1010
},
1111
snapshot_config::SnapshotConfig,
1212
},
13-
crossbeam_channel::SendError,
13+
crossbeam_channel::{SendError, Sender},
1414
log::*,
1515
solana_measure::measure::Measure,
1616
solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph},
@@ -86,6 +86,8 @@ pub struct BankForks {
8686
scheduler_pool: Option<InstalledSchedulerPoolArc>,
8787

8888
dumped_slot_subscribers: Vec<DumpedSlotSubscription>,
89+
/// Tracks subscribers interested in hearing about new `Bank`s.
90+
new_bank_subscribers: Vec<Sender<Arc<Bank>>>,
8991
}
9092

9193
impl Index<u64> for BankForks {
@@ -137,6 +139,7 @@ impl BankForks {
137139
highest_slot_at_startup: 0,
138140
scheduler_pool: None,
139141
dumped_slot_subscribers: vec![],
142+
new_bank_subscribers: vec![],
140143
}));
141144

142145
root_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
@@ -329,6 +332,24 @@ impl BankForks {
329332
self.dumped_slot_subscribers.push(notifier);
330333
}
331334

335+
/// Register a new subscriber interested in hearing about new `Bank`s.
336+
pub fn register_new_bank_subscriber(&mut self, tx: Sender<Arc<Bank>>) {
337+
self.new_bank_subscribers.push(tx);
338+
}
339+
340+
/// Call to notify subscribers of new `Bank`s.
341+
fn notify_new_bank_subscribers(&mut self, root_bank: &Arc<Bank>) {
342+
let mut channels_to_drop = vec![];
343+
for (ind, tx) in self.new_bank_subscribers.iter().enumerate() {
344+
if let Err(SendError(_)) = tx.send(root_bank.clone()) {
345+
channels_to_drop.push(ind);
346+
}
347+
}
348+
for ind in channels_to_drop {
349+
self.new_bank_subscribers.remove(ind);
350+
}
351+
}
352+
332353
/// Clears associated banks from BankForks and notifies subscribers that a dump has occured.
333354
pub fn dump_slots<'a, I>(&mut self, slots: I) -> (Vec<(Slot, BankId)>, Vec<BankWithScheduler>)
334355
where
@@ -444,6 +465,7 @@ impl BankForks {
444465
.unwrap()
445466
.node_id_to_vote_accounts()
446467
);
468+
self.notify_new_bank_subscribers(root_bank);
447469
}
448470
let root_tx_count = root_bank
449471
.parents()
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use {
2+
crate::{
3+
bank::Bank,
4+
epoch_stakes::{BLSPubkeyToRankMap, EpochStakes},
5+
},
6+
crossbeam_channel::Receiver,
7+
log::warn,
8+
parking_lot::RwLock as PlRwLock,
9+
solana_sdk::{
10+
clock::{Epoch, Slot},
11+
epoch_schedule::EpochSchedule,
12+
},
13+
std::{collections::HashMap, sync::Arc, thread},
14+
};
15+
16+
struct State {
17+
stakes: HashMap<Epoch, EpochStakes>,
18+
epoch_schedule: EpochSchedule,
19+
}
20+
21+
impl State {
22+
fn new(bank: Arc<Bank>) -> Self {
23+
Self {
24+
stakes: bank.epoch_stakes_map().clone(),
25+
epoch_schedule: bank.epoch_schedule().clone(),
26+
}
27+
}
28+
}
29+
30+
/// A service that regularly updates the epoch stakes state from `Bank`s
31+
/// and exposes various methods to access the state.
32+
pub struct EpochStakesService {
33+
state: Arc<PlRwLock<State>>,
34+
}
35+
36+
impl EpochStakesService {
37+
pub fn new(bank: Arc<Bank>, epoch: Epoch, new_bank_receiver: Receiver<Arc<Bank>>) -> Self {
38+
let mut prev_epoch = epoch;
39+
let state = Arc::new(PlRwLock::new(State::new(bank)));
40+
{
41+
let state = state.clone();
42+
thread::spawn(move || loop {
43+
let bank = match new_bank_receiver.recv() {
44+
Ok(b) => b,
45+
Err(e) => {
46+
warn!("recv() returned {e:?}. Exiting.");
47+
break;
48+
}
49+
};
50+
let new_epoch = bank.epoch();
51+
if new_epoch > prev_epoch {
52+
prev_epoch = new_epoch;
53+
*state.write() = State::new(bank)
54+
}
55+
});
56+
}
57+
Self { state }
58+
}
59+
60+
pub fn get_key_to_rank_map(&self, slot: Slot) -> Option<Arc<BLSPubkeyToRankMap>> {
61+
let guard = self.state.read();
62+
let epoch = guard.epoch_schedule.get_epoch(slot);
63+
guard
64+
.stakes
65+
.get(&epoch)
66+
.map(|stake| Arc::clone(stake.bls_pubkey_to_rank_map()))
67+
}
68+
}

runtime/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod bank_hash_cache;
1313
pub mod bank_utils;
1414
pub mod commitment;
1515
pub mod epoch_stakes;
16+
pub mod epoch_stakes_service;
1617
pub mod genesis_utils;
1718
pub mod inflation_rewards;
1819
pub mod installed_scheduler_pool;

0 commit comments

Comments
 (0)