Skip to content

Commit e7b1d5e

Browse files
dapplionAgeManning
authored andcommitted
Implement PeerDAS RPC handlers (sigp#6237)
* Implement PeerDAS RPC handlers * use terminate_response_stream * Merge branch 'unstable' of https://github.com/sigp/lighthouse into peerdas-network-rpc-handler * cargo fmt
1 parent 6f75369 commit e7b1d5e

File tree

6 files changed

+313
-11
lines changed

6 files changed

+313
-11
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
11551155
.map_or_else(|| self.get_blobs(block_root), Ok)
11561156
}
11571157

1158+
pub fn get_data_column_checking_all_caches(
1159+
&self,
1160+
block_root: Hash256,
1161+
index: ColumnIndex,
1162+
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
1163+
if let Some(column) = self
1164+
.data_availability_checker
1165+
.get_data_column(&DataColumnIdentifier { block_root, index })?
1166+
{
1167+
return Ok(Some(column));
1168+
}
1169+
1170+
if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) {
1171+
return Ok(columns.iter().find(|c| c.index == index).cloned());
1172+
}
1173+
1174+
self.get_data_column(&block_root, &index)
1175+
}
1176+
11581177
/// Returns the block at the given root, if any.
11591178
///
11601179
/// ## Errors
@@ -1230,6 +1249,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
12301249
}
12311250
}
12321251

1252+
/// Returns the data columns at the given root, if any.
1253+
///
1254+
/// ## Errors
1255+
/// May return a database error.
1256+
pub fn get_data_column(
1257+
&self,
1258+
block_root: &Hash256,
1259+
column_index: &ColumnIndex,
1260+
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
1261+
Ok(self.store.get_data_column(block_root, column_index)?)
1262+
}
1263+
12331264
pub fn get_blinded_block(
12341265
&self,
12351266
block_root: &Hash256,

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use std::time::Duration;
1515
use task_executor::TaskExecutor;
1616
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
1717
use types::{
18-
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
19-
Slot,
18+
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
19+
Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
2020
};
2121

2222
mod error;
@@ -173,6 +173,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
173173
self.availability_cache.peek_blob(blob_id)
174174
}
175175

176+
/// Get a data column from the availability cache.
177+
pub fn get_data_column(
178+
&self,
179+
data_column_id: &DataColumnIdentifier,
180+
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
181+
self.availability_cache.peek_data_column(data_column_id)
182+
}
183+
176184
/// Put a list of blobs received via RPC into the availability cache. This performs KZG
177185
/// verification on the blobs in the list.
178186
pub fn put_rpc_blobs(

beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use ssz_types::{FixedVector, VariableList};
1313
use std::num::NonZeroUsize;
1414
use std::sync::Arc;
1515
use types::blob_sidecar::BlobIdentifier;
16-
use types::{BlobSidecar, ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock};
16+
use types::{
17+
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec,
18+
Hash256, SignedBeaconBlock,
19+
};
1720

1821
/// This represents the components of a partially available block
1922
///
@@ -389,6 +392,22 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
389392
}
390393
}
391394

395+
/// Fetch a data column from the cache without affecting the LRU ordering
396+
pub fn peek_data_column(
397+
&self,
398+
data_column_id: &DataColumnIdentifier,
399+
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
400+
if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) {
401+
Ok(pending_components
402+
.verified_data_columns
403+
.iter()
404+
.find(|data_column| data_column.as_data_column().index == data_column_id.index)
405+
.map(|data_column| data_column.clone_arc()))
406+
} else {
407+
Ok(None)
408+
}
409+
}
410+
392411
pub fn peek_pending_components<R, F: FnOnce(Option<&PendingComponents<T::EthSpec>>) -> R>(
393412
&self,
394413
block_root: &Hash256,

beacon_node/beacon_chain/src/data_column_verification.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
254254
pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
255255
&self.data
256256
}
257+
pub fn clone_arc(&self) -> Arc<DataColumnSidecar<E>> {
258+
self.data.clone()
259+
}
257260
}
258261

259262
/// Complete kzg verification for a `DataColumnSidecar`.

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use serde::Serialize;
66
use ssz::Encode;
77
use ssz_derive::{Decode, Encode};
88
use ssz_types::{typenum::U256, VariableList};
9+
use std::collections::BTreeMap;
910
use std::fmt::Display;
1011
use std::marker::PhantomData;
1112
use std::ops::Deref;
@@ -426,6 +427,17 @@ impl DataColumnsByRootRequest {
426427
pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
427428
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
428429
}
430+
431+
pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
432+
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
433+
for request_id in self.data_column_ids.as_slice() {
434+
column_indexes_by_block
435+
.entry(request_id.block_root)
436+
.or_default()
437+
.push(request_id.index);
438+
}
439+
column_indexes_by_block.into_iter().collect()
440+
}
429441
}
430442

431443
/* RPC Handling and Grouping */

0 commit comments

Comments
 (0)