Skip to content

Commit 01b91cb

Browse files
committed
Avoid double-emitting SSE blob events. Add code comments.
1 parent 0ecb203 commit 01b91cb

File tree

3 files changed

+39
-25
lines changed

3 files changed

+39
-25
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3047,13 +3047,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30473047
return Err(BlockError::BlobNotRequired(blob.slot()));
30483048
}
30493049

3050-
if let Some(event_handler) = self.event_handler.as_ref() {
3051-
if event_handler.has_blob_sidecar_subscribers() {
3052-
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
3053-
blob.as_blob(),
3054-
)));
3055-
}
3056-
}
3050+
self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));
30573051

30583052
let r = self.check_gossip_blob_availability_and_import(blob).await;
30593053
self.remove_notified(&block_root, r)
@@ -3116,15 +3110,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31163110
return Err(BlockError::BlockIsAlreadyKnown(block_root));
31173111
}
31183112

3119-
if let Some(event_handler) = self.event_handler.as_ref() {
3120-
if event_handler.has_blob_sidecar_subscribers() {
3121-
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
3122-
event_handler.register(EventKind::BlobSidecar(
3123-
SseBlobSidecar::from_blob_sidecar(blob),
3124-
));
3125-
}
3126-
}
3127-
}
3113+
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
31283114

31293115
let r = self
31303116
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
@@ -3149,20 +3135,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31493135
return Err(BlockError::BlockIsAlreadyKnown(block_root));
31503136
}
31513137

3138+
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
3139+
3140+
let r = self
3141+
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
3142+
.await;
3143+
self.remove_notified(&block_root, r)
3144+
}
3145+
3146+
fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc<Self>, block_root: &Hash256, blobs_iter: I)
3147+
where
3148+
I: Iterator<Item = &'a BlobSidecar<T::EthSpec>>,
3149+
{
31523150
if let Some(event_handler) = self.event_handler.as_ref() {
31533151
if event_handler.has_blob_sidecar_subscribers() {
3154-
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
3152+
let imported_blobs = self
3153+
.data_availability_checker
3154+
.imported_blob_indexes(block_root)
3155+
.unwrap_or_default();
3156+
let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index));
3157+
3158+
for blob in new_blobs {
31553159
event_handler.register(EventKind::BlobSidecar(
31563160
SseBlobSidecar::from_blob_sidecar(blob),
31573161
));
31583162
}
31593163
}
31603164
}
3161-
3162-
let r = self
3163-
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
3164-
.await;
3165-
self.remove_notified(&block_root, r)
31663165
}
31673166

31683167
/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
@@ -3891,6 +3890,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38913890
// See https://github.com/sigp/lighthouse/issues/2028
38923891
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
38933892
let custody_columns_count = self.data_availability_checker.get_custody_columns_count();
3893+
// if the block was made available via custody columns received from gossip / rpc, use them
3894+
// since we already have them.
3895+
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
3896+
// wait for the data columns to be computed (blocking).
38943897
let data_columns = data_columns
38953898
.filter(|a| a.len() >= custody_columns_count)
38963899
.or_else(|| data_column_recv?.blocking_recv());

beacon_node/beacon_chain/src/fetch_blobs.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
//! This module implements an optimisation to fetch blobs via JSON-RPC from the EL.
2+
//! If a blob has already been seen in the public mempool, then it is often unnecessary to wait for
3+
//! it to arrive on P2P gossip. This PR uses a new JSON-RPC method (`engine_getBlobsV1`) which
4+
//! allows the CL to load the blobs quickly from the EL's blob pool.
5+
//!
6+
//! Once the node fetches the blobs from EL, it then publishes the remaining blobs that hasn't seen
7+
//! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count,
8+
//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity
9+
//! supernodes.
110
use crate::observed_data_sidecars::ObservableDataSidecar;
211
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError};
312
use itertools::Either;
@@ -15,6 +24,8 @@ pub enum BlobsOrDataColumns<E: EthSpec> {
1524
DataColumns(DataColumnSidecarVec<E>),
1625
}
1726

27+
/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or
28+
/// data columns (PeerDAS onwards) to the network, using the supplied `publish_fn`.
1829
pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
1930
chain: Arc<BeaconChain<T>>,
2031
block_root: Hash256,
@@ -81,7 +92,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
8192
.enumerate()
8293
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
8394
{
84-
match BlobSidecar::new_efficiently(
95+
match BlobSidecar::new_with_existing_proof(
8596
i,
8697
blob_and_proof.blob,
8798
&block,
@@ -202,7 +213,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
202213
block.slot(),
203214
block_root,
204215
fixed_blob_sidecar_list.clone(),
205-
Some(data_columns_receiver),
216+
peer_das_enabled.then_some(data_columns_receiver),
206217
)
207218
.await
208219
.map(|_| debug!(chain.log, "Blobs from EL - processed"))

consensus/types/src/blob_sidecar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl<E: EthSpec> BlobSidecar<E> {
149149
})
150150
}
151151

152-
pub fn new_efficiently(
152+
pub fn new_with_existing_proof(
153153
index: usize,
154154
blob: Blob<E>,
155155
signed_block: &SignedBeaconBlock<E>,

0 commit comments

Comments
 (0)