@@ -102,8 +102,8 @@ where
102102
103103 fresh. fill ( cx, & self . socket , & publisher) ;
104104
105- for socket in fresh. drain ( ) {
106- workers. push ( socket, now, & self . subscriber , & publisher) ;
105+ for ( socket, remote_address ) in fresh. drain ( ) {
106+ workers. push ( socket, remote_address , now, & self . subscriber , & publisher) ;
107107 }
108108
109109 let res = workers. poll ( cx, & mut context, now, & publisher) ;
@@ -154,7 +154,7 @@ fn publisher<'a, Sub: Subscriber, C: Clock>(
154154///
155155/// This should produce overall better latencies in the case of overloaded queues.
156156struct FreshQueue {
157- queue : VecDeque < TcpStream > ,
157+ queue : VecDeque < ( TcpStream , SocketAddress ) > ,
158158}
159159
160160impl FreshQueue {
@@ -186,32 +186,31 @@ impl FreshQueue {
186186
187187 while let Poll :: Ready ( res) = listener. poll_accept ( cx) {
188188 match res {
189- Ok ( ( socket, remote_addr ) ) => {
189+ Ok ( ( socket, remote_address ) ) => {
190190 if self . queue . len ( ) == self . queue . capacity ( ) {
191- if let Some ( remote_addr ) = self
191+ if let Some ( remote_address ) = self
192192 . queue
193193 . pop_back ( )
194- . and_then ( |socket| socket . peer_addr ( ) . ok ( ) )
194+ . map ( | ( _socket , remote_address ) | remote_address )
195195 {
196- let remote_address: SocketAddress = remote_addr. into ( ) ;
197- let remote_address = & remote_address;
198196 publisher. on_acceptor_tcp_stream_dropped (
199- event:: builder:: AcceptorTcpStreamDropped { remote_address, reason : event:: builder:: AcceptorTcpStreamDropReason :: FreshQueueAtCapacity } ,
197+ event:: builder:: AcceptorTcpStreamDropped { remote_address : & remote_address , reason : event:: builder:: AcceptorTcpStreamDropReason :: FreshQueueAtCapacity } ,
200198 ) ;
201199 dropped += 1 ;
202200 }
203201 }
204202
205- let remote_address: SocketAddress = remote_addr. into ( ) ;
206- let remote_address = & remote_address;
203+ let remote_address: SocketAddress = remote_address. into ( ) ;
207204 publisher. on_acceptor_tcp_fresh_enqueued (
208- event:: builder:: AcceptorTcpFreshEnqueued { remote_address } ,
205+ event:: builder:: AcceptorTcpFreshEnqueued {
206+ remote_address : & remote_address,
207+ } ,
209208 ) ;
210209 enqueued += 1 ;
211210
212211 // most recent streams go to the front of the line, since they're the most
213212 // likely to be successfully processed
214- self . queue . push_front ( socket) ;
213+ self . queue . push_front ( ( socket, remote_address ) ) ;
215214 }
216215 Err ( error) => {
217216 // TODO submit to a separate error channel that the application can subscribe
@@ -239,7 +238,7 @@ impl FreshQueue {
239238 )
240239 }
241240
242- fn drain ( & mut self ) -> impl Iterator < Item = TcpStream > + ' _ {
241+ fn drain ( & mut self ) -> impl Iterator < Item = ( TcpStream , SocketAddress ) > + ' _ {
243242 self . queue . drain ( ..)
244243 }
245244}
@@ -292,6 +291,7 @@ where
292291 pub fn push < Pub > (
293292 & mut self ,
294293 stream : TcpStream ,
294+ remote_address : SocketAddress ,
295295 now : Timestamp ,
296296 subscriber : & Sub ,
297297 publisher : & Pub ,
@@ -304,20 +304,14 @@ where
304304 //
305305 // TODO: we need to investigate how this interacts with SYN cookies/retries and fast
306306 // failure modes in kernel space.
307- if let Ok ( remote_addr) = stream. peer_addr ( ) {
308- let remote_address: SocketAddress = remote_addr. into ( ) ;
309- let remote_address = & remote_address;
310- publisher. on_acceptor_tcp_stream_dropped (
311- event:: builder:: AcceptorTcpStreamDropped {
312- remote_address,
313- reason : event:: builder:: AcceptorTcpStreamDropReason :: SlotsAtCapacity ,
314- } ,
315- ) ;
316- }
307+ publisher. on_acceptor_tcp_stream_dropped ( event:: builder:: AcceptorTcpStreamDropped {
308+ remote_address : & remote_address,
309+ reason : event:: builder:: AcceptorTcpStreamDropReason :: SlotsAtCapacity ,
310+ } ) ;
317311 drop ( stream) ;
318312 return ;
319313 } ;
320- self . workers [ idx] . push ( stream, now, subscriber, publisher) ;
314+ self . workers [ idx] . push ( stream, remote_address , now, subscriber, publisher) ;
321315 self . working . push_back ( idx) ;
322316 }
323317
@@ -473,7 +467,7 @@ where
473467 Sub : event:: Subscriber + Clone ,
474468{
475469 queue_time : Timestamp ,
476- stream : Option < TcpStream > ,
470+ stream : Option < ( TcpStream , SocketAddress ) > ,
477471 subscriber_ctx : Option < Sub :: ConnectionContext > ,
478472 state : WorkerState ,
479473}
@@ -495,6 +489,7 @@ where
495489 pub fn push < Pub > (
496490 & mut self ,
497491 stream : TcpStream ,
492+ remote_address : SocketAddress ,
498493 now : Timestamp ,
499494 subscriber : & Sub ,
500495 publisher : & Pub ,
@@ -514,20 +509,18 @@ where
514509
515510 let prev_queue_time = core:: mem:: replace ( & mut self . queue_time , now) ;
516511 let prev_state = core:: mem:: replace ( & mut self . state , WorkerState :: Init ) ;
517- let prev_stream = core:: mem:: replace ( & mut self . stream , Some ( stream) ) ;
512+ let prev_stream = core:: mem:: replace ( & mut self . stream , Some ( ( stream, remote_address ) ) ) ;
518513 let prev_ctx = core:: mem:: replace ( & mut self . subscriber_ctx , Some ( subscriber_ctx) ) ;
519514
520- if let Some ( remote_addr) = prev_stream. and_then ( |socket| socket. peer_addr ( ) . ok ( ) ) {
521- let remote_address: SocketAddress = remote_addr. into ( ) ;
522- let remote_address = & remote_address;
515+ if let Some ( remote_address) = prev_stream. map ( |( _socket, remote_address) | remote_address) {
523516 let sojourn_time = now. saturating_duration_since ( prev_queue_time) ;
524517 let buffer_len = match prev_state {
525518 WorkerState :: Init => 0 ,
526519 WorkerState :: Buffering { buffer, .. } => buffer. payload_len ( ) ,
527520 WorkerState :: Erroring { .. } => 0 ,
528521 } ;
529522 publisher. on_acceptor_tcp_stream_replaced ( event:: builder:: AcceptorTcpStreamReplaced {
530- remote_address,
523+ remote_address : & remote_address ,
531524 sojourn_time,
532525 buffer_len,
533526 } ) ;
@@ -615,7 +608,7 @@ impl WorkerState {
615608 & mut self ,
616609 cx : & mut Context ,
617610 context : & mut WorkerContext < Sub > ,
618- stream : & mut Option < TcpStream > ,
611+ stream : & mut Option < ( TcpStream , SocketAddress ) > ,
619612 subscriber_ctx : & mut Option < Sub :: ConnectionContext > ,
620613 queue_time : Timestamp ,
621614 now : Timestamp ,
@@ -639,8 +632,8 @@ impl WorkerState {
639632 } => ( buffer, * blocked_count) ,
640633 // we encountered an error so try and send it back
641634 WorkerState :: Erroring { offset, buffer, .. } => {
642- let stream = Pin :: new ( stream. as_mut ( ) . unwrap ( ) ) ;
643- let len = ready ! ( stream. poll_write( cx, & buffer[ * offset..] ) ) ?;
635+ let ( stream, _remote_address ) = stream. as_mut ( ) . unwrap ( ) ;
636+ let len = ready ! ( Pin :: new ( stream) . poll_write( cx, & buffer[ * offset..] ) ) ?;
644637
645638 * offset += len;
646639
@@ -660,13 +653,17 @@ impl WorkerState {
660653 } ;
661654
662655 // try to read an initial packet from the socket
663- let res = Self :: poll_initial_packet (
664- cx,
665- stream. as_mut ( ) . unwrap ( ) ,
666- recv_buffer,
667- sojourn_time,
668- publisher,
669- ) ;
656+ let res = {
657+ let ( stream, remote_address) = stream. as_mut ( ) . unwrap ( ) ;
658+ Self :: poll_initial_packet (
659+ cx,
660+ stream,
661+ remote_address,
662+ recv_buffer,
663+ sojourn_time,
664+ publisher,
665+ )
666+ } ;
670667
671668 let Poll :: Ready ( res) = res else {
672669 // if we got `Pending` but we don't own the recv buffer then we need to copy it
@@ -689,11 +686,12 @@ impl WorkerState {
689686 let initial_packet = res?;
690687
691688 let subscriber_ctx = subscriber_ctx. take ( ) . unwrap ( ) ;
689+ let ( socket, remote_address) = stream. take ( ) . unwrap ( ) ;
692690
693691 let stream_builder = match endpoint:: accept_stream (
694692 now,
695693 & context. env ,
696- env:: TcpReregistered ( stream . take ( ) . unwrap ( ) ) ,
694+ env:: TcpReregistered ( socket , remote_address ) ,
697695 & initial_packet,
698696 None ,
699697 Some ( recv_buffer) ,
@@ -704,11 +702,11 @@ impl WorkerState {
704702 ) {
705703 Ok ( stream) => stream,
706704 Err ( error) => {
707- if let Some ( env:: TcpReregistered ( socket) ) = error. peer {
705+ if let Some ( env:: TcpReregistered ( socket, remote_address ) ) = error. peer {
708706 if !error. secret_control . is_empty ( ) {
709707 // if we need to send an error then update the state and loop back
710708 // around
711- * stream = Some ( socket) ;
709+ * stream = Some ( ( socket, remote_address ) ) ;
712710 * self = WorkerState :: Erroring {
713711 offset : 0 ,
714712 buffer : error. secret_control ,
@@ -768,6 +766,7 @@ impl WorkerState {
768766 fn poll_initial_packet < Pub > (
769767 cx : & mut Context ,
770768 stream : & mut TcpStream ,
769+ remote_address : & SocketAddress ,
771770 recv_buffer : & mut msg:: recv:: Message ,
772771 sojourn_time : Duration ,
773772 publisher : & Pub ,
@@ -777,15 +776,9 @@ impl WorkerState {
777776 {
778777 loop {
779778 if recv_buffer. payload_len ( ) > 10_000 {
780- let remote_address = stream
781- . peer_addr ( )
782- . ok ( )
783- . map ( SocketAddress :: from)
784- . unwrap_or_default ( ) ;
785-
786779 publisher. on_acceptor_tcp_packet_dropped (
787780 event:: builder:: AcceptorTcpPacketDropped {
788- remote_address : & remote_address ,
781+ remote_address,
789782 reason : DecoderError :: UnexpectedBytes ( recv_buffer. payload_len ( ) )
790783 . into_event ( ) ,
791784 sojourn_time,
@@ -798,15 +791,9 @@ impl WorkerState {
798791
799792 match server:: InitialPacket :: peek ( recv_buffer, 16 ) {
800793 Ok ( packet) => {
801- let remote_address = stream
802- . peer_addr ( )
803- . ok ( )
804- . map ( SocketAddress :: from)
805- . unwrap_or_default ( ) ;
806-
807794 publisher. on_acceptor_tcp_packet_received (
808795 event:: builder:: AcceptorTcpPacketReceived {
809- remote_address : & remote_address ,
796+ remote_address,
810797 credential_id : & * packet. credentials . id ,
811798 stream_id : packet. stream_id . into_varint ( ) . as_u64 ( ) ,
812799 payload_len : packet. payload_len ,
@@ -823,15 +810,9 @@ impl WorkerState {
823810 continue ;
824811 }
825812
826- let remote_address = stream
827- . peer_addr ( )
828- . ok ( )
829- . map ( SocketAddress :: from)
830- . unwrap_or_default ( ) ;
831-
832813 publisher. on_acceptor_tcp_packet_dropped (
833814 event:: builder:: AcceptorTcpPacketDropped {
834- remote_address : & remote_address ,
815+ remote_address,
835816 reason : err. into_event ( ) ,
836817 sojourn_time,
837818 } ,
0 commit comments