Skip to content

Commit 01556f6

Browse files
committed
Optimise payload attributes calculation and add SSE (#4027)
## Issue Addressed Closes #3896 Closes #3998 Closes #3700 ## Proposed Changes - Optimise the calculation of withdrawals for payload attributes by avoiding state clones, avoiding unnecessary state advances and reading from the snapshot cache if possible. - Use the execution layer's payload attributes cache to avoid re-calculating payload attributes. I actually implemented a new LRU cache just for withdrawals but it had the exact same key and most of the same data as the existing payload attributes cache, so I deleted it. - Add a new SSE event that fires when payloadAttributes are calculated. This is useful for block builders, a la ethereum/beacon-APIs#244. - Add a new CLI flag `--always-prepare-payload` which forces payload attributes to be sent with every fcU regardless of connected proposers. This is intended for use by builders/relays. For maximum effect, the flags I've been using to run Lighthouse in "payload builder mode" are: ``` --always-prepare-payload \ --prepare-payload-lookahead 12000 \ --suggested-fee-recipient 0x0000000000000000000000000000000000000000 ``` The fee recipient is required so Lighthouse has something to pack in the payload attributes (it can be ignored by the builder). The lookahead causes fcU to be sent at the start of every slot rather than at 8s. As usual, fcU will also be sent after each change of head block. I think this combination is sufficient for builders to build on all viable heads. Often there will be two fcU (and two payload attributes) sent for the same slot: one sent at the start of the slot with the head from `n - 1` as the parent, and one sent after the block arrives with `n` as the parent. Example usage of the new event stream: ```bash curl -N "http://localhost:5052/eth/v1/events?topics=payload_attributes" ``` ## Additional Info - [x] Tests added by updating the proposer re-org tests. This has the benefit of testing the proposer re-org code paths with withdrawals too, confirming that the new changes don't interact poorly. - [ ] Benchmarking with `blockdreamer` on devnet-7 showed promising results but I'm yet to do a comparison to `unstable`. Co-authored-by: Michael Sproul <[email protected]>
1 parent 6e15533 commit 01556f6

File tree

11 files changed

+539
-90
lines changed

11 files changed

+539
-90
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 140 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use crate::validator_monitor::{
5757
};
5858
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
5959
use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead};
60-
use eth2::types::{EventKind, SseBlock, SyncDuty};
60+
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
6161
use execution_layer::{
6262
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
6363
PayloadAttributes, PayloadStatus,
@@ -89,6 +89,7 @@ use state_processing::{
8989
state_advance::{complete_state_advance, partial_state_advance},
9090
BlockSignatureStrategy, ConsensusContext, SigVerifiedOp, VerifyBlockRoot, VerifyOperation,
9191
};
92+
use std::borrow::Cow;
9293
use std::cmp::Ordering;
9394
use std::collections::HashMap;
9495
use std::collections::HashSet;
@@ -3878,6 +3879,75 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38783879
}))
38793880
}
38803881

3882+
pub fn get_expected_withdrawals(
3883+
&self,
3884+
forkchoice_update_params: &ForkchoiceUpdateParameters,
3885+
proposal_slot: Slot,
3886+
) -> Result<Withdrawals<T::EthSpec>, Error> {
3887+
let cached_head = self.canonical_head.cached_head();
3888+
let head_state = &cached_head.snapshot.beacon_state;
3889+
3890+
let parent_block_root = forkchoice_update_params.head_root;
3891+
3892+
let (unadvanced_state, unadvanced_state_root) =
3893+
if cached_head.head_block_root() == parent_block_root {
3894+
(Cow::Borrowed(head_state), cached_head.head_state_root())
3895+
} else if let Some(snapshot) = self
3896+
.snapshot_cache
3897+
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
3898+
.ok_or(Error::SnapshotCacheLockTimeout)?
3899+
.get_cloned(parent_block_root, CloneConfig::none())
3900+
{
3901+
debug!(
3902+
self.log,
3903+
"Hit snapshot cache during withdrawals calculation";
3904+
"slot" => proposal_slot,
3905+
"parent_block_root" => ?parent_block_root,
3906+
);
3907+
let state_root = snapshot.beacon_state_root();
3908+
(Cow::Owned(snapshot.beacon_state), state_root)
3909+
} else {
3910+
info!(
3911+
self.log,
3912+
"Missed snapshot cache during withdrawals calculation";
3913+
"slot" => proposal_slot,
3914+
"parent_block_root" => ?parent_block_root
3915+
);
3916+
let block = self
3917+
.get_blinded_block(&parent_block_root)?
3918+
.ok_or(Error::MissingBeaconBlock(parent_block_root))?;
3919+
let state = self
3920+
.get_state(&block.state_root(), Some(block.slot()))?
3921+
.ok_or(Error::MissingBeaconState(block.state_root()))?;
3922+
(Cow::Owned(state), block.state_root())
3923+
};
3924+
3925+
// Parent state epoch is the same as the proposal, we don't need to advance because the
3926+
// list of expected withdrawals can only change after an epoch advance or a
3927+
// block application.
3928+
let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch());
3929+
if head_state.current_epoch() == proposal_epoch {
3930+
return get_expected_withdrawals(&unadvanced_state, &self.spec)
3931+
.map_err(Error::PrepareProposerFailed);
3932+
}
3933+
3934+
// Advance the state using the partial method.
3935+
debug!(
3936+
self.log,
3937+
"Advancing state for withdrawals calculation";
3938+
"proposal_slot" => proposal_slot,
3939+
"parent_block_root" => ?parent_block_root,
3940+
);
3941+
let mut advanced_state = unadvanced_state.into_owned();
3942+
partial_state_advance(
3943+
&mut advanced_state,
3944+
Some(unadvanced_state_root),
3945+
proposal_epoch.start_slot(T::EthSpec::slots_per_epoch()),
3946+
&self.spec,
3947+
)?;
3948+
get_expected_withdrawals(&advanced_state, &self.spec).map_err(Error::PrepareProposerFailed)
3949+
}
3950+
38813951
/// Determine whether a fork choice update to the execution layer should be overridden.
38823952
///
38833953
/// This is *only* necessary when proposer re-orgs are enabled, because we have to prevent the
@@ -4664,7 +4734,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
46644734

