Skip to content

Commit 0d694c8

Browse files
authored
core,netty: quick patch for setListener regression
resolves grpc/grpc#8715 now that setListener is called prior to `JumpToApplicationThreadServerStreamListener` being completely ready to use. We should not call `AbstractStream2#onStreamAllocated()` inside `setListener()` anymore, but call it after `ServerImpl#streamCreated()` is completed.
1 parent 0e27eef commit 0d694c8

File tree

5 files changed

+21
-7
lines changed

5 files changed

+21
-7
lines changed

core/src/main/java/io/grpc/internal/AbstractServerStream.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected interface Sink {
8484
* Tears down the stream, typically in the event of a timeout. This method may be called
8585
* multiple times and from any thread.
8686
*
87-
* <p>This is a clone of {@link ServerStream#cancel()}.
87+
* <p>This is a clone of {@link ServerStream#cancel(Status)}.
8888
*/
8989
void cancel(Status status);
9090
}
@@ -189,11 +189,13 @@ protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
189189
* thread.
190190
*/
191191
public final void setListener(ServerStreamListener listener) {
192+
Preconditions.checkState(this.listener == null, "setListener should be called only once");
192193
this.listener = Preconditions.checkNotNull(listener, "listener");
194+
}
193195

194-
// Now that the stream has actually been initialized, call the listener's onReady callback if
195-
// appropriate.
196-
onStreamAllocated();
196+
@Override
197+
public final void onStreamAllocated() {
198+
super.onStreamAllocated();
197199
}
198200

199201
@Override

core/src/main/java/io/grpc/internal/AbstractStream2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ private boolean isReady() {
237237
* StreamListener#onReady()} handler if appropriate. This must be called from the transport
238238
* thread, since the listener may be called back directly.
239239
*/
240-
protected final void onStreamAllocated() {
240+
protected void onStreamAllocated() {
241241
checkState(listener() != null);
242242
synchronized (onReadyLock) {
243243
checkState(!allocated, "Already allocated");

core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,27 @@ public void completeWithoutClose() {
107107

108108
@Test
109109
public void setListener_setOnlyOnce() {
110-
111110
stream.transportState().setListener(new ServerStreamListenerBase());
112111
thrown.expect(IllegalStateException.class);
113112

114113
stream.transportState().setListener(new ServerStreamListenerBase());
115114
}
116115

117116
@Test
118-
public void setListener_readyCalled() {
117+
public void listenerReady_onlyOnce() {
118+
stream.transportState().setListener(new ServerStreamListenerBase());
119+
stream.transportState().onStreamAllocated();
120+
thrown.expect(IllegalStateException.class);
121+
122+
stream.transportState().onStreamAllocated();
123+
}
124+
125+
126+
@Test
127+
public void listenerReady_readyCalled() {
119128
ServerStreamListener streamListener = mock(ServerStreamListener.class);
120129
stream.transportState().setListener(streamListener);
130+
stream.transportState().onStreamAllocated();
121131

122132
verify(streamListener).onReady();
123133
}

netty/src/main/java/io/grpc/netty/NettyServerHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
200200
NettyServerStream stream = new NettyServerStream(ctx.channel(), state, attributes,
201201
statsTraceCtx);
202202
transportListener.streamCreated(stream, method, metadata);
203+
state.onStreamAllocated();
203204
http2Stream.setProperty(streamKey, state);
204205

205206
} catch (Http2Exception e) {

netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
298298
NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY,
299299
statsTraceCtx);
300300
stream.transportState().setListener(serverListener);
301+
state.onStreamAllocated();
301302
verify(serverListener, atLeastOnce()).onReady();
302303
verifyNoMoreInteractions(serverListener);
303304
return stream;

0 commit comments

Comments
 (0)