Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
901f0b2
separating publish streams based on qos
bmaidics Jan 10, 2024
79ddafd
Fix test
bmaidics Jan 11, 2024
1265671
Checkpoint
bmaidics Jan 9, 2024
7847580
Checkpoint
bmaidics Jan 9, 2024
7895cd5
check
bmaidics Jan 9, 2024
f831ff8
start of idempotent work
bmaidics Jan 13, 2024
412019a
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into featu…
bmaidics Jan 15, 2024
167a33a
Optimize memory allocation for mqtt-kafka offset tracking (#694)
bmaidics Jan 15, 2024
cf08e0f
checkpoint
bmaidics Jan 15, 2024
80c0e09
checkpoint
bmaidics Jan 18, 2024
62db2cb
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics Jan 18, 2024
49a7b35
Checkpoint with retained offsetCommit stream
bmaidics Jan 18, 2024
92b262a
checkpoint
bmaidics Jan 19, 2024
29d24f7
checkpoint
bmaidics Jan 22, 2024
f9fe624
mqtt-kafka checkpoint
bmaidics Jan 23, 2024
5195271
checkpoint
bmaidics Jan 24, 2024
99f1c1f
checkpoint
bmaidics Jan 24, 2024
8db664d
Fixes
bmaidics Jan 24, 2024
7038c04
Fix flaky test
bmaidics Jan 24, 2024
e00dd8a
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics Jan 25, 2024
e3d92ee
fixes
bmaidics Jan 25, 2024
31e26ed
fix dump
attilakreiner Jan 25, 2024
f4b27c1
Fix init produce id request
akrambek Jan 25, 2024
9c5fd24
fix
bmaidics Jan 26, 2024
35c6976
Fix bug
bmaidics Jan 26, 2024
a3c7e09
Fix
bmaidics Jan 29, 2024
5cf9068
Don't flush early if the sequence number is not set
akrambek Jan 29, 2024
bc38ac7
Draft
bmaidics Jan 31, 2024
724bd70
checkpoint
bmaidics Feb 1, 2024
7df5123
Fixes
bmaidics Feb 1, 2024
4204bc1
Adrress review comments
bmaidics Feb 1, 2024
2a0204f
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics Feb 1, 2024
ebd9cee
Merge fixes
bmaidics Feb 1, 2024
8988e6b
Include producerId and producerEpoch into cache entry
akrambek Feb 2, 2024
739f79d
fix dump
attilakreiner Feb 2, 2024
878ec78
Fix qos2 large message
bmaidics Feb 3, 2024
0563e13
Fix typo
akrambek Feb 5, 2024
7f2ca2a
Merge branch 'qos2_idempontent' of github.com:bmaidics/zilla into qos…
akrambek Feb 5, 2024
7d9afee
reviews
bmaidics Feb 5, 2024
c274636
more feedback
bmaidics Feb 5, 2024
215791e
checkpoint
bmaidics Feb 6, 2024
d3635b3
Refactor
bmaidics Feb 6, 2024
007e8d1
Adjust code coverage ratio
jfallows Feb 6, 2024
ccae27c
Ignore IT that fails only on GitHub Actions, see issue #786
jfallows Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ local fields = {
mqtt_ext_topic_length = ProtoField.int16("zilla.mqtt_ext.topic_length", "Length", base.DEC),
mqtt_ext_topic = ProtoField.string("zilla.mqtt_ext.topic", "Topic", base.NONE),
mqtt_ext_expiry = ProtoField.int32("zilla.mqtt_ext.expiry", "Expiry", base.DEC),
mqtt_ext_qos_max = ProtoField.uint16("zilla.mqtt_ext.qos_max", "QoS Maximum", base.DEC),
mqtt_ext_subscribe_qos_max = ProtoField.uint16("zilla.mqtt_ext.subscribe_qos_max", "Subscribe QoS Maximum", base.DEC),
mqtt_ext_publish_qos_max = ProtoField.uint16("zilla.mqtt_ext.publish_qos_max", "Publish QoS Maximum", base.DEC),
mqtt_ext_packet_size_max = ProtoField.uint32("zilla.mqtt_ext.packet_size_max", "Packet Size Maximum", base.DEC),
-- capabilities
mqtt_ext_capabilities = ProtoField.uint8("zilla.mqtt_ext.capabilities", "Capabilities", base.HEX),
Expand Down Expand Up @@ -1613,8 +1614,12 @@ function handle_mqtt_extension(buffer, offset, ext_subtree, frame_type_id)
elseif kind == "SESSION" then
handle_mqtt_data_session_extension(buffer, offset + kind_length, ext_subtree)
end
elseif frame_type_id == FLUSH_ID and kind == "SUBSCRIBE" then
handle_mqtt_flush_subscribe_extension(buffer, offset + kind_length, ext_subtree)
elseif frame_type_id == FLUSH_ID then
if kind == "SUBSCRIBE" then
handle_mqtt_flush_subscribe_extension(buffer, offset + kind_length, ext_subtree)
elseif kind == "SESSION" then
handle_mqtt_flush_session_extension(buffer, offset + kind_length, ext_subtree)
end
end
elseif frame_type_id == RESET_ID then
handle_mqtt_reset_extension(buffer, offset, ext_subtree)
Expand Down Expand Up @@ -1720,13 +1725,18 @@ function handle_mqtt_begin_session_extension(buffer, offset, ext_subtree)
local expiry_length = 4
local slice_expiry = buffer(expiry_offset, expiry_length)
ext_subtree:add_le(fields.mqtt_ext_expiry, slice_expiry)
-- qos_max
local qos_max_offset = expiry_offset + expiry_length
local qos_max_length = 2
local slice_qos_max = buffer(qos_max_offset, qos_max_length)
ext_subtree:add_le(fields.mqtt_ext_qos_max, slice_qos_max)
-- subscribe_qos_max
local subscribe_qos_max_offset = expiry_offset + expiry_length
local subscribe_qos_max_length = 2
local slice_subscribe_qos_max = buffer(subscribe_qos_max_offset, subscribe_qos_max_length)
ext_subtree:add_le(fields.mqtt_ext_subscribe_qos_max, slice_subscribe_qos_max)
-- publish_qos_max
local publish_qos_max_offset = subscribe_qos_max_offset + subscribe_qos_max_length
local publish_qos_max_length = 2
local slice_publish_qos_max = buffer(publish_qos_max_offset, publish_qos_max_length)
ext_subtree:add_le(fields.mqtt_ext_publish_qos_max, slice_publish_qos_max)
-- packet_size_max
local packet_size_max_offset = qos_max_offset + qos_max_length
local packet_size_max_offset = publish_qos_max_offset + publish_qos_max_length
local packet_size_max_length = 4
local slice_packet_size_max = buffer(packet_size_max_offset, packet_size_max_length)
ext_subtree:add_le(fields.mqtt_ext_packet_size_max, slice_packet_size_max)
Expand Down Expand Up @@ -1765,8 +1775,13 @@ function handle_mqtt_data_publish_extension(buffer, offset, ext_subtree)
local flags_label = string.format("Flags: 0x%02x", slice_flags:le_uint())
local flags_subtree = ext_subtree:add(zilla_protocol, slice_flags, flags_label)
flags_subtree:add_le(fields.mqtt_ext_publish_flags_retain, slice_flags)
-- packet_id
local packet_id_offset = flags_offset + flags_length
local packet_id_length = 2
local slice_packet_id = buffer(packet_id_offset, packet_id_length)
ext_subtree:add_le(fields.mqtt_ext_packet_id, slice_packet_id)
-- expiry_interval
local expiry_interval_offset = flags_offset + flags_length
local expiry_interval_offset = packet_id_offset + packet_id_length
local expiry_interval_length = 4
local slice_expiry_interval = buffer(expiry_interval_offset, expiry_interval_length)
ext_subtree:add_le(fields.mqtt_ext_expiry_interval, slice_expiry_interval)
Expand Down Expand Up @@ -1945,6 +1960,14 @@ function handle_mqtt_flush_subscribe_extension(buffer, offset, ext_subtree)
dissect_and_add_mqtt_topic_filters(buffer, topic_filters_offset, ext_subtree)
end

function handle_mqtt_flush_session_extension(buffer, offset, ext_subtree)
-- packet_id
local packet_id_offset = offset
local packet_id_length = 2
local slice_packet_id = buffer(packet_id_offset, packet_id_length)
ext_subtree:add_le(fields.mqtt_ext_packet_id, slice_packet_id)
end

function handle_mqtt_reset_extension(buffer, offset, ext_subtree)
-- server_ref
local server_ref_offset = offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ public void generateStreamsBuffer() throws Exception
.publish()
.qos("EXACTLY_ONCE")
.flags("RETAIN")
.packetId(0x42)
.expiryInterval(77)
.contentType("Content Type")
.format("BINARY")
Expand Down Expand Up @@ -1493,7 +1494,8 @@ public void generateStreamsBuffer() throws Exception
.session()
.flags("CLEAN_START")
.expiry(42)
.qosMax(2)
.subscribeQosMax(2)
.publishQosMax(1)
.packetSizeMax(42_000)
.capabilities("RETAIN")
.clientId("client-id")
Expand All @@ -1518,7 +1520,8 @@ public void generateStreamsBuffer() throws Exception
.session()
.flags("CLEAN_START", "WILL")
.expiry(42)
.qosMax(2)
.subscribeQosMax(1)
.publishQosMax(2)
.packetSizeMax(42_000)
.capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS")
.clientId("client-id")
Expand Down Expand Up @@ -1585,6 +1588,27 @@ public void generateStreamsBuffer() throws Exception
.build();
streams[0].write(DataFW.TYPE_ID, data23.buffer(), 0, data23.sizeof());

DirectBuffer mqttSessionFlushEx1 = new UnsafeBuffer(MqttFunctions.flushEx()
.typeId(MQTT_TYPE_ID)
.session()
.packetId(0x2142)
.build()
.build());
FlushFW flush5 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x0000000900000022L) // north_mqtt_server
.routedId(0x0000000900000023L) // north_mqtt_kafka_mapping
.streamId(0x0000000000000025L) // INI
.sequence(401)
.acknowledge(402)
.maximum(7777)
.timestamp(0x0000000000000143L)
.traceId(0x0000000000000025L)
.budgetId(0x0000000000000000L)
.reserved(0x00000000)
.extension(mqttSessionFlushEx1, 0, mqttSessionFlushEx1.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush5.buffer(), 0, flush5.sizeof());

// kafka extension
// - CONSUMER
DirectBuffer kafkaConsumerBeginEx1 = new UnsafeBuffer(KafkaFunctions.beginEx()
Expand Down Expand Up @@ -1680,7 +1704,7 @@ public void generateStreamsBuffer() throws Exception
.correlationId(77)
.build()
.build());
FlushFW flush5 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush6 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x0000000000000027L) // INI
Expand All @@ -1693,7 +1717,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaConsumerFlushEx1, 0, kafkaConsumerFlushEx1.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush5.buffer(), 0, flush5.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush6.buffer(), 0, flush6.sizeof());

