Skip to content

Commit 8e1594d

Browse files
7suyash7klkvr
andauthored
perf(pipeline): speed up fork unwinding with exponential backoff (paradigmxyz#16622)
Signed-off-by: 7suyash7 <[email protected]> Co-authored-by: Arsenii Kulikov <[email protected]>
1 parent c01c737 commit 8e1594d

File tree

5 files changed

+129
-98
lines changed

5 files changed

+129
-98
lines changed

crates/ethereum/node/tests/e2e/p2p.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,9 @@ async fn test_reorg_through_backfill() -> eyre::Result<()> {
185185
let head = first_provider.get_block_by_number(20.into()).await?.unwrap();
186186
second_node.sync_to(head.header.hash).await?;
187187

188-
// Produce an unfinalized fork chain with 5 blocks
188+
// Produce an unfinalized fork chain with 30 blocks
189189
second_node.payload.timestamp = head.header.timestamp;
190-
advance_with_random_transactions(&mut second_node, 10, &mut rng, false).await?;
190+
advance_with_random_transactions(&mut second_node, 30, &mut rng, false).await?;
191191

192192
// Now reorg second node to the finalized canonical head
193193
let head = first_provider.get_block_by_number(100.into()).await?.unwrap();

crates/stages/api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ reth-testing-utils.workspace = true
5151
test-utils = [
5252
"reth-consensus/test-utils",
5353
"reth-network-p2p/test-utils",
54-
"reth-primitives-traits/test-utils",
5554
"reth-provider/test-utils",
5655
"reth-stages-types/test-utils",
56+
"reth-primitives-traits/test-utils",
5757
]

crates/stages/api/src/pipeline/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ impl<Provider> PipelineBuilder<Provider> {
9090
progress: Default::default(),
9191
metrics_tx,
9292
fail_on_unwind,
93+
last_detached_head_unwind_target: None,
94+
detached_head_attempts: 0,
9395
}
9496
}
9597
}

crates/stages/api/src/pipeline/mod.rs

Lines changed: 123 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ pub use event::*;
77
use futures_util::Future;
88
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
99
use reth_provider::{
10-
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
11-
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
12-
StageCheckpointWriter,
10+
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
11+
ChainStateBlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory,
12+
StageCheckpointReader, StageCheckpointWriter,
1313
};
1414
use reth_prune::PrunerBuilder;
1515
use reth_static_file::StaticFileProducer;
@@ -83,6 +83,12 @@ pub struct Pipeline<N: ProviderNodeTypes> {
8383
/// Whether an unwind should fail the syncing process. Should only be set when downloading
8484
/// blocks from trusted sources and expecting them to be valid.
8585
fail_on_unwind: bool,
86+
/// Block that was chosen as a target of the last unwind triggered by
87+
/// [`StageError::DetachedHead`] error.
88+
last_detached_head_unwind_target: Option<B256>,
89+
/// Number of consecutive unwind attempts due to [`StageError::DetachedHead`] for the current
90+
/// fork.
91+
detached_head_attempts: u64,
8692
}
8793

8894
impl<N: ProviderNodeTypes> Pipeline<N> {
@@ -110,6 +116,14 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
110116
pub fn events(&self) -> EventStream<PipelineEvent> {
111117
self.event_sender.new_listener()
112118
}
119+
120+
/// Get a mutable reference to a stage by index.
121+
pub fn stage(
122+
&mut self,
123+
idx: usize,
124+
) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
125+
&mut self.stages[idx]
126+
}
113127
}
114128

115129
impl<N: ProviderNodeTypes> Pipeline<N> {
@@ -383,8 +397,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
383397
) -> Result<ControlFlow, PipelineError> {
384398
let total_stages = self.stages.len();
385399

386-
let stage = &mut self.stages[stage_index];
387-
let stage_id = stage.id();
400+
let stage_id = self.stage(stage_index).id();
388401
let mut made_progress = false;
389402
let target = self.max_block.or(previous_stage);
390403

@@ -422,10 +435,9 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
422435
target,
423436
});
424437

