Skip to content

Commit 0f97600

Browse files
michaelsproulWoodpile37
authored andcommitted
Optimise payload attributes calculation and add SSE (sigp#4027)
Closes sigp#3896 Closes sigp#3998 Closes sigp#3700 - Optimise the calculation of withdrawals for payload attributes by avoiding state clones, avoiding unnecessary state advances and reading from the snapshot cache if possible. - Use the execution layer's payload attributes cache to avoid re-calculating payload attributes. I actually implemented a new LRU cache just for withdrawals but it had the exact same key and most of the same data as the existing payload attributes cache, so I deleted it. - Add a new SSE event that fires when payloadAttributes are calculated. This is useful for block builders, a la ethereum/beacon-APIs#244. - Add a new CLI flag `--always-prepare-payload` which forces payload attributes to be sent with every fcU regardless of connected proposers. This is intended for use by builders/relays. For maximum effect, the flags I've been using to run Lighthouse in "payload builder mode" are: ``` --always-prepare-payload \ --prepare-payload-lookahead 12000 \ --suggested-fee-recipient 0x0000000000000000000000000000000000000000 ``` The fee recipient is required so Lighthouse has something to pack in the payload attributes (it can be ignored by the builder). The lookahead causes fcU to be sent at the start of every slot rather than at 8s. As usual, fcU will also be sent after each change of head block. I think this combination is sufficient for builders to build on all viable heads. Often there will be two fcU (and two payload attributes) sent for the same slot: one sent at the start of the slot with the head from `n - 1` as the parent, and one sent after the block arrives with `n` as the parent. Example usage of the new event stream: ```bash curl -N "http://localhost:5052/eth/v1/events?topics=payload_attributes" ``` - [x] Tests added by updating the proposer re-org tests. This has the benefit of testing the proposer re-org code paths with withdrawals too, confirming that the new changes don't interact poorly. - [ ] Benchmarking with `blockdreamer` on devnet-7 showed promising results but I'm yet to do a comparison to `unstable`. Co-authored-by: Michael Sproul <[email protected]>
1 parent 56161a1 commit 0f97600

File tree

9 files changed

+371
-37
lines changed

9 files changed

+371
-37
lines changed

beacon_node/beacon_chain/src/chain_config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ pub struct ChainConfig {
6767
pub prepare_payload_lookahead: Duration,
6868
/// Use EL-free optimistic sync for the finalized part of the chain.
6969
pub optimistic_finalized_sync: bool,
70+
/// Whether to send payload attributes every slot, regardless of connected proposers.
71+
///
72+
/// This is useful for block builders and testing.
73+
pub always_prepare_payload: bool,
7074
}
7175

7276
impl Default for ChainConfig {
@@ -92,6 +96,7 @@ impl Default for ChainConfig {
9296
checkpoint_sync_url_timeout: 60,
9397
prepare_payload_lookahead: Duration::from_secs(4),
9498
optimistic_finalized_sync: true,
99+
always_prepare_payload: false,
95100
}
96101
}
97102
}

beacon_node/beacon_chain/src/events.rs

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub struct ServerSentEventHandler<T: EthSpec> {
1414
exit_tx: Sender<EventKind<T>>,
1515
chain_reorg_tx: Sender<EventKind<T>>,
1616
contribution_tx: Sender<EventKind<T>>,
17+
payload_attributes_tx: Sender<EventKind<T>>,
1718
late_head: Sender<EventKind<T>>,
1819
block_reward_tx: Sender<EventKind<T>>,
1920
log: Logger,
@@ -32,6 +33,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
3233
let (exit_tx, _) = broadcast::channel(capacity);
3334
let (chain_reorg_tx, _) = broadcast::channel(capacity);
3435
let (contribution_tx, _) = broadcast::channel(capacity);
36+
let (payload_attributes_tx, _) = broadcast::channel(capacity);
3537
let (late_head, _) = broadcast::channel(capacity);
3638
let (block_reward_tx, _) = broadcast::channel(capacity);
3739

@@ -43,35 +45,63 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
4345
exit_tx,
4446
chain_reorg_tx,
4547
contribution_tx,
48+
payload_attributes_tx,
4649
late_head,
4750
block_reward_tx,
4851
log,
4952
}
5053
}
5154

