Skip to content

Commit 664a778

Browse files
Add cache for parallel HTTP requests (sigp#4879)
1 parent 8db17da commit 664a778

File tree

21 files changed

+467
-261
lines changed

21 files changed

+467
-261
lines changed

Cargo.lock

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ members = [
3737
"common/malloc_utils",
3838
"common/oneshot_broadcast",
3939
"common/pretty_reqwest_error",
40+
"common/promise_cache",
4041
"common/sensitive_url",
4142
"common/slot_clock",
4243
"common/system_health",

beacon_node/beacon_chain/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ lighthouse_metrics = { workspace = true }
4343
logging = { workspace = true }
4444
lru = { workspace = true }
4545
merkle_proof = { workspace = true }
46-
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
4746
operation_pool = { workspace = true }
4847
parking_lot = { workspace = true }
48+
promise_cache = { path = "../../common/promise_cache" }
4949
proto_array = { workspace = true }
5050
rand = { workspace = true }
5151
rayon = { workspace = true }

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::observed_blob_sidecars::ObservedBlobSidecars;
5353
use crate::observed_block_producers::ObservedBlockProducers;
5454
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
5555
use crate::observed_slashable::ObservedSlashable;
56+
use crate::parallel_state_cache::ParallelStateCache;
5657
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
5758
use crate::persisted_fork_choice::PersistedForkChoice;
5859
use crate::pre_finalization_cache::PreFinalizationBlockCache;
@@ -460,6 +461,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
460461
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
461462
/// A cache used to track pre-finalization block roots for quick rejection.
462463
pub pre_finalization_block_cache: PreFinalizationBlockCache,
464+
/// A cache used to de-duplicate HTTP state requests.
465+
///
466+
/// The cache is keyed by `state_root`.
467+
pub parallel_state_cache: Arc<RwLock<ParallelStateCache<T::EthSpec>>>,
463468
/// Sender given to tasks, so that if they encounter a state in which execution cannot
464469
/// continue they can request that everything shuts down.
465470
pub shutdown_sender: Sender<ShutdownReason>,
@@ -3868,7 +3873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38683873
self.shuffling_cache
38693874
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
38703875
.ok_or(Error::AttestationCacheLockTimeout)?
3871-
.insert_committee_cache(shuffling_id, committee_cache);
3876+
.insert_value(shuffling_id, committee_cache);
38723877
}
38733878
}
38743879
Ok(())
@@ -6041,7 +6046,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
60416046
// access.
60426047
drop(shuffling_cache);
60436048

6044-
let committee_cache = cache_item.wait()?;
6049+
let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?;
60456050
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
60466051
} else {
60476052
// Create an entry in the cache that "promises" this value will eventually be computed.
@@ -6050,7 +6055,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
60506055
//
60516056
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
60526057
// promise from being created twice.
6053-
let sender = shuffling_cache.create_promise(shuffling_id.clone())?;
6058+
let sender = shuffling_cache
6059+
.create_promise(shuffling_id.clone())
6060+
.map_err(Error::ShufflingCacheError)?;
60546061

60556062
// Drop the shuffling cache to avoid holding the lock for any longer than
60566063
// required.
@@ -6144,7 +6151,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
61446151
self.shuffling_cache
61456152
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
61466153
.ok_or(Error::AttestationCacheLockTimeout)?
6147-
.insert_committee_cache(shuffling_id, &committee_cache);
6154+
.insert_value(shuffling_id, &committee_cache);
61486155

61496156
metrics::stop_timer(committee_building_timer);
61506157

@@ -6446,6 +6453,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
64466453
self.data_availability_checker.data_availability_boundary()
64476454
}
64486455

6456+
pub fn logger(&self) -> &Logger {
6457+
&self.log
6458+
}
6459+
64496460
/// Gets the `LightClientBootstrap` object for a requested block root.
64506461
///
64516462
/// Returns `None` when the state or block is not found in the database.

beacon_node/beacon_chain/src/builder.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use futures::channel::mpsc::Sender;
2323
use kzg::{Kzg, TrustedSetup};
2424
use operation_pool::{OperationPool, PersistedOperationPool};
2525
use parking_lot::{Mutex, RwLock};
26+
use promise_cache::PromiseCache;
2627
use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
2728
use slasher::Slasher;
2829
use slog::{crit, debug, error, info, o, Logger};
@@ -851,6 +852,7 @@ where
851852
let genesis_time = head_snapshot.beacon_state.genesis_time();
852853
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
853854
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
855+
let parallel_state_cache_size = self.chain_config.parallel_state_cache_size;
854856

