Skip to content

Commit eb4a6d5

Browse files
authored
fix: introduce backpressure from consensus to the networking layer by using bounded channels (dfinity#2340)
1 parent 4161756 commit eb4a6d5

File tree

4 files changed

+34
-42
lines changed

4 files changed

+34
-42
lines changed

rs/p2p/artifact_manager/src/lib.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,7 @@ pub fn create_ingress_handlers<
279279
metrics_registry: MetricsRegistry,
280280
) -> Box<dyn JoinGuard> {
281281
let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler);
282-
let inbound_rx_stream =
283-
tokio_stream::wrappers::UnboundedReceiverStream::new(channel.inbound_rx);
282+
let inbound_rx_stream = tokio_stream::wrappers::ReceiverStream::new(channel.inbound_rx);
284283
let user_ingress_rx_stream =
285284
tokio_stream::wrappers::UnboundedReceiverStream::new(user_ingress_rx);
286285
run_artifact_processor(
@@ -307,8 +306,7 @@ pub fn create_artifact_handler<
307306
) -> Box<dyn JoinGuard> {
308307
let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect();
309308
let client = Processor::new(pool, change_set_producer);
310-
let inbound_rx_stream =
311-
tokio_stream::wrappers::UnboundedReceiverStream::new(channel.inbound_rx);
309+
let inbound_rx_stream = tokio_stream::wrappers::ReceiverStream::new(channel.inbound_rx);
312310
run_artifact_processor(
313311
time_source.clone(),
314312
metrics_registry,
@@ -446,7 +444,7 @@ mod tests {
446444
use ic_types::artifact::UnvalidatedArtifactMutation;
447445
use std::{convert::Infallible, sync::Arc};
448446
use tokio::sync::mpsc::channel;
449-
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
447+
use tokio_stream::wrappers::ReceiverStream;
450448

451449
use crate::{run_artifact_processor, ArtifactProcessor};
452450

@@ -534,11 +532,10 @@ mod tests {
534532

535533
let time_source = Arc::new(SysTimeSource::new());
536534
let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100);
537-
#[allow(clippy::disallowed_methods)]
538-
let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel();
535+
let (_, inbound_rx) = tokio::sync::mpsc::channel(100);
539536
run_artifact_processor::<
540537
DummyArtifact,
541-
UnboundedReceiverStream<UnvalidatedArtifactMutation<DummyArtifact>>,
538+
ReceiverStream<UnvalidatedArtifactMutation<DummyArtifact>>,
542539
>(
543540
time_source,
544541
MetricsRegistry::default(),

rs/p2p/consensus_manager/src/lib.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use phantom_newtype::AmountOf;
1616
use tokio::{
1717
runtime::Handle,
1818
sync::{
19-
mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
19+
mpsc::{Receiver, Sender},
2020
watch,
2121
},
2222
};
@@ -29,10 +29,12 @@ type StartConsensusManagerFn =
2929
Box<dyn FnOnce(Arc<dyn Transport>, watch::Receiver<SubnetTopology>) -> Vec<Shutdown>>;
3030

3131
/// Same order of magnitude as the number of active artifacts.
32-
const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000;
32+
/// Please note that we put fairly big number mainly for perfomance reasons so either side of a channel doesn't await.
33+
/// The replica code should be designed in such a way that if we put a channel of size 1, the protocol should still work.
34+
const MAX_IO_CHANNEL_SIZE: usize = 100_000;
3335

3436
pub type AbortableBroadcastSender<T> = Sender<ArtifactTransmit<T>>;
35-
pub type AbortableBroadcastReceiver<T> = UnboundedReceiver<UnvalidatedArtifactMutation<T>>;
37+
pub type AbortableBroadcastReceiver<T> = Receiver<UnvalidatedArtifactMutation<T>>;
3638

3739
pub struct AbortableBroadcastChannel<T: IdentifiableArtifact> {
3840
pub outbound_tx: AbortableBroadcastSender<T>,
@@ -69,15 +71,8 @@ impl AbortableBroadcastChannelBuilder {
6971
(assembler, assembler_router): (F, Router),
7072
slot_limit: usize,
7173
) -> AbortableBroadcastChannel<Artifact> {
72-
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE);
73-
// Making this channel bounded can be problematic since we don't have true multiplexing
74-
// of P2P messages.
75-
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
76-
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
77-
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
78-
// independent of consensus.
79-
#[allow(clippy::disallowed_methods)]
80-
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel();
74+
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_IO_CHANNEL_SIZE);
75+
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(MAX_IO_CHANNEL_SIZE);
8176

8277
assert!(uri_prefix::<WireArtifact>()
8378
.chars()
@@ -143,7 +138,7 @@ fn start_consensus_manager<Artifact, WireArtifact, Assembler>(
143138
outbound_transmits: Receiver<ArtifactTransmit<Artifact>>,
144139
// Slot updates received from peers
145140
slot_updates_rx: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
146-
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
141+
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
147142
assembler: Assembler,
148143
transport: Arc<dyn Transport>,
149144
topology_watcher: watch::Receiver<SubnetTopology>,

rs/p2p/consensus_manager/src/receiver.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#![allow(clippy::disallowed_methods)]
2-
31
use std::collections::{hash_map::Entry, HashMap, HashSet};
42

53
use crate::{
@@ -28,7 +26,7 @@ use tokio::{
2826
runtime::Handle,
2927
select,
3028
sync::{
31-
mpsc::{Receiver, Sender, UnboundedSender},
29+
mpsc::{Receiver, Sender},
3230
watch,
3331
},
3432
task::JoinSet,
@@ -181,7 +179,8 @@ pub(crate) struct ConsensusManagerReceiver<
181179

182180
// Receive side:
183181
slot_updates_rx: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
184-
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
182+
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
183+
185184
artifact_assembler: Assembler,
186185

187186
slot_table: HashMap<NodeId, HashMap<SlotNumber, SlotEntry<WireArtifact::Id>>>,
@@ -207,7 +206,7 @@ where
207206
rt_handle: Handle,
208207
slot_updates_rx: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
209208
artifact_assembler: Assembler,
210-
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
209+
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
211210
topology_watcher: watch::Receiver<SubnetTopology>,
212211
slot_limit: usize,
213212
) -> Shutdown {
@@ -449,7 +448,7 @@ where
449448
// Only first peer for specific artifact ID is considered for push
450449
artifact: Option<(WireArtifact, NodeId)>,
451450
mut peer_rx: watch::Receiver<PeerCounter>,
452-
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
451+
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
453452
artifact_assembler: Assembler,
454453
metrics: ConsensusManagerMetrics,
455454
cancellation_token: CancellationToken,
@@ -480,17 +479,22 @@ where
480479
match assemble_result {
481480
AssembleResult::Done { message, peer_id } => {
482481
let id = message.id();
483-
// Send artifact to pool
484-
if sender.send(UnvalidatedArtifactMutation::Insert((message, peer_id))).is_err() {
482+
// Sends artifact to the pool. In theory this channel can get full if there is a bug in consensus and each round takes very long time.
483+
// However, the duration of this await is not IO-bound so for the time being it is fine that sending over the channel is not done as
484+
// part of a select.
485+
if sender.send(UnvalidatedArtifactMutation::Insert((message, peer_id))).await.is_err() {
486+
485487
error!(log, "The receiving side of the channel, owned by the consensus thread, was closed. This should be infallible situation since a cancellation token should be received. If this happens then most likely there is very subnet synchonization bug.");
486488
}
487489

488490
// wait for deletion from peers
489491
// TODO: NET-1774
490492
let _ = peer_rx.wait_for(|p| p.is_empty()).await;
491493

492-
// Purge from the unvalidated pool
493-
if sender.send(UnvalidatedArtifactMutation::Remove(id)).is_err() {
494+
// Purge artifact from the unvalidated pool. In theory this channel can get full if there is a bug in consensus and each round takes very long time.
495+
// However, the duration of this await is not IO-bound so for the time being it is fine that sending over the channel is not done as
496+
// part of a select.
497+
if sender.send(UnvalidatedArtifactMutation::Remove(id)).await.is_err() {
494498
error!(log, "The receiving side of the channel, owned by the consensus thread, was closed. This should be infallible situation since a cancellation token should be received. If this happens then most likely there is very subnet synchonization bug.");
495499
}
496500
metrics
@@ -596,7 +600,7 @@ mod tests {
596600
use ic_test_utilities_logger::with_test_replica_logger;
597601
use ic_types::{artifact::IdentifiableArtifact, RegistryVersion};
598602
use ic_types_test_utils::ids::{NODE_1, NODE_2};
599-
use tokio::{sync::mpsc::UnboundedReceiver, time::timeout};
603+
use tokio::time::timeout;
600604
use tower::util::ServiceExt;
601605

602606
use super::*;
@@ -606,7 +610,7 @@ mod tests {
606610
struct ReceiverManagerBuilder {
607611
// Slot updates received from peers
608612
slot_updates_rx: Receiver<(SlotUpdate<U64Artifact>, NodeId, ConnId)>,
609-
sender: UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
613+
sender: Sender<UnvalidatedArtifactMutation<U64Artifact>>,
610614
artifact_assembler: MockArtifactAssembler,
611615
topology_watcher: watch::Receiver<SubnetTopology>,
612616
slot_limit: usize,
@@ -618,7 +622,7 @@ mod tests {
618622
ConsensusManagerReceiver<U64Artifact, U64Artifact, MockArtifactAssembler>;
619623

620624
struct Channels {
621-
unvalidated_artifact_receiver: UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
625+
unvalidated_artifact_receiver: Receiver<UnvalidatedArtifactMutation<U64Artifact>>,
622626
}
623627

624628
impl ReceiverManagerBuilder {
@@ -634,8 +638,7 @@ mod tests {
634638

635639
fn new() -> Self {
636640
let (_, slot_updates_rx) = tokio::sync::mpsc::channel(100);
637-
#[allow(clippy::disallowed_methods)]
638-
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel();
641+
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::channel(1000);
639642
let (_, topology_watcher) = watch::channel(SubnetTopology::default());
640643
let artifact_assembler =
641644
Self::make_mock_artifact_assembler_with_clone(MockArtifactAssembler::default);

rs/p2p/test_utils/src/turmoil.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use tokio::{
3636
select,
3737
sync::{mpsc, oneshot, watch, Notify},
3838
};
39-
use tokio_stream::wrappers::UnboundedReceiverStream;
39+
use tokio_stream::wrappers::ReceiverStream;
4040
use turmoil::Sim;
4141

4242
pub struct CustomUdp {
@@ -443,16 +443,13 @@ pub fn waiter_fut(
443443
#[allow(clippy::type_complexity)]
444444
pub fn start_test_processor(
445445
outbound_tx: mpsc::Sender<ArtifactTransmit<U64Artifact>>,
446-
inbound_rx: mpsc::UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
446+
inbound_rx: mpsc::Receiver<UnvalidatedArtifactMutation<U64Artifact>>,
447447
pool: Arc<RwLock<TestConsensus<U64Artifact>>>,
448448
change_set_producer: TestConsensus<U64Artifact>,
449449
) -> Box<dyn JoinGuard> {
450450
let time_source = Arc::new(SysTimeSource::new());
451451
let client = ic_artifact_manager::Processor::new(pool, change_set_producer);
452-
run_artifact_processor::<
453-
U64Artifact,
454-
UnboundedReceiverStream<UnvalidatedArtifactMutation<U64Artifact>>,
455-
>(
452+
run_artifact_processor::<U64Artifact, ReceiverStream<UnvalidatedArtifactMutation<U64Artifact>>>(
456453
time_source,
457454
MetricsRegistry::default(),
458455
Box::new(client),

0 commit comments

Comments
 (0)