Skip to content

Commit e345273

Browse files
authored
services: don't update reflection index mid-stream
This fix addresses #2689
1 parent 437fafa commit e345273

File tree

2 files changed

+265
-208
lines changed

2 files changed

+265
-208
lines changed

services/src/main/java/io/grpc/protobuf/service/ProtoReflectionService.java

Lines changed: 114 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -79,49 +79,104 @@
7979
public final class ProtoReflectionService extends ServerReflectionGrpc.ServerReflectionImplBase
8080
implements InternalNotifyOnServerBuild {
8181

82-
private volatile ServerReflectionIndex serverReflectionIndex;
82+
private final Object lock = new Object();
83+
84+
@GuardedBy("lock")
85+
private ServerReflectionIndex serverReflectionIndex;
86+
87+
private Server server;
8388

8489
private ProtoReflectionService() {}
8590

86-
public static BindableService getInstance() {
91+
public static BindableService newInstance() {
8792
return new ProtoReflectionService();
8893
}
8994

9095
/**
91-
* Receives a reference to the server at build time.
96+
* Do not use this method.
97+
*
98+
* @deprecated use {@link ProtoReflectionService#newInstance()} instead.
9299
*/
100+
@Deprecated
101+
public static BindableService getInstance() {
102+
return newInstance();
103+
}
104+
105+
/** Receives a reference to the server at build time. */
93106
@Override
94107
public void notifyOnBuild(Server server) {
95-
checkState(serverReflectionIndex == null);
96-
serverReflectionIndex = new ServerReflectionIndex(checkNotNull(server, "server"));
108+
this.server = checkNotNull(server);
109+
}
110+
111+
/**
112+
* Checks for updates to the server's mutable services and updates the index if any changes are
113+
* detected. A change is any addition or removal in the set of file descriptors attached to the
114+
* mutable services or a change in the service names.
115+
*
116+
* @return The (potentially updated) index.
117+
*/
118+
private ServerReflectionIndex updateIndexIfNecessary() {
119+
synchronized (lock) {
120+
if (serverReflectionIndex == null) {
121+
serverReflectionIndex =
122+
new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices());
123+
return serverReflectionIndex;
124+
}
125+
126+
Set<FileDescriptor> serverFileDescriptors = new HashSet<FileDescriptor>();
127+
Set<String> serverServiceNames = new HashSet<String>();
128+
List<ServerServiceDefinition> serverMutableServices = server.getMutableServices();
129+
for (ServerServiceDefinition mutableService : serverMutableServices) {
130+
io.grpc.ServiceDescriptor serviceDescriptor = mutableService.getServiceDescriptor();
131+
if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) {
132+
String serviceName = serviceDescriptor.getName();
133+
FileDescriptor fileDescriptor =
134+
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
135+
.getFileDescriptor();
136+
serverFileDescriptors.add(fileDescriptor);
137+
serverServiceNames.add(serviceName);
138+
}
139+
}
140+
141+
// Replace the index if the underlying mutable services have changed. Check both the file
142+
// descriptors and the service names, because one file descriptor can define multiple
143+
// services.
144+
FileDescriptorIndex mutableServicesIndex = serverReflectionIndex.getMutableServicesIndex();
145+
if (!mutableServicesIndex.getServiceFileDescriptors().equals(serverFileDescriptors)
146+
|| !mutableServicesIndex.getServiceNames().equals(serverServiceNames)) {
147+
serverReflectionIndex =
148+
new ServerReflectionIndex(server.getImmutableServices(), serverMutableServices);
149+
}
150+
151+
return serverReflectionIndex;
152+
}
97153
}
98154

