Skip to content

Commit 2211504

Browse files
committed
Prioritise important parts of block processing (sigp#3696)
## Issue Addressed Closes sigp#2327 ## Proposed Changes This is an extension of some ideas I implemented while working on `tree-states`: - Cache the indexed attestations from blocks in the `ConsensusContext`. Previously we were re-computing them 3-4 times over. - Clean up `import_block` by splitting each part into `import_block_XXX`. - Move some stuff off hot paths, specifically: - Relocate non-essential tasks that were running between receiving the payload verification status and priming the early attester cache. These tasks are moved after the cache priming: - Attestation observation - Validator monitor updates - Slasher updates - Updating the shuffling cache - Fork choice attestation observation now happens at the end of block verification in parallel with payload verification (this seems to save 5-10ms). - Payload verification now happens _before_ advancing the pre-state and writing it to disk! States were previously being written eagerly and adding ~20-30ms in front of verifying the execution payload. State catchup also sometimes takes ~500ms if we get a cache miss and need to rebuild the tree hash cache. The remaining task that's taking substantial time (~20ms) is importing the block to fork choice. I _think_ this is because of pull-tips, and we should be able to optimise it out with a clever total active balance cache in the state (which would be computed in parallel with payload verification). I've decided to leave that for future work though. For now it can be observed via the new `beacon_block_processing_post_exec_pre_attestable_seconds` metric. Co-authored-by: Michael Sproul <[email protected]>
1 parent b4f4c0d commit 2211504

File tree

14 files changed

+783
-496
lines changed

14 files changed

+783
-496
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 449 additions & 283 deletions
Large diffs are not rendered by default.

beacon_node/beacon_chain/src/block_verification.rs

Lines changed: 141 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,22 @@ use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOC
5252
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
5353
use crate::{
5454
beacon_chain::{
55-
BeaconForkChoice, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
56-
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
55+
BeaconForkChoice, ForkChoiceError, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT,
56+
MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
5757
},
5858
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
5959
};
6060
use derivative::Derivative;
6161
use eth2::types::EventKind;
6262
use execution_layer::PayloadStatus;
63-
use fork_choice::PayloadVerificationStatus;
63+
use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
6464
use parking_lot::RwLockReadGuard;
6565
use proto_array::Block as ProtoBlock;
6666
use safe_arith::ArithError;
6767
use slog::{debug, error, warn, Logger};
6868
use slot_clock::SlotClock;
6969
use ssz::Encode;
70-
use state_processing::per_block_processing::is_merge_transition_block;
70+
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
7171
use state_processing::{
7272
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
7373
per_block_processing, per_slot_processing,
@@ -550,8 +550,22 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
550550
let pubkey_cache = get_validator_pubkey_cache(chain)?;
551551
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
552552

553+
let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len());
554+
553555
for (block_root, block) in &chain_segment {
554-
signature_verifier.include_all_signatures(block, Some(*block_root), None)?;
556+
let mut consensus_context =
557+
ConsensusContext::new(block.slot()).set_current_block_root(*block_root);
558+
559+
signature_verifier.include_all_signatures(block, &mut consensus_context)?;
560+
561+
// Save the block and its consensus context. The context will have had its proposer index
562+
// and attesting indices filled in, which can be used to accelerate later block processing.
563+
signature_verified_blocks.push(SignatureVerifiedBlock {
564+
block: block.clone(),
565+
block_root: *block_root,
566+
parent: None,
567+
consensus_context,
568+
});
555569
}
556570

557571
if signature_verifier.verify().is_err() {
@@ -560,22 +574,6 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
560574

561575
drop(pubkey_cache);
562576

563-
let mut signature_verified_blocks = chain_segment
564-
.into_iter()
565-
.map(|(block_root, block)| {
566-
// Proposer index has already been verified above during signature verification.
567-
let consensus_context = ConsensusContext::new(block.slot())
568-
.set_current_block_root(block_root)
569-
.set_proposer_index(block.message().proposer_index());
570-
SignatureVerifiedBlock {
571-
block,
572-
block_root,
573-
parent: None,
574-
consensus_context,
575-
}
576-
})
577-
.collect::<Vec<_>>();
578-
579577
if let Some(signature_verified_block) = signature_verified_blocks.first_mut() {
580578
signature_verified_block.parent = Some(parent);
581579
}
@@ -625,6 +623,7 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
625623
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
626624
pub parent_eth1_finalization_data: Eth1FinalizationData,
627625
pub confirmed_state_roots: Vec<Hash256>,
626+
pub consensus_context: ConsensusContext<T::EthSpec>,
628627
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
629628
}
630629

@@ -951,13 +950,14 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
951950

952951
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
953952

954-
signature_verifier.include_all_signatures(&block, Some(block_root), None)?;
953+
let mut consensus_context =
954+
ConsensusContext::new(block.slot()).set_current_block_root(block_root);
955+
956+
signature_verifier.include_all_signatures(&block, &mut consensus_context)?;
955957

956958
if signature_verifier.verify().is_ok() {
957959
Ok(Self {
958-
consensus_context: ConsensusContext::new(block.slot())
959-
.set_current_block_root(block_root)
960-
.set_proposer_index(block.message().proposer_index()),
960+
consensus_context,
961961
block,
962962
block_root,
963963
parent: Some(parent),
@@ -1002,16 +1002,16 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
10021002

10031003
// Gossip verification has already checked the proposer index. Use it to check the RANDAO
10041004
// signature.
1005-
let verified_proposer_index = Some(block.message().proposer_index());
1005+
let mut consensus_context = from.consensus_context;
10061006
signature_verifier
1007-
.include_all_signatures_except_proposal(&block, verified_proposer_index)?;
1007+
.include_all_signatures_except_proposal(&block, &mut consensus_context)?;
10081008

10091009
if signature_verifier.verify().is_ok() {
10101010
Ok(Self {
10111011
block,
10121012
block_root: from.block_root,
10131013
parent: Some(parent),
1014-
consensus_context: from.consensus_context,
1014+
consensus_context,
10151015
})
10161016
} else {
10171017
Err(BlockError::InvalidSignature)
@@ -1138,6 +1138,79 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
11381138

11391139
check_block_relevancy(&block, block_root, chain)?;
11401140

1141+
// Define a future that will verify the execution payload with an execution engine.
1142+
//
1143+
// We do this as early as possible so that later parts of this function can run in parallel
1144+
// with the payload verification.
1145+
let payload_notifier = PayloadNotifier::new(
1146+
chain.clone(),
1147+
block.clone(),
1148+
&parent.pre_state,
1149+
notify_execution_layer,
1150+
)?;
1151+
let is_valid_merge_transition_block =
1152+
is_merge_transition_block(&parent.pre_state, block.message().body());
1153+
let payload_verification_future = async move {
1154+
let chain = payload_notifier.chain.clone();
1155+
let block = payload_notifier.block.clone();
1156+
1157+
// If this block triggers the merge, check to ensure that it references valid execution
1158+
// blocks.
1159+
//
1160+
// The specification defines this check inside `on_block` in the fork-choice specification,
1161+
// however we perform the check here for two reasons:
1162+
//
1163+
// - There's no point in importing a block that will fail fork choice, so it's best to fail
1164+
// early.
1165+
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
1166+
// calls to remote servers.
1167+
if is_valid_merge_transition_block {
1168+
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
1169+
};
1170+
1171+
// The specification declares that this should be run *inside* `per_block_processing`,
1172+
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
1173+
// servers).
1174+
let payload_verification_status = payload_notifier.notify_new_payload().await?;
1175+
1176+
// If the payload did not validate or invalidate the block, check to see if this block is
1177+
// valid for optimistic import.
1178+
if payload_verification_status.is_optimistic() {
1179+
let block_hash_opt = block
1180+
.message()
1181+
.body()
1182+
.execution_payload()
1183+
.map(|full_payload| full_payload.execution_payload.block_hash);
1184+
1185+
// Ensure the block is a candidate for optimistic import.
1186+
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
1187+
{
1188+
warn!(
1189+
chain.log,
1190+
"Rejecting optimistic block";
1191+
"block_hash" => ?block_hash_opt,
1192+
"msg" => "the execution engine is not synced"
1193+
);
1194+
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
1195+
}
1196+
}
1197+
1198+
Ok(PayloadVerificationOutcome {
1199+
payload_verification_status,
1200+
is_valid_merge_transition_block,
1201+
})
1202+
};
1203+
// Spawn the payload verification future as a new task, but don't wait for it to complete.
1204+
// The `payload_verification_future` will be awaited later to ensure verification completed
1205+
// successfully.
1206+
let payload_verification_handle = chain
1207+
.task_executor
1208+
.spawn_handle(
1209+
payload_verification_future,
1210+
"execution_payload_verification",
1211+
)
1212+
.ok_or(BeaconChainError::RuntimeShutdown)?;
1213+
11411214
/*
11421215
* Advance the given `parent.beacon_state` to the slot of the given `block`.
11431216
*/
@@ -1242,80 +1315,11 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
12421315
summaries.push(summary);
12431316
}
12441317
}
1318+
metrics::stop_timer(catchup_timer);
12451319

12461320
let block_slot = block.slot();
12471321
let state_current_epoch = state.current_epoch();
12481322

1249-
// Define a future that will verify the execution payload with an execution engine (but
1250-
// don't execute it yet).
1251-
let payload_notifier =
1252-
PayloadNotifier::new(chain.clone(), block.clone(), &state, notify_execution_layer)?;
1253-
let is_valid_merge_transition_block =
1254-
is_merge_transition_block(&state, block.message().body());
1255-
let payload_verification_future = async move {
1256-
let chain = payload_notifier.chain.clone();
1257-
let block = payload_notifier.block.clone();
1258-
1259-
// If this block triggers the merge, check to ensure that it references valid execution
1260-
// blocks.
1261-
//
1262-
// The specification defines this check inside `on_block` in the fork-choice specification,
1263-
// however we perform the check here for two reasons:
1264-
//
1265-
// - There's no point in importing a block that will fail fork choice, so it's best to fail
1266-
// early.
1267-
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
1268-
// calls to remote servers.
1269-
if is_valid_merge_transition_block {
1270-
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
1271-
};
1272-
1273-
// The specification declares that this should be run *inside* `per_block_processing`,
1274-
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
1275-
// servers).
1276-
//
1277-
// It is important that this function is called *after* `per_slot_processing`, since the
1278-
// `randao` may change.
1279-
let payload_verification_status = payload_notifier.notify_new_payload().await?;
1280-
1281-
// If the payload did not validate or invalidate the block, check to see if this block is
1282-
// valid for optimistic import.
1283-
if payload_verification_status.is_optimistic() {
1284-
let block_hash_opt = block
1285-
.message()
1286-
.body()
1287-
.execution_payload()
1288-
.map(|full_payload| full_payload.execution_payload.block_hash);
1289-
1290-
// Ensure the block is a candidate for optimistic import.
1291-
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
1292-
{
1293-
warn!(
1294-
chain.log,
1295-
"Rejecting optimistic block";
1296-
"block_hash" => ?block_hash_opt,
1297-
"msg" => "the execution engine is not synced"
1298-
);
1299-
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
1300-
}
1301-
}
1302-
1303-
Ok(PayloadVerificationOutcome {
1304-
payload_verification_status,
1305-
is_valid_merge_transition_block,
1306-
})
1307-
};
1308-
// Spawn the payload verification future as a new task, but don't wait for it to complete.
1309-
// The `payload_verification_future` will be awaited later to ensure verification completed
1310-
// successfully.
1311-
let payload_verification_handle = chain
1312-
.task_executor
1313-
.spawn_handle(
1314-
payload_verification_future,
1315-
"execution_payload_verification",
1316-
)
1317-
.ok_or(BeaconChainError::RuntimeShutdown)?;
1318-
13191323
// If the block is sufficiently recent, notify the validator monitor.
13201324
if let Some(slot) = chain.slot_clock.now() {
13211325
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
@@ -1342,8 +1346,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
13421346
}
13431347
}
13441348

1345-
metrics::stop_timer(catchup_timer);
1346-
13471349
/*
13481350
* Build the committee caches on the state.
13491351
*/
@@ -1433,13 +1435,52 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
14331435
});
14341436
}
14351437