5255
pub fn register(&self, kind: EventKind<T>) {
53-
let result = match kind {
54-
EventKind::Attestation(attestation) => self
56+
let log_count = |name, count| {
57+
trace!(
58+
self.log,
59+
"Registering server-sent event";
60+
"kind" => name,
61+
"receiver_count" => count
62+
);
63+
};
64+
let result = match &kind {
65+
EventKind::Attestation(_) => self
5566
.attestation_tx
56-
.send(EventKind::Attestation(attestation))
57-
.map(|count| trace!(self.log, "Registering server-sent attestation event"; "receiver_count" => count)),
58-
EventKind::Block(block) => self.block_tx.send(EventKind::Block(block))
59-
.map(|count| trace!(self.log, "Registering server-sent block event"; "receiver_count" => count)),
60-
EventKind::FinalizedCheckpoint(checkpoint) => self.finalized_tx
61-
.send(EventKind::FinalizedCheckpoint(checkpoint))
62-
.map(|count| trace!(self.log, "Registering server-sent finalized checkpoint event"; "receiver_count" => count)),
63-
EventKind::Head(head) => self.head_tx.send(EventKind::Head(head))
64-
.map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)),
65-
EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit))
66-
.map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)),
67-
EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg))
68-
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
69-
EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof))
70-
.map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)),
71-
EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head))
72-
.map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)),
73-
EventKind::BlockReward(block_reward) => self.block_reward_tx.send(EventKind::BlockReward(block_reward))
74-
.map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)),
67+
.send(kind)
68+
.map(|count| log_count(count, "attestation")),
69+
EventKind::Block(_) => self
70+
.block_tx
71+
.send(kind)
72+
.map(|count| log_count(count, "block")),
73+
EventKind::FinalizedCheckpoint(_) => self
74+
.finalized_tx
75+
.send(kind)
76+
.map(|count| log_count(count, "finalized checkpoint")),
77+
EventKind::Head(_) => self
78+
.head_tx
79+
.send(kind)
80+
.map(|count| log_count(count, "head")),
81+
EventKind::VoluntaryExit(_) => self
82+
.exit_tx
83+
.send(kind)
84+
.map(|count| log_count(count, "exit")),
85+
EventKind::ChainReorg(_) => self
86+
.chain_reorg_tx
87+
.send(kind)
88+
.map(|count| log_count(count, "chain reorg")),
89+
EventKind::ContributionAndProof(_) => self
90+
.contribution_tx
91+
.send(kind)
92+
.map(|count| log_count(count, "contribution and proof")),
93+
EventKind::PayloadAttributes(_) => self
94+
.payload_attributes_tx
95+
.send(kind)
96+
.map(|count| log_count(count, "payload attributes")),
97+
EventKind::LateHead(_) => self
98+
.late_head
99+
.send(kind)
100+
.map(|count| log_count(count, "late head")),
101+
EventKind::BlockReward(_) => self
102+
.block_reward_tx
103+
.send(kind)
104+
.map(|count| log_count(count, "block reward")),
75105
};
76106
if let Err(SendError(event)) = result {
77107
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@@ -106,6 +136,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
106136
self.contribution_tx.subscribe()
107137
}
108138

139+
pub fn subscribe_payload_attributes(&self) -> Receiver<EventKind<T>> {
140+
self.payload_attributes_tx.subscribe()
141+
}
142+
109143
pub fn subscribe_late_head(&self) -> Receiver<EventKind<T>> {
110144
self.late_head.subscribe()
111145
}
@@ -142,6 +176,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
142176
self.contribution_tx.receiver_count() > 0
143177
}
144178