425-
if let Err(err) = stage.execute_ready(exec_input).await {
438+
if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
426439
self.event_sender.notify(PipelineEvent::Error { stage_id });
427-
428-
match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
440+
match self.on_stage_error(stage_id, prev_checkpoint, err)? {
429441
Some(ctrl) => return Ok(ctrl),
430442
None => continue,
431443
};
@@ -443,7 +455,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
443455
target,
444456
});
445457

446-
match stage.execute(&provider_rw, exec_input) {
458+
match self.stage(stage_index).execute(&provider_rw, exec_input) {
447459
Ok(out @ ExecOutput { checkpoint, done }) => {
448460
made_progress |=
449461
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
@@ -468,7 +480,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
468480

469481
UnifiedStorageWriter::commit(provider_rw)?;
470482

471-
stage.post_execute_commit()?;
483+
self.stage(stage_index).post_execute_commit()?;
472484

473485
if done {
474486
let block_number = checkpoint.block_number;
@@ -483,101 +495,118 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
483495
drop(provider_rw);
484496
self.event_sender.notify(PipelineEvent::Error { stage_id });
485497

486-
if let Some(ctrl) =
487-
on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
488-
{
498+
if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
489499
return Ok(ctrl)
490500
}
491501
}
492502
}
493503
}
494504
}
495-
}
496505

