Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ObjectReader implements Callable<ObjectMetadata> {
private int batchSize = DEFAULT_BATCH_SIZE;
private RetryPolicy retryPolicy;
private ObjectReadCallback callback = new NoOpObjectReadCallback();
private ExecutorService executor = null;

public ObjectReader(ChunkedStorageProvider provider, String objectName, OutputStream os) {
this.provider = provider;
Expand Down Expand Up @@ -86,6 +87,11 @@ public ObjectReader withCallback(ObjectReadCallback callback) {
return this;
}

public ObjectReader withExecutorService(ExecutorService executerService) {
this.executor = executerService;
return this;
}

@Override
public ObjectMetadata call() throws Exception {
LOG.info("Reading: " + objectName);
Expand Down Expand Up @@ -134,10 +140,12 @@ public ObjectMetadata call() throws Exception {
Collections.shuffle(idsToRead);
final AtomicReferenceArray<ByteBuffer> chunks = new AtomicReferenceArray<ByteBuffer>(
idsToRead.size());
ExecutorService executor = Executors.newFixedThreadPool(
concurrencyLevel,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ChunkReader-" + objectName + "-%d").build());
if (executor == null) {
executor = Executors.newFixedThreadPool(
concurrencyLevel,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ChunkReader-" + objectName + "-%d").build());
}
try {
for (final int chunkId : idsToRead) {
executor.submit(new Runnable() {
Expand Down Expand Up @@ -166,9 +174,15 @@ public void run() {
}
finally {
executor.shutdown();
if (!executor.awaitTermination(maxWaitTimeInSeconds, TimeUnit.SECONDS)) {
throw new Exception("Took too long to fetch object: " + objectName);
while (!executor.isTerminated()) {
try {
if (!executor.awaitTermination(maxWaitTimeInSeconds, TimeUnit.SECONDS))
throw new Exception("Took too long to fetch object: " + objectName);
} catch (InterruptedException e) {

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the loop, you should probably just invoke executor.shutdownNow() if awaitTermination doesn't return or is itself interrupted, e.g. someone called shutdownNow() on ObjectReader's thread. Something like this:

                finally {
                    executor.shutdown();
                    try {
                        if (!executor.awaitTermination(maxWaitTimeInSeconds, TimeUnit.SECONDS)) {
                            throw new Exception("Took too long to fetch object: " + objectName);
                        }
                    } finally{
                        if(!executor.isTerminated()) {
                            executor.shutdownNow();
                        }
                    }
                }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, swallowing the InterruptedException here seems inappropriate. But I'm also wondering if it's appropriate for the ObjectReader to shutdown itself the executor when it has been provided through withExecutorService(). Shuting it down was making sense when the ObjectReader was creating it itself. But it feels to me that some developer would expect this method to be a way to provide a long running, shared and reused ExecutorService and therefore expect the ObjectReader class to not shut it down.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @mfiguiere that generally any logic which creates an executor should be responsible for its life-cycle.

@kineshsatiya, is the new feature to set an external executor absolutely required for the shutdown bug fix?

We need to restrict master to only critical bug fixes. The ability to pass an external executor seems more like a (likely) small performance optimization given that ObjectReader's use case is transferring large objects.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @jeremycnf and @mfiguiere for quick reviews.

The original intent for users to be able to pass an external executor to ObjectReader is performance related ( to avoid having zombie threads). Calling shutdown with external executor service would have defeated that purpose. I've made the changes that does not shutdown the external service.

}

}

if (exception.get() != null)
Expand Down