Skip to content

Commit 345d702

Browse files
committed
Rename ExecutorProvider to ExecutorFactory, refactor shutodown and serialization
1 parent 76d11c0 commit 345d702

File tree

3 files changed

+90
-34
lines changed

3 files changed

+90
-34
lines changed

gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616

1717
package com.google.cloud;
1818

19+
import static com.google.common.base.MoreObjects.firstNonNull;
20+
1921
import com.google.cloud.spi.ServiceRpcFactory;
22+
import com.google.common.annotations.VisibleForTesting;
2023
import com.google.common.base.Preconditions;
2124

2225
import io.grpc.internal.SharedResourceHolder;
2326
import io.grpc.internal.SharedResourceHolder.Resource;
2427

28+
import java.io.IOException;
29+
import java.io.ObjectInputStream;
2530
import java.util.Objects;
2631
import java.util.concurrent.ScheduledExecutorService;
2732
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -40,10 +45,13 @@ public abstract class GrpcServiceOptions<ServiceT extends Service<OptionsT>, Ser
4045
extends ServiceOptions<ServiceT, ServiceRpcT, OptionsT> {
4146

4247
private static final long serialVersionUID = 6415982522610509549L;
48+
private final String executorFactoryClassName;
4349
private final int initialTimeout;
4450
private final double timeoutMultiplier;
4551
private final int maxTimeout;
4652

53+
private transient ExecutorFactory executorFactory;
54+
4755
/**
4856
* Shared thread pool executor.
4957
*/
@@ -65,45 +73,41 @@ public void close(ScheduledExecutorService instance) {
6573
};
6674

6775
/**
68-
* An interface that provides access to a scheduled executor service.
76+
* An interface for {@link ScheduledExecutorService} factories. Implementations of this interface
77+
* can be used to provide an user-defined scheduled executor to execute requests. Any
78+
* implementation of this interface must override the {@code get()} method to return the desired
79+
* executor. The {@code release(executor)} method should be overriden to free resources used by
80+
* the executor (if needed) according to application's logic.
81+
*
82+
* <p>Implementation must provide a public no-arg constructor. Loading of a factory implementation
83+
* is done via {@link java.util.ServiceLoader}.
6984
*/
70-
public interface ExecutorProvider {
85+
public interface ExecutorFactory {
7186

7287
/**
73-
* Returns the scheduled executor service.
88+
* Gets a scheduled executor service instance.
7489
*/
7590
ScheduledExecutorService get();
7691

7792
/**
78-
* Shuts down the scheduled executor service if it is no longer used.
93+
* Releases resources used by the executor and possibly shuts it down.
7994
*/
80-
void shutdown();
95+
void release(ScheduledExecutorService executor);
8196
}
8297

83-
/**
84-
* An interface that provides access to a scheduled executor service.
85-
*/
86-
private static class DefaultExecutorProvider implements ExecutorProvider {
87-
88-
private ScheduledExecutorService service;
89-
private boolean shutdown = false;
98+
@VisibleForTesting
99+
static class DefaultExecutorFactory implements ExecutorFactory {
90100

91-
private DefaultExecutorProvider() {}
101+
private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory();
92102

93103
@Override
94-
public synchronized ScheduledExecutorService get() {
95-
if (service == null && !shutdown) {
96-
service = SharedResourceHolder.get(EXECUTOR);
97-
}
98-
return service;
104+
public ScheduledExecutorService get() {
105+
return SharedResourceHolder.get(EXECUTOR);
99106
}
100107

101108
@Override
102-
public synchronized void shutdown() {
103-
if (service != null && !shutdown) {
104-
shutdown = true;
105-
service = SharedResourceHolder.release(EXECUTOR, service);
106-
}
109+
public synchronized void release(ScheduledExecutorService executor) {
110+
SharedResourceHolder.release(EXECUTOR, executor);
107111
}
108112
}
109113

@@ -120,6 +124,7 @@ protected abstract static class Builder<ServiceT extends Service<OptionsT>, Serv
120124
B extends Builder<ServiceT, ServiceRpcT, OptionsT, B>>
121125
extends ServiceOptions.Builder<ServiceT, ServiceRpcT, OptionsT, B> {
122126

127+
private ExecutorFactory executorFactory;
123128
private int initialTimeout = 20_000;
124129
private double timeoutMultiplier = 1.5;
125130
private int maxTimeout = 100_000;
@@ -128,6 +133,7 @@ protected Builder() {}
128133

129134
protected Builder(GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> options) {
130135
super(options);
136+
executorFactory = options.executorFactory;
131137
initialTimeout = options.initialTimeout;
132138
timeoutMultiplier = options.timeoutMultiplier;
133139
maxTimeout = options.maxTimeout;
@@ -136,6 +142,17 @@ protected Builder(GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> options) {
136142
@Override
137143
protected abstract GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> build();
138144

145+
/**
146+
* Sets the scheduled executor factory. This method can be used to provide an user-defined
147+
* scheduled executor to execute requests.
148+
*
149+
* @return the builder
150+
*/
151+
public B executorFactory(ExecutorFactory executorFactory) {
152+
this.executorFactory = executorFactory;
153+
return self();
154+
}
155+
139156
/**
140157
* Sets the timeout for the initial RPC, in milliseconds. Subsequent calls will use this value
141158
* adjusted according to {@link #timeoutMultiplier(double)}. Default value is 20000.
@@ -180,6 +197,9 @@ protected GrpcServiceOptions(
180197
Class<? extends ServiceRpcFactory<ServiceRpcT, OptionsT>> rpcFactoryClass, Builder<ServiceT,
181198
ServiceRpcT, OptionsT, ?> builder) {
182199
super(serviceFactoryClass, rpcFactoryClass, builder);
200+
executorFactory = firstNonNull(builder.executorFactory,
201+
getFromServiceLoader(ExecutorFactory.class, DefaultExecutorFactory.INSTANCE));
202+
executorFactoryClassName = executorFactory.getClass().getName();
183203
initialTimeout = builder.initialTimeout;
184204
timeoutMultiplier = builder.timeoutMultiplier;
185205
maxTimeout = builder.maxTimeout <= initialTimeout ? initialTimeout : builder.maxTimeout;
@@ -188,8 +208,8 @@ protected GrpcServiceOptions(
188208
/**
189209
* Returns a scheduled executor service provider.
190210
*/
191-
protected ExecutorProvider executorProvider() {
192-
return new DefaultExecutorProvider();
211+
protected ExecutorFactory executorFactory() {
212+
return executorFactory;
193213
}
194214

195215
/**
@@ -217,13 +237,20 @@ public int maxTimeout() {
217237

218238
@Override
219239
protected int baseHashCode() {
220-
return Objects.hash(super.baseHashCode(), initialTimeout, timeoutMultiplier, maxTimeout);
240+
return Objects.hash(super.baseHashCode(), executorFactoryClassName, initialTimeout,
241+
timeoutMultiplier, maxTimeout);
221242
}
222243

223244
protected boolean baseEquals(GrpcServiceOptions<?, ?, ?> other) {
224245
return super.baseEquals(other)
246+
&& Objects.equals(executorFactoryClassName, other.executorFactoryClassName)
225247
&& Objects.equals(initialTimeout, other.initialTimeout)
226248
&& Objects.equals(timeoutMultiplier, other.timeoutMultiplier)
227249
&& Objects.equals(maxTimeout, other.maxTimeout);
228250
}
251+
252+
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
253+
input.defaultReadObject();
254+
executorFactory = newInstance(executorFactoryClassName);
255+
}
229256
}

gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,30 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotEquals;
21+
import static org.junit.Assert.assertSame;
22+
import static org.junit.Assert.assertTrue;
2123
import static org.junit.Assert.fail;
2224

25+
import com.google.cloud.GrpcServiceOptions.DefaultExecutorFactory;
26+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2327
import com.google.cloud.spi.ServiceRpcFactory;
2428

29+
import org.easymock.EasyMock;
2530
import org.junit.Test;
2631

2732
import java.util.Set;
33+
import java.util.concurrent.ScheduledExecutorService;
2834

2935
public class GrpcServiceOptionsTest {
3036

37+
private static final ExecutorFactory MOCK_EXECUTOR_FACTORY =
38+
EasyMock.createMock(ExecutorFactory.class);
3139
private static final TestGrpcServiceOptions OPTIONS = TestGrpcServiceOptions.builder()
3240
.projectId("project-id")
3341
.initialTimeout(1234)
3442
.timeoutMultiplier(1.6)
3543
.maxTimeout(5678)
44+
.executorFactory(MOCK_EXECUTOR_FACTORY)
3645
.build();
3746
private static final TestGrpcServiceOptions DEFAULT_OPTIONS =
3847
TestGrpcServiceOptions.builder().projectId("project-id").build();
@@ -138,9 +147,11 @@ public void testBuilder() {
138147
assertEquals(1234, OPTIONS.initialTimeout());
139148
assertEquals(1.6, OPTIONS.timeoutMultiplier(), 0.0);
140149
assertEquals(5678, OPTIONS.maxTimeout());
150+
assertSame(MOCK_EXECUTOR_FACTORY, OPTIONS.executorFactory());
141151
assertEquals(20000, DEFAULT_OPTIONS.initialTimeout());
142152
assertEquals(1.5, DEFAULT_OPTIONS.timeoutMultiplier(), 0.0);
143153
assertEquals(100000, DEFAULT_OPTIONS.maxTimeout());
154+
assertTrue(DEFAULT_OPTIONS.executorFactory() instanceof DefaultExecutorFactory);
144155
}
145156

146157
@Test
@@ -182,11 +193,26 @@ public void testBuilderInvalidMaxTimeout() {
182193
public void testBaseEquals() {
183194
assertEquals(OPTIONS, OPTIONS_COPY);
184195
assertNotEquals(DEFAULT_OPTIONS, OPTIONS);
196+
TestGrpcServiceOptions options = OPTIONS.toBuilder()
197+
.executorFactory(new DefaultExecutorFactory())
198+
.build();
199+
assertNotEquals(OPTIONS, options);
185200
}
186201

187202
@Test
188203
public void testBaseHashCode() {
189204
assertEquals(OPTIONS.hashCode(), OPTIONS_COPY.hashCode());
190205
assertNotEquals(DEFAULT_OPTIONS.hashCode(), OPTIONS.hashCode());
206+
TestGrpcServiceOptions options = OPTIONS.toBuilder()
207+
.executorFactory(new DefaultExecutorFactory())
208+
.build();
209+
assertNotEquals(OPTIONS.hashCode(), options.hashCode());
210+
}
211+
212+
@Test
213+
public void testDefaultExecutorFactory() {
214+
ExecutorFactory executorFactory = new DefaultExecutorFactory();
215+
ScheduledExecutorService executorService = executorFactory.get();
216+
assertSame(executorService, executorFactory.get());
191217
}
192218
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.google.api.gax.grpc.ApiException;
2222
import com.google.auth.oauth2.GoogleCredentials;
2323
import com.google.cloud.AuthCredentials;
24-
import com.google.cloud.GrpcServiceOptions.ExecutorProvider;
24+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2525
import com.google.cloud.RetryParams;
2626
import com.google.cloud.pubsub.PubSubException;
2727
import com.google.cloud.pubsub.PubSubOptions;
@@ -64,12 +64,14 @@
6464
import java.io.IOException;
6565
import java.util.Set;
6666
import java.util.concurrent.Future;
67+
import java.util.concurrent.ScheduledExecutorService;
6768

6869
public class DefaultPubSubRpc implements PubSubRpc {
6970

7071
private final PublisherApi publisherApi;
7172
private final SubscriberApi subscriberApi;
72-
private final ExecutorProvider executorProvider;
73+
private final ScheduledExecutorService executor;
74+
private final ExecutorFactory executorFactory;
7375

7476
private static final class InternalPubSubOptions extends PubSubOptions {
7577

@@ -80,18 +82,19 @@ private InternalPubSubOptions(PubSubOptions options) {
8082
}
8183

8284
@Override
83-
protected ExecutorProvider executorProvider() {
84-
return super.executorProvider();
85+
protected ExecutorFactory executorFactory() {
86+
return super.executorFactory();
8587
}
8688
}
8789

8890
public DefaultPubSubRpc(PubSubOptions options) throws IOException {
89-
executorProvider = new InternalPubSubOptions(options).executorProvider();
91+
executorFactory = new InternalPubSubOptions(options).executorFactory();
92+
executor = executorFactory.get();
9093
try {
9194
PublisherSettings.Builder pubBuilder =
92-
PublisherSettings.defaultBuilder().provideExecutorWith(executorProvider.get(), false);
95+
PublisherSettings.defaultBuilder().provideExecutorWith(executor, false);
9396
SubscriberSettings.Builder subBuilder =
94-
SubscriberSettings.defaultBuilder().provideExecutorWith(executorProvider.get(), false);
97+
SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false);
9598
// todo(mziccard): PublisherSettings should support null/absent credentials for testing
9699
if (options.host().contains("localhost")
97100
|| options.authCredentials().equals(AuthCredentials.noAuth())) {
@@ -233,6 +236,6 @@ public Future<Empty> modify(ModifyPushConfigRequest request) {
233236
public void close() throws Exception {
234237
subscriberApi.close();
235238
publisherApi.close();
236-
executorProvider.shutdown();
239+
executorFactory.release(executor);
237240
}
238241
}

0 commit comments

Comments
 (0)