-
Notifications
You must be signed in to change notification settings - Fork 69
Description
Describe the bug
I'm unable to publish messages to a MQTT Kafka broker when using TLS and SASL for the Kafka client connection. The connection with the Kafka cluster seems to work because messages are produced in the session topic and zilla can read messages in the topics.
Tests I did:
- Full docker setup using the docker-compose.yaml file from your doc: OK
- Existing Kafka cluster without TLS and no authentication + the Docker MQTT Kafka broker: OK
- Existing Kafka cluster with TLS and SASL/SCRAM-SHA-512 + the Docker MQTT Kafka broker: partially KO
In the last setup:
- mosquitto_pub generates
docker run -it --rm eclipse-mosquitto mosquitto_pub --url mqtt://host.docker.internal:7183/zilla --message 'Hello, world' --debug --id "client"
Client test sending CONNECT
Client test received CONNACK (142)
Connection error: Connection Refused: unknown reason.
Error: A network protocol error occurred when communicating with the broker.
- BUT I can see that messages are produced in the
mqtt-sessionstopic - AND Zilla can read messages from the mqtt-messages topics when I produce some messages manually
Here is an extract of the logs that shows the successful authentication and session message producing
[0x01010000000000c0] SASL HANDSHAKE scram-sha-512
[0x01010000000000c0] SASL AUTHENTICATE admin
[1738250115049] [22] [72339069014638783] kafka client [mqtt-sessions[0] 0 + 65024 => 65024
[1738250115049] [22] [72339069014638783] kafka cache server fan [0 mqtt-sessions] 0 + 65024 => 65024
[1738250115050] [22] [72339069014638781] kafka cache server [[MQTT-intro.south_kafka_cache_server] mqtt-sessions[0]] 0 + 0 => 0
[1738250115050] [22] [72339069014638777] kafka cache client [[MQTT-intro.south_kafka_cache_server] mqtt-sessions[0]] 0 + 65536 => 65536
zilla:MQTT-intro-north_mqtt_kafka_mapping-test-session GroupId connect
zilla:MQTT-intro-north_mqtt_kafka_mapping-test-session GroupId connect
[1738250115051] [22] [72339069014638777] kafka cache client [[MQTT-intro.south_kafka_cache_server] mqtt-sessions[0]] 65536 - 0 => 65536
[1738250115051] [22] [72339069014638783] kafka cache server fan [0 mqtt-sessions] 65024 - 512 => 64512
[1738250115052] [22] [72339069014638783] kafka client [mqtt-sessions[0] 65024 - 512 => 64512
[client] mqtt-sessions[0] PRODUCE
[1738250115056] [22] [72339069014638785] kafka client [mqtt-sessions[0] flushableRequestBytes 0
[client] 102 DESCRIBE
[client] [0x010100000000004a] mqtt-sessions[0] FETCH RecordSet 134
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record Set Bytes 134
[client] [0x010100000000004a] mqtt-sessions[0] FETCH RecordBatch 152 0 122
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record Set Bytes 73
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record length 71
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record 152
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record Set Bytes 0
....
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordSet 80
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 80
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordBatch 5 0 68
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 19
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record length 18
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record 5
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 0
[0x0101000000000042] mqtt-messages[0] FETCH 6
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordSet 157
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 157
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordBatch 6 0 67
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 96
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record length 17
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record 6
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 78
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordBatch 7 0 66
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 17
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record length 16
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record 7
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 0
[0x0101000000000042] mqtt-messages[0] FETCH 8
[client] [0x0101000000000052] mqtt-retained[0] FETCH RecordSet 0
[client] [0x0101000000000052] mqtt-retained[0] FETCH Record Set Bytes 0
[client] [0x0101000000000052] mqtt-retained[0] FETCH Record Set Bytes 0
To Reproduce
Here is the zilla.yaml file I used:
name: MQTT-intro
vaults:
client_vault:
type: filesystem
options:
trust:
store: /tmp/truststore.p12
type: pkcs12
bindings:
# Proxy service entrypoint
north_tcp_server:
type: tcp
kind: server
options:
host: 0.0.0.0
port: 7183
exit: north_mqtt_server
# MQTT Broker With an exit to Kafka
north_mqtt_server:
type: mqtt
kind: server
exit: north_mqtt_kafka_mapping
# Proxy MQTT messages to Kafka
north_mqtt_kafka_mapping:
type: mqtt-kafka
kind: proxy
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
exit: north_kafka_cache_client
# Kafka sync layer
north_kafka_cache_client:
type: kafka
kind: cache_client
exit: south_kafka_cache_server
south_kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- mqtt-messages
- mqtt-sessions
- mqtt-retained
exit: south_kafka_client
# Connect to Kafka
south_kafka_client:
type: kafka
kind: client
options:
servers:
- my_kakfa_broker1:9092
- my_kakfa_broker2:9092
- my_kakfa_broker3:9092
sasl:
mechanism: scram-sha-512
username: "user"
password: "password"
exit: south_tls_client
south_tls_client:
type: tls
kind: client
vault: client_vault
options:
trust:
- cert_alias
exit: south_tcp_client
south_tcp_client:
type: tcp
kind: client
telemetry:
exporters:
stdout_logs_exporter:
type: stdoutExpected behavior
Like with the docker-compose setup, being able to produce a message with mosquitto_pub and to see in the mqtt_messages topic
Zilla Environment:
Zilla 0.9.122
Start command: start -v -e
Kafka Environment:
- Provider: Confluent (Community) on-premise
- Version: 7.6 (Kafka 3.6)
- Config: 3 topics (mqtt-messages: 1 partition, delete - mqtt-retained: 1 partition, compacted - mqtt-sessions: 1 partition, compacted)
Client Environment:
Mosquitto 2.0.20
Additional context
We were looking for a way to add more logs to be able to debug easily. I found the -Dzilla.binding.kafka.debug=trueargument that is helpful but is there anything else that we could use ?