-
Notifications
You must be signed in to change notification settings - Fork 68
Queue as different kafka produce request if producerId or producerEpoch varies #1053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| long producerId, | ||
| short producerEpoch, | ||
| int sequence) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move parameters producerId, producerEpoch and sequence between parameters timestamp and ackMode for consistent ordering with
struct KafkaProduceDataEx
{
int32 deferred = 0;
int64 timestamp = 0;
int64 producerId = -1;
int16 producerEpoch = -1;
int32 sequence = -1;
uint32 crc32c = 0;
KafkaAckMode ackMode = IN_SYNC_REPLICAS;
KafkaKey key;
KafkaHeader[] headers;
}
| if (latestRecordBatch != null && | ||
| latestRecordBatch.producerId() == producerId && | ||
| latestRecordBatch.producerEpoch() == producerEpoch && | ||
| latestRecordBatch.baseSequence() <= sequence) | ||
| { | ||
| oldRecordBatchLength = latestRecordBatch.length(); | ||
| producerId = latestRecordBatch.producerId(); | ||
| producerEpoch = latestRecordBatch.producerEpoch(); | ||
| sequence = latestRecordBatch.baseSequence(); | ||
| recordCount = latestRecordBatch.recordCount() + 1; | ||
| attributes = latestRecordBatch.attributes(); | ||
| encodeableRecordBatchTimestamp = latestRecordBatch.firstTimestamp(); | ||
| encodeableRecordBatchTimestampMax = latestRecordBatch.maxTimestamp(); | ||
| encodeSlotLimit = encodeableRecordBatchSlotOffset; | ||
| encodeableRecordBatchBytes -= latestRecordBatch.sizeof(); | ||
| } | ||
|
|
||
| sequence = producerId == RECORD_BATCH_PRODUCER_ID_NONE ? RECORD_BATCH_BASE_SEQUENCE_NONE : sequence; | ||
|
|
||
| final int oldEncodeSlotLimit = encodeSlotLimit; | ||
|
|
||
| final RecordBatchFW recordBatch = recordBatchRW.wrap(encodeSlotBuffer, encodeSlotLimit, maxLimit) | ||
| .baseOffset(0) | ||
| .length(oldRecordBatchLength) | ||
| .leaderEpoch(-1) | ||
| .magic(RECORD_BATCH_MAGIC) | ||
| .crc(0) | ||
| .attributes(attributes) | ||
| .lastOffsetDelta(recordCount - 1) | ||
| .firstTimestamp(encodeableRecordBatchTimestamp) | ||
| .maxTimestamp(encodeableRecordBatchTimestampMax) | ||
| .producerId(producerId) | ||
| .producerEpoch(producerEpoch) | ||
| .baseSequence(sequence) | ||
| .recordCount(recordCount) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the if condition above is checking latestRecordBatch.producerId() == producerId then assigning producerId = latestRecordBatch.producerId() (which is a no-op if they are already same).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest we adjust the logic to simplify.
First let's conditionally check if a new record batch is needed (if no current batch, producer differs, etc), and create an empty batch (header) if needed, with record count 0 and record batch length for 0 records.
Then, let's dynamically extend the current record batch to increase the record count by 1, the record batch length by new record bytes, plus the record bytes too. This logic should work whether the record batch was just created (empty, no records yet) or already exists with previously encoded records in the batch.
Modifying the record batch record count and length should be feasible with direct update (like crc) instead of rewriting the record batch (header) entirely.
No description provided.