Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
Expand All @@ -56,9 +60,10 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 10000;
private static final int MAX_PER_REQUEST_CHANGES = 1000;
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);

private final SubscriberStub asyncStub;
private final SubscriberStub stub;
private final String subscription;
private final ScheduledExecutorService systemExecutor;
private final MessageDispatcher messageDispatcher;
Expand All @@ -75,15 +80,15 @@ public StreamingSubscriberConnection(
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
SubscriberStub asyncStub,
SubscriberStub stub,
FlowController flowController,
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
this.subscription = subscription;
this.systemExecutor = systemExecutor;
this.asyncStub = asyncStub;
this.stub = stub;
this.messageDispatcher =
new MessageDispatcher(
receiver,
Expand Down Expand Up @@ -185,8 +190,7 @@ private void initialize() {
final ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> responseObserver =
new StreamingPullResponseObserver(errorFuture);
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>)
(asyncStub.streamingPull(responseObserver));
(ClientCallStreamObserver<StreamingPullRequest>) (stub.streamingPull(responseObserver));
logger.log(
Level.FINER,
"Initializing stream to subscription {0}",subscription);
Expand Down Expand Up @@ -260,24 +264,52 @@ public void run() {
}

private boolean isAlive() {
return state() == State.RUNNING || state() == State.STARTING;
State state = state(); // Read the state only once.
return state == State.RUNNING || state == State.STARTING;
}

@Override
public void sendAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
List<StreamingPullRequest> requests =
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES);
lock.lock();
try {
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
SubscriberStub timeoutStub =
stub.withDeadlineAfter(UNARY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
StreamObserver<Empty> loggingObserver = new StreamObserver<Empty>() {
@Override
public void onCompleted() {
// noop
}
} catch (Exception e) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send ack operations", e);
} finally {
lock.unlock();

@Override
public void onNext(Empty e) {
// noop
}

@Override
public void onError(Throwable t) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};

for (PendingModifyAckDeadline modack : ackDeadlineExtensions) {
for (List<String> idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) {
timeoutStub.modifyAckDeadline(
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscription)
.addAllAckIds(idChunk)
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
.build(),
loggingObserver);
}
}

for (List<String> idChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) {
timeoutStub.acknowledge(
AcknowledgeRequest.newBuilder()
.setSubscription(subscription)
.addAllAckIds(idChunk)
.build(),
loggingObserver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void failed(Subscriber.State from, Throwable failure) {
}

private MessageAndConsumer pollQueue(BlockingQueue<Object> queue) throws InterruptedException {
Object obj = queue.poll(1, TimeUnit.MINUTES);
Object obj = queue.poll(10, TimeUnit.MINUTES);
if (obj == null) {
return null;
}
Expand Down