Skip to content

Commit 64364d1

Browse files
committed
gossipsub: implement IDONTWANT messages
1 parent 7846a80 commit 64364d1

File tree

6 files changed

+140
-1
lines changed

6 files changed

+140
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/gossipsub/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ tracing = "0.1.37"
3838
void = "1.0.2"
3939

4040
prometheus-client = "0.22.0"
41+
lru.workspace = true
4142

4243
[dev-dependencies]
4344
quickcheck = { workspace = true }

beacon_node/gossipsub/src/behaviour.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ use std::{
2525
collections::{BTreeSet, HashMap},
2626
fmt,
2727
net::IpAddr,
28+
num::NonZeroUsize,
2829
task::{Context, Poll},
2930
time::Duration,
3031
};
3132

3233
use futures::StreamExt;
3334
use futures_ticker::Ticker;
35+
use lru::LruCache;
3436
use prometheus_client::registry::Registry;
3537
use rand::{seq::SliceRandom, thread_rng};
3638

@@ -45,6 +47,8 @@ use libp2p::swarm::{
4547
THandlerOutEvent, ToSwarm,
4648
};
4749

50+
use crate::types::IDontWant;
51+
4852
use super::gossip_promises::GossipPromises;
4953
use super::handler::{Handler, HandlerEvent, HandlerIn};
5054
use super::mcache::MessageCache;
@@ -74,6 +78,9 @@ use std::{cmp::Ordering::Equal, fmt::Debug};
7478
#[cfg(test)]
7579
mod tests;
7680

81+
/// IDONTWANT Cache capacity.
82+
const IDONTWANT_CAP: usize = 100;
83+
7784
/// Determines if published messages should be signed or not.
7885
///
7986
/// Without signing, a number of privacy preserving modes can be selected.
@@ -1796,6 +1803,9 @@ where
17961803
// Calculate the message id on the transformed data.
17971804
let msg_id = self.config.message_id(&message);
17981805

1806+
// Broadcast IDONTWANT messages.
1807+
self.send_idontwant(&raw_message, &msg_id, propagation_source);
1808+
17991809
// Check the validity of the message
18001810
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
18011811
// and instead continually penalize peers that repeatedly send this message.
@@ -2656,6 +2666,54 @@ where
26562666
}
26572667
}
26582668

