Skip to content

Question of RdKafka: crash as I call rd_kafka_subscribe #710

@Benjaad

Description

@Benjaad

Description

this stack like this:
#0 0x000000384d4093a0 in pthread_mutex_lock () from /lib64/libpthread.so.0
#1 0x00002abb6b775389 in mtx_lock (mtx=) at tinycthread.c:135
#2 0x00002abb6b76a11f in rd_kafka_cgrp_handle_SyncGroup (rkcg=0x19e0770, err=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=) at rdkafka_cgrp.c:1984
#3 0x00002abb6b75a189 in rd_kafka_handle_SyncGroup (rk=, rkb=0x19e1580, err=, rkbuf=, request=0x2abbd0000c00, opaque=0x19e0770) at rdkafka_request.c:966
#4 0x00002abb6b754f7c in rd_kafka_buf_callback (rk=0x19e0290, rkb=0x19e1580, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x2abc20000da0, request=0x2abbd0000c00) at rdkafka_buf.c:515
#5 0x00002abb6b76adcb in rd_kafka_cgrp_op_serve (rkcg=0x19e0770) at rdkafka_cgrp.c:1664
#6 rd_kafka_cgrp_serve (rkcg=0x19e0770) at rdkafka_cgrp.c:1746
#7 0x00002abb6b73d124 in rd_kafka_thread_main (arg=0x19e0290) at rdkafka.c:974
#8 0x00002abb6b77517f in _thrd_wrapper_function (aArg=) at tinycthread.c:611
#9 0x000000384d4079d1 in start_thread () from /lib64/libpthread.so.0
#10 0x000000384cce88fd in clone () from /lib64/libc.so.6

How to reproduce

Join a new kafka cluster, then subscribed the topic that not created before, finally it will be crashed

Temporary Solution;

I debug it and get the reason:broker return sysc group response without MemberState ,so when read MemberState ,it will call rd_kafka_buf_read_i16 and print the log. But rkbuf->rkbuf_rkb is NULL

code:


    rkbuf = rd_kafka_buf_new_shadow(member_state->data,
                                    RD_KAFKAP_BYTES_LEN(member_state));

    rd_kafka_buf_read_i16(rkbuf, &Version);

modify before


define rd_kafka_buf_parse_fail(rkbuf,...) do { \

            if (log_decode_errors) {                                \
                    rd_rkb_log(rkbuf->rkbuf_rkb, LOG_WARNING, "PROTOERR", \
                               "Protocol parse failure at %s:%i",   \
                               __FUNCTION__, __LINE__);             \
                    rd_rkb_log(rkbuf->rkbuf_rkb, LOG_WARNING,   \
               "PROTOERR", __VA_ARGS__);        \
            }                                                       \
            goto err;                                               \
} while (0)

modify after


define rd_kafka_buf_parse_fail(rkbuf,...) do { \

            if (log_decode_errors && rkbuf->rkbuf_rkb) {                                \
                    rd_rkb_log(rkbuf->rkbuf_rkb, LOG_WARNING, "PROTOERR", \
                               "Protocol parse failure at %s:%i",   \
                               __FUNCTION__, __LINE__);             \
                    rd_rkb_log(rkbuf->rkbuf_rkb, LOG_WARNING,   \
               "PROTOERR", __VA_ARGS__);        \
            }                                                       \
            goto err;                                               \
} while (0)

is this Solution suitable??

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions