Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
******************************************************************************/
package com.netflix.astyanax.recipes.storage;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.retry.RunOnce;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
Expand All @@ -27,16 +36,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.retry.RunOnce;

public class ObjectReader implements Callable<ObjectMetadata> {
private static final Logger LOG = LoggerFactory.getLogger(ObjectReader.class);

Expand All @@ -53,6 +52,7 @@ public class ObjectReader implements Callable<ObjectMetadata> {
private int batchSize = DEFAULT_BATCH_SIZE;
private RetryPolicy retryPolicy;
private ObjectReadCallback callback = new NoOpObjectReadCallback();
private ExecutorService executor = null;

public ObjectReader(ChunkedStorageProvider provider, String objectName, OutputStream os) {
this.provider = provider;
Expand Down Expand Up @@ -86,6 +86,18 @@ public ObjectReader withCallback(ObjectReadCallback callback) {
return this;
}

/**
* When providing an external executor service, users are expected to manage their executor service.
* This class does not shutdown executor.
*
* @param executorService
* @return ObjectReader
*/
public ObjectReader withExecutorService(ExecutorService executorService) {
this.executor = executorService;
return this;
}

@Override
public ObjectMetadata call() throws Exception {
LOG.info("Reading: " + objectName);
Expand Down Expand Up @@ -134,10 +146,14 @@ public ObjectMetadata call() throws Exception {
Collections.shuffle(idsToRead);
final AtomicReferenceArray<ByteBuffer> chunks = new AtomicReferenceArray<ByteBuffer>(
idsToRead.size());
ExecutorService executor = Executors.newFixedThreadPool(
concurrencyLevel,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ChunkReader-" + objectName + "-%d").build());
boolean isExecutorServiceProvided = true;
if (executor == null) {
executor = Executors.newFixedThreadPool(
concurrencyLevel,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ChunkReader-" + objectName + "-%d").build());
isExecutorServiceProvided = false;
}
try {
for (final int chunkId : idsToRead) {
executor.submit(new Runnable() {
Expand Down Expand Up @@ -165,9 +181,15 @@ public void run() {
}
}
finally {
executor.shutdown();
if (!executor.awaitTermination(maxWaitTimeInSeconds, TimeUnit.SECONDS)) {
throw new Exception("Took too long to fetch object: " + objectName);
if (!isExecutorServiceProvided) {
executor.shutdown();
try {
if (!executor.awaitTermination(maxWaitTimeInSeconds, TimeUnit.SECONDS))
throw new Exception("Took too long to fetch object: " + objectName);
} finally {
if (!executor.isTerminated())
executor.shutdownNow();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this call merits some diagnostics - logging what it returns and whether or not the tasks have actually terminated. Here is what shutdownNow() documentation says:

"This method does not wait for actively executing tasks to terminate. Use awaitTermination to do that.

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate."

I know I was the one who suggested it, but it seems like that even with this change (which I believe should help a lot), there is still potential for getting into thread count explosion. I think it would be good to at least wait for a few seconds here and log whether or not threads have terminated.

(Completely agree with others on inappropriateness of shutting down a shared executor - it could be used in parallel by multiple ObjectReaders with clearly unexpected results.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Logging is a good idea.

The only reliable way to kill executor threads still not terminated after shutdownNow() is to kill the JVM itself; in such case we'd need to further investigate whether this recipe is the root cause. If the block is due to an optionally-specified callback, the caller would need to investigate their callback logic.

}
}
}

Expand Down