Skip to content

Commit ba7f739

Browse files
committed
refactor: moving DSN parsing from factory to transport for kafka
1 parent 02944bf commit ba7f739

File tree

3 files changed

+10
-10
lines changed

3 files changed

+10
-10
lines changed

transport/kafka/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,4 @@ type OptionsConfig struct {
1616
Offset string `yaml:"offset" default:"latest"`
1717
Group string `yaml:"group" default:"group"`
1818
Topic string `yaml:"topic" default:"topic"`
19-
Brokers []string `yaml:"brokers"`
2019
}

transport/kafka/factory.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kafka
33
import (
44
"fmt"
55
"log/slog"
6-
"net/url"
76
"strings"
87

98
"github.com/creasty/defaults"
@@ -38,13 +37,6 @@ func (t *TransportFactory) Create(name string, dsn string, options []byte) (api.
3837
return nil, fmt.Errorf("kafka: unmarshal options: %w", err)
3938
}
4039

41-
u, err := url.Parse(dsn)
42-
if err != nil {
43-
return nil, fmt.Errorf("kafka: failed to parse dsn: %w", err)
44-
}
45-
46-
optsConfig.Brokers = strings.Split(u.Host, ",")
47-
4840
tCfg := TransportConfig{
4941
Name: name,
5042
DSN: dsn,

transport/kafka/transport.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"fmt"
66
"log/slog"
7+
"net/url"
8+
"strings"
79

810
"github.com/gerfey/messenger/api"
911
"github.com/gerfey/messenger/serializer"
@@ -19,7 +21,14 @@ type Transport struct {
1921
}
2022

2123
func NewTransport(cfg TransportConfig, resolver api.TypeResolver, logger *slog.Logger) (api.Transport, error) {
22-
conn, err := NewConnection(cfg.Options.Brokers)
24+
u, err := url.Parse(cfg.DSN)
25+
if err != nil {
26+
return nil, fmt.Errorf("kafka: failed to parse dsn: %w", err)
27+
}
28+
29+
brokers := strings.Split(u.Host, ",")
30+
31+
conn, err := NewConnection(brokers)
2332
if err != nil {
2433
logger.Error("failed to connect to Kafka brokers", "error", err)
2534

0 commit comments

Comments
 (0)