Skip to content

Commit b6a1c86

Browse files
authored
Use spawn_async in ByRoot handling workers (#5557)
* Use spawn_async in ByRoot handling workers * box large variants
1 parent 116a55e commit b6a1c86

File tree

3 files changed

+179
-234
lines changed

3 files changed

+179
-234
lines changed

beacon_node/beacon_processor/src/lib.rs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ pub enum BlockingOrAsync {
571571
/// queuing specifics.
572572
pub enum Work<E: EthSpec> {
573573
GossipAttestation {
574-
attestation: GossipAttestationPackage<E>,
574+
attestation: Box<GossipAttestationPackage<E>>,
575575
process_individual: Box<dyn FnOnce(GossipAttestationPackage<E>) + Send + Sync>,
576576
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
577577
},
@@ -583,7 +583,7 @@ pub enum Work<E: EthSpec> {
583583
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
584584
},
585585
GossipAggregate {
586-
aggregate: GossipAggregatePackage<E>,
586+
aggregate: Box<GossipAggregatePackage<E>>,
587587
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
588588
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
589589
},
@@ -624,8 +624,8 @@ pub enum Work<E: EthSpec> {
624624
ChainSegment(AsyncFn),
625625
ChainSegmentBackfill(AsyncFn),
626626
Status(BlockingFn),
627-
BlocksByRangeRequest(BlockingFnWithManualSendOnIdle),
628-
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
627+
BlocksByRangeRequest(AsyncFn),
628+
BlocksByRootsRequest(AsyncFn),
629629
BlobsByRangeRequest(BlockingFn),
630630
BlobsByRootsRequest(BlockingFn),
631631
GossipBlsToExecutionChange(BlockingFn),
@@ -1015,7 +1015,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
10151015
process_individual: _,
10161016
process_batch,
10171017
} => {
1018-
aggregates.push(aggregate);
1018+
aggregates.push(*aggregate);
10191019
if process_batch_opt.is_none() {
10201020
process_batch_opt = Some(process_batch);
10211021
}
@@ -1075,7 +1075,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
10751075
process_individual: _,
10761076
process_batch,
10771077
} => {
1078-
attestations.push(attestation);
1078+
attestations.push(*attestation);
10791079
if process_batch_opt.is_none() {
10801080
process_batch_opt = Some(process_batch);
10811081
}
@@ -1445,7 +1445,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14451445
process_individual,
14461446
process_batch: _,
14471447
} => task_spawner.spawn_blocking(move || {
1448-
process_individual(attestation);
1448+
process_individual(*attestation);
14491449
}),
14501450
Work::GossipAttestationBatch {
14511451
attestations,
@@ -1458,7 +1458,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14581458
process_individual,
14591459
process_batch: _,
14601460
} => task_spawner.spawn_blocking(move || {
1461-
process_individual(aggregate);
1461+
process_individual(*aggregate);
14621462
}),
14631463
Work::GossipAggregateBatch {
14641464
aggregates,
@@ -1493,7 +1493,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14931493
task_spawner.spawn_blocking(process_fn)
14941494
}
14951495
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
1496-
task_spawner.spawn_blocking_with_manual_send_idle(work)
1496+
task_spawner.spawn_async(work)
14971497
}
14981498
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
14991499
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
@@ -1555,23 +1555,6 @@ impl TaskSpawner {
15551555
WORKER_TASK_NAME,
15561556
)
15571557
}
1558-
1559-
/// Spawn a blocking task, passing the `SendOnDrop` into the task.
1560-
///
1561-
/// ## Notes
1562-
///
1563-
/// Users must ensure the `SendOnDrop` is dropped at the appropriate time!
1564-
pub fn spawn_blocking_with_manual_send_idle<F>(self, task: F)
1565-
where
1566-
F: FnOnce(SendOnDrop) + Send + 'static,
1567-
{
1568-
self.executor.spawn_blocking(
1569-
|| {
1570-
task(self.send_idle_on_drop);
1571-
},
1572-
WORKER_TASK_NAME,
1573-
)
1574-
}
15751558
}
15761559

15771560
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
102102
self.try_send(BeaconWorkEvent {
103103
drop_during_sync: true,
104104
work: Work::GossipAttestation {
105-
attestation: GossipAttestationPackage {
105+
attestation: Box::new(GossipAttestationPackage {
106106
message_id,
107107
peer_id,
108108
attestation: Box::new(attestation),
109109
subnet_id,
110110
should_import,
111111
seen_timestamp,
112-
},
112+
}),
113113
process_individual: Box::new(process_individual),
114114
process_batch: Box::new(process_batch),
115115
},
@@ -148,13 +148,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
148148
self.try_send(BeaconWorkEvent {
149149
drop_during_sync: true,
150150
work: Work::GossipAggregate {
151-
aggregate: GossipAggregatePackage {
151+
aggregate: Box::new(GossipAggregatePackage {
152152
message_id,
153153
peer_id,
154154
aggregate: Box::new(aggregate),
155155
beacon_block_root,
156156
seen_timestamp,
157-
},
157+
}),
158158
process_individual: Box::new(process_individual),
159159
process_batch: Box::new(process_batch),
160160
},
@@ -508,20 +508,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
508508
request: BlocksByRangeRequest,
509509
) -> Result<(), Error<T::EthSpec>> {
510510
let processor = self.clone();
511-
let process_fn = move |send_idle_on_drop| {
511+
let process_fn = async move {
512512
let executor = processor.executor.clone();
513-
processor.handle_blocks_by_range_request(
514-
executor,
515-
send_idle_on_drop,
516-
peer_id,
517-
request_id,
518-
request,
519-
)
513+
processor
514+
.handle_blocks_by_range_request(executor, peer_id, request_id, request)
515+
.await;
520516
};
521517

522518
self.try_send(BeaconWorkEvent {
523519
drop_during_sync: false,
524-
work: Work::BlocksByRangeRequest(Box::new(process_fn)),
520+
work: Work::BlocksByRangeRequest(Box::pin(process_fn)),
525521
})
526522
}
527523

@@ -533,20 +529,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
533529
request: BlocksByRootRequest,
534530
) -> Result<(), Error<T::EthSpec>> {
535531
let processor = self.clone();
536-
let process_fn = move |send_idle_on_drop| {
532+
let process_fn = async move {
537533
let executor = processor.executor.clone();
538-
processor.handle_blocks_by_root_request(
539-
executor,
540-
send_idle_on_drop,
541-
peer_id,
542-
request_id,
543-
request,
544-
)
534+
processor
535+
.handle_blocks_by_root_request(executor, peer_id, request_id, request)
536+
.await;
545537
};
546538

547539
self.try_send(BeaconWorkEvent {
548540
drop_during_sync: false,
549-
work: Work::BlocksByRootsRequest(Box::new(process_fn)),
541+
work: Work::BlocksByRootsRequest(Box::pin(process_fn)),
550542
})
551543
}
552544

0 commit comments

Comments
 (0)