Skip to content

Commit 647622c

Browse files
committed
Implement proposer boost re-orging
1 parent 10dac51 commit 647622c

File tree

11 files changed

+229
-7
lines changed

11 files changed

+229
-7
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::observed_operations::{ObservationOutcome, ObservedOperations};
3434
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
3535
use crate::persisted_fork_choice::PersistedForkChoice;
3636
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
37-
use crate::snapshot_cache::SnapshotCache;
37+
use crate::snapshot_cache::{BlockProductionPreState, SnapshotCache};
3838
use crate::sync_committee_verification::{
3939
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
4040
};
@@ -101,6 +101,9 @@ pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
101101
/// validator pubkey cache.
102102
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
103103

104+
/// The latest delay from the start of the slot at which to attempt a 1-slot re-org.
105+
const MAX_RE_ORG_SLOT_DELAY: Duration = Duration::from_secs(2);
106+
104107
// These keys are all zero because they get stored in different columns, see `DBColumn` type.
105108
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
106109
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
@@ -2723,8 +2726,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
27232726
.head_info()
27242727
.map_err(BlockProductionError::UnableToGetHeadInfo)?;
27252728
let (state, state_root_opt) = if head_info.slot < slot {
2729+
// Attempt an aggressive re-org if configured and the conditions are right.
2730+
if let Some(re_org_state) = self.get_state_for_re_org(slot, &head_info)? {
2731+
info!(
2732+
self.log,
2733+
"Proposing block to re-org current head";
2734+
"slot" => slot,
2735+
"head" => %head_info.block_root,
2736+
);
2737+
(re_org_state.pre_state, re_org_state.state_root)
2738+
}
27262739
// Normal case: proposing a block atop the current head. Use the snapshot cache.
2727-
if let Some(pre_state) = self
2740+
else if let Some(pre_state) = self
27282741
.snapshot_cache
27292742
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
27302743
.and_then(|snapshot_cache| {
@@ -2769,6 +2782,76 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
27692782
)
27702783
}
27712784

2785+
fn get_state_for_re_org(
2786+
&self,
2787+
slot: Slot,
2788+
head_info: &HeadInfo,
2789+
) -> Result<Option<BlockProductionPreState<T::EthSpec>>, BlockProductionError> {
2790+
if let Some(re_org_threshold) = self.config.re_org_threshold {
2791+
let canonical_head = head_info.block_root;
2792+
let slot_delay = self
2793+
.slot_clock
2794+
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
2795+
.ok_or(BlockProductionError::UnableToReadSlot)?;
2796+
2797+
// Check that we're producing a block one slot after the current head, and early enough
2798+
// in the slot to be able to propagate widely.
2799+
if head_info.slot + 1 == slot && slot_delay < MAX_RE_ORG_SLOT_DELAY {
2800+
// Is the current head weak and appropriate for re-orging?
2801+
let proposer_head = self.fork_choice.write().get_proposer_head(
2802+
slot,
2803+
canonical_head,
2804+
re_org_threshold,
2805+
)?;
2806+
if let Some(re_org_head) = proposer_head.re_org_head {
2807+
// Only attempt a re-org if we hit the snapshot cache.
2808+
if let Some(pre_state) = self
2809+
.snapshot_cache
2810+
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
2811+
.and_then(|snapshot_cache| {
2812+
snapshot_cache.get_state_for_block_production(re_org_head)
2813+
})
2814+
{
2815+
debug!(
2816+
self.log,
2817+
"Attempting re-org due to weak head";
2818+
"head" => ?canonical_head,
2819+
"re_org_head" => ?re_org_head,
2820+
"head_weight" => ?proposer_head.canonical_head_weight,
2821+
"re_org_weight" => ?proposer_head.re_org_weight_threshold,
2822+
);
2823+
return Ok(Some(pre_state));
2824+
} else {
2825+
debug!(
2826+
self.log,
2827+
"Not attempting re-org due to cache miss";
2828+
"head" => ?canonical_head,
2829+
"re_org_head" => ?re_org_head,
2830+
"head_weight" => ?proposer_head.canonical_head_weight,
2831+
"re_org_weight" => ?proposer_head.re_org_weight_threshold,
2832+
);
2833+
}
2834+
} else {
2835+
debug!(
2836+
self.log,
2837+
"Not attempting re-org due to strong head";
2838+
"head" => ?canonical_head,
2839+
"head_weight" => ?proposer_head.canonical_head_weight,
2840+
"re_org_weight" => ?proposer_head.re_org_weight_threshold,
2841+
);
2842+
}
2843+
} else {
2844+
debug!(
2845+
self.log,
2846+
"Not attempting re-org due to slot distance";
2847+
"head" => ?canonical_head,
2848+
);
2849+
}
2850+
}
2851+
2852+
Ok(None)
2853+
}
2854+
27722855
/// Produce a block for some `slot` upon the given `state`.
27732856
///
27742857
/// Typically the `self.produce_block()` function should be used, instead of calling this

beacon_node/beacon_chain/src/chain_config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use serde_derive::{Deserialize, Serialize};
22
use types::Checkpoint;
33

4+
pub const DEFAULT_RE_ORG_THRESHOLD: u64 = 10;
5+
46
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
57
pub struct ChainConfig {
68
/// Maximum number of slots to skip when importing a consensus message (e.g., block,
@@ -18,6 +20,8 @@ pub struct ChainConfig {
1820
pub enable_lock_timeouts: bool,
1921
/// The max size of a message that can be sent over the network.
2022
pub max_network_size: usize,
23+
/// Maximum percentage of weight at which to attempt re-orging the canonical head.
24+
pub re_org_threshold: Option<u64>,
2125
}
2226

2327
impl Default for ChainConfig {
@@ -28,6 +32,7 @@ impl Default for ChainConfig {
2832
reconstruct_historic_states: false,
2933
enable_lock_timeouts: true,
3034
max_network_size: 10 * 1_048_576, // 10M
35+
re_org_threshold: None,
3136
}
3237
}
3338
}

beacon_node/beacon_chain/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ pub enum BlockProductionError {
169169
UnableToProduceAtSlot(Slot),
170170
SlotProcessingError(SlotProcessingError),
171171
BlockProcessingError(BlockProcessingError),
172+
ForkChoiceError(ForkChoiceError),
172173
Eth1ChainError(Eth1ChainError),
173174
BeaconStateError(BeaconStateError),
174175
StateAdvanceError(StateAdvanceError),
@@ -194,3 +195,4 @@ easy_from_to!(BeaconStateError, BlockProductionError);
194195
easy_from_to!(SlotProcessingError, BlockProductionError);
195196
easy_from_to!(Eth1ChainError, BlockProductionError);
196197
easy_from_to!(StateAdvanceError, BlockProductionError);
198+
easy_from_to!(ForkChoiceError, BlockProductionError);

beacon_node/src/cli.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,4 +625,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
625625
experimental as it may obscure performance issues.")
626626
.takes_value(false)
627627
)
628+
.arg(
629+
Arg::with_name("enable-proposer-re-orgs")
630+
.long("enable-proposer-re-orgs")
631+
.help("Attempt to re-org out weak/late blocks from other proposers \
632+
(dangerous, experimental)")
633+
.takes_value(true)
634+
)
635+
.arg(
636+
Arg::with_name("proposer-re-org-fraction")
637+
.long("proposer-re-org-fraction")
638+
.help("Percentage of vote weight below which to attempt a proposer re-org")
639+
.requires("enable-proposer-re-orgs")
640+
.takes_value(true)
641+
)
628642
}

beacon_node/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use beacon_chain::chain_config::DEFAULT_RE_ORG_THRESHOLD;
12
use clap::ArgMatches;
23
use clap_utils::flags::DISABLE_MALLOC_TUNING_FLAG;
34
use client::{ClientConfig, ClientGenesis};
@@ -562,6 +563,17 @@ pub fn get_config<E: EthSpec>(
562563
client_config.chain.enable_lock_timeouts = false;
563564
}
564565

566+
if let Some(enable_re_orgs) = clap_utils::parse_optional(cli_args, "enable-proposer-re-orgs")? {
567+
if enable_re_orgs {
568+
client_config.chain.re_org_threshold = Some(
569+
clap_utils::parse_optional(cli_args, "proposer-re-org-fraction")?
570+
.unwrap_or(DEFAULT_RE_ORG_THRESHOLD),
571+
);
572+
} else {
573+
client_config.chain.re_org_threshold = None;
574+
}
575+
}
576+
565577
Ok(client_config)
566578
}
567579