855857
// Calculate the weak subjectivity point in which to backfill blocks to.
856858
let genesis_backfill_slot = if self.chain_config.genesis_backfill {
@@ -933,6 +935,11 @@ where
933935
beacon_proposer_cache,
934936
block_times_cache: <_>::default(),
935937
pre_finalization_block_cache: <_>::default(),
938+
parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new(
939+
parallel_state_cache_size,
940+
Default::default(),
941+
log.clone(),
942+
))),
936943
validator_pubkey_cache,
937944
attester_cache: <_>::default(),
938945
early_attester_cache: <_>::default(),

beacon_node/beacon_chain/src/canonical_head.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -820,9 +820,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
820820
Ok(head_shuffling_ids) => {
821821
self.shuffling_cache
822822
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
823-
.map(|mut shuffling_cache| {
824-
shuffling_cache.update_head_shuffling_ids(head_shuffling_ids)
825-
})
823+
.map(|mut shuffling_cache| shuffling_cache.update_protector(head_shuffling_ids))
826824
.unwrap_or_else(|| {
827825
error!(
828826
self.log,

beacon_node/beacon_chain/src/chain_config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3;
1515
/// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet).
1616
pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24;
1717

18+
/// Cache only a small number of states in the parallel cache by default.
19+
pub const DEFAULT_PARALLEL_STATE_CACHE_SIZE: usize = 2;
20+
1821
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
1922
pub struct ChainConfig {
2023
/// Maximum number of slots to skip when importing an attestation.
@@ -83,6 +86,8 @@ pub struct ChainConfig {
8386
pub progressive_balances_mode: ProgressiveBalancesMode,
8487
/// Number of epochs between each migration of data from the hot database to the freezer.
8588
pub epochs_per_migration: u64,
89+
/// Size of the promise cache for de-duplicating parallel state requests.
90+
pub parallel_state_cache_size: usize,
8691
}
8792

8893
impl Default for ChainConfig {
@@ -114,6 +119,7 @@ impl Default for ChainConfig {
114119
always_prepare_payload: false,
115120
progressive_balances_mode: ProgressiveBalancesMode::Fast,
116121
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
122+
parallel_state_cache_size: DEFAULT_PARALLEL_STATE_CACHE_SIZE,
117123
}
118124
}
119125
}

beacon_node/beacon_chain/src/errors.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,7 @@ pub enum BeaconChainError {
211211
},
212212
AttestationHeadNotInForkChoice(Hash256),
213213
MissingPersistedForkChoice,
214-
CommitteePromiseFailed(oneshot_broadcast::Error),
215-
MaxCommitteePromises(usize),
214+
ShufflingCacheError(promise_cache::PromiseCacheError),
216215
BlsToExecutionPriorToCapella,
217216
BlsToExecutionConflictsWithPool,
218217
InconsistentFork(InconsistentFork),

beacon_node/beacon_chain/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub mod observed_block_producers;
4242
pub mod observed_operations;
4343
mod observed_slashable;
4444
pub mod otb_verification_service;
45+
mod parallel_state_cache;
4546
mod persisted_beacon_chain;
4647
mod persisted_fork_choice;
4748
mod pre_finalization_cache;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use promise_cache::{PromiseCache, Protect};
2+
use types::{BeaconState, Hash256};
3+
4+
#[derive(Debug, Default)]
5+
pub struct ParallelStateProtector;
6+
7+
impl Protect<Hash256> for ParallelStateProtector {
8+
type SortKey = usize;
9+
10+
/// Evict in arbitrary (hashmap) order by using the same key for every value.
11+
fn sort_key(&self, _: &Hash256) -> Self::SortKey {
12+
0
13+
}
14+
15+
/// We don't care too much about preventing evictions of particular states here. All the states
16+
/// in this cache should be different from the head state.
17+
fn protect_from_eviction(&self, _: &Hash256) -> bool {
18+
false
19+
}
20+
}
21+
22+
pub type ParallelStateCache<E> = PromiseCache<Hash256, BeaconState<E>, ParallelStateProtector>;

0 commit comments

Comments
 (0)