1438+
/*
1439+
* Apply the block's attestations to fork choice.
1440+
*
1441+
* We're running in parallel with the payload verification at this point, so this is
1442+
* free real estate.
1443+
*/
1444+
let current_slot = chain.slot()?;
1445+
let mut fork_choice = chain.canonical_head.fork_choice_write_lock();
1446+
1447+
// Register each attester slashing in the block with fork choice.
1448+
for attester_slashing in block.message().body().attester_slashings() {
1449+
fork_choice.on_attester_slashing(attester_slashing);
1450+
}
1451+
1452+
// Register each attestation in the block with fork choice.
1453+
for (i, attestation) in block.message().body().attestations().iter().enumerate() {
1454+
let _fork_choice_attestation_timer =
1455+
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
1456+
1457+
let indexed_attestation = consensus_context
1458+
.get_indexed_attestation(&state, attestation)
1459+
.map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?;
1460+
1461+
match fork_choice.on_attestation(
1462+
current_slot,
1463+
indexed_attestation,
1464+
AttestationFromBlock::True,
1465+
&chain.spec,
1466+
) {
1467+
Ok(()) => Ok(()),
1468+
// Ignore invalid attestations whilst importing attestations from a block. The
1469+
// block might be very old and therefore the attestations useless to fork choice.
1470+
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
1471+
Err(e) => Err(BlockError::BeaconChainError(e.into())),
1472+
}?;
1473+
}
1474+
drop(fork_choice);
1475+
14361476
Ok(Self {
14371477
block,
14381478
block_root,
14391479
state,
14401480
parent_block: parent.beacon_block,
14411481
parent_eth1_finalization_data,
14421482
confirmed_state_roots,
1483+
consensus_context,
14431484
payload_verification_handle,
14441485
})
14451486
}

