Skip to content

Commit 6907d81

Browse files
authored
core: Make getTransport's fast path lock-free
This change exposed a pre-existing bug where shutdownNow wasn't called for decommissionedTransports. The bug is fixed and a test added in this commit. Fixes #2120
1 parent 3d4ae36 commit 6907d81

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@
6363
import java.net.URISyntaxException;
6464
import java.util.ArrayList;
6565
import java.util.Collection;
66-
import java.util.HashMap;
6766
import java.util.HashSet;
6867
import java.util.List;
69-
import java.util.Map;
68+
import java.util.concurrent.ConcurrentHashMap;
69+
import java.util.concurrent.ConcurrentMap;
7070
import java.util.concurrent.Executor;
7171
import java.util.concurrent.ExecutorService;
7272
import java.util.concurrent.ScheduledExecutorService;
@@ -139,14 +139,16 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
139139
private LoadBalancer<ClientTransport> loadBalancer;
140140

141141
/**
142-
* Maps EquivalentAddressGroups to transports for that server.
142+
* Maps EquivalentAddressGroups to transports for that server. "lock" must be held when mutating.
143143
*/
144-
@GuardedBy("lock")
145-
private final Map<EquivalentAddressGroup, TransportSet> transports =
146-
new HashMap<EquivalentAddressGroup, TransportSet>();
144+
// Even though we set a concurrency level of 1, this is better than Collections.synchronizedMap
145+
// because it doesn't need to acquire a lock for reads.
146+
private final ConcurrentMap<EquivalentAddressGroup, TransportSet> transports =
147+
new ConcurrentHashMap<EquivalentAddressGroup, TransportSet>(16, .75f, 1);
147148

148149
/**
149-
* TransportSets that are shutdown (but not yet terminated) due to channel idleness.
150+
* TransportSets that are shutdown (but not yet terminated) due to channel idleness or channel
151+
* shut down.
150152
*/
151153
@GuardedBy("lock")
152154
private final HashSet<TransportSet> decommissionedTransports = new HashSet<TransportSet>();
@@ -425,6 +427,8 @@ public ManagedChannelImpl shutdown() {
425427
maybeTerminateChannel();
426428
if (!terminated) {
427429
transportsCopy.addAll(transports.values());
430+
transports.clear();
431+
decommissionedTransports.addAll(transportsCopy);
428432
delayedTransportsCopy.addAll(delayedTransports);
429433
oobTransportsCopy.addAll(oobTransports);
430434
}
@@ -472,6 +476,7 @@ public ManagedChannelImpl shutdownNow() {
472476
List<OobTransportProviderImpl> oobTransportsCopy;
473477
synchronized (lock) {
474478
transportsCopy = new ArrayList<TransportSet>(transports.values());
479+
transportsCopy.addAll(decommissionedTransports);
475480
delayedTransportsCopy = new ArrayList<DelayedClientTransport>(delayedTransports);
476481
oobTransportsCopy = new ArrayList<OobTransportProviderImpl>(oobTransports);
477482
}
@@ -589,7 +594,10 @@ public void updateRetainedTransports(Collection<EquivalentAddressGroup> addrs) {
589594
@Override
590595
public ClientTransport getTransport(final EquivalentAddressGroup addressGroup) {
591596
checkNotNull(addressGroup, "addressGroup");
592-
TransportSet ts;
597+
TransportSet ts = transports.get(addressGroup);
598+
if (ts != null) {
599+
return ts.obtainActiveTransport();
600+
}
593601
synchronized (lock) {
594602
if (shutdown) {
595603
return SHUTDOWN_TRANSPORT;

core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.grpc.NameResolver;
6161
import io.grpc.ResolvedServerInfo;
6262
import io.grpc.ResolvedServerInfoGroup;
63+
import io.grpc.Status;
6364
import io.grpc.StringMarshaller;
6465
import io.grpc.TransportManager.InterimTransport;
6566
import io.grpc.TransportManager.OobTransportProvider;
@@ -303,6 +304,10 @@ public void idlenessDecommissionsTransports() throws Exception {
303304

304305
channel.shutdown();
305306
verify(t1.transport).shutdown();
307+
channel.shutdownNow();
308+
verify(t0.transport).shutdownNow(any(Status.class));
309+
verify(t1.transport).shutdownNow(any(Status.class));
310+
306311
t1.listener.transportTerminated();
307312
assertFalse(channel.isTerminated());
308313
t0.listener.transportTerminated();

0 commit comments

Comments
 (0)