Skip to content

Commit 39c9ebf

Browse files
authored
examples: Add cancellation example
It uses the echo service for both unary and bidi RPCs, to show the various cancellation circumstances and APIs.
1 parent 6b7cb9e commit 39c9ebf

File tree

5 files changed

+492
-0
lines changed

5 files changed

+492
-0
lines changed

examples/BUILD.bazel

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,22 @@ java_grpc_library(
5151
deps = [":route_guide_java_proto"],
5252
)
5353

54+
proto_library(
55+
name = "echo_proto",
56+
srcs = ["src/main/proto/grpc/examples/echo/echo.proto"],
57+
)
58+
59+
java_proto_library(
60+
name = "echo_java_proto",
61+
deps = [":echo_proto"],
62+
)
63+
64+
java_grpc_library(
65+
name = "echo_java_grpc",
66+
srcs = [":echo_proto"],
67+
deps = [":echo_java_proto"],
68+
)
69+
5470
java_library(
5571
name = "examples",
5672
testonly = 1,
@@ -64,6 +80,8 @@ java_library(
6480
"@io_grpc_grpc_java//netty",
6581
],
6682
deps = [
83+
":echo_java_grpc",
84+
":echo_java_proto",
6785
":hello_streaming_java_grpc",
6886
":hello_streaming_java_proto",
6987
":helloworld_java_grpc",

examples/build.gradle

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,20 @@ task keepAliveClient(type: CreateStartScripts) {
202202
classpath = startScripts.classpath
203203
}
204204

205+
task cancellationClient(type: CreateStartScripts) {
206+
mainClass = 'io.grpc.examples.cancellation.CancellationClient'
207+
applicationName = 'cancellation-client'
208+
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
209+
classpath = startScripts.classpath
210+
}
211+
212+
task cancellationServer(type: CreateStartScripts) {
213+
mainClass = 'io.grpc.examples.cancellation.CancellationServer'
214+
applicationName = 'cancellation-server'
215+
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
216+
classpath = startScripts.classpath
217+
}
218+
205219
applicationDistribution.into('bin') {
206220
from(routeGuideServer)
207221
from(routeGuideClient)
@@ -223,5 +237,7 @@ applicationDistribution.into('bin') {
223237
from(deadlineClient)
224238
from(keepAliveServer)
225239
from(keepAliveClient)
240+
from(cancellationClient)
241+
from(cancellationServer)
226242
fileMode = 0755
227243
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright 2023 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.cancellation;
18+
19+
import com.google.common.util.concurrent.FutureCallback;
20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import com.google.common.util.concurrent.MoreExecutors;
23+
import io.grpc.Channel;
24+
import io.grpc.Context;
25+
import io.grpc.Context.CancellableContext;
26+
import io.grpc.Grpc;
27+
import io.grpc.InsecureChannelCredentials;
28+
import io.grpc.ManagedChannel;
29+
import io.grpc.Status;
30+
import io.grpc.StatusRuntimeException;
31+
import io.grpc.examples.echo.EchoGrpc;
32+
import io.grpc.examples.echo.EchoRequest;
33+
import io.grpc.examples.echo.EchoResponse;
34+
import io.grpc.stub.ClientCallStreamObserver;
35+
import io.grpc.stub.StreamObserver;
36+
import java.util.concurrent.TimeUnit;
37+
38+
/**
39+
* A client that cancels RPCs to an Echo server.
40+
*/
41+
public class CancellationClient {
42+
private final Channel channel;
43+
44+
public CancellationClient(Channel channel) {
45+
this.channel = channel;
46+
}
47+
48+
private void demonstrateCancellation() throws Exception {
49+
echoBlocking("I'M A BLOCKING CLIENT! HEAR ME ROAR!");
50+
51+
// io.grpc.Context can be used to cancel RPCs using any of the stubs. It is the only way to
52+
// cancel blocking stub RPCs. io.grpc.Context is a general-purpose alternative to thread
53+
// interruption and can be used outside of gRPC, like to coordinate within your application.
54+
//
55+
// CancellableContext must always be cancelled or closed at the end of its lifetime, otherwise
56+
// it could "leak" memory.
57+
try (CancellableContext context = Context.current().withCancellation()) {
58+
new Thread(() -> {
59+
try {
60+
Thread.sleep(500); // Do some work
61+
} catch (InterruptedException ex) {
62+
Thread.currentThread().interrupt();
63+
}
64+
// Cancellation reasons are never sent to the server. But they are echoed back to the
65+
// client as the RPC failure reason.
66+
context.cancel(new RuntimeException("Oops. Messed that up, let me try again"));
67+
}).start();
68+
69+
// context.run() attaches the context to this thread for gRPC to observe. It also restores
70+
// the previous context before returning.
71+
context.run(() -> echoBlocking("RAAWRR haha lol hehe AWWRR GRRR"));
72+
}
73+
74+
// Futures cancelled with interruption cancel the RPC.
75+
ListenableFuture<EchoResponse> future = echoFuture("Future clie*cough*nt was here!");
76+
Thread.sleep(500); // Do some work
77+
// We realize we really don't want to hear that echo.
78+
future.cancel(true);
79+
Thread.sleep(100); // Make logs more obvious. Cancel is async
80+
81+
ClientCallStreamObserver<EchoRequest> reqCallObserver = echoAsync("Testing, testing, 1, 2, 3");
82+
reqCallObserver.onCompleted();
83+
Thread.sleep(500); // Make logs more obvious. Wait for completion
84+
85+
// Async's onError() will cancel. But the method can't be called concurrently with other calls
86+
// on the StreamObserver. If you need thread-safety, use CancellableContext as above.
87+
StreamObserver<EchoRequest> reqObserver = echoAsync("... async client... is the... best...");
88+
try {
89+
Thread.sleep(500); // Do some work
90+
} catch (InterruptedException ex) {
91+
Thread.currentThread().interrupt();
92+
}
93+
// Since reqObserver.onCompleted() hasn't been called, we can use onError().
94+
reqObserver.onError(new RuntimeException("That was weak..."));
95+
Thread.sleep(100); // Make logs more obvious. Cancel is async
96+
97+
// Async's cancel() will cancel. Also may not be called concurrently with other calls on the
98+
// StreamObserver.
99+
reqCallObserver = echoAsync("Async client or bust!");
100+
reqCallObserver.onCompleted();
101+
try {
102+
Thread.sleep(250); // Do some work
103+
} catch (InterruptedException ex) {
104+
Thread.currentThread().interrupt();
105+
}
106+
// Since onCompleted() has been called, we can't use onError(). It is safe to use cancel()
107+
// regardless of onCompleted() being called.
108+
reqCallObserver.cancel("That's enough. I'm bored", null);
109+
Thread.sleep(100); // Make logs more obvious. Cancel is async
110+
}
111+
112+
/** Say hello to server, just like in helloworld example. */
113+
public void echoBlocking(String text) {
114+
System.out.println("\nYelling: " + text);
115+
EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
116+
EchoResponse response;
117+
try {
118+
response = EchoGrpc.newBlockingStub(channel).unaryEcho(request);
119+
} catch (StatusRuntimeException e) {
120+
System.out.println("RPC failed: " + e.getStatus());
121+
return;
122+
}
123+
System.out.println("Echo: " + response.getMessage());
124+
}
125+
126+
/** Say hello to the server, but using future API. */
127+
public ListenableFuture<EchoResponse> echoFuture(String text) {
128+
System.out.println("\nYelling: " + text);
129+
EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
130+
ListenableFuture<EchoResponse> future = EchoGrpc.newFutureStub(channel).unaryEcho(request);
131+
Futures.addCallback(future, new FutureCallback<EchoResponse>() {
132+
@Override
133+
public void onSuccess(EchoResponse response) {
134+
System.out.println("Echo: " + response.getMessage());
135+
}
136+
137+
@Override
138+
public void onFailure(Throwable t) {
139+
System.out.println("RPC failed: " + Status.fromThrowable(t));
140+
}
141+
}, MoreExecutors.directExecutor());
142+
return future;
143+
}
144+
145+
/** Say hello to the server, but using async API and cancelling. */
146+
public ClientCallStreamObserver<EchoRequest> echoAsync(String text) {
147+
System.out.println("\nYelling: " + text);
148+
EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
149+
150+
// Client-streaming and bidirectional RPCs can cast the returned StreamObserver to
151+
// ClientCallStreamObserver.
152+
//
153+
// Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can
154+
// use ClientResponseObserver to get the ClientCallStreamObserver. For example:
155+
// EchoGrpc.newStub(channel).unaryEcho(new ClientResponseObserver<EchoResponse>() {...});
156+
// Since ClientCallStreamObserver.cancel() is not thread-safe, it isn't safe to call from
157+
// another thread until the RPC stub method (e.g., unaryEcho()) returns.
158+
ClientCallStreamObserver<EchoRequest> reqObserver = (ClientCallStreamObserver<EchoRequest>)
159+
EchoGrpc.newStub(channel).bidirectionalStreamingEcho(new StreamObserver<EchoResponse>() {
160+
@Override
161+
public void onNext(EchoResponse response) {
162+
System.out.println("Echo: " + response.getMessage());
163+
}
164+
165+
@Override
166+
public void onCompleted() {
167+
System.out.println("RPC completed");
168+
}
169+
170+
@Override
171+
public void onError(Throwable t) {
172+
System.out.println("RPC failed: " + Status.fromThrowable(t));
173+
}
174+
});
175+
176+
reqObserver.onNext(request);
177+
return reqObserver;
178+
}
179+
180+
/**
181+
* Cancel RPCs to a server. If provided, the first element of {@code args} is the target server.
182+
*/
183+
public static void main(String[] args) throws Exception {
184+
String target = "localhost:50051";
185+
if (args.length > 0) {
186+
if ("--help".equals(args[0])) {
187+
System.err.println("Usage: [target]");
188+
System.err.println("");
189+
System.err.println(" target The server to connect to. Defaults to " + target);
190+
System.exit(1);
191+
}
192+
target = args[0];
193+
}
194+
195+
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
196+
.build();
197+
try {
198+
CancellationClient client = new CancellationClient(channel);
199+
client.demonstrateCancellation();
200+
} finally {
201+
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
202+
}
203+
}
204+
}

0 commit comments

Comments
 (0)