beacon_node/beacon_chain/src/execution_payload.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
6969
// are cheap and doing them here ensures we protect the execution engine from junk.
7070
partially_verify_execution_payload(
7171
state,
72+
block.slot(),
7273
block.message().execution_payload()?,
7374
&chain.spec,
7475
)
@@ -373,7 +374,8 @@ pub fn get_execution_payload<
373374
let spec = &chain.spec;
374375
let current_epoch = state.current_epoch();
375376
let is_merge_transition_complete = is_merge_transition_complete(state);
376-
let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?;
377+
let timestamp =
378+
compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?;
377379
let random = *state.get_randao_mix(current_epoch)?;
378380
let latest_execution_payload_header_block_hash =
379381
state.latest_execution_payload_header()?.block_hash;

beacon_node/beacon_chain/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ lazy_static! {
6464
"beacon_block_processing_state_root_seconds",
6565
"Time spent calculating the state root when processing a block."
6666
);
67+
pub static ref BLOCK_PROCESSING_POST_EXEC_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
68+
"beacon_block_processing_post_exec_pre_attestable_seconds",
69+
"Time between finishing execution processing and the block becoming attestable",
70+
linear_buckets(5e-3, 5e-3, 10)
71+
);
6772
pub static ref BLOCK_PROCESSING_DB_WRITE: Result<Histogram> = try_create_histogram(
6873
"beacon_block_processing_db_write_seconds",
6974
"Time spent writing a newly processed block and state to DB"

beacon_node/beacon_chain/src/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ where
586586

587587
pub fn get_timestamp_at_slot(&self) -> u64 {
588588
let state = self.get_current_state();
589-
compute_timestamp_at_slot(&state, &self.spec).unwrap()
589+
compute_timestamp_at_slot(&state, state.slot(), &self.spec).unwrap()
590590
}
591591

592592
pub fn get_current_state_and_root(&self) -> (BeaconState<E>, Hash256) {

0 commit comments

Comments
 (0)