Skip to content

Commit 55642db

Browse files
committed
fix(stage_p2): use stage logger for GenerateLogDirs to ensure consistent logging
feat: add testProduce2 to validate produce request error handling fix: simplify deferred client close by removing wrapper function
1 parent ab20d04 commit 55642db

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

internal/stage_p2.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package internal
2+
3+
import (
4+
"github.com/codecrafters-io/kafka-tester/internal/assertions"
5+
"github.com/codecrafters-io/kafka-tester/internal/kafka_executable"
6+
"github.com/codecrafters-io/kafka-tester/protocol/builder"
7+
"github.com/codecrafters-io/kafka-tester/protocol/common"
8+
"github.com/codecrafters-io/kafka-tester/protocol/kafka_client"
9+
"github.com/codecrafters-io/kafka-tester/protocol/serializer"
10+
"github.com/codecrafters-io/tester-utils/test_case_harness"
11+
)
12+
13+
//lint:ignore U1000, ignore for this PR
14+
func testProduce2(stageHarness *test_case_harness.TestCaseHarness) error {
15+
b := kafka_executable.NewKafkaExecutable(stageHarness)
16+
stageLogger := stageHarness.Logger
17+
err := serializer.GenerateLogDirs(stageLogger, []string{})
18+
if err != nil {
19+
return err
20+
}
21+
22+
if err := b.Run(); err != nil {
23+
return err
24+
}
25+
26+
correlationId := getRandomCorrelationId()
27+
28+
client := kafka_client.NewClient("localhost:9092")
29+
if err := client.ConnectWithRetries(b, stageLogger); err != nil {
30+
return err
31+
}
32+
defer client.Close()
33+
34+
recordBatch := builder.NewRecordBatchBuilder().
35+
AddStringRecord(common.MESSAGE1).
36+
Build()
37+
38+
request := builder.NewProduceRequestBuilder().
39+
AddRecordBatch(common.TOPIC_UNKOWN_NAME, 0, recordBatch).
40+
WithCorrelationId(correlationId).
41+
Build()
42+
43+
rawResponse, err := client.SendAndReceive(request, stageLogger)
44+
if err != nil {
45+
return err
46+
}
47+
48+
actualResponse := builder.NewEmptyProduceResponse()
49+
if err := actualResponse.Decode(rawResponse.Payload, stageLogger); err != nil {
50+
return err
51+
}
52+
53+
expectedResponse := builder.NewProduceResponseBuilder().
54+
AddErrorPartitionResponse(common.TOPIC_UNKOWN_NAME, 0, 3).
55+
WithCorrelationId(correlationId).
56+
Build()
57+
58+
if err = assertions.NewProduceResponseAssertion(actualResponse, expectedResponse, stageLogger).Run(stageLogger); err != nil {
59+
return err
60+
}
61+
62+
return nil
63+
}

0 commit comments

Comments
 (0)