99155
@Override
100156
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
101157
final StreamObserver<ServerReflectionResponse> responseObserver) {
102-
103-
checkState(serverReflectionIndex != null);
104-
serverReflectionIndex.initializeImmutableServicesIndex();
105-
106158
final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
107159
(ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
108160
ProtoReflectionStreamObserver requestObserver =
109-
new ProtoReflectionStreamObserver(serverCallStreamObserver);
161+
new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
110162
serverCallStreamObserver.setOnReadyHandler(requestObserver);
111163
serverCallStreamObserver.disableAutoInboundFlowControl();
112164
serverCallStreamObserver.request(1);
113165
return requestObserver;
114166
}
115167

116-
private class ProtoReflectionStreamObserver implements Runnable,
117-
StreamObserver<ServerReflectionRequest> {
168+
private static class ProtoReflectionStreamObserver
169+
implements Runnable, StreamObserver<ServerReflectionRequest> {
170+
private final ServerReflectionIndex serverReflectionIndex;
118171
private final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver;
119172

120173
private boolean closeAfterSend = false;
121174
private ServerReflectionRequest request;
122175

123176
ProtoReflectionStreamObserver(
177+
ServerReflectionIndex serverReflectionIndex,
124178
ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver) {
179+
this.serverReflectionIndex = serverReflectionIndex;
125180
this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer");
126181
}
127182

@@ -141,8 +196,6 @@ public void onNext(ServerReflectionRequest request) {
141196

142197
private void handleReflectionRequest() {
143198
if (serverCallStreamObserver.isReady()) {
144-
serverReflectionIndex.updateMutableIndexIfNecessary();
145-
146199
switch (request.getMessageRequestCase()) {
147200
case FILE_BY_FILENAME:
148201
getFileByName(request);
@@ -250,8 +303,7 @@ private void listServices(ServerReflectionRequest request) {
250303
.build());
251304
}
252305

253-
private void sendErrorResponse(
254-
ServerReflectionRequest request, Status status, String message) {
306+
private void sendErrorResponse(ServerReflectionRequest request, Status status, String message) {
255307
ServerReflectionResponse response =
256308
ServerReflectionResponse.newBuilder()
257309
.setValidHost(request.getHost())
@@ -299,81 +351,35 @@ private ServerReflectionResponse createServerReflectionResponse(
299351
* in the immutable service index are the mutable services checked.
300352
*/
301353
private static final class ServerReflectionIndex {
302-
private FileDescriptorIndex immutableServicesIndex;
303-
private final Object lock = new Object();
304-
/**
305-
* Tracks mutable services. Accesses must be synchronized.
306-
*/
307-
@GuardedBy("lock") private FileDescriptorIndex mutableServicesIndex
308-
= new FileDescriptorIndex(Collections.<ServerServiceDefinition>emptyList());
354+
private final FileDescriptorIndex immutableServicesIndex;
355+
private final FileDescriptorIndex mutableServicesIndex;
309356

310-
private final Server server;
311-
312-
public ServerReflectionIndex(Server server) {
313-
this.server = server;
357+
public ServerReflectionIndex(
358+
List<ServerServiceDefinition> immutableServices,
359+
List<ServerServiceDefinition> mutableServices) {
360+
immutableServicesIndex = new FileDescriptorIndex(immutableServices);
361+
mutableServicesIndex = new FileDescriptorIndex(mutableServices);
314362
}
315363

316-
/**
317-
* When first called, initializes the immutable services index. Subsequent calls have no effect.
318-
*
319-
* <p>This must be called by the reflection service before returning a new
320-
* {@link ProtoReflectionStreamObserver}.
321-
*/
322-
private synchronized void initializeImmutableServicesIndex() {
323-
if (immutableServicesIndex == null) {
324-
immutableServicesIndex = new FileDescriptorIndex(server.getImmutableServices());
325-
}
326-
}
327-
328-
/**
329-
* Checks for updates to the server's mutable services and updates the index if any changes
330-
* are detected. A change is any addition or removal in the set of file descriptors attached to
331-
* the mutable services or a change in the service names.
332-
*/
333-
private void updateMutableIndexIfNecessary() {
334-
Set<FileDescriptor> currentFileDescriptors = new HashSet<FileDescriptor>();
335-
Set<String> currentServiceNames = new HashSet<String>();
336-
synchronized (lock) {
337-
List<ServerServiceDefinition> currentMutableServices = server.getMutableServices();
338-
for (ServerServiceDefinition mutableService : currentMutableServices) {
339-
io.grpc.ServiceDescriptor serviceDescriptor = mutableService.getServiceDescriptor();
340-
if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) {
341-
String serviceName = serviceDescriptor.getName();
342-
FileDescriptor fileDescriptor =
343-
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
344-
.getFileDescriptor();
345-
currentFileDescriptors.add(fileDescriptor);
346-
checkState(!currentServiceNames.contains(serviceName),
347-
"Service already defined: %s", serviceName);
348-
currentServiceNames.add(serviceName);
349-
}
350-
}
351-
352-
// Replace the mutable index if the underlying services have changed. Check both the file
353-
// descriptors and the service names, because one file descriptor can define multiple
354-
// services.
355-
if (!mutableServicesIndex.getServiceFileDescriptors().equals(currentFileDescriptors)
356-
|| !mutableServicesIndex.getServiceNames().equals(currentServiceNames)) {
357-
mutableServicesIndex = new FileDescriptorIndex(currentMutableServices);
358-
}
359-
}
364+
private FileDescriptorIndex getMutableServicesIndex() {
365+
return mutableServicesIndex;
360366
}
361367

362368
private Set<String> getServiceNames() {
363-
Set<String> serviceNames = new HashSet<String>(immutableServicesIndex.getServiceNames());
364-
synchronized (lock) {
365-
serviceNames.addAll(mutableServicesIndex.getServiceNames());
366-
}
369+
Set<String> immutableServiceNames = immutableServicesIndex.getServiceNames();
370+
Set<String> mutableServiceNames = mutableServicesIndex.getServiceNames();
371+
Set<String> serviceNames =
372+
new HashSet<String>(immutableServiceNames.size() + mutableServiceNames.size());
373+
serviceNames.addAll(immutableServiceNames);
374+
serviceNames.addAll(mutableServiceNames);
367375
return serviceNames;
368376
}
369377

370378
@Nullable
371379
private FileDescriptor getFileDescriptorByName(String name) {
372380
FileDescriptor fd = immutableServicesIndex.getFileDescriptorByName(name);
373381
if (fd == null) {
374-
synchronized (lock) {
375-
fd = mutableServicesIndex.getFileDescriptorByName(name);
376-
}
382+
fd = mutableServicesIndex.getFileDescriptorByName(name);
377383
}
378384
return fd;
379385
}
@@ -382,21 +388,17 @@ private FileDescriptor getFileDescriptorByName(String name) {
382388
private FileDescriptor getFileDescriptorBySymbol(String symbol) {
383389
FileDescriptor fd = immutableServicesIndex.getFileDescriptorBySymbol(symbol);
384390
if (fd == null) {
385-
synchronized (lock) {
386-
fd = mutableServicesIndex.getFileDescriptorBySymbol(symbol);
387-
}
391+
fd = mutableServicesIndex.getFileDescriptorBySymbol(symbol);
388392
}
389393
return fd;
390394
}
391395

392396
@Nullable
393397
private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int extension) {
394-
FileDescriptor fd
395-
= immutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
398+
FileDescriptor fd =
399+
immutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
396400
if (fd == null) {
397-
synchronized (lock) {
398-
fd = mutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
399-
}
401+
fd = mutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
400402
}
401403
return fd;
402404
}
@@ -405,28 +407,26 @@ private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int ex
405407
private Set<Integer> getExtensionNumbersOfType(String type) {
406408
Set<Integer> extensionNumbers = immutableServicesIndex.getExtensionNumbersOfType(type);
407409
if (extensionNumbers == null) {
408-
synchronized (lock) {
409-
extensionNumbers = mutableServicesIndex.getExtensionNumbersOfType(type);
410-
}
410+
extensionNumbers = mutableServicesIndex.getExtensionNumbersOfType(type);
411411
}
412412
return extensionNumbers;
413413
}
414414
}
415415

416416
/**
417-
* Provides a set of methods for answering reflection queries for the file descriptors
418-
* underlying a set of services. Used by {@link ServerReflectionIndex} to separately index
419-
* immutable and mutable services.
417+
* Provides a set of methods for answering reflection queries for the file descriptors underlying
418+
* a set of services. Used by {@link ServerReflectionIndex} to separately index immutable and
419+
* mutable services.
420420
*/
421421
private static final class FileDescriptorIndex {
422422
private final Set<String> serviceNames = new HashSet<String>();
423423
private final Set<FileDescriptor> serviceFileDescriptors = new HashSet<FileDescriptor>();
424-
private final Map<String, FileDescriptor> fileDescriptorsByName
425-
= new HashMap<String, FileDescriptor>();
426-
private final Map<String, FileDescriptor> fileDescriptorsBySymbol
427-
= new HashMap<String, FileDescriptor>();
428-
private final Map<String, Map<Integer, FileDescriptor>> fileDescriptorsByExtensionAndNumber
429-
= new HashMap<String, Map<Integer, FileDescriptor>>();
424+
private final Map<String, FileDescriptor> fileDescriptorsByName =
425+
new HashMap<String, FileDescriptor>();
426+
private final Map<String, FileDescriptor> fileDescriptorsBySymbol =
427+
new HashMap<String, FileDescriptor>();
428+
private final Map<String, Map<Integer, FileDescriptor>> fileDescriptorsByExtensionAndNumber =
429+
new HashMap<String, Map<Integer, FileDescriptor>>();
430430

431431
FileDescriptorIndex(List<ServerServiceDefinition> services) {
432432
Queue<FileDescriptor> fileDescriptorsToProcess = new LinkedList<FileDescriptor>();
@@ -438,8 +438,8 @@ private static final class FileDescriptorIndex {
438438
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
439439
.getFileDescriptor();
440440
String serviceName = serviceDescriptor.getName();
441-
checkState(!serviceNames.contains(serviceName),
442-
"Service already defined: %s", serviceName);
441+
checkState(
442+
!serviceNames.contains(serviceName), "Service already defined: %s", serviceName);
443443
serviceFileDescriptors.add(fileDescriptor);
444444
serviceNames.add(serviceName);
445445
if (!seenFiles.contains(fileDescriptor.getName())) {
@@ -501,8 +501,7 @@ private Set<Integer> getExtensionNumbersOfType(String type) {
501501

502502
private void processFileDescriptor(FileDescriptor fd) {
503503
String fdName = fd.getName();
504-
checkState(!fileDescriptorsByName.containsKey(fdName),
505-
"File name already used: %s", fdName);
504+
checkState(!fileDescriptorsByName.containsKey(fdName), "File name already used: %s", fdName);
506505
fileDescriptorsByName.put(fdName, fd);
507506
for (ServiceDescriptor service : fd.getServices()) {
508507
processService(service, fd);
@@ -517,21 +516,25 @@ private void processFileDescriptor(FileDescriptor fd) {
517516

518517
private void processService(ServiceDescriptor service, FileDescriptor fd) {
519518
String serviceName = service.getFullName();
520-
checkState(!fileDescriptorsBySymbol.containsKey(serviceName),
521-
"Service already defined: %s", serviceName);
519+
checkState(
520+
!fileDescriptorsBySymbol.containsKey(serviceName),
521+
"Service already defined: %s",
522+
serviceName);
522523
fileDescriptorsBySymbol.put(serviceName, fd);
523524
for (MethodDescriptor method : service.getMethods()) {
524525
String methodName = method.getFullName();
525-
checkState(!fileDescriptorsBySymbol.containsKey(methodName),
526-
"Method already defined: %s", methodName);
526+
checkState(
527+
!fileDescriptorsBySymbol.containsKey(methodName),
528+
"Method already defined: %s",
529+
methodName);
527530
fileDescriptorsBySymbol.put(methodName, fd);
528531
}
529532
}
530533

531534
private void processType(Descriptor type, FileDescriptor fd) {
532535
String typeName = type.getFullName();
533-
checkState(!fileDescriptorsBySymbol.containsKey(typeName),
534-
"Type already defined: %s", typeName);
536+
checkState(
537+
!fileDescriptorsBySymbol.containsKey(typeName), "Type already defined: %s", typeName);
535538
fileDescriptorsBySymbol.put(typeName, fd);
536539
for (FieldDescriptor extension : type.getExtensions()) {
537540
processExtension(extension, fd);
@@ -550,7 +553,9 @@ private void processExtension(FieldDescriptor extension, FileDescriptor fd) {
550553
}
551554
checkState(
552555
!fileDescriptorsByExtensionAndNumber.get(extensionName).containsKey(extensionNumber),
553-
"Extension name and number already defined: %s, %s", extensionName, extensionNumber);
556+
"Extension name and number already defined: %s, %s",
557+
extensionName,
558+
extensionNumber);
554559
fileDescriptorsByExtensionAndNumber.get(extensionName).put(extensionNumber, fd);
555560
}
556561
}

0 commit comments

Comments
 (0)