consensus/fork_choice/src/fork_choice.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::ForkChoiceStore;
2-
use proto_array::{Block as ProtoBlock, ExecutionStatus, ProtoArrayForkChoice};
2+
use proto_array::{Block as ProtoBlock, ExecutionStatus, ProposerHead, ProtoArrayForkChoice};
33
use ssz_derive::{Decode, Encode};
44
use std::cmp::Ordering;
55
use std::marker::PhantomData;
@@ -412,6 +412,25 @@ where
412412
.map_err(Into::into)
413413
}
414414

415+
pub fn get_proposer_head(
416+
&mut self,
417+
current_slot: Slot,
418+
canonical_head: Hash256,
419+
re_org_threshold: u64,
420+
) -> Result<ProposerHead, Error<T::Error>> {
421+
// Calling `update_time` is essential, as it needs to dequeue attestations from the previous
422+
// slot so we can see how many attesters voted for the canonical head.
423+
self.update_time(current_slot)?;
424+
425+
self.proto_array
426+
.get_proposer_head::<E>(
427+
self.fc_store.justified_balances(),
428+
canonical_head,
429+
re_org_threshold,
430+
)
431+
.map_err(Into::into)
432+
}
433+
415434
/// Returns `true` if the given `store` should be updated to set
416435
/// `state.current_justified_checkpoint` its `justified_checkpoint`.
417436
///

