@@ -8,11 +8,11 @@ use pin_utils::unsafe_pinned;
88/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method.
99#[ must_use = "streams do nothing unless polled" ]
1010pub struct FlattenStream < Fut : Future > {
11- state : State < Fut >
11+ state : State < Fut , Fut :: Output > ,
1212}
1313
1414impl < Fut : Future > FlattenStream < Fut > {
15- unsafe_pinned ! ( state: State <Fut >) ;
15+ unsafe_pinned ! ( state: State <Fut , Fut :: Output >) ;
1616
1717 pub ( super ) fn new ( future : Fut ) -> FlattenStream < Fut > {
1818 FlattenStream {
@@ -33,11 +33,25 @@ impl<Fut> fmt::Debug for FlattenStream<Fut>
3333}
3434
3535#[ derive( Debug ) ]
36- enum State < Fut : Future > {
36+ enum State < Fut , St > {
3737 // future is not yet called or called and not ready
3838 Future ( Fut ) ,
3939 // future resolved to Stream
40- Stream ( Fut :: Output ) ,
40+ Stream ( St ) ,
41+ }
42+
43+ impl < Fut , St > State < Fut , St > {
44+ fn get_pin_mut < ' a > ( self : Pin < & ' a mut Self > ) -> State < Pin < & ' a mut Fut > , Pin < & ' a mut St > > {
45+ // safety: data is never moved via the resulting &mut reference
46+ match unsafe { Pin :: get_unchecked_mut ( self ) } {
47+ // safety: the future we're re-pinning here will never be moved;
48+ // it will just be polled, then dropped in place
49+ State :: Future ( f) => State :: Future ( unsafe { Pin :: new_unchecked ( f) } ) ,
50+ // safety: the stream we're repinning here will never be moved;
51+ // it will just be polled, then dropped in place
52+ State :: Stream ( s) => State :: Stream ( unsafe { Pin :: new_unchecked ( s) } ) ,
53+ }
54+ }
4155}
4256
4357impl < Fut > FusedStream for FlattenStream < Fut >
@@ -60,23 +74,15 @@ impl<Fut> Stream for FlattenStream<Fut>
6074
6175 fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
6276 loop {
63- // safety: data is never moved via the resulting &mut reference
64- match & mut unsafe { Pin :: get_unchecked_mut ( self . as_mut ( ) ) } . state {
77+ match self . as_mut ( ) . state ( ) . get_pin_mut ( ) {
6578 State :: Future ( f) => {
66- // safety: the future we're re-pinning here will never be moved;
67- // it will just be polled, then dropped in place
68- let stream = ready ! ( unsafe { Pin :: new_unchecked( f) } . poll( cx) ) ;
69-
79+ let stream = ready ! ( f. poll( cx) ) ;
7080 // Future resolved to stream.
7181 // We do not return, but poll that
7282 // stream in the next loop iteration.
7383 self . as_mut ( ) . state ( ) . set ( State :: Stream ( stream) ) ;
7484 }
75- State :: Stream ( s) => {
76- // safety: the stream we're repinning here will never be moved;
77- // it will just be polled, then dropped in place
78- return unsafe { Pin :: new_unchecked ( s) } . poll_next ( cx) ;
79- }
85+ State :: Stream ( s) => return s. poll_next ( cx) ,
8086 }
8187 }
8288 }
0 commit comments