-
Notifications
You must be signed in to change notification settings - Fork 8
CreateTopics
Extension
#46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: kafka-produce
Are you sure you want to change the base?
Changes from all commits
e3e8dcb
60802cf
df48cbe
22a3996
498472c
6aebd10
f501f90
dcfb583
61b7f2a
a75a9fe
719740e
536b478
ced137d
8f99a3c
8b0412f
04bcea8
9182472
1600b29
aaae417
404af0f
88f5c4d
9627e0a
b54c66f
e4168e8
eb3199a
1c71d02
d51f956
08fb9b8
3d09cf1
a3bb12c
e5c5436
f3e49d2
33d3dc9
8a1220a
9de60d6
58514cb
bc5b677
799cd6e
2634d96
a01034e
b8bdd4f
23af5e7
1b12f24
4e465b8
f1ebdc4
e9bf221
ed96b98
c0dec1e
0d01d5c
b9791d9
7e59755
a3efc22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
- Include CreateTopics in APIVersions | ||
- CreateTopics with Invalid Topic Name (empty, ., .., length > 249, invalid characters) | ||
- CreateTopics with Invalid Partition Values (negative, zero) | ||
- CreateTopics with Invalid Replication Values (negative, zero, > broker count) | ||
- CreateTopics with Reserved Topic Name (__cluster_metadata) | ||
- CreateTopics with Existing Topic (read __cluster_metadata) | ||
- CreateTopics with Authorization Check (no CREATE permission on CLUSTER or TOPIC resource) | ||
- CreateTopics with Single Topic (single topic in single request) | ||
- CreateTopics with Multiple Topics (multiple topics in single request) | ||
- CreateTopics with Manual Assignments (manual assignment of partitions and replicas) | ||
- CreateTopics with Validation Only | ||
- CreateTopics with Invalid Request (Duplicate topic names in single request) | ||
|
||
|
||
1. Topic Name Validation - INVALID_TOPIC_EXCEPTION (17) | ||
- Empty topic name or "." or ".." | ||
- Length > 249 characters | ||
- Invalid characters (only ASCII alphanumerics, '.', '_', '-') | ||
- Internal topic collision detection | ||
|
||
|
||
|
||
Stage CTX: CreateTopics with Invalid Parameters | ||
- Handle negative/zero partitions, invalid replication factors | ||
- Duplicate topic names in single request | ||
- Error codes: INVALID_PARTITIONS (37), INVALID_REPLICATION_FACTOR (38) | ||
|
||
Stage CTX: Topic-specific configurations |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,6 +71,24 @@ extensions: | |
|
||
[fetch-api]: https://kafka.apache.org/protocol.html#The_Messages_Fetch | ||
|
||
- slug: "producing-messages" | ||
name: "Producing Messages" | ||
description_markdown: | | ||
In this challenge extension you'll add support for producing messages by implementing the [Produce][produce-api] API. | ||
|
||
Along the way you'll learn about how Kafka's Produce API works, how Kafka stores messages on disk and more. | ||
|
||
[produce-api]: https://kafka.apache.org/protocol.html#The_Messages_Produce | ||
|
||
- slug: "creating-topics" | ||
name: "Creating Topics" | ||
description_markdown: | | ||
In this challenge extension you'll add support for creating topics by implementing the [CreateTopics][create-topics-api] API. | ||
|
||
Along the way you'll learn about how Kafka's CreateTopics API works, topic validation, the `__cluster_metadata` topic and more. | ||
|
||
[create-topics-api]: https://kafka.apache.org/protocol.html#The_Messages_CreateTopics | ||
|
||
stages: | ||
- slug: "vi6" | ||
name: "Bind to a port" | ||
|
@@ -199,3 +217,91 @@ stages: | |
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement the Fetch response for a topic with multiple messages, reading them from disk. | ||
|
||
# Producing Messages | ||
|
||
- slug: "um3" | ||
primary_extension_slug: "producing-messages" | ||
name: "Include Produce in APIVersions" | ||
difficulty: easy | ||
marketing_md: |- | ||
In this stage, you'll add the Produce API to the APIVersions response. | ||
|
||
- slug: "ck2" | ||
primary_extension_slug: "producing-messages" | ||
name: "Produce to an invalid topic or partition" | ||
difficulty: medium | ||
marketing_md: |- | ||
In this stage, you'll implement the Produce response for an invalid topic or partition. | ||
|
||
- slug: "ps7" | ||
primary_extension_slug: "producing-messages" | ||
name: "Produce a single record" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement successfully producing a single record to disk. | ||
|
||
- slug: "sb8" | ||
primary_extension_slug: "producing-messages" | ||
name: "Produce multiple records" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement producing multiple records in a single record batch. | ||
|
||
- slug: "mf2" | ||
primary_extension_slug: "producing-messages" | ||
name: "Produce to multiple partitions" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement producing to multiple partitions of the same topic. | ||
|
||
- slug: "ar4" | ||
primary_extension_slug: "producing-messages" | ||
name: "Produce to multiple topics" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement producing to multiple topics in a single request. | ||
|
||
# Creating Topics | ||
|
||
- slug: "yb1" | ||
primary_extension_slug: "creating-topics" | ||
name: "Include CreateTopics in APIVersions" | ||
difficulty: easy | ||
marketing_md: |- | ||
In this stage, you'll add the CreateTopics API to the APIVersions response. | ||
|
||
- slug: "ve7" | ||
primary_extension_slug: "creating-topics" | ||
name: "CreateTopics with Invalid Topic Name" | ||
difficulty: medium | ||
marketing_md: |- | ||
In this stage, you'll implement the CreateTopics response for invalid topic names. | ||
|
||
- slug: "us2" | ||
primary_extension_slug: "creating-topics" | ||
name: "CreateTopics with Existing Topic Name" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement the CreateTopics response for existing and reserved topic names. | ||
|
||
- slug: "hh9" | ||
primary_extension_slug: "creating-topics" | ||
name: "CreateTopics with Validation Only" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement the CreateTopics response for validation-only mode. | ||
|
||
- slug: "rk2" | ||
primary_extension_slug: "creating-topics" | ||
name: "CreateTopics with a single topic" | ||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement creating a single topic. | ||
|
||
- slug: "fl3" | ||
primary_extension_slug: "creating-topics" | ||
name: "CreateTopics with Multiple Topics" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ryan-gang you really need to pay more attention to detail re: consistency. Why is the above stage name "CreateTopics with a single topic", and this is "CreateTopics with Multiple Topics"? What's the reason for different capitalization? |
||
difficulty: hard | ||
marketing_md: |- | ||
In this stage, you'll implement creating multiple topics in a single request. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
Stage CT1: Include CreateTopics in APIVersions | ||
- Add API key 19 (CreateTopics) to APIVersions response | ||
- Foundation stage enabling API discovery | ||
|
||
Stage CT2: CreateTopics with Invalid Topic Name (Hard code error) | ||
- Handle invalid characters, reserved names, empty names | ||
- Error code: INVALID_TOPIC_EXCEPTION (17) | ||
|
||
Stage CT3: CreateTopics with Existing Topic Name (Read __cluster_metadata) | ||
- Handle attempts to create system topics (__cluster_metadata): INVALID_REQUEST (42) | ||
- Handle duplicate topic creation attempts: TOPIC_ALREADY_EXISTS (36) | ||
|
||
Stage CT4: CreateTopics with Validation Only | ||
- Success case without persisting any data | ||
|
||
Stage CT5: CreateTopics with Valid Parameters (Success Case) | ||
- Successfully create single topic with valid parameters | ||
- Core success functionality | ||
|
||
Stage CT6: Multiple topics in single CreateTopics request | ||
- Handle multiple topics in a single request |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
In this stage, you'll add an entry for the `CreateTopics` API to the APIVersions response. | ||
|
||
## The CreateTopics API | ||
|
||
The [CreateTopics API](https://kafka.apache.org/protocol#The_Messages_CreateTopics) (API key `19`) is used to create topics in a Kafka cluster. We'll use a single broker in our cluster for this extension. | ||
|
||
We've created an interactive protocol inspector for the request & response structures for `CreateTopics`: | ||
|
||
- 🔎 [CreateTopics Request (v6)](https://binspec.org/kafka-createtopics-request-v6) | ||
- 🔎 [CreateTopics Response (v6)](https://binspec.org/kafka-createtopics-response-v6) | ||
|
||
In this stage, you'll only need to add an entry for the `CreateTopics` API to the APIVersions response you implemented in earlier stages. This will let the client know that the broker supports the `CreateTopics` API. We'll get to responding to `CreateTopics` requests in later stages. | ||
|
||
## Tests | ||
|
||
The tester will execute your program like this: | ||
|
||
```bash | ||
./your_program.sh /tmp/server.properties | ||
``` | ||
|
||
It'll then connect to your server on port 9092 and send a valid `APIVersions` (v4) request. | ||
|
||
The tester will validate that: | ||
|
||
- The first 4 bytes of your response (the "message length") are valid. | ||
- The correlation ID in the response header matches the correlation ID in the request header. | ||
- The error code in the response body is `0` (No Error). | ||
- The response body contains at least one entry for the API key `19` (CreateTopics). | ||
- The `MaxVersion` for the CreateTopics API is at least 6. | ||
|
||
## Notes | ||
|
||
- You don't have to implement support for the `CreateTopics` request in this stage. We'll get to this in later stages. | ||
- You'll still need to include the entry for `APIVersions` in your response to pass previous stages. | ||
- The `MaxVersion` for the `CreateTopics` API is 6. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
In this stage, you'll add support for handling `CreateTopics` requests with invalid topic names. | ||
|
||
## CreateTopics API response for invalid topics | ||
|
||
When a Kafka broker receives a CreateTopics request, it first needs to validate that the topic name follows Kafka's naming rules. If a topic name is invalid, it returns an appropriate error code and response without attempting to create the topic. | ||
|
||
Topic names must follow these rules: | ||
- Cannot be empty string | ||
- Cannot be "." or ".." | ||
- Maximum length of 249 characters | ||
- Only ASCII alphanumerics, '.', '_', and '-' are allowed | ||
|
||
If the topic name is invalid, the broker returns an error code of `17` (INVALID_TOPIC_EXCEPTION). | ||
|
||
We've created an interactive protocol inspector for the request & response structures for `CreateTopics`: | ||
|
||
- 🔎 [CreateTopics Request (v6)](https://binspec.org/kafka-createtopics-request-v6) | ||
- 🔎 [CreateTopics Response (v6)](https://binspec.org/kafka-createtopics-response-v6) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This must link to an invalid response, not the success response |
||
|
||
In this stage, you'll need to implement basic topic name validation without needing to check against existing topics or system topics. You can hard code the error response for invalid topic names in this stage. We'll get to checking against existing topics and system topics in later stages. | ||
|
||
## Tests | ||
|
||
The tester will execute your program like this: | ||
|
||
```bash | ||
./your_program.sh /tmp/server.properties | ||
``` | ||
|
||
It'll then connect to your server on port 9092 and send a `CreateTopics` (v6) request with an invalid topic name. | ||
|
||
The tester will validate that: | ||
|
||
- The first 4 bytes of your response (the "message length") are valid. | ||
- The correlation ID in the response header matches the correlation ID in the request header. | ||
- The error code in the topic response is `17` (INVALID_TOPIC_EXCEPTION). | ||
- The `throttle_time_ms` field in the response is `0`. | ||
- The `name` field in the topic response corresponds to the topic name in the request. | ||
- The `error_message` field contains `Topic name is invalid`. | ||
- The `num_partitions` and `replication_factor` fields are `-1`. | ||
|
||
## Notes | ||
|
||
- You'll need to parse the `CreateTopics` request in this stage to get the topic names. | ||
- The official docs for the `CreateTopics` request can be found [here](https://kafka.apache.org/protocol.html#The_Messages_CreateTopics). | ||
- Topic name validation logic is in the Kafka source code at `clients/src/main/java/org/apache/kafka/common/internals/Topic.java`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a GitHub permalink here |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
In this stage, you'll add support for handling `CreateTopics` requests with existing topic names. | ||
|
||
## CreateTopics API response for existing topics | ||
|
||
When a Kafka broker receives a CreateTopics request, it needs to validate that the topic doesn't already exist and is not reserved for system use. If a topic already exists or is reserved, it returns an appropriate error code and response without attempting to create the topic again. | ||
|
||
To validate that a topic exists, the broker reads the `__cluster_metadata` topic's log file, located at `/tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log`. Inside the log file, the broker finds the topic's metadata, which is a `record` (inside a RecordBatch) with a payload of type `TOPIC_RECORD`. If there exists a `TOPIC_RECORD` with the given topic name and the topic ID, the topic exists. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is another example of a case where we can't test if they're actually testing if a topic exists. A user could hardcode a response and pass this stage. Even if a user plans on implementing the whole thing, (a) we don't guide them through what they should do if the topic doesn't exist (b) we won't be able to test their implementation so they might "think" it's correct and discover it's not in the next stage |
||
|
||
If the topic already exists, the broker returns an error code of `36` (TOPIC_ALREADY_EXISTS), with the `error_message` field containing `Topic '<topic_name>' already exists.`. | ||
|
||
Reserved topic names include: | ||
- `__cluster_metadata` - KRaft metadata topic | ||
|
||
If the topic name is reserved, the broker returns an error code of `42` (INVALID_REQUEST), with the `error_message` field containing `Creation of internal topic __cluster_metadata is prohibited.`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove the "reserved topics" thing altogether - doesn't seem worth doing |
||
|
||
We've created an interactive protocol inspector for the request & response structures for `CreateTopics`: | ||
|
||
- 🔎 [CreateTopics Request (v6)](https://binspec.org/kafka-createtopics-request-v6) | ||
- 🔎 [CreateTopics Response (v6)](https://binspec.org/kafka-createtopics-response-v6) | ||
|
||
We've also created an interactive protocol inspector for the `__cluster_metadata` topic's log file: | ||
- 🔎 [__cluster_metadata topic's log file](https://binspec.org/kafka-cluster-metadata) | ||
|
||
This would help you understand the structure of the `TOPIC_RECORD` record inside the `__cluster_metadata` topic's log file. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's try to link directly to the TOPIC_RECORD bytes here |
||
|
||
In this stage, you'll need to implement topic existence checking by reading the cluster metadata. | ||
|
||
## Tests | ||
|
||
The tester will execute your program like this: | ||
|
||
```bash | ||
./your_program.sh /tmp/server.properties | ||
``` | ||
|
||
It'll then connect to your server on port 9092 and send a `CreateTopics` (v6) request for an existing topic and a reserved topic name. | ||
|
||
The tester will validate that: | ||
|
||
- The first 4 bytes of your response (the "message length") are valid. | ||
- The correlation ID in the response header matches the correlation ID in the request header. | ||
- The error code in the topic response is `36` (TOPIC_ALREADY_EXISTS) or `42` (INVALID_REQUEST) depending on the request. | ||
- The `throttle_time_ms` field in the response is `0`. | ||
- The `name` field in the topic response corresponds to the topic name in the request. | ||
- The `error_message` field contains the expected error message. | ||
- The `num_partitions` and `replication_factor` fields are `-1`. | ||
|
||
## Notes | ||
|
||
- The official Kafka docs don't cover the structure of records inside the `__cluster_metadata` topic, but you can find the definitions in the Kafka source code [here](https://github.com/apache/kafka/tree/trunk/metadata/src/main/resources/common/metadata). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Remove unnecessary capitalization, and shorten)