DirectBuffer kafkaResetEx1 = new UnsafeBuffer(KafkaFunctions.resetEx()
.typeId(KAFKA_TYPE_ID)
Expand Down Expand Up @@ -1773,7 +1797,7 @@ public void generateStreamsBuffer() throws Exception
.memberId("member-id")
.build()
.build());
FlushFW flush6 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush7 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x0000000000000029L) // INI
Expand All @@ -1786,7 +1810,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaGroupFlushEx1, 0, kafkaGroupFlushEx1.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush6.buffer(), 0, flush6.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush7.buffer(), 0, flush7.sizeof());

DirectBuffer kafkaGroupFlushEx2 = new UnsafeBuffer(KafkaFunctions.flushEx()
.typeId(KAFKA_TYPE_ID)
Expand All @@ -1799,7 +1823,7 @@ public void generateStreamsBuffer() throws Exception
.members("member-3")
.build()
.build());
FlushFW flush7 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush8 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x0000000000000028L) // REP
Expand All @@ -1812,7 +1836,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaGroupFlushEx2, 0, kafkaGroupFlushEx2.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush7.buffer(), 0, flush7.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush8.buffer(), 0, flush8.sizeof());

// - BOOTSTRAP
DirectBuffer kafkaBootstrapBeginEx1 = new UnsafeBuffer(KafkaFunctions.beginEx()
Expand Down Expand Up @@ -2062,7 +2086,7 @@ public void generateStreamsBuffer() throws Exception
.correlationId(77)
.build()
.build());
FlushFW flush8 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush9 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x0000000000000033L) // INI
Expand All @@ -2075,7 +2099,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaMergedConsumerFlushEx, 0, kafkaMergedConsumerFlushEx.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush8.buffer(), 0, flush8.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush9.buffer(), 0, flush9.sizeof());

