@@ -531,21 +531,48 @@ impl Endpoint {
531
531
/// This updates the local state for the remote node. If the provided [`NodeAddr`]
532
532
/// contains a [`RelayUrl`] this will be used as the new relay server for this node. If
533
533
/// it contains any new IP endpoints they will also be stored and tried when next
534
- /// connecting to this node.
534
+ /// connecting to this node. Any address that matches this node's direct addresses will be
535
+ /// silently ignored.
536
+ ///
537
+ /// See also [`Endpoint::add_node_addr_with_source`].
535
538
///
536
539
/// # Errors
537
540
///
538
- /// Will return an error if we attempt to add our own [`PublicKey`] to the node map.
541
+ /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the
542
+ /// direct addresses are a subset of ours.
539
543
pub fn add_node_addr ( & self , node_addr : NodeAddr ) -> Result < ( ) > {
544
+ self . add_node_addr_inner ( node_addr, magicsock:: Source :: App )
545
+ }
546
+
547
+ /// Informs this [`Endpoint`] about addresses of the iroh-net node, noting the source.
548
+ ///
549
+ /// This updates the local state for the remote node. If the provided [`NodeAddr`] contains a
550
+ /// [`RelayUrl`] this will be used as the new relay server for this node. If it contains any
551
+ /// new IP endpoints they will also be stored and tried when next connecting to this node. Any
552
+ /// address that matches this node's direct addresses will be silently ignored. The *source* is
553
+ /// used for logging exclusively and will not be stored.
554
+ ///
555
+ /// # Errors
556
+ ///
557
+ /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the
558
+ /// direct addresses are a subset of ours.
559
+ pub fn add_node_addr_with_source (
560
+ & self ,
561
+ node_addr : NodeAddr ,
562
+ source : & ' static str ,
563
+ ) -> Result < ( ) > {
564
+ self . add_node_addr_inner ( node_addr, magicsock:: Source :: NamedApp { name : source } )
565
+ }
566
+
567
+ fn add_node_addr_inner ( & self , node_addr : NodeAddr , source : magicsock:: Source ) -> Result < ( ) > {
540
568
// Connecting to ourselves is not supported.
541
569
if node_addr. node_id == self . node_id ( ) {
542
570
bail ! (
543
571
"Adding our own address is not supported ({} is the node id of this node)" ,
544
572
node_addr. node_id. fmt_short( )
545
573
) ;
546
574
}
547
- self . msock . add_node_addr ( node_addr) ;
548
- Ok ( ( ) )
575
+ self . msock . add_node_addr ( node_addr, source)
549
576
}
550
577
551
578
// # Getter methods for properties of this Endpoint itself.
@@ -1386,8 +1413,9 @@ mod tests {
1386
1413
1387
1414
#[ tokio:: test]
1388
1415
async fn endpoint_conn_type_stream ( ) {
1416
+ const TIMEOUT : Duration = std:: time:: Duration :: from_secs ( 15 ) ;
1389
1417
let _logging_guard = iroh_test:: logging:: setup ( ) ;
1390
- let ( relay_map, relay_url , _relay_guard) = run_relay_server ( ) . await . unwrap ( ) ;
1418
+ let ( relay_map, _relay_url , _relay_guard) = run_relay_server ( ) . await . unwrap ( ) ;
1391
1419
let mut rng = rand_chacha:: ChaCha8Rng :: seed_from_u64 ( 42 ) ;
1392
1420
let ep1_secret_key = SecretKey :: generate_with_rng ( & mut rng) ;
1393
1421
let ep2_secret_key = SecretKey :: generate_with_rng ( & mut rng) ;
@@ -1408,31 +1436,25 @@ mod tests {
1408
1436
. await
1409
1437
. unwrap ( ) ;
1410
1438
1411
- async fn handle_direct_conn ( ep : Endpoint , node_id : PublicKey ) -> Result < ( ) > {
1412
- let node_addr = NodeAddr :: new ( node_id) ;
1413
- ep. add_node_addr ( node_addr) ?;
1414
- let stream = ep. conn_type_stream ( node_id) ?;
1415
- async fn get_direct_event (
1416
- src : & PublicKey ,
1417
- dst : & PublicKey ,
1418
- mut stream : ConnectionTypeStream ,
1419
- ) -> Result < ( ) > {
1420
- let src = src. fmt_short ( ) ;
1421
- let dst = dst. fmt_short ( ) ;
1422
- while let Some ( conn_type) = stream. next ( ) . await {
1423
- tracing:: info!( me = %src, dst = %dst, conn_type = ?conn_type) ;
1424
- if matches ! ( conn_type, ConnectionType :: Direct ( _) ) {
1425
- return Ok ( ( ) ) ;
1426
- }
1439
+ async fn handle_direct_conn ( ep : & Endpoint , node_id : PublicKey ) -> Result < ( ) > {
1440
+ let mut stream = ep. conn_type_stream ( node_id) ?;
1441
+ let src = ep. node_id ( ) . fmt_short ( ) ;
1442
+ let dst = node_id. fmt_short ( ) ;
1443
+ while let Some ( conn_type) = stream. next ( ) . await {
1444
+ tracing:: info!( me = %src, dst = %dst, conn_type = ?conn_type) ;
1445
+ if matches ! ( conn_type, ConnectionType :: Direct ( _) ) {
1446
+ return Ok ( ( ) ) ;
1427
1447
}
1428
- anyhow:: bail!( "conn_type stream ended before `ConnectionType::Direct`" ) ;
1429
1448
}
1430
- tokio:: time:: timeout (
1431
- Duration :: from_secs ( 15 ) ,
1432
- get_direct_event ( & ep. node_id ( ) , & node_id, stream) ,
1433
- )
1434
- . await ??;
1435
- Ok ( ( ) )
1449
+ anyhow:: bail!( "conn_type stream ended before `ConnectionType::Direct`" ) ;
1450
+ }
1451
+
1452
+ async fn accept ( ep : & Endpoint ) -> NodeId {
1453
+ let incoming = ep. accept ( ) . await . unwrap ( ) ;
1454
+ let conn = incoming. await . unwrap ( ) ;
1455
+ let node_id = get_remote_node_id ( & conn) . unwrap ( ) ;
1456
+ tracing:: info!( node_id=%node_id. fmt_short( ) , "accepted connection" ) ;
1457
+ node_id
1436
1458
}
1437
1459
1438
1460
let ep1_nodeid = ep1. node_id ( ) ;
@@ -1445,39 +1467,31 @@ mod tests {
1445
1467
) ;
1446
1468
tracing:: info!( "node id 2 {ep2_nodeid}" ) ;
1447
1469
1448
- let res_ep1 = tokio:: spawn ( handle_direct_conn ( ep1. clone ( ) , ep2_nodeid) ) ;
1470
+ let ep1_side = async move {
1471
+ accept ( & ep1) . await ;
1472
+ handle_direct_conn ( & ep1, ep2_nodeid) . await
1473
+ } ;
1474
+
1475
+ let ep2_side = async move {
1476
+ ep2. connect ( ep1_nodeaddr, TEST_ALPN ) . await . unwrap ( ) ;
1477
+ handle_direct_conn ( & ep2, ep1_nodeid) . await
1478
+ } ;
1479
+
1480
+ let res_ep1 = tokio:: spawn ( tokio:: time:: timeout ( TIMEOUT , ep1_side) ) ;
1449
1481
1450
1482
let ep1_abort_handle = res_ep1. abort_handle ( ) ;
1451
1483
let _ep1_guard = CallOnDrop :: new ( move || {
1452
1484
ep1_abort_handle. abort ( ) ;
1453
1485
} ) ;
1454
1486
1455
- let res_ep2 = tokio:: spawn ( handle_direct_conn ( ep2 . clone ( ) , ep1_nodeid ) ) ;
1487
+ let res_ep2 = tokio:: spawn ( tokio :: time :: timeout ( TIMEOUT , ep2_side ) ) ;
1456
1488
let ep2_abort_handle = res_ep2. abort_handle ( ) ;
1457
1489
let _ep2_guard = CallOnDrop :: new ( move || {
1458
1490
ep2_abort_handle. abort ( ) ;
1459
1491
} ) ;
1460
- async fn accept ( ep : Endpoint ) -> NodeId {
1461
- let incoming = ep. accept ( ) . await . unwrap ( ) ;
1462
- let conn = incoming. await . unwrap ( ) ;
1463
- get_remote_node_id ( & conn) . unwrap ( )
1464
- }
1465
-
1466
- // create a node addr with no direct connections
1467
- let ep1_nodeaddr = NodeAddr :: from_parts ( ep1_nodeid, Some ( relay_url) , vec ! [ ] ) ;
1468
-
1469
- let accept_res = tokio:: spawn ( accept ( ep1. clone ( ) ) ) ;
1470
- let accept_abort_handle = accept_res. abort_handle ( ) ;
1471
- let _accept_guard = CallOnDrop :: new ( move || {
1472
- accept_abort_handle. abort ( ) ;
1473
- } ) ;
1474
-
1475
- let _conn_2 = ep2. connect ( ep1_nodeaddr, TEST_ALPN ) . await . unwrap ( ) ;
1476
-
1477
- let got_id = accept_res. await . unwrap ( ) ;
1478
- assert_eq ! ( ep2_nodeid, got_id) ;
1479
1492
1480
- res_ep1. await . unwrap ( ) . unwrap ( ) ;
1481
- res_ep2. await . unwrap ( ) . unwrap ( ) ;
1493
+ let ( r1, r2) = tokio:: try_join!( res_ep1, res_ep2) . unwrap ( ) ;
1494
+ r1. expect ( "ep1 timeout" ) . unwrap ( ) ;
1495
+ r2. expect ( "ep2 timeout" ) . unwrap ( ) ;
1482
1496
}
1483
1497
}
0 commit comments