179+
pub fn has_payload_attributes_subscribers(&self) -> bool {
180+
self.payload_attributes_tx.receiver_count() > 0
181+
}
182+
145183
pub fn has_late_head_subscribers(&self) -> bool {
146184
self.late_head.receiver_count() > 0
147185
}

beacon_node/beacon_chain/src/test_utils.rs

Lines changed: 116 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ pub enum AttestationStrategy {
107107
SomeValidators(Vec<usize>),
108108
}
109109

110+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111+
pub enum SyncCommitteeStrategy {
112+
/// All sync committee validators sign.
113+
AllValidators,
114+
/// No validators sign.
115+
NoValidators,
116+
}
117+
110118
/// Indicates whether the `BeaconChainHarness` should use the `state.current_sync_committee` or
111119
/// `state.next_sync_committee` when creating sync messages or contributions.
112120
#[derive(Clone, Debug)]
@@ -1763,15 +1771,64 @@ where
17631771
self.process_attestations(attestations);
17641772
}
17651773

1774+
pub fn sync_committee_sign_block(
1775+
&self,
1776+
state: &BeaconState<E>,
1777+
block_hash: Hash256,
1778+
slot: Slot,
1779+
relative_sync_committee: RelativeSyncCommittee,
1780+
) {
1781+
let sync_contributions =
1782+
self.make_sync_contributions(state, block_hash, slot, relative_sync_committee);
1783+
self.process_sync_contributions(sync_contributions).unwrap()
1784+
}
1785+
17661786
pub async fn add_attested_block_at_slot(
17671787
&self,
17681788
slot: Slot,
17691789
state: BeaconState<E>,
17701790
state_root: Hash256,
17711791
validators: &[usize],
1792+
) -> Result<(SignedBeaconBlockHash, BeaconState<E>), BlockError<E>> {
1793+
self.add_attested_block_at_slot_with_sync(
1794+
slot,
1795+
state,
1796+
state_root,
1797+
validators,
1798+
SyncCommitteeStrategy::NoValidators,
1799+
)
1800+
.await
1801+
}
1802+
1803+
pub async fn add_attested_block_at_slot_with_sync(
1804+
&self,
1805+
slot: Slot,
1806+
state: BeaconState<E>,
1807+
state_root: Hash256,
1808+
validators: &[usize],
1809+
sync_committee_strategy: SyncCommitteeStrategy,
17721810
) -> Result<(SignedBeaconBlockHash, BeaconState<E>), BlockError<E>> {
17731811
let (block_hash, block, state) = self.add_block_at_slot(slot, state).await?;
17741812
self.attest_block(&state, state_root, block_hash, &block, validators);
1813+
1814+
if sync_committee_strategy == SyncCommitteeStrategy::AllValidators
1815+
&& state.current_sync_committee().is_ok()
1816+
{
1817+
self.sync_committee_sign_block(
1818+
&state,
1819+
block_hash.into(),
1820+
slot,
1821+
if (slot + 1).epoch(E::slots_per_epoch())
1822+
% self.spec.epochs_per_sync_committee_period
1823+
== 0
1824+
{
1825+
RelativeSyncCommittee::Next
1826+
} else {
1827+
RelativeSyncCommittee::Current
1828+
},
1829+
);
1830+
}
1831+
17751832
Ok((block_hash, state))
17761833
}
17771834

@@ -1781,10 +1838,35 @@ where
17811838
state_root: Hash256,
17821839
slots: &[Slot],
17831840
validators: &[usize],
1841+
) -> AddBlocksResult<E> {
1842+
self.add_attested_blocks_at_slots_with_sync(
1843+
state,
1844+
state_root,
1845+
slots,
1846+
validators,
1847+
SyncCommitteeStrategy::NoValidators,
1848+
)
1849+
.await
1850+
}
1851+
1852+
pub async fn add_attested_blocks_at_slots_with_sync(
1853+
&self,
1854+
state: BeaconState<E>,
1855+
state_root: Hash256,
1856+
slots: &[Slot],
1857+
validators: &[usize],
1858+
sync_committee_strategy: SyncCommitteeStrategy,
17841859
) -> AddBlocksResult<E> {
17851860
assert!(!slots.is_empty());
1786-
self.add_attested_blocks_at_slots_given_lbh(state, state_root, slots, validators, None)
1787-
.await
1861+
self.add_attested_blocks_at_slots_given_lbh(
1862+
state,
1863+
state_root,
1864+
slots,
1865+
validators,
1866+
None,
1867+
sync_committee_strategy,
1868+
)
1869+
.await
17881870
}
17891871

