-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add support to use pooled byte buffer for thrift serde #25803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
return connectorCodecManager.getConnectorSplitCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); | ||
Optional<ConnectorCodec<ConnectorSplit>> codec = connectorCodecManager.getConnectorSplitCodec(connectorId); | ||
if (!codec.isPresent()) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it throw instead?
ByteBufferPool byteBufferPool = byteBufferPoolManager.getPool(); | ||
List<ByteBuffer> byteBuffers = reader.readBinaryToBuffers(byteBufferPool); | ||
if (byteBuffers.isEmpty()) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally try to avoid returning nulls. The semantic of null is not always clear. For example here it is unclear whether it is a failure, or the object is legitimately null.
Is it appropriate to throw in here?
} | ||
finally { | ||
for (ByteBuffer byteBuffer : byteBuffers) { | ||
byteBufferPoolManager.getPool().release(byteBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the release be done in the same code that does acquire? (codec.serialize
?)
public static <T> void serializeConcreteValue(T value, ThriftCodec<T> codec, ByteBufferPool pool, Consumer<List<ByteBuffer>> consumer) | ||
throws Exception | ||
{ | ||
List<ByteBuffer> byteBuffers = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Prefer ImmutableList. Consider creating it in ByteBufferOutputTransport
(e.g.: when calling getBytes()
)
|
||
public class ByteBufferPoolManager | ||
{ | ||
private final ConcurrentHashMap<Thread, ByteBufferPool> threadPools = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered a single ByteBufferPool
for all threads?
Having it done this way is dangerous. You never know what thread this code will be run on. Different frameworks manage threads differently.
In a worst case consider getPool
called by a framework that runs a certain task every time on a new thread?
} | ||
|
||
@Min(1024) | ||
public int getByteBufferPoolBufferSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call these methods as getThriftByteBufferPool...
Otherwise it is not very clear what byte buffer pool those are used for
Description
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.