2669+
/// Helper function which sends an IDONTWANT message to mesh\[topic\] peers.
2670+
fn send_idontwant(
2671+
&mut self,
2672+
message: &RawMessage,
2673+
msg_id: &MessageId,
2674+
propagation_source: &PeerId,
2675+
) {
2676+
let Some(mesh_peers) = self.mesh.get(&message.topic) else {
2677+
return;
2678+
};
2679+
2680+
let recipient_peers = mesh_peers.iter().filter(|peer_id| {
2681+
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
2682+
});
2683+
2684+
for peer_id in recipient_peers {
2685+
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
2686+
tracing::error!(peer = %peer_id,
2687+
"Could not IDONTWANT, peer doesn't exist in connected peer list");
2688+
continue;
2689+
};
2690+
2691+
// Only gossipsub 1.2 peers support IDONTWANT.
2692+
if peer.kind == PeerKind::Gossipsubv1_2 {
2693+
continue;
2694+
}
2695+
2696+
if peer
2697+
.sender
2698+
.idontwant(IDontWant {
2699+
message_ids: vec![msg_id.clone()],
2700+
})
2701+
.is_err()
2702+
{
2703+
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IDONTWANT");
2704+
2705+
if let Some((peer_score, ..)) = &mut self.peer_score {
2706+
peer_score.failed_message_slow_peer(peer_id);
2707+
}
2708+
// Increment failed message count
2709+
self.failed_messages
2710+
.entry(*peer_id)
2711+
.or_default()
2712+
.non_priority += 1;
2713+
}
2714+
}
2715+
}
2716+
26592717
/// Helper function which forwards a message to mesh\[topic\] peers.
26602718
///
26612719
/// Returns true if at least one peer was messaged.
@@ -2709,6 +2767,11 @@ where
27092767
if !recipient_peers.is_empty() {
27102768
for peer_id in recipient_peers.iter() {
27112769
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2770+
if peer.dont_send.get(msg_id).is_some() {
2771+
tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
2772+
continue;
2773+
}
2774+
27122775
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
27132776
if peer
27142777
.sender
@@ -3058,6 +3121,7 @@ where
30583121
connections: vec![],
30593122
sender: RpcSender::new(self.config.connection_handler_queue_len()),
30603123
topics: Default::default(),
3124+
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
30613125
});
30623126
// Add the new connection
30633127
connected_peer.connections.push(connection_id);
@@ -3088,6 +3152,7 @@ where
30883152
connections: vec![],
30893153
sender: RpcSender::new(self.config.connection_handler_queue_len()),
30903154
topics: Default::default(),
3155+
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
30913156
});
30923157
// Add the new connection
30933158
connected_peer.connections.push(connection_id);
@@ -3246,6 +3311,17 @@ where
32463311
peers,
32473312
backoff,
32483313
}) => prune_msgs.push((topic_hash, peers, backoff)),
3314+
ControlAction::IDontWant(IDontWant { message_ids }) => {
3315+
let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3316+
else {
3317+
tracing::error!(peer = %propagation_source,
3318+
"Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3319+
continue;
3320+
};
3321+
for message_id in message_ids {
3322+
peer.dont_send.push(message_id, ());
3323+
}
3324+
}
32493325
}
32503326
}
32513327
if !ihave_msgs.is_empty() {

beacon_node/gossipsub/src/behaviour/tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ where
240240
kind: kind.clone().unwrap_or(PeerKind::Floodsub),
241241
connections: vec![connection_id],
242242
topics: Default::default(),
243+
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
243244
sender,
244245
},
245246
);
@@ -625,6 +626,7 @@ fn test_join() {
625626
kind: PeerKind::Floodsub,
626627
connections: vec![connection_id],
627628
topics: Default::default(),
629+
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
628630
sender,
629631
},
630632
);
@@ -1020,6 +1022,7 @@ fn test_get_random_peers() {
10201022
connections: vec![ConnectionId::new_unchecked(0)],
10211023
topics: topics.clone(),
10221024
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
1025+
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
10231026
},
10241027
);
10251028
}

beacon_node/gossipsub/src/protocol.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ use void::Void;
4141

4242
pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
4343

44+
pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
45+
protocol: StreamProtocol::new("/meshsub/1.2.0"),
46+
kind: PeerKind::Gossipsubv1_1,
47+
};
4448
pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
4549
protocol: StreamProtocol::new("/meshsub/1.1.0"),
4650
kind: PeerKind::Gossipsubv1_1,
@@ -70,7 +74,11 @@ impl Default for ProtocolConfig {
7074
Self {
7175
max_transmit_size: 65536,
7276
validation_mode: ValidationMode::Strict,
73-
protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL],
77+
protocol_ids: vec![
78+
GOSSIPSUB_1_2_0_PROTOCOL,
79+
GOSSIPSUB_1_1_0_PROTOCOL,
80+
GOSSIPSUB_1_0_0_PROTOCOL,
81+
],
7482
}
7583
}
7684
}

beacon_node/gossipsub/src/types.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use futures_timer::Delay;
2828
use instant::Duration;
2929
use libp2p::identity::PeerId;
3030
use libp2p::swarm::ConnectionId;
31+
use lru::LruCache;
3132
use prometheus_client::encoding::EncodeLabelValue;
3233
use quick_protobuf::MessageWrite;
3334
use std::collections::BTreeSet;
@@ -121,11 +122,15 @@ pub(crate) struct PeerConnections {
121122
pub(crate) sender: RpcSender,
122123
/// Subscribed topics.
123124
pub(crate) topics: BTreeSet<TopicHash>,
125+
/// Don't send messages.
126+
pub(crate) dont_send: LruCache<MessageId, ()>,
124127
}
125128

