Skip to content

Commit 427e30b

Browse files
committed
fix(mqtt): use topicPrefix
Signed-off-by: hmoazzem <[email protected]>
1 parent dde00be commit 427e30b

File tree

1 file changed

+37
-35
lines changed

1 file changed

+37
-35
lines changed

pkg/pipeline/peer/mqtt/peer.go

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (p *PeerMQTT) Connect(config json.RawMessage, args ...any) error {
4343
opts.Servers = append(opts.Servers, u)
4444
}
4545

46-
p.Config.TopicPrefix = cmp.Or(p.Config.TopicPrefix, "pgo")
46+
p.Config.TopicPrefix = cmp.Or(cfg.TopicPrefix, "pgo")
4747

4848
mqttOpts := convertToPahoOptions(&opts)
4949

@@ -76,13 +76,6 @@ func (p *PeerMQTT) Sub(args ...any) (<-chan cdc.Event, error) {
7676
}
7777
}
7878

79-
enableResponse := true
80-
if len(args) > 1 {
81-
if enabled, ok := args[1].(bool); ok {
82-
enableResponse = enabled
83-
}
84-
}
85-
8679
filter := prefix + "/#"
8780
events := make(chan cdc.Event, 100)
8881

@@ -128,28 +121,28 @@ func (p *PeerMQTT) Sub(args ...any) (<-chan cdc.Event, error) {
128121
}
129122
}
130123

131-
var payloads []interface{}
124+
var payloads []any
132125
if len(msg.Payload()) > 0 {
133126
if isBatch {
134127
if err := json.Unmarshal(msg.Payload(), &payloads); err != nil {
135-
var singlePayload interface{}
128+
var singlePayload any
136129
if err := json.Unmarshal(msg.Payload(), &singlePayload); err != nil {
137130
p.logger.Warn("invalid payload",
138131
zap.Error(err),
139132
zap.String("topic", topic))
140133
return
141134
}
142-
payloads = []interface{}{singlePayload}
135+
payloads = []any{singlePayload}
143136
}
144137
} else {
145-
var singlePayload interface{}
138+
var singlePayload any
146139
if err := json.Unmarshal(msg.Payload(), &singlePayload); err != nil {
147140
p.logger.Warn("invalid payload",
148141
zap.Error(err),
149142
zap.String("topic", topic))
150143
return
151144
}
152-
payloads = []interface{}{singlePayload}
145+
payloads = []any{singlePayload}
153146
}
154147
}
155148

@@ -162,27 +155,6 @@ func (p *PeerMQTT) Sub(args ...any) (<-chan cdc.Event, error) {
162155
p.logger.Warn("event channel full, dropping message")
163156
}
164157
}
165-
166-
if enableResponse {
167-
responseTopic := "/response" + msg.Topic()
168-
response := map[string]interface{}{
169-
"success": true,
170-
"timestamp": time.Now().UnixMilli(),
171-
"count": len(payloads),
172-
}
173-
174-
responseData, err := json.Marshal(response)
175-
if err != nil {
176-
p.logger.Error("failed to marshal response", zap.Error(err))
177-
return
178-
}
179-
180-
if err := p.Client.Publish(responseTopic, 0, false, responseData); err != nil {
181-
p.logger.Error("failed to publish response",
182-
zap.Error(err),
183-
zap.String("topic", responseTopic))
184-
}
185-
}
186158
})
187159

188160
if err := token.Error(); err != nil {
@@ -203,7 +175,7 @@ func (p *PeerMQTT) Disconnect() error {
203175
}
204176

205177
// createEvent creates a new CDC event using the builder pattern
206-
func createEvent(schema, table, opCode string, payload interface{}) cdc.Event {
178+
func createEvent(schema, table, opCode string, payload any) cdc.Event {
207179
source := cdc.NewSourceBuilder("mqtt", "mqtt-source").
208180
WithDatabase("mqtt").
209181
WithSchema(schema).
@@ -228,3 +200,33 @@ func createEvent(schema, table, opCode string, payload interface{}) cdc.Event {
228200
func init() {
229201
pipeline.RegisterConnector(pipeline.ConnectorMQTT, &PeerMQTT{})
230202
}
203+
204+
// apparently response isn't useful
205+
/*
206+
enableResponse := true
207+
if len(args) > 1 {
208+
if enabled, ok := args[1].(bool); ok {
209+
enableResponse = enabled
210+
}
211+
}
212+
if enableResponse {
213+
responseTopic := "/response" + msg.Topic()
214+
response := map[string]any{
215+
"success": true,
216+
"timestamp": time.Now().UnixMilli(),
217+
"count": len(payloads),
218+
}
219+
220+
responseData, err := json.Marshal(response)
221+
if err != nil {
222+
p.logger.Error("failed to marshal response", zap.Error(err))
223+
return
224+
}
225+
226+
if err := p.Client.Publish(responseTopic, 0, false, responseData); err != nil {
227+
p.logger.Error("failed to publish response",
228+
zap.Error(err),
229+
zap.String("topic", responseTopic))
230+
}
231+
}
232+
*/

0 commit comments

Comments
 (0)