-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
Hi edenhill,
I write c code which called by lua to produce messages to kafka server . The lua code is called in nginx server.
I create one kafka handle and add a list of brokers once in nginx initialisation. Then when receiving requests to nginx server, call the lua code to publish messages.
But I got the nginx worker processes exit with signal 11 once requests sent out.
I also wonder if it's right to hold one kafka handle for dealing with all topic write effectively.
As I don't know how to attach the code,i past the major code snippt below:
//the code try to create one kafka handle and add brokers
int mercury_add_kafkabroker(mercury_t *me, const char *kafka_brokers)
{
if (!me || !kafka_brokers) return -1;
init_error();
char errstr[100];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_error_cb(conf, err_cb);
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
char err[100];
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
me->kafka_handler = rk;
if (rk == NULL) {
set_error("rd_kafka_new fail");
return -1;
}
/* Add brokers */
if (rd_kafka_brokers_add(me->kafka_handler, kafka_brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
return -1;
}
return 0;
}
//the code is called looply by receiving request to nginx server
int mercury_sendmsg_kafka(mercury_t *me, const char * topic, const char *msg)
{
if (!me || !msg) return -1;
init_error();
size_t msg_len = strlen(msg);
char *opbuf = malloc(msg_len + 1);
strcpy(opbuf, msg);
if (NULL == me->kafka_topic_handler) {
rd_kafka_topic_conf_t * topic_conf = rd_kafka_topic_conf_new();
rd_kafka_topic_t * rkt = rd_kafka_topic_new(me->kafka_handler, topic, topic_conf);
me->kafka_topic_handler = rkt;
}
while(rd_kafka_produce(me->kafka_topic_handler, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, opbuf, msg_len, NULL, 0, NULL) == -1) {
fprintf(stderr, "produce error; resend!");
/* Poll to handle delivery reports */
rd_kafka_poll(me->kafka_handler, 10);
}
rd_kafka_poll(me->kafka_handler, 0);
if (NULL != me->kafka_handler) {
rd_kafka_destroy(me->kafka_handler);
}
return 0;
}
Thanks
Aaron