Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ CFLAGS+=-O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I.
CFLAGS+=-g

# Clang warnings to ignore
CFLAGS+=-Wno-gnu-designator
ifeq ($(CC),clang)
CFLAGS+=-Wno-gnu-designator
endif

# Enable iovecs in snappy
CFLAGS+=-DSG
Expand Down
12 changes: 5 additions & 7 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ static void msg_delivered (rd_kafka_t *rk,
void *payload, size_t len,
int error_code,
void *opaque, void *msg_opaque) {
long int msgid = (long int)msg_opaque;
static rd_ts_t last;
rd_ts_t now = rd_clock();
static int msgs;
Expand All @@ -106,12 +105,12 @@ static void msg_delivered (rd_kafka_t *rk,
!(msgs_wait_cnt % (dispintvl / 1000)) ||
(now - last) >= dispintvl * 1000) {
if (error_code)
printf("Message %ld delivey failed: %s (%li remain)\n",
msgid, rd_kafka_err2str(error_code),
printf("Message delivey failed: %s (%li remain)\n",
rd_kafka_err2str(error_code),
msgs_wait_cnt);
else if (!quiet)
printf("Message %ld delivered: %li remain\n",
msgid, msgs_wait_cnt);
printf("Message delivered: %li remain\n",
msgs_wait_cnt);
if (!quiet && do_seq)
printf(" --> \"%.*s\"\n", (int)len, (char *)payload);
last = now;
Expand Down Expand Up @@ -560,8 +559,7 @@ int main (int argc, char **argv) {
while (run &&
rd_kafka_produce(rkt, partition,
sendflags, pbuf, msgsize,
key, keylen,
(void *)cnt.msgs) == -1) {
key, keylen, NULL) == -1) {
if (!quiet || errno != ENOBUFS)
printf("produce error: %s%s\n",
strerror(errno),
Expand Down
4 changes: 2 additions & 2 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ int rd_kafka_q_serve (rd_kafka_t *rk,
rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen);

/* Call callback for each op */
TAILQ_FOREACH_SAFE(rko, tmp, &localq.rkq_q, rko_link) {
TAILQ_FOREACH_SAFE(rko, &localq.rkq_q, rko_link, tmp) {
callback(rko, opaque);
rd_kafka_op_destroy(rko);
}
Expand Down Expand Up @@ -515,7 +515,7 @@ void rd_kafka_destroy (rd_kafka_t *rk) {

/* Decommission all topics */
rd_kafka_lock(rk);
TAILQ_FOREACH_SAFE(rkt, rkt_tmp, &rk->rk_topics, rkt_link) {
TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
rd_kafka_unlock(rk);
rd_kafka_topic_partitions_remove(rkt);
rd_kafka_lock(rk);
Expand Down
23 changes: 13 additions & 10 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static void msghdr_print (rd_kafka_t *rk,
int i;

rd_kafka_dbg(rk, MSG, "MSG", "%s: iovlen %zd",
what, msg->msg_iovlen);
what, (size_t)msg->msg_iovlen);

for (i = 0 ; i < msg->msg_iovlen ; i++) {
rd_kafka_dbg(rk, MSG, what,
Expand Down Expand Up @@ -240,7 +240,7 @@ static void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,

rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq");

TAILQ_FOREACH_SAFE(rkbuf, tmp, &tmpq.rkbq_bufs, rkbuf_link)
TAILQ_FOREACH_SAFE(rkbuf, &tmpq.rkbq_bufs, rkbuf_link, tmp)
rkbuf->rkbuf_cb(rkb, err, NULL, rkbuf, rkbuf->rkbuf_opaque);
}

Expand All @@ -255,8 +255,8 @@ static void rd_kafka_broker_waitresp_timeout_scan (rd_kafka_broker_t *rkb,

assert(pthread_self() == rkb->rkb_thread);

TAILQ_FOREACH_SAFE(rkbuf, tmp,
&rkb->rkb_waitresps.rkbq_bufs, rkbuf_link) {
TAILQ_FOREACH_SAFE(rkbuf,
&rkb->rkb_waitresps.rkbq_bufs, rkbuf_link, tmp) {
if (likely(rkbuf->rkbuf_ts_timeout > now))
continue;

Expand Down Expand Up @@ -382,7 +382,7 @@ static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb,

rd_kafka_dbg(rkb->rkb_rk, BROKER, "BRKSEND",
"sendmsg FAILED for iovlen %zd (%i)",
msg->msg_iovlen,
(size_t)msg->msg_iovlen,
IOV_MAX);
rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
"Send failed: %s", strerror(errno));
Expand Down Expand Up @@ -1093,10 +1093,13 @@ static void rd_kafka_msghdr_rebuild (struct msghdr *dst, size_t dst_len,
off_t vof = of - len;

if (0)
printf(" #%i/%zd and %zd: of %zd, len %zd, "
"vof %zd: iov %zd\n",
i, src->msg_iovlen, dst->msg_iovlen,
of, len, vof, src->msg_iov[i].iov_len);
printf(" #%i/%zd and %zd: of %jd, len %zd, "
"vof %jd: iov %zd\n",
i,
(size_t)src->msg_iovlen,
(size_t)dst->msg_iovlen,
(intmax_t)of, len, (intmax_t)vof,
src->msg_iov[i].iov_len);
if (vof < 0)
vof = 0;

Expand Down Expand Up @@ -1613,7 +1616,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
if (0)
rd_rkb_dbg(rkb, MSG, "PRODUCE",
"Serve %i/%i messages (%i iovecs) "
"for %.*s [%"PRId32"] (%zd bytes)",
"for %.*s [%"PRId32"] (%"PRIu64" bytes)",
msgcnt, rktp->rktp_msgq.rkmq_msg_cnt,
iovcnt,
RD_KAFKAP_STR_PR(rkt->rkt_topic),
Expand Down
2 changes: 1 addition & 1 deletion rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
int cnt = timedout->rkmq_msg_cnt;

/* Assume messages are added in time sequencial order */
TAILQ_FOREACH_SAFE(rkm, tmp, &rkmq->rkmq_msgs, rkm_link) {
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
if (likely(rkm->rkm_ts_timeout > now))
break;

Expand Down
21 changes: 11 additions & 10 deletions rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,22 @@ typedef struct rd_kafkap_str_s {
#define RD_KAFKAP_STR_LEN_NULL -1
/* Returns the actual size of a kafka protocol string representation. */
#define RD_KAFKAP_STR_SIZE(kstr) (int16_t)(sizeof((kstr)->len) + \
(ntohs((kstr)->len) == \
((int16_t)ntohs((kstr)->len) == \
RD_KAFKAP_STR_LEN_NULL ? \
0 : ntohs((kstr)->len)))
/* Returns the length of the string of a kafka protocol string representation */
#define RD_KAFKAP_STR_LEN(kstr) (int)((ntohs((kstr)->len) == \
RD_KAFKAP_STR_LEN_NULL ? \
0 : ntohs((kstr)->len)))
0 : (int16_t)ntohs((kstr)->len)))


/* Macro suitable for "%.*s" printing. */
#define RD_KAFKAP_STR_PR(kstr) \
(ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL ? \
((int16_t)ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL ? \
0 : (int)ntohs((kstr)->len)), (kstr)->str

#define RD_KAFKAP_STR_IS_NULL(kstr) \
(ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL)
((int16_t)ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL)

static inline int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
const rd_kafkap_str_t *b) RD_UNUSED;
Expand Down Expand Up @@ -137,7 +137,7 @@ static inline rd_kafkap_str_t *rd_kafkap_str_new (const char *str) {
kstr->len = ntohs(len);
memcpy(kstr->str, str, len+1);
} else
kstr->len = ntohs(RD_KAFKAP_STR_LEN_NULL);
kstr->len = (int16_t)ntohs(RD_KAFKAP_STR_LEN_NULL);

return kstr;
}
Expand All @@ -163,16 +163,17 @@ typedef struct rd_kafkap_bytes_s {
#define RD_KAFKAP_BYTES_LEN_NULL -1
/* Returns the actual size of a kafka protocol bytes representation. */
#define RD_KAFKAP_BYTES_SIZE(kbytes) (int32_t)(sizeof((kbytes)->len) + \
(ntohl((kbytes)->len) == \
((int32_t)ntohl((kbytes)->len)==\
RD_KAFKAP_BYTES_LEN_NULL ? \
0 : ntohl((kbytes)->len)))
/* Returns the length of the string of a kafka protocol bytes representation */
#define RD_KAFKAP_BYTES_LEN(kbytes) (int32_t)((ntohl((kbytes)->len) == \
#define RD_KAFKAP_BYTES_LEN(kbytes) (int32_t)(((int32_t)ntohl((kbytes)->len) ==\
RD_KAFKAP_BYTES_LEN_NULL ? \
0 : ntohl((kbytes)->len)))
0 : \
(int32_t)ntohl((kbytes)->len)))

#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
(ntohs((kbytes)->len) == RD_KAFKAP_STR_LEN_NULL)
((int32_t)ntohl((kbytes)->len) == RD_KAFKAP_STR_LEN_NULL)


static inline int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
Expand Down Expand Up @@ -207,7 +208,7 @@ static inline rd_kafkap_bytes_t *rd_kafkap_bytes_new (const void *data,
kbytes->len = ntohl(datalen);
memcpy(kbytes->data, data, datalen);
} else
kbytes->len = ntohl(RD_KAFKAP_BYTES_LEN_NULL);
kbytes->len = (int32_t)ntohl(RD_KAFKAP_BYTES_LEN_NULL);

return kbytes;
}
Expand Down
2 changes: 1 addition & 1 deletion rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ void rd_kafka_topic_assign_uas (rd_kafka_t *rk, const char *topic) {
cnt = uas.rkmq_msg_cnt;
rd_kafka_toppar_unlock(rktp_ua);

TAILQ_FOREACH_SAFE(rkm, tmp, &uas.rkmq_msgs, rkm_link) {
TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
if (unlikely(rd_kafka_msg_partitioner(rkt, NULL, rkm) == -1)) {
/* Desired partition not available */
rd_kafka_msgq_enq(&failed, rkm);
Expand Down
9 changes: 1 addition & 8 deletions rdsysqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,14 @@
(*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
#endif

#ifdef TAILQ_FOREACH_SAFE
#ifdef __APPLE__
/* Apple's .._SAFE macro has the temporary variable at the end. */
#undef TAILQ_FOREACH_SAFE
#endif
#endif

#ifndef TAILQ_FOREACH_SAFE
/*
* TAILQ_FOREACH_SAFE() provides a traversal where the current iterated element
* may be freed or unlinked.
* It does not allow freeing or modifying any other element in the list,
* at least not the next element.
*/
#define TAILQ_FOREACH_SAFE(elm,tmpelm,head,field) \
#define TAILQ_FOREACH_SAFE(elm,head,field,tmpelm) \
for ((elm) = TAILQ_FIRST(head) ; \
(elm) && ((tmpelm) = TAILQ_NEXT((elm), field), 1) ; \
(elm) = (tmpelm))
Expand Down