Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/block_creation_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ pub fn start_loop(config: BlockCreationLoopConfig) {
);
}

// TODO(ksn): we should periodically be checking for new leader windows to get the skip
// timer from a ParentReady.
//
// Wait for the voting loop to notify us
let LeaderWindowInfo {
start_slot,
Expand Down
4 changes: 2 additions & 2 deletions votor/src/consensus_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
},
event::VotorEvent,
vote_to_certificate_ids, Certificate, Stake, VoteType,
MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_LITE, MAX_ENTRIES_PER_PUBKEY_FOR_OTHER_TYPES,
MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_FALLBACK, MAX_ENTRIES_PER_PUBKEY_FOR_OTHER_TYPES,
},
crossbeam_channel::Sender,
log::{error, trace},
Expand Down Expand Up @@ -166,7 +166,7 @@ impl ConsensusPool {
fn new_vote_pool(vote_type: VoteType) -> VotePoolType {
match vote_type {
VoteType::NotarizeFallback => VotePoolType::DuplicateBlockVotePool(
DuplicateBlockVotePool::new(MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_LITE),
DuplicateBlockVotePool::new(MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_FALLBACK),
),
VoteType::Notarize => VotePoolType::DuplicateBlockVotePool(
DuplicateBlockVotePool::new(MAX_ENTRIES_PER_PUBKEY_FOR_OTHER_TYPES),
Expand Down
4 changes: 2 additions & 2 deletions votor/src/consensus_pool/parent_ready_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! a block with parent `b` in slot `s` will have their block finalized.

use {
crate::{event::VotorEvent, MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_LITE},
crate::{event::VotorEvent, MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_FALLBACK},
solana_clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
solana_pubkey::Pubkey,
solana_votor_messages::consensus_message::Block,
Expand Down Expand Up @@ -103,7 +103,7 @@ impl ParentReadyTracker {
self.my_pubkey
);
status.notar_fallbacks.push(block);
assert!(status.notar_fallbacks.len() <= MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_LITE);
assert!(status.notar_fallbacks.len() <= MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_FALLBACK);

// Add this block as valid parent to skip connected future blocks
for s in slot.saturating_add(1).. {
Expand Down
6 changes: 3 additions & 3 deletions votor/src/consensus_pool_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl ConsensusPoolService {
.leader_schedule_cache
.slot_leader_at(*highest_parent_ready, Some(&root_bank))
else {
error!("Unable to compute the leader at slot {highest_parent_ready}. Something is wrong, exiting");
error!("Unable to compute the leader at slot {highest_parent_ready}. Something is wrong; exiting");
ctx.exit.store(true, Ordering::Relaxed);
return;
};
Expand All @@ -374,7 +374,7 @@ impl ConsensusPoolService {

if (start_slot..=end_slot).any(|s| ctx.blockstore.has_existing_shreds_for_slot(s)) {
warn!(
"{my_pubkey}: We have already produced shreds in the window {start_slot}-{end_slot}, \
"{my_pubkey}: We have already produced shreds in the window {start_slot}-{end_slot}; \
skipping production of our leader window"
);
return;
Expand All @@ -386,7 +386,7 @@ impl ConsensusPoolService {
{
BlockProductionParent::MissedWindow => {
warn!(
"{my_pubkey}: Leader slot {start_slot} has already been certified, \
"{my_pubkey}: Leader slot {start_slot} has already been certified; \
skipping production of {start_slot}-{end_slot}"
);
ConsensusPoolServiceStats::incr_u16(&mut stats.parent_ready_missed_window);
Expand Down
98 changes: 79 additions & 19 deletions votor/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use {
crate::{
commitment::{alpenglow_update_commitment_cache, AlpenglowCommitmentType},
event::{CompletedBlock, VotorEvent, VotorEventReceiver},
event::{CompletedBlock, LeaderWindowInfo, VotorEvent, VotorEventReceiver},
event_handler::stats::EventHandlerStats,
root_utils::{self, RootContext},
timer_manager::TimerManager,
Expand Down Expand Up @@ -32,7 +32,7 @@ use {
Arc, Condvar, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
time::{Duration, Instant},
},
thiserror::Error,
};
Expand Down Expand Up @@ -171,6 +171,7 @@ impl EventHandler {
&mut vctx,
&rctx,
&mut local_context,
exit.clone(),
)?;
event_processing_time.stop();
local_context
Expand Down Expand Up @@ -231,6 +232,7 @@ impl EventHandler {
vctx: &mut VotingContext,
rctx: &RootContext,
local_context: &mut LocalContext,
exit: Arc<AtomicBool>,
) -> Result<Vec<BLSOp>, EventLoopError> {
let mut votes = vec![];
let LocalContext {
Expand Down Expand Up @@ -261,6 +263,21 @@ impl EventHandler {
.or_default()
.push((block, parent_block));
}

// Fast leader handover
let next_slot = slot.saturating_add(1);

if let Some(window_info) = Self::is_fast_leader_handover_ready(
rctx,
vctx,
my_pubkey,
next_slot,
parent_block,
exit.clone(),
) {
Self::produce_window(ctx, my_pubkey, window_info, stats);
}

Self::check_rootable_blocks(
my_pubkey,
ctx,
Expand Down Expand Up @@ -365,22 +382,7 @@ impl EventHandler {

// It is time to produce our leader window
VotorEvent::ProduceWindow(window_info) => {
info!("{my_pubkey}: ProduceWindow {window_info:?}");
let mut l_window_info = ctx.leader_window_notifier.window_info.lock().unwrap();
if let Some(old_window_info) = l_window_info.as_ref() {
stats.leader_window_replaced = stats.leader_window_replaced.saturating_add(1);
error!(
"{my_pubkey}: Attempting to start leader window for {}-{}, \
however there is already a pending window to produce {}-{}. \
Our production is lagging, discarding in favor of the newer window",
window_info.start_slot,
window_info.end_slot,
old_window_info.start_slot,
old_window_info.end_slot,
);
}
*l_window_info = Some(window_info);
ctx.leader_window_notifier.window_notification.notify_one();
Self::produce_window(ctx, my_pubkey, window_info, stats);
}

// We have finalized this block consider it for rooting
Expand Down Expand Up @@ -425,7 +427,7 @@ impl EventHandler {
if let Err(e) = Self::handle_set_identity(my_pubkey, ctx, vctx) {
error!(
"Unable to load new vote history when attempting to change identity from {} \
to {} in voting loop, Exiting: {}",
to {} in voting loop; Exiting: {}",
vctx.vote_history.node_pubkey,
ctx.cluster_info.id(),
e
Expand Down Expand Up @@ -596,6 +598,64 @@ impl EventHandler {
Ok(true)
}

fn is_fast_leader_handover_ready(
rctx: &RootContext,
vctx: &mut VotingContext,
my_pubkey: &Pubkey,
slot: u64,
parent_block: Block,
exit: Arc<AtomicBool>,
) -> Option<LeaderWindowInfo> {
// Fast leader handover only applies to the first slot of a leader's window
if slot != first_of_consecutive_leader_slots(slot) {
return None;
}

let root_bank = vctx.root_bank.load();
let Some(leader_pubkey) = rctx
.leader_schedule_cache
.slot_leader_at(slot, Some(&root_bank))
else {
error!("Unable to compute the leader at slot {slot}. Something is wrong; exiting.");
exit.store(false, Ordering::Relaxed);
return None;
};

let window_info = LeaderWindowInfo {
start_slot: slot,
end_slot: last_of_consecutive_leader_slots(slot),
parent_block,
// TODO(ksn): the skip timer shouldn't start until ParentReady has been observed.
skip_timer: Instant::now(),
};

(&leader_pubkey == my_pubkey).then(|| window_info)
}

fn produce_window(
ctx: &SharedContext,
my_pubkey: &Pubkey,
window_info: LeaderWindowInfo,
stats: &mut EventHandlerStats,
) {
info!("{my_pubkey}: ProduceWindow {window_info:?}");
let mut l_window_info = ctx.leader_window_notifier.window_info.lock().unwrap();
if let Some(old_window_info) = l_window_info.as_ref() {
stats.leader_window_replaced = stats.leader_window_replaced.saturating_add(1);
error!(
"{my_pubkey}: Attempting to start leader window for {}-{}, \
however there is already a pending window to produce {}-{}. \
Our production is lagging, discarding in favor of the newer window",
window_info.start_slot,
window_info.end_slot,
old_window_info.start_slot,
old_window_info.end_slot,
);
}
*l_window_info = Some(window_info);
ctx.leader_window_notifier.window_notification.notify_one();
}

/// Checks the pending blocks that have completed replay to see if they
/// are eligble to be voted on now
fn check_pending_blocks(
Expand Down
2 changes: 1 addition & 1 deletion votor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn vote_to_certificate_ids(vote: &Vote) -> Vec<Certificate> {
}

pub const MAX_ENTRIES_PER_PUBKEY_FOR_OTHER_TYPES: usize = 1;
pub const MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_LITE: usize = 3;
pub const MAX_ENTRIES_PER_PUBKEY_FOR_NOTARIZE_FALLBACK: usize = 3;

pub const SAFE_TO_NOTAR_MIN_NOTARIZE_ONLY: f64 = 0.4;
pub const SAFE_TO_NOTAR_MIN_NOTARIZE_FOR_NOTARIZE_OR_SKIP: f64 = 0.2;
Expand Down