Skip to content

Commit 168d9f0

Browse files
committed
HDiff in the hot DB - squashed
1 parent 29ad4fc commit 168d9f0

File tree

16 files changed

+918
-413
lines changed

16 files changed

+918
-413
lines changed

account_manager/src/validator/slashing_protection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub fn cli_run<E: EthSpec>(
9090
let slashing_protection_database =
9191
SlashingDatabase::open_or_create(&slashing_protection_db_path).map_err(|e| {
9292
format!(
93-
"Unable to open database at {}: {:?}",
93+
"Unable to open slashing protection database at {}: {:?}",
9494
slashing_protection_db_path.display(),
9595
e
9696
)
@@ -198,7 +198,7 @@ pub fn cli_run<E: EthSpec>(
198198
let slashing_protection_database = SlashingDatabase::open(&slashing_protection_db_path)
199199
.map_err(|e| {
200200
format!(
201-
"Unable to open database at {}: {:?}",
201+
"Unable to open slashing protection database at {}: {:?}",
202202
slashing_protection_db_path.display(),
203203
e
204204
)

beacon_node/beacon_chain/src/block_verification.rs

Lines changed: 41 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ use std::fmt::Debug;
9191
use std::fs;
9292
use std::io::Write;
9393
use std::sync::Arc;
94-
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
94+
use store::{Error as DBError, KeyValueStore, StoreOp};
9595
use strum::AsRefStr;
9696
use task_executor::JoinHandle;
9797
use types::{
@@ -1455,52 +1455,49 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
14551455

14561456
let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64());
14571457
for _ in 0..distance {
1458-
let state_root = if parent.beacon_block.slot() == state.slot() {
1459-
// If it happens that `pre_state` has *not* already been advanced forward a single
1460-
// slot, then there is no need to compute the state root for this
1461-
// `per_slot_processing` call since that state root is already stored in the parent
1462-
// block.
1463-
parent.beacon_block.state_root()
1464-
} else {
1465-
// This is a new state we've reached, so stage it for storage in the DB.
1466-
// Computing the state root here is time-equivalent to computing it during slot
1467-
// processing, but we get early access to it.
1468-
let state_root = state.update_tree_hash_cache()?;
1469-
1470-
// Store the state immediately, marking it as temporary, and staging the deletion
1471-
// of its temporary status as part of the larger atomic operation.
1472-
let txn_lock = chain.store.hot_db.begin_rw_transaction();
1473-
let state_already_exists =
1474-
chain.store.load_hot_state_summary(&state_root)?.is_some();
1475-
1476-
let state_batch = if state_already_exists {
1477-
// If the state exists, it could be temporary or permanent, but in neither case
1478-
// should we rewrite it or store a new temporary flag for it. We *will* stage
1479-
// the temporary flag for deletion because it's OK to double-delete the flag,
1480-
// and we don't mind if another thread gets there first.
1481-
vec![]
1458+
let state_root =
1459+
if parent.beacon_block.slot() == state.slot() {
1460+
// If it happens that `pre_state` has *not* already been advanced forward a single
1461+
// slot, then there is no need to compute the state root for this
1462+
// `per_slot_processing` call since that state root is already stored in the parent
1463+
// block.
1464+
parent.beacon_block.state_root()
14821465
} else {
1483-
vec![
1484-
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
1485-
StoreOp::PutState(state_root, &state)
1486-
} else {
1487-
StoreOp::PutStateSummary(
1488-
state_root,
1489-
HotStateSummary::new(&state_root, &state)?,
1490-
)
1491-
},
1492-
StoreOp::PutStateTemporaryFlag(state_root),
1493-
]
1494-
};
1495-
chain
1496-
.store
1497-
.do_atomically_with_block_and_blobs_cache(state_batch)?;
1498-
drop(txn_lock);
1466+
// This is a new state we've reached, so stage it for storage in the DB.
1467+
// Computing the state root here is time-equivalent to computing it during slot
1468+
// processing, but we get early access to it.
1469+
let state_root = state.update_tree_hash_cache()?;
1470+
1471+
// Store the state immediately, marking it as temporary, and staging the deletion
1472+
// of its temporary status as part of the larger atomic operation.
1473+
// TODO(hdiff): Is it necessary to do this read tx now? Also why is it necessary to
1474+
// check that the summary exists at all? Are double writes common? Can this txn
1475+
// lock deadlock with the `do_atomically` call?
1476+
let txn_lock = chain.store.hot_db.begin_rw_transaction();
1477+
let state_already_exists =
1478+
chain.store.load_hot_state_summary(&state_root)?.is_some();
1479+
1480+
if state_already_exists {
1481+
// If the state exists, it could be temporary or permanent, but in neither case
1482+
// should we rewrite it or store a new temporary flag for it. We *will* stage
1483+
// the temporary flag for deletion because it's OK to double-delete the flag,
1484+
// and we don't mind if another thread gets there first.
1485+
} else {
1486+
let mut ops = vec![];
1487+
// Recycle store codepath to create a state summary and store the state / diff
1488+
chain.store.store_hot_state(&state_root, &state, &mut ops)?;
1489+
// Additionally write a temporary flag as part of the atomic write
1490+
ops.extend(chain.store.convert_to_kv_batch(vec![
1491+
StoreOp::PutStateTemporaryFlag(state_root),
1492+
])?);
1493+
chain.store.hot_db.do_atomically(ops)?;
1494+
}
1495+
drop(txn_lock);
14991496

1500-
confirmed_state_roots.push(state_root);
1497+
confirmed_state_roots.push(state_root);
15011498

1502-
state_root
1503-
};
1499+
state_root
1500+
};
15041501

15051502
if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? {
15061503
// Expose Prometheus metrics.

beacon_node/beacon_chain/src/builder.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use std::time::Duration;
3838
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
3939
use task_executor::{ShutdownReason, TaskExecutor};
4040
use types::{
41-
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec,
42-
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
41+
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Epoch, EthSpec, FixedBytesExtended,
42+
Hash256, Signature, SignedBeaconBlock, Slot,
4343
};
4444

4545
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -398,7 +398,11 @@ where
398398
let retain_historic_states = self.chain_config.reconstruct_historic_states;
399399
self.pending_io_batch.push(
400400
store
401-
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
401+
.init_anchor_info(
402+
genesis.beacon_block.message(),
403+
Slot::new(0),
404+
retain_historic_states,
405+
)
402406
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
403407
);
404408
self.pending_io_batch.push(
@@ -518,6 +522,14 @@ where
518522
}
519523
}
520524

525+
debug!(
526+
log,
527+
"Storing split from weak subjectivity state";
528+
"slot" => weak_subj_slot,
529+
"state_root" => ?weak_subj_state_root,
530+
"block_root" => ?weak_subj_block_root,
531+
);
532+
521533
// Set the store's split point *before* storing genesis so that genesis is stored
522534
// immediately in the freezer DB.
523535
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
@@ -539,6 +551,19 @@ where
539551
.do_atomically(block_root_batch)
540552
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
541553

554+
// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
555+
// states that do not align with the start_slot grid
556+
let retain_historic_states = self.chain_config.reconstruct_historic_states;
557+
self.pending_io_batch.push(
558+
store
559+
.init_anchor_info(
560+
weak_subj_block.message(),
561+
weak_subj_slot,
562+
retain_historic_states,
563+
)
564+
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
565+
);
566+
542567
// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
543568
// about on a crash restart.
544569
store
@@ -548,6 +573,8 @@ where
548573
weak_subj_state.clone(),
549574
)
550575
.map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?;
576+
// Note: post hot hdiff must update the anchor info before attempting to put_state otherwise
577+
// the write will fail if the weak_subj_slot is not aligned with the snapshot moduli.
551578
store
552579
.put_state(&weak_subj_state_root, &weak_subj_state)
553580
.map_err(|e| format!("Failed to store weak subjectivity state: {e:?}"))?;
@@ -563,13 +590,7 @@ where
563590
// Stage the database's metadata fields for atomic storage when `build` is called.
564591
// This prevents the database from restarting in an inconsistent state if the anchor
565592
// info or split point is written before the `PersistedBeaconChain`.
566-
let retain_historic_states = self.chain_config.reconstruct_historic_states;
567593
self.pending_io_batch.push(store.store_split_in_batch());
568-
self.pending_io_batch.push(
569-
store
570-
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
571-
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
572-
);
573594
self.pending_io_batch.push(
574595
store
575596
.init_blob_info(weak_subj_block.slot())
@@ -581,13 +602,6 @@ where
581602
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
582603
);
583604

584-
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
585-
self.pending_io_batch
586-
.push(store.pruning_checkpoint_store_op(Checkpoint {
587-
root: weak_subj_block_root,
588-
epoch: weak_subj_state.slot().epoch(E::slots_per_epoch()),
589-
}));
590-
591605
let snapshot = BeaconSnapshot {
592606
beacon_block_root: weak_subj_block_root,
593607
beacon_block: Arc::new(weak_subj_block),

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use crate::errors::BeaconChainError;
22
use crate::summaries_dag::{
3-
BlockSummariesDAG, DAGBlockSummary, DAGStateSummaryV22, Error as SummariesDagError,
3+
BlockSummariesDAG, DAGBlockSummary, DAGStateSummary, Error as SummariesDagError,
44
StateSummariesDAG,
55
};
66
use parking_lot::Mutex;
77
use slog::{debug, error, info, warn, Logger};
8-
use std::collections::{HashMap, HashSet};
8+
use std::collections::HashSet;
99
use std::mem;
1010
use std::sync::{mpsc, Arc};
1111
use std::thread;
@@ -462,7 +462,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
462462
new_finalized_checkpoint: Checkpoint,
463463
log: &Logger,
464464
) -> Result<PruningOutcome, BeaconChainError> {
465-
let split_state_root = store.get_split_info().state_root;
466465
let new_finalized_slot = new_finalized_checkpoint
467466
.epoch
468467
.start_slot(E::slots_per_epoch());
@@ -494,7 +493,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
494493
.load_hot_state_summaries()?
495494
.into_iter()
496495
.map(|(state_root, summary)| (state_root, summary.into()))
497-
.collect::<Vec<(Hash256, DAGStateSummaryV22)>>();
496+
.collect::<Vec<(Hash256, DAGStateSummary)>>();
498497

499498
// De-duplicate block roots to reduce block reads below
500499
let summary_block_roots = HashSet::<Hash256>::from_iter(
@@ -528,18 +527,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
528527
})
529528
.collect::<Result<Vec<_>, BeaconChainError>>()?;
530529

531-
let parent_block_roots = blocks
532-
.iter()
533-
.map(|(block_root, block)| (*block_root, block.parent_root))
534-
.collect::<HashMap<Hash256, Hash256>>();
535-
536530
(
537-
StateSummariesDAG::new_from_v22(
538-
state_summaries,
539-
parent_block_roots,
540-
split_state_root,
541-
)
542-
.map_err(PruningError::SummariesDagError)?,
531+
StateSummariesDAG::new(state_summaries),
543532
BlockSummariesDAG::new(&blocks),
544533
)
545534
};
@@ -585,10 +574,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
585574
.min()
586575
.ok_or(PruningError::EmptyFinalizedBlocks)?;
587576

577+
// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
578+
// work.
579+
let required_finalized_diff_state_slots = store
580+
.hierarchy_hot
581+
.closest_layer_points(new_finalized_slot, store.hot_hdiff_start_slot());
582+
588583
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
589584
// everything in one fell swoop.
590585
let mut blocks_to_prune: HashSet<Hash256> = HashSet::new();
591586
let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new();
587+
let mut kept_summaries_for_hdiff = vec![];
592588

593589
for (slot, summaries) in state_summaries_dag.summaries_by_slot_ascending() {
594590
for (state_root, summary) in summaries {
@@ -597,6 +593,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
597593
// Keep this state is the post state of a viable head, or a state advance from a
598594
// viable head.
599595
false
596+
} else if required_finalized_diff_state_slots.contains(&slot) {
597+
// Keep this state and diff as it's necessary for the finalized portion of the
598+
// HDiff links. `required_finalized_diff_state_slots` tracks the set of slots on
599+
// each diff layer, and by checking `newly_finalized_state_roots` which only
600+
// keep those on the finalized canonical chain. Checking the state root ensures
601+
// we avoid lingering forks.
602+
603+
// In the diagram below, `o` are diffs by slot that we must keep. In the prior
604+
// finalized section there's only one chain so we preserve them unconditionally.
605+
// For the newly finalized chain, we check which of is canonical and only keep
606+
// those. Slots below `min_finalized_state_slot` we don't have canonical
607+
// information so we assume they are part of the finalized pruned chain.
608+
//
609+
// /-----o----
610+
// o-------o------/-------o----
611+
if slot < newly_finalized_states_min_slot
612+
|| newly_finalized_state_roots.contains(&state_root)
613+
{
614+
// Track kept summaries to debug hdiff inconsistencies with "Extra pruning information"
615+
kept_summaries_for_hdiff.push((state_root, slot));
616+
false
617+
} else {
618+
true
619+
}
600620
} else {
601621
// Everything else, prune
602622
true
@@ -650,6 +670,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
650670
"newly_finalized_blocks_min_slot" => newly_finalized_blocks_min_slot,
651671
"newly_finalized_state_roots" => newly_finalized_state_roots.len(),
652672
"newly_finalized_states_min_slot" => newly_finalized_states_min_slot,
673+
"required_finalized_diff_state_slots" => ?required_finalized_diff_state_slots,
674+
"kept_summaries_for_hdiff" => ?kept_summaries_for_hdiff,
653675
"state_summaries_count" => state_summaries_dag.summaries_count(),
654676
"finalized_and_descendant_block_roots" => finalized_and_descendant_block_roots.len(),
655677
"blocks_to_prune_count" => blocks_to_prune.len(),

beacon_node/beacon_chain/src/schema_change.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
mod migration_schema_v20;
33
mod migration_schema_v21;
44
mod migration_schema_v22;
5+
mod migration_schema_v23;
56

67
use crate::beacon_chain::BeaconChainTypes;
78
use slog::Logger;
@@ -59,6 +60,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
5960
// bumped inside the upgrade_to_v22 fn
6061
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
6162
}
63+
(SchemaVersion(22), SchemaVersion(23)) => {
64+
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone(), log)?;
65+
db.store_schema_version_atomically(to, ops)
66+
}
67+
(SchemaVersion(23), SchemaVersion(22)) => {
68+
let ops = migration_schema_v23::downgrade_to_v22::<T>(db.clone(), log)?;
69+
db.store_schema_version_atomically(to, ops)
70+
}
6271
// Anything else is an error.
6372
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
6473
target_version: to,

0 commit comments

Comments
 (0)