Skip to content

Commit 8248347

Browse files
committed
feat: impl filter with transport
1 parent 05aa7b6 commit 8248347

File tree

5 files changed

+82
-38
lines changed

5 files changed

+82
-38
lines changed

network/src/network.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,7 @@ impl NetworkService {
11151115
let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
11161116
let count = max((config.max_outbound_peers >> 1) as usize, 1);
11171117
let mut addrs: Vec<_> = peer_store
1118-
.fetch_addrs_to_attempt(count, *target)
1118+
.fetch_addrs_to_attempt(count, *target, |_| true)
11191119
.into_iter()
11201120
.map(|paddr| paddr.addr)
11211121
.collect();

network/src/peer_store/peer_store_impl.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,15 @@ impl PeerStore {
157157
}
158158

159159
/// Get peers for outbound connection, this method randomly return recently connected peer addrs
160-
pub fn fetch_addrs_to_attempt(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
160+
pub fn fetch_addrs_to_attempt<F>(
161+
&mut self,
162+
count: usize,
163+
required_flags: Flags,
164+
filter: F,
165+
) -> Vec<AddrInfo>
166+
where
167+
F: Fn(&AddrInfo) -> bool,
168+
{
161169
// Get info:
162170
// 1. Not already connected
163171
// 2. Connected within 3 days
@@ -167,9 +175,10 @@ impl PeerStore {
167175
let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
168176

169177
let filter = |peer_addr: &AddrInfo| {
170-
extract_peer_id(&peer_addr.addr)
171-
.map(|peer_id| !peers.contains_key(&peer_id))
172-
.unwrap_or_default()
178+
filter(peer_addr)
179+
&& extract_peer_id(&peer_addr.addr)
180+
.map(|peer_id| !peers.contains_key(&peer_id))
181+
.unwrap_or_default()
173182
&& peer_addr
174183
.connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL))
175184
&& required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
@@ -181,7 +190,10 @@ impl PeerStore {
181190

182191
/// Get peers for feeler connection, this method randomly return peer addrs that we never
183192
/// connected to.
184-
pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec<AddrInfo> {
193+
pub fn fetch_addrs_to_feeler<F>(&mut self, count: usize, filter: F) -> Vec<AddrInfo>
194+
where
195+
F: Fn(&AddrInfo) -> bool,
196+
{
185197
// Get info:
186198
// 1. Not already connected
187199
// 2. Not already tried in a minute
@@ -192,9 +204,10 @@ impl PeerStore {
192204
let peers = &self.connected_peers;
193205

194206
let filter = |peer_addr: &AddrInfo| {
195-
extract_peer_id(&peer_addr.addr)
196-
.map(|peer_id| !peers.contains_key(&peer_id))
197-
.unwrap_or_default()
207+
filter(peer_addr)
208+
&& extract_peer_id(&peer_addr.addr)
209+
.map(|peer_id| !peers.contains_key(&peer_id))
210+
.unwrap_or_default()
198211
&& !peer_addr.tried_in_last_minute(now_ms)
199212
&& !peer_addr.connected(|t| t > addr_expired_ms)
200213
};

network/src/protocols/tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ fn test_discovery_behavior() {
488488
let mut locked = node1.network_state.peer_store.lock();
489489

490490
locked
491-
.fetch_addrs_to_feeler(6)
491+
.fetch_addrs_to_feeler(6, |_| true)
492492
.into_iter()
493493
.map(|peer| peer.addr)
494494
.flat_map(|addr| {

network/src/services/outbound_peer.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,23 @@ impl OutboundPeerService {
5555

5656
fn dial_feeler(&mut self) {
5757
let now_ms = unix_time_as_millis();
58+
let filter: Box<dyn Fn(&AddrInfo) -> bool> = match self.transport_type {
59+
TransportType::Tcp => Box::new(|_| true),
60+
TransportType::Ws => Box::new(|peer_addr: &AddrInfo| {
61+
peer_addr
62+
.addr
63+
.iter()
64+
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Tcp(_)))
65+
}),
66+
TransportType::Wss => Box::new(|peer_addr: &AddrInfo| {
67+
peer_addr
68+
.addr
69+
.iter()
70+
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_)))
71+
}),
72+
};
5873
let attempt_peers = self.network_state.with_peer_store_mut(|peer_store| {
59-
let paddrs = peer_store.fetch_addrs_to_feeler(FEELER_CONNECTION_COUNT);
74+
let paddrs = peer_store.fetch_addrs_to_feeler(FEELER_CONNECTION_COUNT, filter);
6075
for paddr in paddrs.iter() {
6176
// mark addr as tried
6277
if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) {
@@ -97,8 +112,24 @@ impl OutboundPeerService {
97112

98113
let target = &self.network_state.required_flags;
99114

115+
let filter: Box<dyn Fn(&AddrInfo) -> bool> = match self.transport_type {
116+
TransportType::Tcp => Box::new(|_| true),
117+
TransportType::Ws => Box::new(|peer_addr: &AddrInfo| {
118+
peer_addr
119+
.addr
120+
.iter()
121+
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Tcp(_)))
122+
}),
123+
TransportType::Wss => Box::new(|peer_addr: &AddrInfo| {
124+
peer_addr
125+
.addr
126+
.iter()
127+
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_)))
128+
}),
129+
};
130+
100131
let f = |peer_store: &mut PeerStore, number: usize, now_ms: u64| -> Vec<AddrInfo> {
101-
let paddrs = peer_store.fetch_addrs_to_attempt(number, *target);
132+
let paddrs = peer_store.fetch_addrs_to_attempt(number, *target, filter);
102133
for paddr in paddrs.iter() {
103134
// mark addr as tried
104135
if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) {

network/src/tests/peer_store.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ fn test_add_addr() {
3131
let mut peer_store: PeerStore = Default::default();
3232
assert_eq!(
3333
peer_store
34-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
34+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
3535
.len(),
3636
0
3737
);
3838
let addr = random_addr();
3939
peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap();
40-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
40+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
4141
// we have not connected yet, so return 0
4242
assert_eq!(
4343
peer_store
44-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
44+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
4545
.len(),
4646
0
4747
);
@@ -141,14 +141,14 @@ fn test_attempt_ban() {
141141

142142
assert_eq!(
143143
peer_store
144-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
144+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
145145
.len(),
146146
1
147147
);
148148
peer_store.ban_addr(&addr, 10_000, "no reason".into());
149149
assert_eq!(
150150
peer_store
151-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
151+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
152152
.len(),
153153
0
154154
);
@@ -161,7 +161,7 @@ fn test_fetch_addrs_to_attempt() {
161161

162162
let mut peer_store: PeerStore = Default::default();
163163
assert!(peer_store
164-
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
164+
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
165165
.is_empty());
166166
let addr = random_addr();
167167
peer_store
@@ -176,13 +176,13 @@ fn test_fetch_addrs_to_attempt() {
176176

177177
assert_eq!(
178178
peer_store
179-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
179+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
180180
.len(),
181181
1
182182
);
183183
peer_store.add_connected_peer(addr, SessionType::Outbound);
184184
assert!(peer_store
185-
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
185+
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
186186
.is_empty());
187187
}
188188

@@ -199,18 +199,18 @@ fn test_fetch_addrs_to_attempt_or_feeler() {
199199

200200
assert_eq!(
201201
peer_store
202-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
202+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
203203
.len(),
204204
1
205205
);
206-
assert!(peer_store.fetch_addrs_to_feeler(2).is_empty());
206+
assert!(peer_store.fetch_addrs_to_feeler(2, |_| true).is_empty());
207207

208208
_faketime_guard.set_faketime(100_000 + ADDR_TRY_TIMEOUT_MS + 1);
209209

210210
assert!(peer_store
211-
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
211+
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
212212
.is_empty());
213-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
213+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
214214
}
215215

216216
#[test]
@@ -229,14 +229,14 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
229229
paddr.mark_tried(now);
230230
}
231231
assert!(peer_store
232-
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
232+
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
233233
.is_empty());
234234
// after 60 seconds
235235
if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) {
236236
paddr.mark_tried(now - 60_001);
237237
}
238238
assert!(peer_store
239-
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
239+
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
240240
.is_empty());
241241
peer_store
242242
.mut_addr_manager()
@@ -247,7 +247,7 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
247247

248248
assert_eq!(
249249
peer_store
250-
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
250+
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
251251
.len(),
252252
1
253253
);
@@ -256,7 +256,7 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
256256
}
257257
assert_eq!(
258258
peer_store
259-
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
259+
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
260260
.len(),
261261
1
262262
);
@@ -265,18 +265,18 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
265265
#[test]
266266
fn test_fetch_addrs_to_feeler() {
267267
let mut peer_store: PeerStore = Default::default();
268-
assert!(peer_store.fetch_addrs_to_feeler(1).is_empty());
268+
assert!(peer_store.fetch_addrs_to_feeler(1, |_| true).is_empty());
269269
let addr = random_addr();
270270

271271
// add an addr
272272
peer_store
273273
.add_addr(addr.clone(), Flags::COMPATIBILITY)
274274
.unwrap();
275-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
275+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
276276

277277
// ignores connected peers' addrs
278278
peer_store.add_connected_peer(addr.clone(), SessionType::Outbound);
279-
assert!(peer_store.fetch_addrs_to_feeler(1).is_empty());
279+
assert!(peer_store.fetch_addrs_to_feeler(1, |_| true).is_empty());
280280

281281
// peer does not need feeler if it connected to us recently
282282
peer_store
@@ -285,7 +285,7 @@ fn test_fetch_addrs_to_feeler() {
285285
.unwrap()
286286
.last_connected_at_ms = ckb_systemtime::unix_time_as_millis();
287287
peer_store.remove_disconnected_peer(&addr);
288-
assert!(peer_store.fetch_addrs_to_feeler(1).is_empty());
288+
assert!(peer_store.fetch_addrs_to_feeler(1, |_| true).is_empty());
289289
}
290290

291291
#[test]
@@ -581,10 +581,10 @@ fn test_addr_unique() {
581581
.unwrap();
582582
peer_store.add_addr(addr_1, Flags::COMPATIBILITY).unwrap();
583583
assert_eq!(peer_store.addr_manager().addrs_iter().count(), 2);
584-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 2);
584+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 2);
585585

586586
peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap();
587-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 2);
587+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 2);
588588

589589
assert_eq!(peer_store.addr_manager().addrs_iter().count(), 2);
590590
}
@@ -597,8 +597,8 @@ fn test_only_tcp_store() {
597597
peer_store
598598
.add_addr(addr.clone(), Flags::COMPATIBILITY)
599599
.unwrap();
600-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
601-
assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, {
600+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
601+
assert_eq!(peer_store.fetch_addrs_to_feeler(1, |_| true)[0].addr, {
602602
addr.pop();
603603
addr
604604
});
@@ -618,6 +618,6 @@ fn test_support_dns_store() {
618618
peer_store
619619
.add_addr(addr.clone(), Flags::COMPATIBILITY)
620620
.unwrap();
621-
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
622-
assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, addr);
621+
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
622+
assert_eq!(peer_store.fetch_addrs_to_feeler(1, |_| true)[0].addr, addr);
623623
}

0 commit comments

Comments
 (0)