DirectBuffer kafkaMergedFetchFlushEx = new UnsafeBuffer(KafkaFunctions.flushEx()
.typeId(KAFKA_TYPE_ID)
Expand All @@ -2092,7 +2116,7 @@ public void generateStreamsBuffer() throws Exception
.key("key")
.build()
.build());
FlushFW flush9 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush10 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x0000000000000033L) // INI
Expand All @@ -2105,7 +2129,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaMergedFetchFlushEx, 0, kafkaMergedFetchFlushEx.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush9.buffer(), 0, flush9.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush10.buffer(), 0, flush10.sizeof());

// - INIT_PRODUCER_ID
DirectBuffer kafkaInitProducerIdBeginEx1 = new UnsafeBuffer(KafkaFunctions.beginEx()
Expand Down Expand Up @@ -2537,7 +2561,7 @@ public void generateStreamsBuffer() throws Exception
.build()
.build()
.build());
FlushFW flush10 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush11 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x000000000000003dL) // INI
Expand All @@ -2550,7 +2574,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaFetchFlushEx, 0, kafkaFetchFlushEx.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush10.buffer(), 0, flush10.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush11.buffer(), 0, flush11.sizeof());

// - PRODUCE
DirectBuffer kafkaProduceBeginEx1 = new UnsafeBuffer(KafkaFunctions.beginEx()
Expand Down Expand Up @@ -2635,7 +2659,7 @@ public void generateStreamsBuffer() throws Exception
.key("key")
.build()
.build());
FlushFW flush11 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush12 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x000000090000000fL) // north_kafka_cache_client
.routedId(0x0000000900000010L) // south_kafka_cache_server
.streamId(0x000000000000003fL) // INI
Expand All @@ -2648,7 +2672,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(kafkaProduceFlushEx, 0, kafkaProduceFlushEx.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush11.buffer(), 0, flush11.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush12.buffer(), 0, flush12.sizeof());

