Skip to content

Commit e6fb686

Browse files
committed
Send it boy
1 parent d58d8ef commit e6fb686

File tree

15 files changed

+976
-349
lines changed

15 files changed

+976
-349
lines changed

account_manager/src/validator/slashing_protection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub fn cli_run<E: EthSpec>(
9090
let slashing_protection_database =
9191
SlashingDatabase::open_or_create(&slashing_protection_db_path).map_err(|e| {
9292
format!(
93-
"Unable to open database at {}: {:?}",
93+
"Unable to open slashing protection database at {}: {:?}",
9494
slashing_protection_db_path.display(),
9595
e
9696
)
@@ -198,7 +198,7 @@ pub fn cli_run<E: EthSpec>(
198198
let slashing_protection_database = SlashingDatabase::open(&slashing_protection_db_path)
199199
.map_err(|e| {
200200
format!(
201-
"Unable to open database at {}: {:?}",
201+
"Unable to open slashing protection database at {}: {:?}",
202202
slashing_protection_db_path.display(),
203203
e
204204
)

beacon_node/beacon_chain/src/block_verification.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ use std::fmt::Debug;
9191
use std::fs;
9292
use std::io::Write;
9393
use std::sync::Arc;
94-
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
94+
use store::{Error as DBError, KeyValueStore};
9595
use strum::AsRefStr;
9696
use task_executor::JoinHandle;
9797
use types::{
@@ -1483,28 +1483,19 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
14831483
// processing, but we get early access to it.
14841484
let state_root = state.update_tree_hash_cache()?;
14851485

1486-
// Store the state immediately.
1487-
let txn_lock = chain.store.hot_db.begin_rw_transaction();
1486+
// Store the state immediately. States are ONLY deleted on finalization pruning, so
1487+
// we won't have race conditions where we should have writen a state and didn't.
14881488
let state_already_exists =
14891489
chain.store.load_hot_state_summary(&state_root)?.is_some();
14901490

1491-
let state_batch = if state_already_exists {
1491+
if state_already_exists {
14921492
// If the state exists, we do not need to re-write it.
1493-
vec![]
14941493
} else {
1495-
vec![if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
1496-
StoreOp::PutState(state_root, &state)
1497-
} else {
1498-
StoreOp::PutStateSummary(
1499-
state_root,
1500-
HotStateSummary::new(&state_root, &state)?,
1501-
)
1502-
}]
1494+
// Recycle store codepath to create a state summary and store the state / diff
1495+
let mut ops = vec![];
1496+
chain.store.store_hot_state(&state_root, &state, &mut ops)?;
1497+
chain.store.hot_db.do_atomically(ops)?;
15031498
};
1504-
chain
1505-
.store
1506-
.do_atomically_with_block_and_blobs_cache(state_batch)?;
1507-
drop(txn_lock);
15081499

15091500
state_root
15101501
};

beacon_node/beacon_chain/src/builder.rs

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use std::time::Duration;
3939
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
4040
use task_executor::{ShutdownReason, TaskExecutor};
4141
use types::{
42-
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec,
43-
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
42+
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Epoch, EthSpec, FixedBytesExtended,
43+
Hash256, Signature, SignedBeaconBlock, Slot,
4444
};
4545

4646
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -387,21 +387,28 @@ where
387387
}
388388

389389
/// Starts a new chain from a genesis state.
390-
pub fn genesis_state(mut self, beacon_state: BeaconState<E>) -> Result<Self, String> {
390+
pub fn genesis_state(mut self, mut beacon_state: BeaconState<E>) -> Result<Self, String> {
391391
let store = self.store.clone().ok_or("genesis_state requires a store")?;
392392

393+
// Initialize anchor info before attempting to write the genesis state
394+
let retain_historic_states = self.chain_config.reconstruct_historic_states;
395+
let genesis_beacon_block = genesis_block(&mut beacon_state, &self.spec)?;
396+
self.pending_io_batch.push(
397+
store
398+
.init_anchor_info(
399+
genesis_beacon_block.message().parent_root(),
400+
Slot::new(0),
401+
retain_historic_states,
402+
)
403+
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
404+
);
405+
393406
let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
394407
self = updated_builder;
395408

396409
// Stage the database's metadata fields for atomic storage when `build` is called.
397410
// Since v4.4.0 we will set the anchor with a dummy state upper limit in order to prevent
398411
// historic states from being retained (unless `--reconstruct-historic-states` is set).
399-
let retain_historic_states = self.chain_config.reconstruct_historic_states;
400-
self.pending_io_batch.push(
401-
store
402-
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
403-
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
404-
);
405412
self.pending_io_batch.push(
406413
store
407414
.init_blob_info(genesis.beacon_block.slot())
@@ -519,6 +526,14 @@ where
519526
}
520527
}
521528

529+
debug!(
530+
log,
531+
"Storing split from weak subjectivity state";
532+
"slot" => weak_subj_slot,
533+
"state_root" => ?weak_subj_state_root,
534+
"block_root" => ?weak_subj_block_root,
535+
);
536+
522537
// Set the store's split point *before* storing genesis so that genesis is stored
523538
// immediately in the freezer DB.
524539
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
@@ -540,6 +555,19 @@ where
540555
.do_atomically(block_root_batch)
541556
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
542557

558+
// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
559+
// states that do not align with the start_slot grid
560+
let retain_historic_states = self.chain_config.reconstruct_historic_states;
561+
self.pending_io_batch.push(
562+
store
563+
.init_anchor_info(
564+
weak_subj_block.message().parent_root(),
565+
weak_subj_slot,
566+
retain_historic_states,
567+
)
568+
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
569+
);
570+
543571
// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
544572
// about on a crash restart.
545573
store
@@ -549,6 +577,8 @@ where
549577
weak_subj_state.clone(),
550578
)
551579
.map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?;
580+
// Note: post hot hdiff must update the anchor info before attempting to put_state otherwise
581+
// the write will fail if the weak_subj_slot is not aligned with the snapshot moduli.
552582
store
553583
.put_state(&weak_subj_state_root, &weak_subj_state)
554584
.map_err(|e| format!("Failed to store weak subjectivity state: {e:?}"))?;
@@ -585,13 +615,7 @@ where
585615
// Stage the database's metadata fields for atomic storage when `build` is called.
586616
// This prevents the database from restarting in an inconsistent state if the anchor
587617
// info or split point is written before the `PersistedBeaconChain`.
588-
let retain_historic_states = self.chain_config.reconstruct_historic_states;
589618
self.pending_io_batch.push(store.store_split_in_batch());
590-
self.pending_io_batch.push(
591-
store
592-
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
593-
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
594-
);
595619
self.pending_io_batch.push(
596620
store
597621
.init_blob_info(weak_subj_block.slot())
@@ -603,13 +627,6 @@ where
603627
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
604628
);
605629

606-
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
607-
self.pending_io_batch
608-
.push(store.pruning_checkpoint_store_op(Checkpoint {
609-
root: weak_subj_block_root,
610-
epoch: weak_subj_state.slot().epoch(E::slots_per_epoch()),
611-
}));
612-
613630
let snapshot = BeaconSnapshot {
614631
beacon_block_root: weak_subj_block_root,
615632
beacon_block: Arc::new(weak_subj_block),

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -586,11 +586,23 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
586586
let newly_finalized_blocks = state_summaries_dag
587587
.blocks_of_states(newly_finalized_state_roots.iter())
588588
.map_err(|e| PruningError::SummariesDagError("blocks of newly finalized", e))?;
589+
let newly_finalized_blocks_min_slot = *newly_finalized_blocks
590+
.iter()
591+
.map(|(_, slot)| slot)
592+
.min()
593+
.ok_or(PruningError::EmptyFinalizedBlocks)?;
594+
595+
// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
596+
// work.
597+
let required_finalized_diff_state_slots = store
598+
.hierarchy_hot
599+
.closest_layer_points(new_finalized_slot, store.hot_hdiff_start_slot()?);
589600

590601
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
591602
// everything in one fell swoop.
592603
let mut blocks_to_prune: HashSet<Hash256> = HashSet::new();
593604
let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new();
605+
let mut kept_summaries_for_hdiff = vec![];
594606

595607
// Consider the following block tree where we finalize block `[0]` at the checkpoint `(f)`.
596608
// There's a block `[3]` that descendends from the finalized block but NOT from the
@@ -603,14 +615,38 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
603615
// \---[3]--|-----------------[4]
604616
// |
605617

606-
for (_, summaries) in state_summaries_dag.summaries_by_slot_ascending() {
607-
for (state_root, summary) in summaries {
618+
for (slot, summaries) in state_summaries_dag.summaries_by_slot_ascending() {
619+
for (state_root, _) in summaries {
608620
let should_prune = if finalized_and_descendant_state_roots_of_finalized_checkpoint
609621
.contains(&state_root)
610622
{
611623
// This state is a viable descendant of the finalized checkpoint, so does not
612624
// conflict with finality and can be built on or become a head
613625
false
626+
} else if required_finalized_diff_state_slots.contains(&slot) {
627+
// Keep this state and diff as it's necessary for the finalized portion of the
628+
// HDiff links. `required_finalized_diff_state_slots` tracks the set of slots on
629+
// each diff layer, and by checking `newly_finalized_state_roots` which only
630+
// keep those on the finalized canonical chain. Checking the state root ensures
631+
// we avoid lingering forks.
632+
633+
// In the diagram below, `o` are diffs by slot that we must keep. In the prior
634+
// finalized section there's only one chain so we preserve them unconditionally.
635+
// For the newly finalized chain, we check which of is canonical and only keep
636+
// those. Slots below `min_finalized_state_slot` we don't have canonical
637+
// information so we assume they are part of the finalized pruned chain.
638+
//
639+
// /-----o----
640+
// o-------o------/-------o----
641+
if slot < newly_finalized_states_min_slot
642+
|| newly_finalized_state_roots.contains(&state_root)
643+
{
644+
// Track kept summaries to debug hdiff inconsistencies with "Extra pruning information"
645+
kept_summaries_for_hdiff.push((state_root, slot));
646+
false
647+
} else {
648+
true
649+
}
614650
} else {
615651
// Everything else, prune
616652
true
@@ -619,7 +655,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
619655
if should_prune {
620656
// States are migrated into the cold DB in the migrate step. All hot states
621657
// prior to finalized can be pruned from the hot DB columns
622-
states_to_prune.insert((summary.slot, state_root));
658+
states_to_prune.insert((slot, state_root));
623659
}
624660
}
625661
}
@@ -639,7 +675,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
639675
} else if newly_finalized_blocks.contains(&(block_root, slot)) {
640676
// Keep recently finalized blocks
641677
false
642-
} else if slot < newly_finalized_states_min_slot {
678+
} else if slot < newly_finalized_blocks_min_slot {
643679
// Keep recently finalized blocks that we know are canonical. Blocks with slots <
644680
// that `newly_finalized_blocks_min_slot` we don't have canonical information so we
645681
// assume they are part of the finalized pruned chain
@@ -668,7 +704,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
668704
"new_finalized_checkpoint" => ?new_finalized_checkpoint,
669705
"newly_finalized_blocks" => newly_finalized_blocks.len(),
670706
"newly_finalized_state_roots" => newly_finalized_state_roots.len(),
707+
"newly_finalized_blocks_min_slot" => newly_finalized_blocks_min_slot,
671708
"newly_finalized_states_min_slot" => newly_finalized_states_min_slot,
709+
"required_finalized_diff_state_slots" => ?required_finalized_diff_state_slots,
710+
"kept_summaries_for_hdiff" => ?kept_summaries_for_hdiff,
672711
"state_summaries_count" => state_summaries_dag.summaries_count(),
673712
"state_summaries_dag_roots" => ?state_summaries_dag_roots,
674713
"finalized_and_descendant_state_roots_of_finalized_checkpoint" => finalized_and_descendant_state_roots_of_finalized_checkpoint.len(),

beacon_node/beacon_chain/src/schema_change.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod migration_schema_v20;
33
mod migration_schema_v21;
44
mod migration_schema_v22;
55
mod migration_schema_v23;
6+
mod migration_schema_v24;
67

78
use crate::beacon_chain::BeaconChainTypes;
89
use slog::Logger;
@@ -68,6 +69,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
6869
let ops = migration_schema_v23::downgrade_from_v23::<T>(db.clone(), log)?;
6970
db.store_schema_version_atomically(to, ops)
7071
}
72+
(SchemaVersion(23), SchemaVersion(24)) => {
73+
let ops = migration_schema_v24::upgrade_to_v24::<T>(db.clone(), log)?;
74+
db.store_schema_version_atomically(to, ops)
75+
}
76+
(SchemaVersion(24), SchemaVersion(23)) => {
77+
let ops = migration_schema_v24::downgrade_from_v24::<T>(db.clone(), log)?;
78+
db.store_schema_version_atomically(to, ops)
79+
}
7180
// Anything else is an error.
7281
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
7382
target_version: to,

0 commit comments

Comments
 (0)