Skip to content

Conversation

jfmyers9
Copy link
Contributor

@jfmyers9 jfmyers9 commented Jun 2, 2022

This changeset relaxes the parsing of record batches when the data returned does not align exactly with the format of a record batch. We will return all successfully parsed record batches, and discard record batches that are incomplete.

When specifying max_bytes on the fetch request, the fetch will return exactly max_bytes. If this value is less than the available data to fetch, the fetch request will potentially return an incomplete record batch. In this case, we cowardly refuse to parse the record batch.

Previously we would return a match error when parsing the magic byte or deserializing the record itself. This prevented consumers from progressing once this situation occurred.

Note: A more complete change might be to return the bytes that we were unable to parse similar to what we do in lib/generated/fetch.ex, but I'm unaware of how to make such a large change due to the generated code. If this approach is preferred, any guidance on how to implement this change would be helpful.

Fixes #23

@jfmyers9 jfmyers9 force-pushed the jmyers/incomplete-record branch 2 times, most recently from 0fdf9c5 to cb1d481 Compare June 2, 2022 15:45
This changeset relaxes the parsing of record batches when the data
returned does not align exactly with the format of a record batch. We
will return all successfully parsed record batches, and discard record
batches that are incomplete.

When specifying `max_bytes` on the fetch request, the fetch will return
exactly max_bytes. If this value is less than the available data to
fetc, the fetch request will potentially return an incomplete record
batch. In this case, we cowardly refuse to parse the record batch.

Previously we would return a match error when parsing the magic byte or
deserializing the record itself. This prevented consumers from
progressing once this situation occurred.
@jfmyers9 jfmyers9 force-pushed the jmyers/incomplete-record branch from cb1d481 to e8be52d Compare June 2, 2022 16:05
@jfmyers9
Copy link
Contributor Author

jfmyers9 commented Jun 6, 2022

@dantswain Would love to get your thoughts here or more details on #23. We are trying to utilize FetchRequest/Response V5 to utilize headers in KafkaEx but we keep running into issues deserializing record batches. I think we have a temporary workaround of using max_bytes: 1 on the fetch request, but that doesn't feel like a great long term solution. Any help would be appreciated.

@dantswain
Copy link
Collaborator

Hi @jfmyers9 sorry for the delay. I've looked at this a couple times but it's gonna take time when I can really focus on it. My biggest concern here is losing messages - what happens to the data we drop? Not saying anything bad specifically happens, just trying to make really sure I understand and haven't had time to sit down and think it through.

Also not sure why your build is failing :/ Something to do with snappy but it's not clear from the error messages what's going on.

@jfmyers9
Copy link
Contributor Author

jfmyers9 commented Jun 7, 2022

Yeah I agree its a bit odd, as said I would like to return the partial data in some fashion, but it's not clear to me how we'd change the API to achieve that.

If you are looking for a simple test case, here is a repro that I have on my local Kafka running in docker:

iex(1)> uris = [{"kafka", 9092}]
[{"kafka", 9092}]
iex(2)> KafkaEx.create_worker(:jim, [uris: uris, consumer_group: "test", kafka_version: "kayrock"])

