Skip to content

Commit 02944bf

Browse files
committed
feat: added connection class to manage connections to Kafka
1 parent 5d0b0f8 commit 02944bf

File tree

6 files changed

+98
-18
lines changed

6 files changed

+98
-18
lines changed

examples/kafka_transport/kafka_transport.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@ package main
33
import (
44
"context"
55
"log/slog"
6+
"time"
67

78
"github.com/gerfey/messenger/builder"
89
"github.com/gerfey/messenger/config"
910
"github.com/gerfey/messenger/examples/kafka_transport/handler"
1011
"github.com/gerfey/messenger/examples/kafka_transport/message"
1112
)
1213

14+
const (
15+
waitDurationSeconds = 20
16+
)
17+
1318
func main() {
1419
ctx := context.Background()
1520

@@ -57,5 +62,5 @@ func main() {
5762
return
5863
}
5964

60-
<-ctx.Done()
65+
time.Sleep(waitDurationSeconds * time.Second)
6166
}

transport/kafka/connection.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/segmentio/kafka-go"
9+
)
10+
11+
const (
12+
connectionTimeout = 5 * time.Second
13+
)
14+
15+
type Connection struct {
16+
brokers []string
17+
dialer *kafka.Dialer
18+
}
19+
20+
func NewConnection(brokers []string) (*Connection, error) {
21+
conn := &Connection{
22+
brokers: brokers,
23+
dialer: &kafka.Dialer{
24+
Timeout: connectionTimeout,
25+
DualStack: true,
26+
},
27+
}
28+
29+
if err := conn.Check(connectionTimeout); err != nil {
30+
return nil, err
31+
}
32+
33+
return conn, nil
34+
}
35+
36+
func (c *Connection) Check(timeout time.Duration) error {
37+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
38+
defer cancel()
39+
40+
for _, broker := range c.brokers {
41+
conn, connErr := c.dialer.DialContext(ctx, "tcp", broker)
42+
if connErr != nil {
43+
return fmt.Errorf("failed to connect to Kafka broker at '%s': %w", broker, connErr)
44+
}
45+
46+
if closeErr := conn.Close(); closeErr != nil {
47+
return fmt.Errorf("failed to close connection to Kafka broker at '%s': %w", broker, closeErr)
48+
}
49+
}
50+
51+
return nil
52+
}
53+
54+
func (c *Connection) CreateReader(config kafka.ReaderConfig) *kafka.Reader {
55+
config.Brokers = c.brokers
56+
config.Dialer = c.dialer
57+
58+
return kafka.NewReader(config)
59+
}
60+
61+
func (c *Connection) CreateWriter(topic string) *kafka.Writer {
62+
return &kafka.Writer{
63+
Addr: kafka.TCP(c.brokers...),
64+
Topic: topic,
65+
RequiredAcks: kafka.RequireAll,
66+
Balancer: &kafka.LeastBytes{},
67+
Async: false,
68+
}
69+
}

transport/kafka/consumer.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,19 @@ const (
2323
type Consumer struct {
2424
cfg TransportConfig
2525
serializer api.Serializer
26+
conn *Connection
2627
}
2728

28-
func NewConsumer(cfg TransportConfig, ser api.Serializer) *Consumer {
29+
func NewConsumer(cfg TransportConfig, ser api.Serializer, conn *Connection) *Consumer {
2930
return &Consumer{
3031
cfg: cfg,
3132
serializer: ser,
33+
conn: conn,
3234
}
3335
}
3436

3537
func (c *Consumer) Consume(ctx context.Context, handler func(context.Context, api.Envelope) error) error {
36-
r := kafka.NewReader(kafka.ReaderConfig{
37-
Brokers: c.cfg.Options.Brokers,
38+
readerConfig := kafka.ReaderConfig{
3839
GroupID: c.cfg.Options.Group,
3940
Topic: c.cfg.Options.Topic,
4041
StartOffset: c.startOffset(c.cfg.Options.Offset),
@@ -47,7 +48,9 @@ func (c *Consumer) Consume(ctx context.Context, handler func(context.Context, ap
4748
RebalanceTimeout: rebalanceTimeout,
4849
HeartbeatInterval: heartbeatInterval,
4950
MaxWait: time.Second,
50-
})
51+
}
52+
53+
r := c.conn.CreateReader(readerConfig)
5154
defer r.Close()
5255

5356
jobs := make(chan job)

transport/kafka/factory_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestTransportFactory_Create(t *testing.T) {
9797

9898
transport, err := factory.Create(name, dsn, optionsBytes)
9999

100-
require.NoError(t, err)
101-
assert.NotNil(t, transport)
102-
assert.IsType(t, &kafka.Transport{}, transport)
100+
require.Error(t, err)
101+
assert.Contains(t, err.Error(), "kafka")
102+
assert.Nil(t, transport)
103103
}

transport/kafka/producer.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@ type Producer struct {
1616
serializer api.Serializer
1717
}
1818

19-
func NewProducer(cfg TransportConfig, ser api.Serializer) (*Producer, error) {
19+
func NewProducer(cfg TransportConfig, ser api.Serializer, conn *Connection) (*Producer, error) {
2020
return &Producer{
2121
cfg: cfg,
2222
serializer: ser,
23-
writer: &kafka.Writer{
24-
Addr: kafka.TCP(cfg.Options.Brokers...),
25-
Topic: cfg.Options.Topic,
26-
RequiredAcks: kafka.RequireAll,
27-
Balancer: &kafka.LeastBytes{},
28-
Async: false,
29-
},
23+
writer: conn.CreateWriter(cfg.Options.Topic),
3024
}, nil
3125
}
3226

transport/kafka/transport.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,33 @@ type Transport struct {
1515
consumer *Consumer
1616
serializer api.Serializer
1717
logger *slog.Logger
18+
conn *Connection
1819
}
1920

2021
func NewTransport(cfg TransportConfig, resolver api.TypeResolver, logger *slog.Logger) (api.Transport, error) {
22+
conn, err := NewConnection(cfg.Options.Brokers)
23+
if err != nil {
24+
logger.Error("failed to connect to Kafka brokers", "error", err)
25+
26+
return nil, fmt.Errorf("kafka: %w", err)
27+
}
28+
2129
ser := serializer.NewSerializer(resolver)
2230

23-
producer, err := NewProducer(cfg, ser)
31+
producer, err := NewProducer(cfg, ser, conn)
2432
if err != nil {
2533
return nil, fmt.Errorf("failed to create kafka producer: %w", err)
2634
}
2735

28-
consumer := NewConsumer(cfg, ser)
36+
consumer := NewConsumer(cfg, ser, conn)
2937

3038
return &Transport{
3139
cfg: cfg,
3240
producer: producer,
3341
consumer: consumer,
3442
serializer: ser,
3543
logger: logger,
44+
conn: conn,
3645
}, nil
3746
}
3847

0 commit comments

Comments
 (0)