497-
fn on_stage_error<N: ProviderNodeTypes>(
498-
factory: &ProviderFactory<N>,
499-
stage_id: StageId,
500-
prev_checkpoint: Option<StageCheckpoint>,
501-
err: StageError,
502-
) -> Result<Option<ControlFlow>, PipelineError> {
503-
if let StageError::DetachedHead { local_head, header, error } = err {
504-
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
505-
506-
// We unwind because of a detached head.
507-
let unwind_to =
508-
local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
509-
Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
510-
} else if let StageError::Block { block, error } = err {
511-
match error {
512-
BlockErrorKind::Validation(validation_error) => {
513-
error!(
514-
target: "sync::pipeline",
515-
stage = %stage_id,
516-
bad_block = %block.block.number,
517-
"Stage encountered a validation error: {validation_error}"
518-
);
519-
520-
// FIXME: When handling errors, we do not commit the database transaction. This
521-
// leads to the Merkle stage not clearing its checkpoint, and restarting from an
522-
// invalid place.
523-
let provider_rw = factory.database_provider_rw()?;
524-
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
525-
provider_rw.save_stage_checkpoint(
526-
StageId::MerkleExecute,
527-
prev_checkpoint.unwrap_or_default(),
528-
)?;
529-
530-
UnifiedStorageWriter::commit(provider_rw)?;
531-
532-
// We unwind because of a validation error. If the unwind itself
533-
// fails, we bail entirely,
534-
// otherwise we restart the execution loop from the
535-
// beginning.
536-
Ok(Some(ControlFlow::Unwind {
537-
target: prev_checkpoint.unwrap_or_default().block_number,
538-
bad_block: block,
539-
}))
506+
fn on_stage_error(
507+
&mut self,
508+
stage_id: StageId,
509+
prev_checkpoint: Option<StageCheckpoint>,
510+
err: StageError,
511+
) -> Result<Option<ControlFlow>, PipelineError> {
512+
if let StageError::DetachedHead { local_head, header, error } = err {
513+
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
514+
515+
if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
516+
if local_head.block.hash == last_detached_head_unwind_target &&
517+
header.block.number == local_head.block.number + 1
518+
{
519+
self.detached_head_attempts += 1;
520+
} else {
521+
self.detached_head_attempts = 1;
522+
}
523+
} else {
524+
self.detached_head_attempts = 1;
540525
}
541-
BlockErrorKind::Execution(execution_error) => {
542-
error!(
543-
target: "sync::pipeline",
544-
stage = %stage_id,
545-
bad_block = %block.block.number,
546-
"Stage encountered an execution error: {execution_error}"
547-
);
548526

549-
// We unwind because of an execution error. If the unwind itself
550-
// fails, we bail entirely,
551-
// otherwise we restart
552-
// the execution loop from the beginning.
553-
Ok(Some(ControlFlow::Unwind {
554-
target: prev_checkpoint.unwrap_or_default().block_number,
555-
bad_block: block,
556-
}))
527+
// We unwind because of a detached head.
528+
let unwind_to = local_head
529+
.block
530+
.number
531+
.saturating_sub(
532+
BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
533+
)
534+
.max(1);
535+
536+
self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
537+
Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
538+
} else if let StageError::Block { block, error } = err {
539+
match error {
540+
BlockErrorKind::Validation(validation_error) => {
541+
error!(
542+
target: "sync::pipeline",
543+
stage = %stage_id,
544+
bad_block = %block.block.number,
545+
"Stage encountered a validation error: {validation_error}"
546+
);
547+
548+
// FIXME: When handling errors, we do not commit the database transaction. This
549+
// leads to the Merkle stage not clearing its checkpoint, and restarting from an
550+
// invalid place.
551+
let provider_rw = self.provider_factory.database_provider_rw()?;
552+
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
553+
provider_rw.save_stage_checkpoint(
554+
StageId::MerkleExecute,
555+
prev_checkpoint.unwrap_or_default(),
556+
)?;
557+
558+
UnifiedStorageWriter::commit(provider_rw)?;
559+
560+
// We unwind because of a validation error. If the unwind itself
561+
// fails, we bail entirely,
562+
// otherwise we restart the execution loop from the
563+
// beginning.
564+
Ok(Some(ControlFlow::Unwind {
565+
target: prev_checkpoint.unwrap_or_default().block_number,
566+
bad_block: block,
567+
}))
568+
}
569+
BlockErrorKind::Execution(execution_error) => {
570+
error!(
571+
target: "sync::pipeline",
572+
stage = %stage_id,
573+
bad_block = %block.block.number,
574+
"Stage encountered an execution error: {execution_error}"
575+
);
576+
577+
// We unwind because of an execution error. If the unwind itself
578+
// fails, we bail entirely,
579+
// otherwise we restart
580+
// the execution loop from the beginning.
581+
Ok(Some(ControlFlow::Unwind {
582+
target: prev_checkpoint.unwrap_or_default().block_number,
583+
bad_block: block,
584+
}))
585+
}
557586
}
558-
}
559-
} else if let StageError::MissingStaticFileData { block, segment } = err {
560-
error!(
561-
target: "sync::pipeline",
562-
stage = %stage_id,
563-
bad_block = %block.block.number,
564-
segment = %segment,
565-
"Stage is missing static file data."
566-
);
587+
} else if let StageError::MissingStaticFileData { block, segment } = err {
588+
error!(
589+
target: "sync::pipeline",
590+
stage = %stage_id,
591+
bad_block = %block.block.number,
592+
segment = %segment,
593+
"Stage is missing static file data."
594+
);
567595

568-
Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
569-
} else if err.is_fatal() {
570-
error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
571-
Err(err.into())
572-
} else {
573-
// On other errors we assume they are recoverable if we discard the
574-
// transaction and run the stage again.
575-
warn!(
576-
target: "sync::pipeline",
577-
stage = %stage_id,
578-
"Stage encountered a non-fatal error: {err}. Retrying..."
579-
);
580-
Ok(None)
596+
Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
597+
} else if err.is_fatal() {
598+
error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
599+
Err(err.into())
600+
} else {
601+
// On other errors we assume they are recoverable if we discard the
602+
// transaction and run the stage again.
603+
warn!(
604+
target: "sync::pipeline",
605+
stage = %stage_id,
606+
"Stage encountered a non-fatal error: {err}. Retrying..."
607+
);
608+
Ok(None)
609+
}
581610
}
582611
}
583612

crates/stages/api/src/stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,4 @@ pub trait StageExt<Provider>: Stage<Provider> {
271271
}
272272
}
273273

274-
impl<Provider, S: Stage<Provider>> StageExt<Provider> for S {}
274+
impl<Provider, S: Stage<Provider> + ?Sized> StageExt<Provider> for S {}

0 commit comments

Comments
 (0)