126129
/// Describes the types of peers that can exist in the gossipsub context.
127130
#[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)]
128131
pub enum PeerKind {
132+
/// A gossipsub 1.2 peer.
133+
Gossipsubv1_2,
129134
/// A gossipsub 1.1 peer.
130135
Gossipsubv1_1,
131136
/// A gossipsub 1.0 peer.
@@ -257,6 +262,8 @@ pub enum ControlAction {
257262
Graft(Graft),
258263
/// The node has been removed from the mesh - Prune control message.
259264
Prune(Prune),
265+
/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
266+
IDontWant(IDontWant),
260267
}
261268

262269
/// Node broadcasts known messages per topic - IHave control message.
@@ -293,6 +300,13 @@ pub struct Prune {
293300
pub(crate) backoff: Option<u64>,
294301
}
295302

303+
/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
304+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
305+
pub struct IDontWant {
306+
/// A list of known message ids (peer_id + sequence _number) as a string.
307+
pub(crate) message_ids: Vec<MessageId>,
308+
}
309+
296310
/// A Gossipsub RPC message sent.
297311
#[derive(Debug)]
298312
pub enum RpcOut {
@@ -314,6 +328,8 @@ pub enum RpcOut {
314328
IHave(IHave),
315329
/// Send a IWant control message.
316330
IWant(IWant),
331+
/// Send a IDontWant control message.
332+
IDontWant(IDontWant),
317333
}
318334

319335
impl RpcOut {
@@ -374,6 +390,7 @@ impl From<RpcOut> for proto::RPC {
374390
iwant: vec![],
375391
graft: vec![],
376392
prune: vec![],
393+
idontwant: vec![],
377394
}),
378395
},
379396
RpcOut::IWant(IWant { message_ids }) => proto::RPC {
@@ -386,6 +403,7 @@ impl From<RpcOut> for proto::RPC {
386403
}],
387404
graft: vec![],
388405
prune: vec![],
406+
idontwant: vec![],
389407
}),
390408
},
391409
RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
@@ -398,6 +416,7 @@ impl From<RpcOut> for proto::RPC {
398416
topic_id: Some(topic_hash.into_string()),
399417
}],
400418
prune: vec![],
419+
idontwant: vec![],
401420
}),
402421
},
403422
RpcOut::Prune(Prune {
@@ -424,9 +443,23 @@ impl From<RpcOut> for proto::RPC {
424443
.collect(),
425444
backoff,
426445
}],
446+
idontwant: vec![],
427447
}),
428448
}
429449
}
450+
RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
451+
publish: Vec::new(),
452+
subscriptions: Vec::new(),
453+
control: Some(proto::ControlMessage {
454+
ihave: vec![],
455+
iwant: vec![],
456+
graft: vec![],
457+
prune: vec![],
458+
idontwant: vec![proto::ControlIDontWant {
459+
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
460+
}],
461+
}),
462+
},
430463
}
431464
}
432465
}
@@ -485,6 +518,7 @@ impl From<Rpc> for proto::RPC {
485518
iwant: Vec::new(),
486519
graft: Vec::new(),
487520
prune: Vec::new(),
521+
idontwant: Vec::new(),
488522
};
489523

490524
let empty_control_msg = rpc.control_msgs.is_empty();
@@ -533,6 +567,12 @@ impl From<Rpc> for proto::RPC {
533567
};
534568
control.prune.push(rpc_prune);
535569
}
570+
ControlAction::IDontWant(IDontWant { message_ids }) => {
571+
let rpc_iwant = proto::ControlIDontWant {
572+
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
573+
};
574+
control.idontwant.push(rpc_iwant);
575+
}
536576
}
537577
}
538578

@@ -571,6 +611,7 @@ impl PeerKind {
571611
Self::Floodsub => "Floodsub",
572612
Self::Gossipsub => "Gossipsub v1.0",
573613
Self::Gossipsubv1_1 => "Gossipsub v1.1",
614+
Self::Gossipsubv1_2 => "Gossipsub v1.2",
574615
}
575616
}
576617
}
@@ -657,6 +698,15 @@ impl RpcSender {
657698
.map_err(|err| err.into_inner())
658699
}
659700

701+
/// Send a `RpcOut::IWant` message to the `RpcReceiver`
702+
/// this is low priority, if the queue is full an Err is returned.
703+
#[allow(clippy::result_large_err)]
704+
pub(crate) fn idontwant(&mut self, idontwant: IDontWant) -> Result<(), RpcOut> {
705+
self.non_priority_sender
706+
.try_send(RpcOut::IDontWant(idontwant))
707+
.map_err(|err| err.into_inner())
708+
}
709+
660710
/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
661711
/// this is high priority.
662712
pub(crate) fn subscribe(&mut self, topic: TopicHash) {

0 commit comments

Comments
 (0)