// amqp extension
DirectBuffer amqpBeginEx1 = new UnsafeBuffer(AmqpFunctions.beginEx()
Expand Down Expand Up @@ -2814,7 +2838,7 @@ public void generateStreamsBuffer() throws Exception
AMQP_TYPE_ID, 0, 0, 0, // int32 typeId
3 // uint8 AmqpCapabilities
});
FlushFW flush12 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
FlushFW flush13 = flushRW.wrap(frameBuffer, 0, frameBuffer.capacity())
.originId(0x0000000900000025L) // north_amqp_server
.routedId(0x0000000900000026L) // north_fan_server
.streamId(0x0000000000000041L) // INI
Expand All @@ -2827,7 +2851,7 @@ public void generateStreamsBuffer() throws Exception
.reserved(0x00000000)
.extension(amqpFlushEx, 0, amqpFlushEx.capacity())
.build();
streams[0].write(FlushFW.TYPE_ID, flush12.buffer(), 0, flush12.sizeof());
streams[0].write(FlushFW.TYPE_ID, flush13.buffer(), 0, flush13.sizeof());

DirectBuffer amqpAbortEx = new UnsafeBuffer(AmqpFunctions.abortEx()
.typeId(AMQP_TYPE_ID)
Expand Down
Binary file not shown.
Binary file not shown.
Loading