Skip to content

Commit 9797e2b

Browse files
carllinbw-solana
authored andcommitted
Update commitment cache in alpenglow (anza-xyz#94)
1 parent 170b2d7 commit 9797e2b

File tree

3 files changed

+138
-35
lines changed

3 files changed

+138
-35
lines changed

core/src/commitment_service.rs

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,21 @@ use {
2121
},
2222
};
2323

24-
pub struct CommitmentAggregationData {
24+
pub enum AlpenglowCommitmentType {
25+
Notarize,
26+
Root,
27+
}
28+
pub enum CommitmentAggregationData {
29+
AlpenglowCommitmentAggregationData(AlpenglowCommitmentAggregationData),
30+
TowerCommitmentAggregationData(TowerCommitmentAggregationData),
31+
}
32+
33+
pub struct AlpenglowCommitmentAggregationData {
34+
pub commitment_type: AlpenglowCommitmentType,
35+
pub slot: Slot,
36+
}
37+
38+
pub struct TowerCommitmentAggregationData {
2539
bank: Arc<Bank>,
2640
root: Slot,
2741
total_stake: Stake,
@@ -30,7 +44,7 @@ pub struct CommitmentAggregationData {
3044
node_vote_state: (Pubkey, TowerVoteState),
3145
}
3246

33-
impl CommitmentAggregationData {
47+
impl TowerCommitmentAggregationData {
3448
pub fn new(
3549
bank: Arc<Bank>,
3650
root: Slot,
@@ -107,15 +121,28 @@ impl AggregateCommitmentService {
107121
let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
108122
let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data);
109123

110-
let ancestors = aggregation_data.bank.status_cache_ancestors();
111-
if ancestors.is_empty() {
112-
continue;
113-
}
114-
115124
let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms");
116-
let update_commitment_slots =
117-
Self::update_commitment_cache(block_commitment_cache, aggregation_data, ancestors);
125+
let update_commitment_slots = {
126+
match aggregation_data {
127+
CommitmentAggregationData::TowerCommitmentAggregationData(data) => {
128+
let ancestors = data.bank.status_cache_ancestors();
129+
if ancestors.is_empty() {
130+
continue;
131+
}
132+
133+
Self::update_commitment_cache(block_commitment_cache, data, ancestors)
134+
}
135+
CommitmentAggregationData::AlpenglowCommitmentAggregationData(data) => {
136+
Self::alpenglow_update_commitment_cache(
137+
block_commitment_cache,
138+
data.commitment_type,
139+
data.slot,
140+
)
141+
}
142+
}
143+
};
118144
aggregate_commitment_time.stop();
145+
119146
datapoint_info!(
120147
"block-commitment-cache",
121148
(
@@ -134,17 +161,36 @@ impl AggregateCommitmentService {
134161
i64
135162
),
136163
);
137-
138164
// Triggers rpc_subscription notifications as soon as new commitment data is available,
139165
// sending just the commitment cache slot information that the notifications thread
140166
// needs
141167
subscriptions.notify_subscribers(update_commitment_slots);
142168
}
143169
}
144170

171+
fn alpenglow_update_commitment_cache(
172+
block_commitment_cache: &RwLock<BlockCommitmentCache>,
173+
update_type: AlpenglowCommitmentType,
174+
slot: Slot,
175+
) -> CommitmentSlots {
176+
let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
177+
178+
match update_type {
179+
AlpenglowCommitmentType::Notarize => {
180+
w_block_commitment_cache.set_slot(slot);
181+
}
182+
AlpenglowCommitmentType::Root => {
183+
w_block_commitment_cache.set_highest_confirmed_slot(slot);
184+
w_block_commitment_cache.set_root(slot);
185+
w_block_commitment_cache.set_highest_super_majority_root(slot);
186+
}
187+
}
188+
w_block_commitment_cache.commitment_slots()
189+
}
190+
145191
fn update_commitment_cache(
146192
block_commitment_cache: &RwLock<BlockCommitmentCache>,
147-
aggregation_data: CommitmentAggregationData,
193+
aggregation_data: TowerCommitmentAggregationData,
148194
ancestors: Vec<u64>,
149195
) -> CommitmentSlots {
150196
let (block_commitment, rooted_stake) = Self::aggregate_commitment(
@@ -615,7 +661,7 @@ mod tests {
615661
let ancestors = working_bank.status_cache_ancestors();
616662
let _ = AggregateCommitmentService::update_commitment_cache(
617663
&block_commitment_cache,
618-
CommitmentAggregationData {
664+
TowerCommitmentAggregationData {
619665
bank: working_bank,
620666
root: 0,
621667
total_stake: 100,
@@ -654,7 +700,7 @@ mod tests {
654700
let ancestors = working_bank.status_cache_ancestors();
655701
let _ = AggregateCommitmentService::update_commitment_cache(
656702
&block_commitment_cache,
657-
CommitmentAggregationData {
703+
TowerCommitmentAggregationData {
658704
bank: working_bank,
659705
root: 1,
660706
total_stake: 100,
@@ -703,7 +749,7 @@ mod tests {
703749
let ancestors = working_bank.status_cache_ancestors();
704750
let _ = AggregateCommitmentService::update_commitment_cache(
705751
&block_commitment_cache,
706-
CommitmentAggregationData {
752+
TowerCommitmentAggregationData {
707753
bank: working_bank,
708754
root: 0,
709755
total_stake: 100,

core/src/replay_stage.rs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ use {
1515
DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker,
1616
},
1717
cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsUpdateSender},
18-
commitment_service::{AggregateCommitmentService, CommitmentAggregationData},
18+
commitment_service::{
19+
AggregateCommitmentService, AlpenglowCommitmentAggregationData,
20+
AlpenglowCommitmentType, CommitmentAggregationData, TowerCommitmentAggregationData,
21+
},
1922
consensus::{
2023
fork_choice::{select_vote_and_reset_forks, ForkChoice, SelectVoteAndResetForkResult},
2124
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
@@ -70,7 +73,7 @@ use {
7073
accounts_background_service::AbsRequestSender,
7174
bank::{bank_hash_details, Bank, NewBankOptions},
7275
bank_forks::{BankForks, SetRootError, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
73-
commitment::{BlockCommitmentCache, CommitmentSlots},
76+
commitment::BlockCommitmentCache,
7477
installed_scheduler_pool::BankWithScheduler,
7578
prioritization_fee_cache::PrioritizationFeeCache,
7679
vote_sender_types::ReplayVoteSender,
@@ -1289,19 +1292,14 @@ impl ReplayStage {
12891292
&drop_bank_sender,
12901293
&mut cert_pool,
12911294
&mut vote_history,
1295+
&lockouts_sender,
12921296
) {
12931297
error!(
12941298
"Unable to set alpenglow root {}, error {e}",
12951299
new_root_bank.slot()
12961300
);
12971301
return;
12981302
}
1299-
rpc_subscriptions.notify_subscribers(CommitmentSlots {
1300-
slot: new_root_slot,
1301-
root: new_root_slot,
1302-
highest_confirmed_slot: new_root_slot,
1303-
highest_super_majority_root: new_root_slot,
1304-
});
13051303
}
13061304
}
13071305

@@ -2750,7 +2748,6 @@ impl ReplayStage {
27502748
vote_account_pubkey: &Pubkey,
27512749
identity_keypair: &Keypair,
27522750
authorized_voter_keypairs: &[Arc<Keypair>],
2753-
_lockouts_sender: &Sender<CommitmentAggregationData>,
27542751
vote_signatures: &mut Vec<Signature>,
27552752
has_new_vote_been_rooted: &mut bool,
27562753
replay_timing: &mut ReplayLoopTiming,
@@ -3393,7 +3390,6 @@ impl ReplayStage {
33933390
vote_account_pubkey,
33943391
identity_keypair,
33953392
&authorized_voter_keypairs.read().unwrap(),
3396-
lockouts_sender,
33973393
voted_signatures,
33983394
has_new_vote_been_rooted,
33993395
replay_timing,
@@ -3409,6 +3405,11 @@ impl ReplayStage {
34093405
None,
34103406
),
34113407
);
3408+
Self::alpenglow_update_commitment_cache(
3409+
AlpenglowCommitmentType::Notarize,
3410+
highest_frozen_bank.slot(),
3411+
lockouts_sender,
3412+
);
34123413
}
34133414

34143415
// Try to finalize the highest notarized block
@@ -3431,7 +3432,6 @@ impl ReplayStage {
34313432
vote_account_pubkey,
34323433
identity_keypair,
34333434
&authorized_voter_keypairs.read().unwrap(),
3434-
lockouts_sender,
34353435
voted_signatures,
34363436
has_new_vote_been_rooted,
34373437
replay_timing,
@@ -3504,19 +3504,35 @@ impl ReplayStage {
35043504
}
35053505
}
35063506

3507+
fn alpenglow_update_commitment_cache(
3508+
commitment_type: AlpenglowCommitmentType,
3509+
slot: Slot,
3510+
lockouts_sender: &Sender<CommitmentAggregationData>,
3511+
) {
3512+
if let Err(e) = lockouts_sender.send(
3513+
CommitmentAggregationData::AlpenglowCommitmentAggregationData(
3514+
AlpenglowCommitmentAggregationData {
3515+
commitment_type,
3516+
slot,
3517+
},
3518+
),
3519+
) {
3520+
trace!("lockouts_sender failed: {:?}", e);
3521+
}
3522+
}
3523+
35073524
fn update_commitment_cache(
35083525
bank: Arc<Bank>,
35093526
root: Slot,
35103527
total_stake: Stake,
35113528
node_vote_state: (Pubkey, TowerVoteState),
35123529
lockouts_sender: &Sender<CommitmentAggregationData>,
35133530
) {
3514-
if let Err(e) = lockouts_sender.send(CommitmentAggregationData::new(
3515-
bank,
3516-
root,
3517-
total_stake,
3518-
node_vote_state,
3519-
)) {
3531+
if let Err(e) =
3532+
lockouts_sender.send(CommitmentAggregationData::TowerCommitmentAggregationData(
3533+
TowerCommitmentAggregationData::new(bank, root, total_stake, node_vote_state),
3534+
))
3535+
{
35203536
trace!("lockouts_sender failed: {:?}", e);
35213537
}
35223538
}
@@ -4740,6 +4756,7 @@ impl ReplayStage {
47404756
drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
47414757
cert_pool: &mut CertificatePool,
47424758
vote_history: &mut VoteHistory,
4759+
lockouts_sender: &Sender<CommitmentAggregationData>,
47434760
) -> Result<(), SetRootError> {
47444761
vote_history.set_root(new_root);
47454762
cert_pool.purge(new_root);
@@ -4762,7 +4779,13 @@ impl ReplayStage {
47624779
voted_signatures,
47634780
epoch_slots_frozen_slots,
47644781
drop_bank_sender,
4765-
)
4782+
)?;
4783+
Self::alpenglow_update_commitment_cache(
4784+
AlpenglowCommitmentType::Root,
4785+
new_root,
4786+
lockouts_sender,
4787+
);
4788+
Ok(())
47664789
}
47674790

47684791
#[allow(clippy::too_many_arguments)]

runtime/src/commitment.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,22 @@ impl BlockCommitmentCache {
180180
}
181181
}
182182

183+
pub fn set_slot(&mut self, slot: Slot) {
184+
self.commitment_slots.slot = std::cmp::max(self.commitment_slots.slot, slot);
185+
}
186+
183187
pub fn set_highest_confirmed_slot(&mut self, slot: Slot) {
184-
self.commitment_slots.highest_confirmed_slot = slot;
188+
self.commitment_slots.highest_confirmed_slot =
189+
std::cmp::max(self.commitment_slots.highest_confirmed_slot, slot);
185190
}
186191

187-
pub fn set_highest_super_majority_root(&mut self, root: Slot) {
188-
self.commitment_slots.highest_super_majority_root = root;
192+
pub fn set_root(&mut self, slot: Slot) {
193+
self.commitment_slots.root = std::cmp::max(self.commitment_slots.root, slot);
194+
}
195+
196+
pub fn set_highest_super_majority_root(&mut self, slot: Slot) {
197+
self.commitment_slots.highest_super_majority_root =
198+
std::cmp::max(self.commitment_slots.highest_super_majority_root, slot);
189199
}
190200

191201
pub fn initialize_slots(&mut self, slot: Slot, root: Slot) {
@@ -337,4 +347,28 @@ mod tests {
337347

338348
assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 0);
339349
}
350+
351+
#[test]
352+
fn test_setters_getters() {
353+
let mut block_commitment_cache = BlockCommitmentCache::default();
354+
// Setting bigger slots should be ok
355+
block_commitment_cache.set_slot(1);
356+
assert_eq!(block_commitment_cache.slot(), 1);
357+
block_commitment_cache.set_highest_confirmed_slot(2);
358+
assert_eq!(block_commitment_cache.highest_confirmed_slot(), 2);
359+
block_commitment_cache.set_root(3);
360+
assert_eq!(block_commitment_cache.root(), 3);
361+
block_commitment_cache.set_highest_super_majority_root(4);
362+
assert_eq!(block_commitment_cache.highest_super_majority_root(), 4);
363+
364+
// Setting smaller slots shuold be ignored
365+
block_commitment_cache.set_slot(0);
366+
assert_eq!(block_commitment_cache.slot(), 1);
367+
block_commitment_cache.set_highest_confirmed_slot(1);
368+
assert_eq!(block_commitment_cache.highest_confirmed_slot(), 2);
369+
block_commitment_cache.set_root(2);
370+
assert_eq!(block_commitment_cache.root(), 3);
371+
block_commitment_cache.set_highest_super_majority_root(3);
372+
assert_eq!(block_commitment_cache.highest_super_majority_root(), 4);
373+
}
340374
}

0 commit comments

Comments
 (0)