consensus/proto_array/src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub enum Error {
1414
InvalidNodeDelta(usize),
1515
DeltaOverflow(usize),
1616
ProposerBoostOverflow(usize),
17+
UniqueWeightOverflow(Hash256),
1718
IndexOverflow(&'static str),
1819
InvalidDeltaLen {
1920
deltas: usize,

consensus/proto_array/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ mod proto_array;
44
mod proto_array_fork_choice;
55
mod ssz_container;
66

7-
pub use crate::proto_array_fork_choice::{Block, ExecutionStatus, ProtoArrayForkChoice};
7+
pub use crate::proto_array_fork_choice::{
8+
Block, ExecutionStatus, ProposerHead, ProtoArrayForkChoice,
9+
};
810
pub use error::Error;
911

1012
pub mod core {

consensus/proto_array/src/proto_array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ impl ProtoArray {
579579
/// Returns `None` if there is an overflow or underflow when calculating the score.
580580
///
581581
/// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#get_latest_attesting_balance
582-
fn calculate_proposer_boost<E: EthSpec>(
582+
pub fn calculate_proposer_boost<E: EthSpec>(
583583
validator_balances: &[u64],
584584
proposer_score_boost: u64,
585585
) -> Option<u64> {

consensus/proto_array/src/proto_array_fork_choice.rs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::error::Error;
2-
use crate::proto_array::{ProposerBoost, ProtoArray};
2+
use crate::proto_array::{calculate_proposer_boost, ProposerBoost, ProtoArray};
33
use crate::ssz_container::SszContainer;
44
use serde_derive::{Deserialize, Serialize};
55
use ssz::{Decode, Encode};
@@ -92,11 +92,26 @@ where
9292
&mut self.0[i]
9393
}
9494

95+
pub fn iter(&self) -> impl Iterator<Item = &T> {
96+
self.0.iter()
97+
}
98+
9599
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
96100
self.0.iter_mut()
97101
}
98102
}
99103

104+
/// Information about the proposer head used for opportunistic re-orgs.
105+
#[derive(Default, Clone)]
106+
pub struct ProposerHead {
107+
/// If set, the head block that the proposer should build upon.
108+
pub re_org_head: Option<Hash256>,
109+
/// The weight difference between the canonical head and its parent.
110+
pub canonical_head_weight: Option<u64>,
111+
/// The computed fraction of the active committee balance below which we can re-org.
112+
pub re_org_weight_threshold: Option<u64>,
113+
}
114+
100115
#[derive(PartialEq)]
101116
pub struct ProtoArrayForkChoice {
102117
pub(crate) proto_array: ProtoArray,
@@ -214,6 +229,73 @@ impl ProtoArrayForkChoice {
214229
.map_err(|e| format!("find_head failed: {:?}", e))
215230
}
216231

232+
pub fn get_proposer_head<E: EthSpec>(
233+
&self,
234+
justified_state_balances: &[u64],
235+
canonical_head: Hash256,
236+
re_org_vote_fraction: u64,
237+
) -> Result<ProposerHead, String> {
238+
let nodes = self
239+
.proto_array
240+
.iter_nodes(&canonical_head)
241+
.take(2)
242+
.collect::<Vec<_>>();
243+
if nodes.len() != 2 {
244+
return Ok(ProposerHead::default());
245+
}
246+
let head_node = nodes[0];
247+
let parent_node = nodes[1];
248+
249+
// Re-org conditions.
250+
let is_single_slot_re_org = parent_node.slot + 1 == head_node.slot;
251+
let re_org_weight_threshold =
252+
calculate_proposer_boost::<E>(justified_state_balances, re_org_vote_fraction)
253+
.ok_or_else(|| {
254+
"overflow calculating committee weight for proposer boost".to_string()
255+
})?;
256+
let canonical_head_weight = self
257+
.get_block_unique_weight(canonical_head, justified_state_balances)
258+
.map_err(|e| format!("overflow calculating head weight: {:?}", e))?;
259+
let is_weak_head = canonical_head_weight < re_org_weight_threshold;
260+
261+
let re_org_head = (is_single_slot_re_org && is_weak_head).then(|| parent_node.root);
262+
263+
Ok(ProposerHead {
264+
re_org_head,
265+
canonical_head_weight: Some(canonical_head_weight),
266+
re_org_weight_threshold: Some(re_org_weight_threshold),
267+
})
268+
}
269+
270+
/// Compute the sum of attester balances of attestations to a specific block root.
271+
///
272+
/// This weight is the weight unique to the block, *not* including the weight of its ancestors.
273+
///
274+
/// Any `proposer_boost` in effect is ignored: only attestations are counted.
275+
fn get_block_unique_weight(
276+
&self,
277+
block_root: Hash256,
278+
justified_balances: &[u64],
279+
) -> Result<u64, Error> {
280+
let mut unique_weight = 0u64;
281+
for (validator_index, vote) in self.votes.iter().enumerate() {
282+
// Check the `next_root` as we care about the most recent attestations, including ones
283+
// from the previous slot that have just been dequeued but haven't run fully through
284+
// fork choice yet.
285+
if vote.next_root == block_root {
286+
let validator_balance = justified_balances
287+
.get(validator_index)
288+
.copied()
289+
.unwrap_or(0);
290+
291+
unique_weight = unique_weight
292+
.checked_add(validator_balance)
293+
.ok_or(Error::UniqueWeightOverflow(block_root))?;
294+
}
295+
}
296+
Ok(unique_weight)
297+
}
298+
217299
pub fn maybe_prune(&mut self, finalized_root: Hash256) -> Result<(), String> {
218300
self.proto_array
219301
.maybe_prune(finalized_root)

0 commit comments

Comments
 (0)