Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2012 ${project.organization.name}
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.drift.buffer;

import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ByteBufferPool
{
private static final int DEFAULT_BUFFER_SIZE = 4096; // 4KB
private static final int DEFAULT_BUFFER_COUNT = 1024; // 4MB max pool size
private final ArrayBlockingQueue<ByteBuffer> pool;
private final int bufferSize;

private static final AtomicInteger idGenerator = new AtomicInteger();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused


public ByteBufferPool()
{
this(DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_COUNT);
}

public ByteBufferPool(int bufferSize, int maxCount)
{
this.bufferSize = bufferSize;
pool = new ArrayBlockingQueue<>(maxCount);

// Pre-populate the pool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is probably okay to populate lazily

for (int i = 0; i < maxCount; i++) {
pool.offer(ByteBuffer.allocate(bufferSize));
}
}

public ByteBuffer acquire()
{
ByteBuffer buffer = pool.poll();
if (buffer == null) {
buffer = ByteBuffer.allocate(bufferSize);
}
buffer.clear();
return buffer;
}

public void release(ByteBuffer byteBuffer)
{
byteBuffer.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe check if the buffer is of expected size

boolean ignored = pool.offer(byteBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
package com.facebook.drift.protocol;

import com.facebook.drift.TException;
import com.facebook.drift.buffer.ByteBufferPool;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static com.facebook.drift.protocol.TProtocolUtil.readAllInBatches;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -36,7 +40,6 @@ public class TBinaryProtocol
implements TProtocol
{
private static final TStruct ANONYMOUS_STRUCT = new TStruct("");

protected static final int VERSION_MASK = 0xffff0000;
protected static final int VERSION_1 = 0x80010000;

Expand Down Expand Up @@ -210,6 +213,21 @@ public void writeBinary(ByteBuffer value)
transport.write(value.array(), value.position() + value.arrayOffset(), length);
}

@Override
public void writeBinaryFromBuffers(List<ByteBuffer> byteBuffers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe simply writeBinary

throws TException
{
int size = 0;
for (ByteBuffer byteBuffer : byteBuffers) {
size += byteBuffer.remaining();
}

writeI32(size);
for (ByteBuffer byteBuffer : byteBuffers) {
transport.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining());
}
}

/**
* Reading methods.
*/
Expand Down Expand Up @@ -404,6 +422,34 @@ public int readBinary(byte[] buf, int offset)
return readAllInBatches(transport, buf, offset, size);
}

@Override
public List<ByteBuffer> readBinaryToBuffers(ByteBufferPool pool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe simply readBinary

throws TException
{
int size = checkSize(readI32());
if (size == 0) {
return Collections.emptyList();
}

List<ByteBuffer> byteBuffers = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer ImutableList (e.g.: ImmutableList.builder())


int remaining = size;

while (remaining > 0) {
ByteBuffer byteBuffer = pool.acquire();
byteBuffers.add(byteBuffer);
int bytesToRead = Math.min(remaining, byteBuffer.remaining());

transport.read(byteBuffer.array(), byteBuffer.arrayOffset(), bytesToRead);
byteBuffer.position(bytesToRead);
byteBuffer.limit(bytesToRead);
byteBuffer.flip();

remaining -= bytesToRead;
}
return byteBuffers;
}

private static int checkSize(int length)
throws TProtocolException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package com.facebook.drift.protocol;

import com.facebook.drift.TException;
import com.facebook.drift.buffer.ByteBufferPool;

import java.nio.ByteBuffer;
import java.util.List;

public interface TProtocolReader
{
Expand Down Expand Up @@ -86,4 +88,10 @@ ByteBuffer readBinary()

int readBinary(byte[] buf, int offset)
throws TException;

default List<ByteBuffer> readBinaryToBuffers(ByteBufferPool pool)
throws TException
{
throw new UnsupportedOperationException("readBinaryToBuffers is not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend making this method mandatory and implementing it for Binary and FBBinary protocol as well. The implementation should be pratcially idential except how the length is read.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.drift.TException;

import java.nio.ByteBuffer;
import java.util.List;

public interface TProtocolWriter
{
Expand Down Expand Up @@ -86,4 +87,10 @@ void writeString(String value)

void writeBinary(ByteBuffer value)
throws TException;

default void writeBinaryFromBuffers(List<ByteBuffer> byteBuffers)
throws TException
{
throw new UnsupportedOperationException("writeBinaryFromBuffers is not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (C) 2012 ${project.organization.name}
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.drift.protocol.bytebuffer;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;

public class ByteBufferInputStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cover it with a unit test well. Or consider ByteSource from Guava (e.g.: ByteSource#concat)

extends InputStream
{
private final List<ByteBuffer> buffers;
private int currentBufferIndex;
private ByteBuffer currentBuffer;

public ByteBufferInputStream(List<ByteBuffer> byteBuffers)
{
this.buffers = byteBuffers;
this.currentBufferIndex = 0;
this.currentBuffer = buffers.isEmpty() ? null : buffers.get(currentBufferIndex);
}

@Override
public int read()
throws IOException
{
if (currentBuffer == null) {
return -1;
}
if (!currentBuffer.hasRemaining()) {
advanceBuffer();
if (currentBuffer == null) {
return -1;
}
}

return currentBuffer.get() & 0xFF;
}

@Override
public int read(byte[] b, int off, int len)
throws IOException
{
if (currentBuffer == null) {
return -1;
}

int totalBytesRead = 0;
int bytesRemaining = len;
int destOffset = off;

while (bytesRemaining > 0) {
if (!currentBuffer.hasRemaining()) {
advanceBuffer();
if (currentBuffer == null) {
return totalBytesRead > 0 ? totalBytesRead : -2;
}
}

int bytesToRead = Math.min(bytesRemaining, currentBuffer.remaining());

currentBuffer.get(b, destOffset, bytesToRead);

totalBytesRead += bytesToRead;
bytesRemaining -= bytesToRead;
destOffset += bytesToRead;
}
return totalBytesRead > 0 ? totalBytesRead : -3;
}

@Override
public int available()
throws IOException
{
if (currentBuffer == null) {
return 0;
}
int available = currentBuffer.remaining();
for (int i = currentBufferIndex + 1; i < buffers.size(); i++) {
available += buffers.get(i).remaining();
}
return available;
}

private void advanceBuffer()
{
currentBufferIndex++;
if (currentBufferIndex >= buffers.size()) {
currentBuffer = null;
}
else {
currentBuffer = buffers.get(currentBufferIndex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2012 ${project.organization.name}
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.drift.protocol.bytebuffer;

import com.facebook.drift.protocol.TTransport;
import com.facebook.drift.protocol.TTransportException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import static java.lang.String.format;

public class ByteBufferInputTransport
implements TTransport
{
private final ByteBufferInputStream inputStream;

public ByteBufferInputTransport(List<ByteBuffer> byteBuffers)
{
inputStream = new ByteBufferInputStream(byteBuffers);
}

@Override
public void read(byte[] buf, int off, int len)
throws TTransportException
{
try {
int bytesRead = inputStream.read(buf, off, len);

if (bytesRead < 0) {
if (len > 0) {
throw new TTransportException("End of stream reached when trying to read " + len + " bytes, but gets " + bytesRead + " bytes");
}
return;
}
if (bytesRead < len) {
throw new TTransportException(format("Not enough bytes to read, read %s bytes but need %s bytes", bytesRead, len));
}
}
catch (IOException e) {
throw new TTransportException("Failed to read", e);
}
}

@Override
public void write(byte[] buf, int off, int len)
throws TTransportException
{
throw new TTransportException("This is a read-only transport");
}
}
Loading