Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
df6efd2
Upgrade vcpkg (#4531)
emasab Dec 8, 2023
960e056
Added Fenced member error codes
pranavrth Dec 6, 2023
a772bc4
In between changes
pranavrth Dec 11, 2023
a9b886e
Merged base branch
pranavrth Dec 12, 2023
22cdb64
Update semaphore.yml (#4536)
cchristous Dec 13, 2023
dee9ead
Added type of errors
pranavrth Dec 13, 2023
42cc318
[KIP-848] topic_id in topic_partition_internal, some new error codes …
emasab Dec 11, 2023
7632311
Added new Error code UNKNOWN_TOPIC_ID (#4525)
pranavrth Dec 14, 2023
e03d3bb
Make install dependent on libs to avoid errors (#4562)
emasab Dec 14, 2023
954391e
Working partial metadata code. Not fully tested.
pranavrth Jan 8, 2024
a91b47d
Added Error codes
pranavrth Jan 22, 2024
2dff2eb
Bump openssl version partially to 3.0.12 (#4586)
milindl Jan 23, 2024
02ea549
Changes related to testing
pranavrth Jan 25, 2024
22d524a
Merge branch 'dev_kip848_ConsumerGroupHeartbeatAPI_assign_revoke_leav…
pranavrth Jan 29, 2024
25130f4
fixed next_target_assignment not getting reset to NULL
pranavrth Jan 31, 2024
8d2ecbb
Removed comment and printf
pranavrth Jan 31, 2024
0b6a7a5
More refinements
pranavrth Jan 31, 2024
25a3ef6
Added debug logs for the partitions not present in the metadata
pranavrth Jan 31, 2024
c3244d4
Fixed member stuck if fenced during rebalancing
pranavrth Feb 1, 2024
aad2a7a
Fixed segfault with current and target assignment while resetting con…
pranavrth Feb 1, 2024
6d88efd
Uniform test code across scripts and KRaft mode (#4524)
emasab Feb 9, 2024
bba472f
Fixed SEGFAULT due to deleted topic in metadata
pranavrth Feb 12, 2024
589fd2d
Added UNKNOWN_TOPIC_ID error from master
pranavrth Feb 12, 2024
a6d85bd
Uuid support in mock cluster (#4591)
emasab Feb 13, 2024
f7a82ff
[KIP-848] Added ConsumerGroupHeartbeat API request
pranavrth Aug 26, 2023
45459a5
[KIP-848] Assign, revoke, leave group flows
emasab Feb 6, 2024
f0fc683
Updated OffsetCommit Request and response to v9. Fixed metadata being…
pranavrth Feb 27, 2024
8b705fe
Fixed leave not being called if the consumer without any assignment l…
pranavrth Mar 1, 2024
e3fe1e3
Style fixes
pranavrth Mar 1, 2024
f6199cf
Merge branch 'dev_kip848' into dev_kip848_ConsumerGroupHeartbeatAPI_e…
pranavrth Mar 4, 2024
c5fc02d
Removed few printfs
pranavrth Mar 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: v1.0
name: 'librdkafka build and release artifact pipeline'
agent:
machine:
type: s1-prod-macos-arm64
type: s1-prod-macos-13-5-arm64
execution_time_limit:
hours: 3
global_job_config:
Expand All @@ -17,7 +17,7 @@ blocks:
task:
agent:
machine:
type: s1-prod-macos-arm64
type: s1-prod-macos-13-5-arm64
env_vars:
- name: ARTIFACT_KEY
value: p-librdkafka__plat-osx__arch-arm64__lnk-all
Expand All @@ -43,7 +43,7 @@ blocks:
task:
agent:
machine:
type: s1-prod-macos
type: s1-prod-macos-13-5-amd64
env_vars:
- name: ARTIFACT_KEY
value: p-librdkafka__plat-osx__arch-x64__lnk-all
Expand Down Expand Up @@ -120,7 +120,7 @@ blocks:
- sudo dpkg -i rapidjson-dev.deb
- python3 -m pip install -U pip
- python3 -m pip -V
- python3 -m pip install -r tests/requirements.txt
- (cd tests && python3 -m pip install -r requirements.txt)
- ./configure --install-deps
# split these up
- ./packaging/tools/rdutcoverage.sh
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# librdkafka v2.3.1

librdkafka v2.3.1 is a feature release:

* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).


# librdkafka v2.3.0

librdkafka v2.3.0 is a feature release:
Expand Down
76 changes: 38 additions & 38 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,44 +1972,44 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------|-----------|----------------|
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 7 |
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |


# Recommendations for language binding developers
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ file-check: CONFIGURATION.md LICENSES.txt examples
check: file-check
@(for d in $(LIBSUBDIRS); do $(MAKE) -C $$d $@ || exit $?; done)

install-subdirs:
install-subdirs: libs
@(for d in $(LIBSUBDIRS); do $(MAKE) -C $$d install || exit $?; done)

install: install-subdirs doc-install
Expand Down
33 changes: 21 additions & 12 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ int main(int argc, char **argv) {
return 1;
}

if (rd_kafka_conf_set(conf, "partition.assignment.strategy",
"cooperative-sticky", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

/* Set the consumer group id.
* All consumers sharing the same group id will join the same
* group, and the subscribed topic' partitions will be assigned
Expand All @@ -188,12 +196,13 @@ int main(int argc, char **argv) {
return 1;
}

if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
// sizeof(errstr)) !=
// RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }

if (rd_kafka_conf_set(conf, "session.timeout.ms", "10000", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
Expand All @@ -213,12 +222,12 @@ int main(int argc, char **argv) {
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);


// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }
if (rd_kafka_conf_set(conf, "max.poll.interval.ms", "90000", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
Expand Down
2 changes: 1 addition & 1 deletion mklove/modules/configure.base
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ function mkl_dep_install_source {
# Build and install
mkl_dbg "Building $name from source in $sdir (func $func)"

$func $name "$ddir" >$ilog 2>&1
libdir="/usr/lib" $func $name "$ddir" >$ilog 2>&1
retcode=$?

mkl_popd # $sdir
Expand Down
4 changes: 2 additions & 2 deletions mklove/modules/configure.libssl
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ function manual_checks {
function libcrypto_install_source {
local name=$1
local destdir=$2
local ver=3.0.11
local checksum="b3425d3bb4a2218d0697eb41f7fc0cdede016ed19ca49d168b78e8d947887f55"
local ver=3.0.12
local checksum="f93c9e8edde5e9166119de31755fc87b4aa34863662f67ddfcba14d0b6b69b61"
local url=https://www.openssl.org/source/openssl-${ver}.tar.gz

local conf_args="--prefix=/usr --openssldir=/usr/lib/ssl no-shared no-zlib"
Expand Down
73 changes: 73 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,15 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
"Broker: Request principal deserialization failed during "
"forwarding"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
"Broker: The member epoch is fenced by the group coordinator"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
"Broker: The instance ID is still used by another member in the "
"consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
"Broker: The assignor or its version range is not supported by "
"the consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
"Broker: The member epoch is stale"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
Expand Down Expand Up @@ -5098,6 +5105,38 @@ rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) {
return copy_uuid;
}

/**
* Returns a new non cryptographically secure UUIDv4 (random).
*
* @return A UUIDv4.
*
* @remark Must be freed after use using rd_kafka_Uuid_destroy().
*/
rd_kafka_Uuid_t rd_kafka_Uuid_random() {
int i;
unsigned char rand_values_bytes[16] = {0};
uint64_t *rand_values_uint64 = (uint64_t *)rand_values_bytes;
unsigned char *rand_values_app;
rd_kafka_Uuid_t ret = RD_KAFKA_UUID_ZERO;
for (i = 0; i < 16; i += 2) {
uint16_t rand_uint16 = (uint16_t)rd_jitter(0, INT16_MAX - 1);
/* No need to convert endianess here because it's still only
* a random value. */
rand_values_app = (unsigned char *)&rand_uint16;
rand_values_bytes[i] |= rand_values_app[0];
rand_values_bytes[i + 1] |= rand_values_app[1];
}

rand_values_bytes[6] &= 0x0f; /* clear version */
rand_values_bytes[6] |= 0x40; /* version 4 */
rand_values_bytes[8] &= 0x3f; /* clear variant */
rand_values_bytes[8] |= 0x80; /* IETF variant */

ret.most_significant_bits = be64toh(rand_values_uint64[0]);
ret.least_significant_bits = be64toh(rand_values_uint64[1]);
return ret;
}

/**
* @brief Destroy the provided uuid.
*
Expand All @@ -5107,6 +5146,40 @@ void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) {
rd_free(uuid);
}

/**
* @brief Computes canonical encoding for the given uuid string.
* Mainly useful for testing.
*
* @param uuid UUID for which canonical encoding is required.
*
* @return canonical encoded string for the given UUID.
*
* @remark Must be freed after use.
*/
const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid) {
int i, j;
unsigned char bytes[16];
char *ret = rd_calloc(37, sizeof(*ret));

for (i = 0; i < 8; i++) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
j = 7 - i;
#elif __BYTE_ORDER == __BIG_ENDIAN
j = i;
#endif
bytes[i] = (uuid->most_significant_bits >> (8 * j)) & 0xFF;
bytes[8 + i] = (uuid->least_significant_bits >> (8 * j)) & 0xFF;
}

rd_snprintf(ret, 37,
"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%"
"02x%02x%02x",
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5],
bytes[6], bytes[7], bytes[8], bytes[9], bytes[10],
bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]);
return ret;
}

const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
if (*uuid->base64str)
return uuid->base64str;
Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,19 @@ typedef enum {
RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED = 96,
/** Request principal deserialization failed during forwarding */
RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97,
/** Unknown Topic Id */
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100,
/** The member epoch is fenced by the group coordinator. The member must
* abandon all its partitions and rejoin. */
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110,
/** The instance ID is still used by another member in the
* consumer group. That member must leave first.
*/
RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111,
/** The assignor or its version range is not supported by
* the consumer group.
*/
RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112,
/** The member epoch is stale.
* The member must retry after receiving its updated member epoch
* via the ConsumerGroupHeartbeat API. */
Expand Down
Loading