17901872
async fn add_attested_blocks_at_slots_given_lbh(
@@ -1794,6 +1876,7 @@ where
17941876
slots: &[Slot],
17951877
validators: &[usize],
17961878
mut latest_block_hash: Option<SignedBeaconBlockHash>,
1879+
sync_committee_strategy: SyncCommitteeStrategy,
17971880
) -> AddBlocksResult<E> {
17981881
assert!(
17991882
slots.windows(2).all(|w| w[0] <= w[1]),
@@ -1803,7 +1886,13 @@ where
18031886
let mut state_hash_from_slot: HashMap<Slot, BeaconStateHash> = HashMap::new();
18041887
for slot in slots {
18051888
let (block_hash, new_state) = self
1806-
.add_attested_block_at_slot(*slot, state, state_root, validators)
1889+
.add_attested_block_at_slot_with_sync(
1890+
*slot,
1891+
state,
1892+
state_root,
1893+
validators,
1894+
sync_committee_strategy,
1895+
)
18071896
.await
18081897
.unwrap();
18091898
state = new_state;
@@ -1885,6 +1974,7 @@ where
18851974
&epoch_slots,
18861975
&validators,
18871976
Some(head_block),
1977+
SyncCommitteeStrategy::NoValidators, // for backwards compat
18881978
)
18891979
.await;
18901980

@@ -2002,6 +2092,22 @@ where
20022092
num_blocks: usize,
20032093
block_strategy: BlockStrategy,
20042094
attestation_strategy: AttestationStrategy,
2095+
) -> Hash256 {
2096+
self.extend_chain_with_sync(
2097+
num_blocks,
2098+
block_strategy,
2099+
attestation_strategy,
2100+
SyncCommitteeStrategy::NoValidators,
2101+
)
2102+
.await
2103+
}
2104+
2105+
pub async fn extend_chain_with_sync(
2106+
&self,
2107+
num_blocks: usize,
2108+
block_strategy: BlockStrategy,
2109+
attestation_strategy: AttestationStrategy,
2110+
sync_committee_strategy: SyncCommitteeStrategy,
20052111
) -> Hash256 {
20062112
let (mut state, slots) = match block_strategy {
20072113
BlockStrategy::OnCanonicalHead => {
@@ -2033,7 +2139,13 @@ where
20332139
};
20342140
let state_root = state.update_tree_hash_cache().unwrap();
20352141
let (_, _, last_produced_block_hash, _) = self
2036-
.add_attested_blocks_at_slots(state, state_root, &slots, &validators)
2142+
.add_attested_blocks_at_slots_with_sync(
2143+
state,
2144+
state_root,
2145+
&slots,
2146+
&validators,
2147+
sync_committee_strategy,
2148+
)
20372149
.await;
20382150
last_produced_block_hash.into()
20392151
}

beacon_node/http_api/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3501,6 +3501,9 @@ pub fn serve<T: BeaconChainTypes>(
35013501
api_types::EventTopic::ContributionAndProof => {
35023502
event_handler.subscribe_contributions()
35033503
}
3504+
api_types::EventTopic::PayloadAttributes => {
3505+
event_handler.subscribe_payload_attributes()
3506+
}
35043507
api_types::EventTopic::LateHead => {
35053508
event_handler.subscribe_late_head()
35063509
}

0 commit comments

Comments
 (0)