Skip to content

Commit c2894d1

Browse files
committed
Merge branch 'refs/heads/apache-3.2' into 3.2.16-release
2 parents 4a74897 + 08c74b7 commit c2894d1

File tree

8 files changed

+32
-0
lines changed

8 files changed

+32
-0
lines changed

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ public void onComplete(
136136
}
137137
}
138138

139+
@Override
140+
public void onClose() {
141+
if (done) {
142+
return;
143+
}
144+
onCancelByRemote(TriRpcStatus.CANCELLED);
145+
}
146+
139147
@Override
140148
public void onStart() {
141149
listener.onStart(TripleClientCall.this);

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ default void onComplete(
5656
boolean isReturnTriException) {
5757
onComplete(status, attachments);
5858
}
59+
60+
void onClose();
5961
}
6062

6163
/**

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,5 +472,10 @@ public void cancelByRemote(long errorCode) {
472472
finishProcess(transportError, null, false);
473473
});
474474
}
475+
476+
@Override
477+
public void onClose() {
478+
executor.execute(listener::onClose);
479+
}
475480
}
476481
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,14 @@ public void cancelByRemote(long errorCode) {
483483
executor.execute(() -> listener.onCancelByRemote(
484484
TriRpcStatus.CANCELLED.withDescription("Canceled by client ,errorCode=" + errorCode)));
485485
}
486+
487+
@Override
488+
public void onClose() {
489+
if (listener == null) {
490+
return;
491+
}
492+
executor.execute(() -> listener.onCancelByRemote(TriRpcStatus.CANCELLED));
493+
}
486494
}
487495

488496
private static class ServerDecoderListener implements TriDecoder.Listener {

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,6 @@ public interface H2TransportListener {
4242
void onData(ByteBuf data, boolean endStream);
4343

4444
void cancelByRemote(long errorCode);
45+
46+
void onClose();
4547
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame)
8080

8181
@Override
8282
public void channelInactive(ChannelHandlerContext ctx) {
83+
transportListener.onClose();
8384
ctx.close();
8485
}
8586

dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public void onComplete(TriRpcStatus status, Map<String, Object> attachments) {
3636
this.status = status;
3737
}
3838

39+
@Override
40+
public void onClose() {}
41+
3942
@Override
4043
public void onMessage(byte[] message, boolean isNeedReturnException) {
4144
this.message = message;

dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public void onData(ByteBuf data, boolean endStream) {}
4040

4141
@Override
4242
public void cancelByRemote(long errorCode) {}
43+
44+
@Override
45+
public void onClose() {}
4346
};
4447
DefaultHttp2Headers headers = new DefaultHttp2Headers();
4548
headers.scheme(HTTPS.name()).path("/foo.bar").method(HttpMethod.POST.asciiName());

0 commit comments

Comments
 (0)