46654735
// Nothing to do if there are no proposers registered with the EL, exit early to avoid
46664736
// wasting cycles.
4667-
if !execution_layer.has_any_proposer_preparation_data().await {
4737+
if !self.config.always_prepare_payload
4738+
&& !execution_layer.has_any_proposer_preparation_data().await
4739+
{
46684740
return Ok(());
46694741
}
46704742

@@ -4721,71 +4793,84 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
47214793
// If the execution layer doesn't have any proposer data for this validator then we assume
47224794
// it's not connected to this BN and no action is required.
47234795
let proposer = pre_payload_attributes.proposer_index;
4724-
if !execution_layer
4725-
.has_proposer_preparation_data(proposer)
4726-
.await
4796+
if !self.config.always_prepare_payload
4797+
&& !execution_layer
4798+
.has_proposer_preparation_data(proposer)
4799+
.await
47274800
{
47284801
return Ok(());
47294802
}
47304803

4731-
let withdrawals = match self.spec.fork_name_at_slot::<T::EthSpec>(prepare_slot) {
4732-
ForkName::Base | ForkName::Altair | ForkName::Merge => None,
4733-
ForkName::Capella => {
4734-
// We must use the advanced state because balances can change at epoch boundaries
4735-
// and balances affect withdrawals.
4736-
// FIXME(mark)
4737-
// Might implement caching here in the future..
4738-
let prepare_state = self
4739-
.state_at_slot(prepare_slot, StateSkipConfig::WithoutStateRoots)
4740-
.map_err(|e| {
4741-
error!(self.log, "State advance for withdrawals failed"; "error" => ?e);
4742-
e
4743-
})?;
4744-
Some(get_expected_withdrawals(&prepare_state, &self.spec))
4745-
}
4746-
}
4747-
.transpose()
4748-
.map_err(|e| {
4749-
error!(self.log, "Error preparing beacon proposer"; "error" => ?e);
4750-
e
4751-
})
4752-
.map(|withdrawals_opt| withdrawals_opt.map(|w| w.into()))
4753-
.map_err(Error::PrepareProposerFailed)?;
4754-
4804+
// Fetch payoad attributes from the execution layer's cache, or compute them from scratch
4805+
// if no matching entry is found. This saves recomputing the withdrawals which can take
4806+
// considerable time to compute if a state load is required.
47554807
let head_root = forkchoice_update_params.head_root;
4756-
let payload_attributes = PayloadAttributes::new(
4757-
self.slot_clock
4758-
.start_of(prepare_slot)
4759-
.ok_or(Error::InvalidSlot(prepare_slot))?
4760-
.as_secs(),
4761-
pre_payload_attributes.prev_randao,
4762-
execution_layer.get_suggested_fee_recipient(proposer).await,
4763-
withdrawals,
4764-
);
4808+
let payload_attributes = if let Some(payload_attributes) = execution_layer
4809+
.payload_attributes(prepare_slot, head_root)
4810+
.await
4811+
{
4812+
payload_attributes
4813+
} else {
4814+
let withdrawals = match self.spec.fork_name_at_slot::<T::EthSpec>(prepare_slot) {
4815+
ForkName::Base | ForkName::Altair | ForkName::Merge => None,
4816+
ForkName::Capella => {
4817+
let chain = self.clone();
4818+
self.spawn_blocking_handle(
4819+
move || {
4820+
chain.get_expected_withdrawals(&forkchoice_update_params, prepare_slot)
4821+
},
4822+
"prepare_beacon_proposer_withdrawals",
4823+
)
4824+
.await?
4825+
.map(Some)?
4826+
}
4827+
};
47654828

4766-
debug!(
4767-
self.log,
4768-
"Preparing beacon proposer";
4769-
"payload_attributes" => ?payload_attributes,
4770-
"prepare_slot" => prepare_slot,
4771-
"validator" => proposer,
4772-
"parent_root" => ?head_root,
4773-
);
4829+
let payload_attributes = PayloadAttributes::new(
4830+
self.slot_clock
4831+
.start_of(prepare_slot)
4832+
.ok_or(Error::InvalidSlot(prepare_slot))?
4833+
.as_secs(),
4834+
pre_payload_attributes.prev_randao,
4835+
execution_layer.get_suggested_fee_recipient(proposer).await,
4836+
withdrawals.map(Into::into),
4837+
);
47744838

4775-
let already_known = execution_layer
4776-
.insert_proposer(prepare_slot, head_root, proposer, payload_attributes)
4777-
.await;
4839+
execution_layer
4840+
.insert_proposer(
4841+
prepare_slot,
4842+
head_root,
4843+
proposer,
4844+
payload_attributes.clone(),
4845+
)
4846+
.await;
47784847

4779-
// Only push a log to the user if this is the first time we've seen this proposer for this
4780-
// slot.
4781-
if !already_known {
4848+
// Only push a log to the user if this is the first time we've seen this proposer for
4849+
// this slot.
47824850
info!(
47834851
self.log,
47844852
"Prepared beacon proposer";
47854853
"prepare_slot" => prepare_slot,
47864854
"validator" => proposer,
47874855
"parent_root" => ?head_root,
47884856
);
4857+
payload_attributes
4858+
};
4859+
4860+
// Push a server-sent event (probably to a block builder or relay).
4861+
if let Some(event_handler) = &self.event_handler {
4862+
if event_handler.has_payload_attributes_subscribers() {
4863+
event_handler.register(EventKind::PayloadAttributes(ForkVersionedResponse {
4864+
data: SseExtendedPayloadAttributes {
4865+
proposal_slot: prepare_slot,
4866+
proposer_index: proposer,
4867+
parent_block_root: head_root,
4868+
parent_block_hash: forkchoice_update_params.head_hash.unwrap_or_default(),
4869+
payload_attributes: payload_attributes.into(),
4870+
},
4871+
version: Some(self.spec.fork_name_at_slot::<T::EthSpec>(prepare_slot)),
4872+
}));
4873+
}
47894874
}
47904875

47914876
let till_prepare_slot =
@@ -4808,7 +4893,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
48084893

48094894
// If we are close enough to the proposal slot, send an fcU, which will have payload
48104895
// attributes filled in by the execution layer cache we just primed.
4811-
if till_prepare_slot <= self.config.prepare_payload_lookahead {
4896+
if self.config.always_prepare_payload
4897+
|| till_prepare_slot <= self.config.prepare_payload_lookahead
4898+
{
48124899
debug!(
48134900
self.log,
48144901
"Sending forkchoiceUpdate for proposer prep";

beacon_node/beacon_chain/src/chain_config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ pub struct ChainConfig {
6767
pub prepare_payload_lookahead: Duration,
6868
/// Use EL-free optimistic sync for the finalized part of the chain.
6969
pub optimistic_finalized_sync: bool,
70+
/// Whether to send payload attributes every slot, regardless of connected proposers.
71+
///
72+
/// This is useful for block builders and testing.
73+
pub always_prepare_payload: bool,
7074
}
7175

7276
impl Default for ChainConfig {
@@ -93,6 +97,7 @@ impl Default for ChainConfig {
9397
prepare_payload_lookahead: Duration::from_secs(4),
9498
// This value isn't actually read except in tests.
9599
optimistic_finalized_sync: true,
100+
always_prepare_payload: false,
96101
}
97102
}
98103
}

beacon_node/beacon_chain/src/events.rs

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub struct ServerSentEventHandler<T: EthSpec> {
1414
exit_tx: Sender<EventKind<T>>,
1515
chain_reorg_tx: Sender<EventKind<T>>,
1616
contribution_tx: Sender<EventKind<T>>,
17+
payload_attributes_tx: Sender<EventKind<T>>,
1718
late_head: Sender<EventKind<T>>,
1819
block_reward_tx: Sender<EventKind<T>>,
1920
log: Logger,
@@ -32,6 +33,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
3233
let (exit_tx, _) = broadcast::channel(capacity);
3334
let (chain_reorg_tx, _) = broadcast::channel(capacity);
3435
let (contribution_tx, _) = broadcast::channel(capacity);
36+
let (payload_attributes_tx, _) = broadcast::channel(capacity);
3537
let (late_head, _) = broadcast::channel(capacity);
3638
let (block_reward_tx, _) = broadcast::channel(capacity);
3739

@@ -43,35 +45,63 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
4345
exit_tx,
4446
chain_reorg_tx,
4547
contribution_tx,
48+
payload_attributes_tx,
4649
late_head,
4750
block_reward_tx,
4851
log,
4952
}
5053
}
5154

5255
pub fn register(&self, kind: EventKind<T>) {
53-
let result = match kind {
54-
EventKind::Attestation(attestation) => self
56+
let log_count = |name, count| {
57+
trace!(
58+
self.log,
59+
"Registering server-sent event";
60+
"kind" => name,
61+
"receiver_count" => count
62+
);
63+
};
64+
let result = match &kind {
65+
EventKind::Attestation(_) => self
5566
.attestation_tx
56-
.send(EventKind::Attestation(attestation))
57-
.map(|count| trace!(self.log, "Registering server-sent attestation event"; "receiver_count" => count)),
58-
EventKind::Block(block) => self.block_tx.send(EventKind::Block(block))
59-
.map(|count| trace!(self.log, "Registering server-sent block event"; "receiver_count" => count)),
60-
EventKind::FinalizedCheckpoint(checkpoint) => self.finalized_tx
61-
.send(EventKind::FinalizedCheckpoint(checkpoint))
62-
.map(|count| trace!(self.log, "Registering server-sent finalized checkpoint event"; "receiver_count" => count)),
63-
EventKind::Head(head) => self.head_tx.send(EventKind::Head(head))
64-
.map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)),
65-
EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit))
66-
.map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)),
67-
EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg))
68-
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
69-
EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof))
70-
.map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)),
71-
EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head))
72-
.map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)),
73-
EventKind::BlockReward(block_reward) => self.block_reward_tx.send(EventKind::BlockReward(block_reward))
74-
.map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)),
67+
.send(kind)
68+
.map(|count| log_count(count, "attestation")),
69+
EventKind::Block(_) => self
70+
.block_tx
71+
.send(kind)
72+
.map(|count| log_count(count, "block")),
73+
EventKind::FinalizedCheckpoint(_) => self
74+
.finalized_tx
75+
.send(kind)
76+
.map(|count| log_count(count, "finalized checkpoint")),
77+
EventKind::Head(_) => self
78+
.head_tx
79+
.send(kind)
80+
.map(|count| log_count(count, "head")),
81+
EventKind::VoluntaryExit(_) => self
82+
.exit_tx
83+
.send(kind)
84+
.map(|count| log_count(count, "exit")),
85+
EventKind::ChainReorg(_) => self
86+
.chain_reorg_tx
87+
.send(kind)
88+
.map(|count| log_count(count, "chain reorg")),
89+
EventKind::ContributionAndProof(_) => self
90+
.contribution_tx
91+
.send(kind)
92+
.map(|count| log_count(count, "contribution and proof")),
93+
EventKind::PayloadAttributes(_) => self
94+
.payload_attributes_tx
95+
.send(kind)
96+
.map(|count| log_count(count, "payload attributes")),
97+
EventKind::LateHead(_) => self
98+
.late_head
99+
.send(kind)
100+
.map(|count| log_count(count, "late head")),
101+
EventKind::BlockReward(_) => self
102+
.block_reward_tx
103+
.send(kind)
104+
.map(|count| log_count(count, "block reward")),
75105
};
76106
if let Err(SendError(event)) = result {
77107
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@@ -106,6 +136,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
106136
self.contribution_tx.subscribe()
107137
}
108138

139+
pub fn subscribe_payload_attributes(&self) -> Receiver<EventKind<T>> {
140+
self.payload_attributes_tx.subscribe()
141+
}
142+
109143
pub fn subscribe_late_head(&self) -> Receiver<EventKind<T>> {
110144
self.late_head.subscribe()
111145
}
@@ -142,6 +176,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
142176
self.contribution_tx.receiver_count() > 0
143177
}
144178

179+
pub fn has_payload_attributes_subscribers(&self) -> bool {
180+
self.payload_attributes_tx.receiver_count() > 0
181+
}
182+
145183
pub fn has_late_head_subscribers(&self) -> bool {
146184
self.late_head.receiver_count() > 0
147185
}

0 commit comments

Comments
 (0)