Skip to content

Commit fa79684

Browse files
committed
feat: implement testProduce4new function for Kafka message production
- Added a new file stage_p4_new.go containing the testProduce4new function. - This function tests the production of messages to a Kafka topic, including validation of responses and log assertions. - It utilizes various builders and assertions to ensure the correctness of the produced messages and their handling by the Kafka client.
1 parent ba10cee commit fa79684

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

internal/stage_p4_new.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package internal
2+
3+
import (
4+
"github.com/codecrafters-io/kafka-tester/internal/assertions"
5+
"github.com/codecrafters-io/kafka-tester/internal/kafka_executable"
6+
kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
7+
"github.com/codecrafters-io/kafka-tester/protocol/builder"
8+
"github.com/codecrafters-io/kafka-tester/protocol/common"
9+
"github.com/codecrafters-io/kafka-tester/protocol/kafka_client"
10+
"github.com/codecrafters-io/kafka-tester/protocol/serializer"
11+
"github.com/codecrafters-io/tester-utils/random"
12+
"github.com/codecrafters-io/tester-utils/test_case_harness"
13+
)
14+
15+
func testProduce4new(stageHarness *test_case_harness.TestCaseHarness) error {
16+
b := kafka_executable.NewKafkaExecutable(stageHarness)
17+
stageLogger := stageHarness.Logger
18+
err := serializer.GenerateLogDirs(stageLogger, []string{common.TOPIC4_NAME})
19+
if err != nil {
20+
return err
21+
}
22+
23+
if err := b.Run(); err != nil {
24+
return err
25+
}
26+
27+
correlationId := getRandomCorrelationId()
28+
29+
client := kafka_client.NewClient("localhost:9092")
30+
if err := client.ConnectWithRetries(b, stageLogger); err != nil {
31+
return err
32+
}
33+
defer client.Close()
34+
35+
topic := common.TOPIC4_NAME
36+
partition := int32(random.RandomInt(0, 3))
37+
38+
recordBatch := builder.NewRecordBatchBuilder().
39+
AddStringRecord(common.MESSAGE1).
40+
Build()
41+
42+
request := builder.NewProduceRequestBuilder().
43+
AddRecordBatch(topic, partition, recordBatch).
44+
WithCorrelationId(correlationId).
45+
Build()
46+
47+
rawResponse, err := client.SendAndReceive(request, stageLogger)
48+
if err != nil {
49+
return err
50+
}
51+
52+
actualResponse := builder.NewEmptyProduceResponse()
53+
if err := actualResponse.Decode(rawResponse.Payload, stageLogger); err != nil {
54+
return err
55+
}
56+
57+
expectedResponse := builder.NewProduceResponseBuilder().
58+
AddSuccessPartitionResponse(topic, partition).
59+
WithCorrelationId(correlationId).
60+
Build()
61+
62+
if err = assertions.NewProduceResponseAssertion(actualResponse, expectedResponse).Run(stageLogger); err != nil {
63+
return err
64+
}
65+
66+
topicPartitionLogAssertion := assertions.NewTopicPartitionLogAssertion(topic, partition, kafkaapi.RecordBatches{recordBatch})
67+
if err = topicPartitionLogAssertion.Run(stageLogger); err != nil {
68+
return err
69+
}
70+
71+
////////////
72+
73+
recordBatch2 := builder.NewRecordBatchBuilder().
74+
AddStringRecord(common.MESSAGE2).
75+
Build()
76+
recordBatch2.BaseSequence = 1
77+
78+
request2 := builder.NewProduceRequestBuilder().
79+
AddRecordBatch(topic, partition, recordBatch2).
80+
WithCorrelationId(correlationId).
81+
Build()
82+
83+
rawResponse, err = client.SendAndReceive(request2, stageLogger)
84+
if err != nil {
85+
return err
86+
}
87+
88+
actualResponse = builder.NewEmptyProduceResponse()
89+
if err := actualResponse.Decode(rawResponse.Payload, stageLogger); err != nil {
90+
return err
91+
}
92+
93+
expectedResponse = builder.NewProduceResponseBuilder().
94+
AddSuccessPartitionResponse(topic, partition).
95+
WithCorrelationId(correlationId).
96+
Build()
97+
expectedResponse.Body.TopicResponses[0].PartitionResponses[0].BaseOffset = 1
98+
99+
if err = assertions.NewProduceResponseAssertion(actualResponse, expectedResponse).Run(stageLogger); err != nil {
100+
return err
101+
}
102+
103+
recordBatch2.BaseOffset = 1
104+
topicPartitionLogAssertion = assertions.NewTopicPartitionLogAssertion(topic, partition, kafkaapi.RecordBatches{recordBatch, recordBatch2})
105+
if err = topicPartitionLogAssertion.Run(stageLogger); err != nil {
106+
return err
107+
}
108+
109+
return nil
110+
}

0 commit comments

Comments
 (0)