-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Closed
Description
Hi, I'd like to use librdkafka for consuming from some high volume streams as I'd rather write C instead of Java (and oh god the Java buildchain needed for production...), but the example library can't seem to connect to Kafka 0.7.0
damocles@kafka-yo-kafka:~/librdkafka/examples$ ./rdkafka_example -C -t rt-san_francisco -b localhost:9093
%7|1398125017.755|CONNECTED|localhost:9093#consumer-0| connected to localhost:9093
% Error: Broker: Offset out of range (0)
% Error: Broker: Offset out of range (0)
% Error: Broker: Offset out of range (0)
^Cdamocles@kafka-yo-kafka:~/librdkafka/examples$ cd ../../kafka-0.7.0-incubating-src/
damocles@kafka-yo-kafka:~/kafka-0.7.0-incubating-src$ ./bin/kafka-console-consumer.sh --topic rt-san_francisco --zookeeper localhost:2182
[2014-04-22 00:04:07,133] INFO Connecting to zookeeper instance at localhost:2182 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,153] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2014-04-22 00:04:07,163] INFO Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.version=1.6.0_30 (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.home=/usr/lib/jvm/java-6-openjdk-amd64/jre (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.class.path=:./bin/../project/boot/scala-2.8.0/lib/scala-compiler.jar:./bin/../project/boot/scala-2.8.0/lib/scala-library.jar:./bin/../core/target/scala_2.8.0/kafka-0.7.0.jar:./bin/../core/lib/zkclient-20110412.jar:./bin/../core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:./bin/../core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:./bin/../core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.3.jar (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.library.path=/usr/lib/jvm/java-6-openjdk-amd64/jre/lib/amd64/server:/usr/lib/jvm/java-6-openjdk-amd64/jre/lib/amd64:/usr/lib/jvm/java-6-openjdk-amd64/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:os.version=3.2.0-54-virtual (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:user.name=damocles (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:user.home=/home/damocles (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:user.dir=/home/damocles/kafka-0.7.0-incubating-src (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,166] INFO Initiating client connection, connectString=localhost:2182 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2a74c252 (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,197] INFO Opening socket connection to server localhost/127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn)
[2014-04-22 00:04:07,207] INFO Socket connection established to localhost/127.0.0.1:2182, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-04-22 00:04:07,215] INFO Session establishment complete on server localhost/127.0.0.1:2182, sessionid = 0x1451fcfd9314693, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-04-22 00:04:07,218] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-04-22 00:04:07,221] INFO starting auto committer every 10000 ms (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,313] INFO begin registering consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 in ZK (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,356] INFO end registering consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 in ZK (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,403] INFO begin rebalancing consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 try #0 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,651] INFO Committing all offsets (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,654] INFO Releasing partition ownership (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,657] INFO Consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 rebalancing the following partitions: List(0-0) for topic rt-san_francisco with consumers: List(console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684-0) (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,660] INFO console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684-0 attempting to claim partition 0-0 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,704] INFO Consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 selected partitions : rt-san_francisco:0-0: fetched offset = 1402874789: consumed offset = 1402874789 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,710] INFO end rebalancing consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 try #0 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,711] INFO FetchRunnable-0 start fetching topic: rt-san_francisco part: 0 offset: 1402874789 from localhost:9093 (kafka.consumer.FetcherRunnable)
{"host":"kafka-yo-kafka","level":"warn","msg":"Request Timed Out {}","ts":1398125047.859,"isodate":"2014-04-22T00:04:07.859Z","peerId":2,"workerId":2}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient consecutiveErrorCount: 110151 {}","ts":1398125047.859,"isodate":"2014-04-22T00:04:07.859Z","peerId":2,"workerId":2}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient.makeNonOperational: false {}","ts":1398125047.859,"isodate":"2014-04-22T00:04:07.859Z","peerId":2,"workerId":2}
{"host":"kafka-yo-kafka","level":"warn","msg":"Request Timed Out {}","ts":1398125047.914,"isodate":"2014-04-22T00:04:07.914Z","peerId":1,"workerId":1}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient consecutiveErrorCount: 110216 {}","ts":1398125047.915,"isodate":"2014-04-22T00:04:07.915Z","peerId":1,"workerId":1}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient.makeNonOperational: false {}","ts":1398125047.915,"isodate":"2014-04-22T00:04:07.915Z","peerId":1,"workerId":1}
{"host":"kafka-yo-kafka","level":"info","msg":"Received response for request, url: /cities/san_francisco, statusCode: 200, x-powered-by: Express, cache-control: s-maxage=0, content-type: application/json; charset=utf-8, content-length: 801372, etag: \"1409535808\", date: Tue, 22 Apr 2014 00:04:08 GMT, connection: keep-alive {}","ts":1398125048.255,"isodate":"2014-04-22T00:04:08.255Z","peerId":1,"workerId":1}The highlight reel:
- Zookeeper and Kafka are running on non-default ports (literally adding 1 to each port number)
- The official client correctly identifies the current offset, while rdkafka_example seems to think the offset == 0.
[2014-04-22 00:04:07,704] INFO Consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 selected partitions : rt-san_francisco:0-0: fetched offset = 1402874789: consumed offset = 1402874789 (kafka.consumer.ZookeeperConsumerConnector)versus% Error: Broker: Offset out of range (0) - The official client is using the exact same host:port combo to eventually communicate with kafka after the zookeeper handshake as being provided to rdkafka_example:
[2014-04-22 00:04:07,711] INFO FetchRunnable-0 start fetching topic: rt-san_francisco part: 0 offset: 1402874789 from localhost:9093 (kafka.consumer.FetcherRunnable)
I started to dive into the code, but I'm not entirely familiar with the codebase, so not sure where this error is originating from.
Metadata
Metadata
Assignees
Labels
No labels