10:54:04.390 [debug] Successfully connected to broker "kafka":9092
{:ok, #PID<0.199.0>}
iex(3)> KafkaEx.produce("jim", 0, "Message 1", worker_name: :jim)
:ok
iex(4)> KafkaEx.produce("jim", 0, "Message 2", worker_name: :jim)
:ok
iex(5)> KafkaEx.fetch("jim", 0, offset: 0, worker_name: :jim, api_version: 5, max_bytes: 1, auto_commit: false)
[
  %KafkaEx.Protocol.Fetch.Response{
    partitions: [
      %{
        error_code: :no_error,
        hw_mark_offset: 2,
        last_offset: 0,
        message_set: [
          %KafkaEx.Protocol.Fetch.Message{
            attributes: 0,
            crc: nil,
            headers: [],
            key: "",
            offset: 0,
            partition: 0,
            timestamp: -1,
            topic: "jim",
            value: "Message 1"
          }
        ],
        partition: 0
      }
    ],
    topic: "jim"
  }
]
iex(6)> KafkaEx.fetch("jim", 0, offset: 0, worker_name: :jim, api_version: 5, max_bytes: 1_000_000, auto_commit: false)
[
  %KafkaEx.Protocol.Fetch.Response{
    partitions: [
      %{
        error_code: :no_error,
        hw_mark_offset: 2,
        last_offset: 1,
        message_set: [
          %KafkaEx.Protocol.Fetch.Message{
            attributes: 0,
            crc: nil,
            headers: [],
            key: "",
            offset: 0,
            partition: 0,
            timestamp: -1,
            topic: "jim",
            value: "Message 1"
          },
          %KafkaEx.Protocol.Fetch.Message{
            attributes: 0,
            crc: nil,
            headers: [],
            key: "",
            offset: 1,
            partition: 0,
            timestamp: -1,
            topic: "jim",
            value: "Message 2"
          }
        ],
        partition: 0
      }
    ],
    topic: "jim"
  }
]
iex(7)> KafkaEx.fetch("jim", 0, offset: 0, worker_name: :jim, api_version: 5, max_bytes: 100, auto_commit: false)

10:55:20.391 [error] Failed to parse a response from the server: <<0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 106, 105, 109, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 65, 0, 0, 0, 0, 2, 232, 211, 211, 181, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 30, 0, 0, 0, 0, 18, 77, 101, 115, 115, 97, 103, 101, 32, 49, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 65, 0, 0, 0, 0, 2, 220, 52, 123, 44, 0, 0>> for request %Kayrock.Fetch.V5.Request{client_id: "kafka_ex", correlation_id: 9, isolation_level: 0, max_bytes: 100, max_wait_time: 10, min_bytes: 1, replica_id: -1, topics: [%{partitions: [%{fetch_offset: 0, log_start_offset: 0, max_bytes: 100, partition: 0}], topic: "jim"}]}

10:55:20.394 [debug] Shutting down worker :jim, reason: {%RuntimeError{message: "Parse error during %Kayrock.Fetch.V5.Request{client_id: \"kafka_ex\", correlation_id: 9, isolation_level: 0, max_bytes: 100, max_wait_time: 10, min_bytes: 1, replica_id: -1, topics: [%{partitions: [%{fetch_offset: 0, log_start_offset: 0, max_bytes: 100, partition: 0}], topic: \"jim\"}]} response deserializer. Couldn't parse: <<0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 106, 105, 109, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, ...>>"}, [{Kayrock.RecordBatch, :deserialize, 5, [file: 'lib/kayrock/record_batch.ex', line: 188]}, {Kayrock.Fetch.V5.Response, :deserialize_field, 4, [file: 'lib/generated/fetch.ex', line: 2583]}, {Kayrock.Fetch.V5.Response, :"-deserialize_field/4-fun-1-", 2, [file: 'lib/generated/fetch.ex', line: 2593]}, {Kayrock.Fetch.V5.Response, :deserialize_field, 4, [file: 'lib/generated/fetch.ex', line: 2592]}, {Kayrock.Fetch.V5.Response, :"-deserialize_field/4-fun-2-", 2, [file: 'lib/generated/fetch.ex', line: 2614]}, {Kayrock.Fetch.V5.Response, :deserialize_field, 4, [file: 'lib/generated/fetch.ex', line: 2613]}, {KafkaEx.New.Client, :deserialize, 2, [file: 'lib/kafka_ex/new/client.ex', line: 668]}, {KafkaEx.New.Client, :run_client_request, 3, [file: 'lib/kafka_ex/new/client.ex', line: 566]}]}

10:55:20.404 [error] GenServer :jim terminating
** (RuntimeError) Parse error during %Kayrock.Fetch.V5.Request{client_id: "kafka_ex", correlation_id: 9, isolation_level: 0, max_bytes: 100, max_wait_time: 10, min_bytes: 1, replica_id: -1, topics: [%{partitions: [%{fetch_offset: 0, log_start_offset: 0, max_bytes: 100, partition: 0}], topic: "jim"}]} response deserializer. Couldn't parse: <<0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 106, 105, 109, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, ...>>
    (kayrock) lib/kayrock/record_batch.ex:188: Kayrock.RecordBatch.deserialize/5
    (kayrock) lib/generated/fetch.ex:2583: Kayrock.Fetch.V5.Response.deserialize_field/4
    (kayrock) lib/generated/fetch.ex:2593: anonymous fn/2 in Kayrock.Fetch.V5.Response.deserialize_field/4
    (kayrock) lib/generated/fetch.ex:2592: Kayrock.Fetch.V5.Response.deserialize_field/4
    (kayrock) lib/generated/fetch.ex:2614: anonymous fn/2 in Kayrock.Fetch.V5.Response.deserialize_field/4
    (kayrock) lib/generated/fetch.ex:2613: Kayrock.Fetch.V5.Response.deserialize_field/4
    (kafka_ex) lib/kafka_ex/new/client.ex:668: KafkaEx.New.Client.deserialize/2
    (kafka_ex) lib/kafka_ex/new/client.ex:566: KafkaEx.New.Client.run_client_request/3
Last message (from #PID<0.196.0>): {:fetch, %KafkaEx.Protocol.Fetch.Request{api_version: 5, auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 100, min_bytes: 1, offset: 0, offset_commit_api_version: 0, partition: 0, topic: "jim", wait_time: 10}}
State: %KafkaEx.New.Client.State{allow_auto_topic_creation: true, api_versions: %{48 => {0, 1}, 11 => {0, 7}, 39 => {0, 2}, 34 => {0, 2}, 26 => {0, 3}, 15 => {0, 5}, 20 => {0, 6}, 50 => {0, 0}, 17 => {0, 1}, 25 => {0, 3}, 13 => {0, 4}, 0 => {0, 9}, 44 => {0, 1}, 8 => {0, 8}, 36 => {0, 2}, 7 => {0, 3}, 1 => {0, 12}, 32 => {0, 4}, 37 => {0, 3}, 35 => {0, 2}, 3 => {0, 11}, 45 => {0, 0}, 6 => {0, 7}, 2 => {0, 6}, 49 => {0, 1}, 41 => {0, 2}, 33 => {0, 2}, 42 => {0, 2}, 60 => {0, 0}, 43 => {0, 2}, 10 => {0, 3}, 9 => {0, 7}, 19 => {0, 7}, 56 => {0, 0}, 57 => {0, 0}, 51 => {0, 0}, 14 => {0, 5}, 5 => {0, 3}, 18 => {0, 3}, 61 => {0, 0}, 31 => {0, 2}, 22 => {0, 4}, 29 => {0, 2}, 21 => {0, 2}, 27 => {0, 1}, 24 => {0, 3}, 47 => {0, ...}, 40 => {...}, ...}, bootstrap_uris: [{"kafka", 9092}], cluster_metadata: %KafkaEx.New.ClusterMetadata{brokers: %{1013 => %KafkaEx.New.Broker{host: "kafka", node_id: 1013, port: 9092, rack: nil, socket: %KafkaEx.Socket{socket: #Port<0.9>, ssl: false}}}, consumer_group_coordinators: %{}, controller_id: 1013, topics: %{"jim" => %KafkaEx.New.Topic{is_internal: 0, name: "jim", partition_leaders: %{0 => 1013, 1 => 1013}, partitions: [%KafkaEx.New.Partition{isr: [1013], leader: 1013, partition_id: 0, replicas: [1013]}, %KafkaEx.New.Partition{isr: [1013], leader: 1013, partition_id: 1, replicas: [1013]}]}}}, consumer_group_for_auto_commit: "test", consumer_group_update_interval: 30000, correlation_id: 9, metadata_update_interval: 30000, ssl_options: [], use_ssl: false, worker_name: :jim}
Client #PID<0.196.0> is alive

    (stdlib) gen.erl:167: :gen.do_call/4
    (elixir) lib/gen_server.ex:986: GenServer.call/3
    (stdlib) erl_eval.erl:680: :erl_eval.do_apply/6
    (elixir) src/elixir.erl:258: :elixir.eval_forms/4
    (iex) lib/iex/evaluator.ex:257: IEx.Evaluator.handle_eval/5
    (iex) lib/iex/evaluator.ex:237: IEx.Evaluator.do_eval/3
    (iex) lib/iex/evaluator.ex:215: IEx.Evaluator.eval/3
    (iex) lib/iex/evaluator.ex:103: IEx.Evaluator.loop/1
** (exit) exited in: GenServer.call(:jim, {:fetch, %KafkaEx.Protocol.Fetch.Request{api_version: 5, auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 100, min_bytes: 1, offset: 0, offset_commit_api_version: 0, partition: 0, topic: "jim", wait_time: 10}}, 7000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Parse error during %Kayrock.Fetch.V5.Request{client_id: "kafka_ex", correlation_id: 9, isolation_level: 0, max_bytes: 100, max_wait_time: 10, min_bytes: 1, replica_id: -1, topics: [%{partitions: [%{fetch_offset: 0, log_start_offset: 0, max_bytes: 100, partition: 0}], topic: "jim"}]} response deserializer. Couldn't parse: <<0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 106, 105, 109, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, ...>>
            (kayrock) lib/kayrock/record_batch.ex:188: Kayrock.RecordBatch.deserialize/5
            (kayrock) lib/generated/fetch.ex:2583: Kayrock.Fetch.V5.Response.deserialize_field/4
            (kayrock) lib/generated/fetch.ex:2593: anonymous fn/2 in Kayrock.Fetch.V5.Response.deserialize_field/4
            (kayrock) lib/generated/fetch.ex:2592: Kayrock.Fetch.V5.Response.deserialize_field/4
            (kayrock) lib/generated/fetch.ex:2614: anonymous fn/2 in Kayrock.Fetch.V5.Response.deserialize_field/4
            (kayrock) lib/generated/fetch.ex:2613: Kayrock.Fetch.V5.Response.deserialize_field/4
            (kafka_ex) lib/kafka_ex/new/client.ex:668: KafkaEx.New.Client.deserialize/2
            (kafka_ex) lib/kafka_ex/new/client.ex:566: KafkaEx.New.Client.run_client_request/3
    (elixir) lib/gen_server.ex:989: GenServer.call/3

@jfmyers9
Copy link
Contributor Author

jfmyers9 commented Jun 7, 2022

And here is the result with this changeset:

iex(6)> KafkaEx.fetch("jim", 0, offset: 0, worker_name: :jim, api_version: 5, max_bytes: 100, auto_commit: false)
[
  %KafkaEx.Protocol.Fetch.Response{
    partitions: [
      %{
        error_code: :no_error,
        hw_mark_offset: 2,
        last_offset: 0,
        message_set: [
          %KafkaEx.Protocol.Fetch.Message{
            attributes: 0,
            crc: nil,
            headers: [],
            key: "",
            offset: 0,
            partition: 0,
            timestamp: -1,
            topic: "jim",
            value: "Message 1"
          }
        ],
        partition: 0
      }
    ],
    topic: "jim"
  }
]

@jfmyers9
Copy link
Contributor Author

Hey @dantswain, just wondering if you've had a chance to look at this? I'd love for us to be able to remove our max_bytes: 1 hack at some point.

@Argonus
Copy link
Contributor

Argonus commented Nov 17, 2023

@dantswain have you got chance to look at this? I've this issue on my side right now either.
This solution here, should solve that problem without any issues, as we are building record batch only with amount of records that are safe to decode. Then, in consumer group we fetch offset of last decoded message and use it in commit. So once new group will be fetched, it will start from proper place.
Also, last_offset: 0 is setup properly.

%Message{offset: last_offset} = List.last(message_set)

@dantswain dantswain merged commit ff40891 into kafkaex:master Nov 17, 2023
@dantswain
Copy link
Collaborator

Thanks for looking at this, @Argonus ! I trust your assessment and that makes me feel better about merging.

Do I need to publish a release to hex? I'm super out of touch on the state of all of this.

@Argonus
Copy link
Contributor

Argonus commented Nov 18, 2023

@dantswain yeah, that would be amazing.
Also, I've pm'ed you in elixir slack. I've idea to move this package to kafka_ex organization to simplify all fixed, updates and so on, I'm working on moving kafka_ex to v1.0.0 based on kayrock as a default.

@dantswain
Copy link
Collaborator

@Argonus this should be done, I added you to the list of owners on hex.pm as well, so you are empowered to push releases :)

Thanks so much for taking this on. I didn't notice any of the messages in slack - I almost never sign in to elixirlang slack anymore :/ You can always feel free to reach out via github or via my email (dan dot t dot swain at gmail) - that's much more likely to get a response.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Error in parsing V5.FetchRequest
3 participants