Skip to content

Commit 21c7b10

Browse files
authored
SMQ-2888 - Change internal message topic format (#2889)
Signed-off-by: Arvindh <[email protected]>
1 parent d64a74b commit 21c7b10

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+841
-542
lines changed

cmd/mqtt/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ func main() {
148148
}
149149
defer mpub.Close()
150150

151-
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
152-
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
151+
fwd := mqtt.NewForwarder(brokers.SubjectAllMessages, logger)
152+
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllMessages)
153153
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
154154
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
155155
exitCode = 1

coap/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,5 @@ Setting `SMQ_CLIENTS_GRPC_CLIENT_CERT` and `SMQ_CLIENTS_GRPC_CLIENT_KEY` will en
7676

7777
## Usage
7878

79-
If CoAP adapter is running locally (on default 5683 port), a valid URL would be: `coap://localhost/channels/<channel_id>/messages?auth=<client_auth_key>`.
79+
If CoAP adapter is running locally (on default 5683 port), a valid URL would be: `coap://localhost/m/<domain_id>/c/<channel_id>/<subtopic>?auth=<client_auth_key>`.
8080
Since CoAP protocol does not support `Authorization` header (option) and options have limited size, in order to send CoAP messages, valid `auth` value (a valid Client key) must be present in `Uri-Query` option.

coap/adapter.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package coap
88

99
import (
1010
"context"
11-
"fmt"
1211

1312
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
1413
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
@@ -21,8 +20,6 @@ import (
2120

2221
var errFailedToDisconnectClient = errors.New("failed to disconnect client")
2322

24-
const chansPrefix = "channels"
25-
2623
// Service specifies CoAP service API.
2724
type Service interface {
2825
// Publish publishes message to specified channel.
@@ -37,7 +34,7 @@ type Service interface {
3734
Unsubscribe(ctx context.Context, key, domainID, chanID, subptopic, token string) error
3835

3936
// DisconnectHandler method is used to disconnected the client
40-
DisconnectHandler(ctx context.Context, chanID, subptopic, token string) error
37+
DisconnectHandler(ctx context.Context, domainID, chanID, subptopic, token string) error
4138
}
4239

4340
var _ Service = (*adapterService)(nil)
@@ -87,7 +84,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi
8784

8885
msg.Publisher = authnRes.GetId()
8986

90-
return svc.pubsub.Publish(ctx, msg.GetChannel(), msg)
87+
return svc.pubsub.Publish(ctx, messaging.EncodeMessageTopic(msg), msg)
9188
}
9289

9390
func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID, subtopic string, c Client) error {
@@ -116,11 +113,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID,
116113
return svcerr.ErrAuthorization
117114
}
118115

119-
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
120-
if subtopic != "" {
121-
subject = fmt.Sprintf("%s.%s", subject, subtopic)
122-
}
123-
116+
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
124117
authzc := newAuthzClient(clientID, domainID, chanID, subtopic, svc.channels, c)
125118
subCfg := messaging.SubscriberConfig{
126119
ID: c.Token(),
@@ -156,19 +149,13 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, domainID, chanI
156149
return svcerr.ErrAuthorization
157150
}
158151

159-
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
160-
if subtopic != "" {
161-
subject = fmt.Sprintf("%s.%s", subject, subtopic)
162-
}
152+
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
163153

164154
return svc.pubsub.Unsubscribe(ctx, token, subject)
165155
}
166156

167-
func (svc *adapterService) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) error {
168-
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
169-
if subtopic != "" {
170-
subject = fmt.Sprintf("%s.%s", subject, subtopic)
171-
}
157+
func (svc *adapterService) DisconnectHandler(ctx context.Context, domainID, chanID, subtopic, token string) error {
158+
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
172159

173160
return svc.pubsub.Unsubscribe(ctx, token, subject)
174161
}

coap/api/logging.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,11 @@ func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, domainID, cha
9797

9898
// DisconnectHandler logs the disconnect handler. It logs the channel ID, subtopic (if any) and the time it took to complete the request.
9999
// If the request fails, it logs the error.
100-
func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) (err error) {
100+
func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, domainID, chanID, subtopic, token string) (err error) {
101101
defer func(begin time.Time) {
102102
args := []any{
103103
slog.String("duration", time.Since(begin).String()),
104+
slog.String("domain_id", domainID),
104105
slog.String("channel_id", chanID),
105106
slog.String("token", token),
106107
}
@@ -115,5 +116,5 @@ func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, chanID, subt
115116
lm.logger.Info("Unsubscribe completed successfully", args...)
116117
}(time.Now())
117118

118-
return lm.svc.DisconnectHandler(ctx, chanID, subtopic, token)
119+
return lm.svc.DisconnectHandler(ctx, domainID, chanID, subtopic, token)
119120
}

coap/api/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ func (mm *metricsMiddleware) Unsubscribe(ctx context.Context, key, domainID, cha
6262
}
6363

6464
// DisconnectHandler instruments DisconnectHandler method with metrics.
65-
func (mm *metricsMiddleware) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) error {
65+
func (mm *metricsMiddleware) DisconnectHandler(ctx context.Context, domainID, chanID, subtopic, token string) error {
6666
defer func(begin time.Time) {
6767
mm.counter.With("method", "disconnect_handler").Add(1)
6868
mm.latency.With("method", "disconnect_handler").Observe(time.Since(begin).Seconds())
6969
}(time.Now())
7070

71-
return mm.svc.DisconnectHandler(ctx, chanID, subtopic, token)
71+
return mm.svc.DisconnectHandler(ctx, domainID, chanID, subtopic, token)
7272
}

coap/api/transport.go

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"io"
1010
"log/slog"
1111
"net/http"
12-
"net/url"
13-
"regexp"
1412
"strings"
1513
"time"
1614

@@ -33,18 +31,9 @@ const (
3331
startObserve = 0 // observe option value that indicates start of observation
3432
)
3533

36-
var channelPartRegExp = regexp.MustCompile(`^/m/([\w\-]+)/c/([\w\-]+)(/[^?]*)?(\?.*)?$`)
37-
38-
const (
39-
numGroups = 4 // entire expression+ domain group + channel group + subtopic group
40-
domainGroup = 1 // domain group is first in channel regexp
41-
channelGroup = 3 // channel group is third in channel regexp
42-
)
43-
4434
var (
45-
errMalformedSubtopic = errors.New("malformed subtopic")
46-
errBadOptions = errors.New("bad options")
47-
errMethodNotAllowed = errors.New("method not allowed")
35+
errBadOptions = errors.New("bad options")
36+
errMethodNotAllowed = errors.New("method not allowed")
4837
)
4938

5039
var (
@@ -133,7 +122,7 @@ func handleGet(m *mux.Message, w mux.ResponseWriter, msg *messaging.Message, key
133122
if obs == startObserve {
134123
c := coap.NewClient(w.Conn(), m.Token(), logger)
135124
w.Conn().AddOnClose(func() {
136-
_ = service.DisconnectHandler(context.Background(), msg.GetChannel(), msg.GetSubtopic(), c.Token())
125+
_ = service.DisconnectHandler(context.Background(), msg.GetDomain(), msg.GetChannel(), msg.GetSubtopic(), c.Token())
137126
})
138127
return service.Subscribe(w.Conn().Context(), key, msg.GetDomain(), msg.GetChannel(), msg.GetSubtopic(), c)
139128
}
@@ -148,20 +137,23 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
148137
if err != nil {
149138
return &messaging.Message{}, err
150139
}
151-
channelParts := channelPartRegExp.FindStringSubmatch(path)
152-
if len(channelParts) < numGroups {
153-
return &messaging.Message{}, errMalformedSubtopic
154-
}
155140

156-
st, err := parseSubtopic(channelParts[channelGroup])
141+
var domainID, channelID, subTopic string
142+
switch msg.Code() {
143+
case codes.GET:
144+
domainID, channelID, subTopic, err = messaging.ParseSubscribeTopic(path)
145+
case codes.POST:
146+
domainID, channelID, subTopic, err = messaging.ParsePublishTopic(path)
147+
}
157148
if err != nil {
158149
return &messaging.Message{}, err
159150
}
151+
160152
ret := &messaging.Message{
161153
Protocol: protocol,
162-
Domain: channelParts[domainGroup],
163-
Channel: channelParts[2],
164-
Subtopic: st,
154+
Domain: domainID,
155+
Channel: channelID,
156+
Subtopic: subTopic,
165157
Payload: []byte{},
166158
Created: time.Now().UnixNano(),
167159
}
@@ -187,32 +179,3 @@ func parseKey(msg *mux.Message) (string, error) {
187179
}
188180
return vars[1], nil
189181
}
190-
191-
func parseSubtopic(subtopic string) (string, error) {
192-
if subtopic == "" {
193-
return subtopic, nil
194-
}
195-
196-
subtopic, err := url.QueryUnescape(subtopic)
197-
if err != nil {
198-
return "", errMalformedSubtopic
199-
}
200-
subtopic = strings.ReplaceAll(subtopic, "/", ".")
201-
202-
elems := strings.Split(subtopic, ".")
203-
filteredElems := []string{}
204-
for _, elem := range elems {
205-
if elem == "" {
206-
continue
207-
}
208-
209-
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
210-
return "", errMalformedSubtopic
211-
}
212-
213-
filteredElems = append(filteredElems, elem)
214-
}
215-
216-
subtopic = strings.Join(filteredElems, ".")
217-
return subtopic, nil
218-
}

coap/tracing/adapter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ func (tm *tracingServiceMiddleware) Unsubscribe(ctx context.Context, key, domain
6666
}
6767

6868
// DisconnectHandler traces a CoAP disconnect operation.
69-
func (tm *tracingServiceMiddleware) DisconnectHandler(ctx context.Context, chanID, subptopic, token string) error {
69+
func (tm *tracingServiceMiddleware) DisconnectHandler(ctx context.Context, domainID, chanID, subptopic, token string) error {
7070
ctx, span := tm.tracer.Start(ctx, disconnectHandlerOp, trace.WithAttributes(
71+
attribute.String("domain_id", domainID),
7172
attribute.String("channel_id", chanID),
7273
attribute.String("subtopic", subptopic),
7374
))
7475
defer span.End()
75-
return tm.svc.DisconnectHandler(ctx, chanID, subptopic, token)
76+
return tm.svc.DisconnectHandler(ctx, domainID, chanID, subptopic, token)
7677
}

consumers/messages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ type config struct {
123123
func loadConfig(configPath string) (config, error) {
124124
cfg := config{
125125
SubscriberCfg: subscriberConfig{
126-
Subjects: []string{brokers.SubjectAllChannels},
126+
Subjects: []string{brokers.SubjectAllMessages},
127127
},
128128
TransformerCfg: transformerConfig{
129129
Format: defFormat,

http/api/endpoint_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
smqauthn "github.com/absmach/supermq/pkg/authn"
2929
authnMocks "github.com/absmach/supermq/pkg/authn/mocks"
3030
"github.com/absmach/supermq/pkg/connections"
31+
"github.com/absmach/supermq/pkg/messaging"
3132
pubsub "github.com/absmach/supermq/pkg/messaging/mocks"
3233
"github.com/absmach/supermq/pkg/policies"
3334
"github.com/stretchr/testify/assert"
@@ -257,7 +258,7 @@ func TestPublish(t *testing.T) {
257258
ClientType: policies.ClientType,
258259
Type: uint32(connections.Publish),
259260
}).Return(tc.authzRes, tc.authzErr)
260-
svcCall := pub.On("Publish", mock.Anything, tc.chanID, mock.Anything).Return(nil)
261+
svcCall := pub.On("Publish", mock.Anything, messaging.EncodeTopicSuffix(tc.domainID, tc.chanID, ""), mock.Anything).Return(nil)
261262
req := testRequest{
262263
client: ts.Client(),
263264
method: http.MethodPost,

http/handler.go

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88
"fmt"
99
"log/slog"
1010
"net/http"
11-
"net/url"
12-
"regexp"
1311
"strings"
1412
"time"
1513

@@ -49,14 +47,10 @@ var (
4947
errClientNotInitialized = errors.New("client is not initialized")
5048
errFailedPublish = errors.New("failed to publish")
5149
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
52-
errMalformedSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed subtopic"))
5350
errMalformedTopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed topic"))
5451
errMissingTopicPub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to publish due to missing topic"))
55-
errFailedParseSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to parse subtopic"))
5652
)
5753

58-
var channelRegExp = regexp.MustCompile(`^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
59-
6054
// Event implements events.Event interface.
6155
type handler struct {
6256
publisher messaging.Publisher
@@ -125,6 +119,11 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
125119
return errors.Wrap(errFailedPublish, errClientNotInitialized)
126120
}
127121

122+
domainID, chanID, subtopic, err := messaging.ParsePublishTopic(*topic)
123+
if err != nil {
124+
return errors.Wrap(errMalformedTopic, err)
125+
}
126+
128127
var clientID, clientType string
129128
switch {
130129
case strings.HasPrefix(string(s.Password), "Client"):
@@ -153,11 +152,6 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
153152
return mgate.NewHTTPProxyError(http.StatusUnauthorized, svcerr.ErrAuthentication)
154153
}
155154

156-
domainID, chanID, subtopic, err := parseTopic(*topic)
157-
if err != nil {
158-
return mgate.NewHTTPProxyError(http.StatusBadRequest, err)
159-
}
160-
161155
msg := messaging.Message{
162156
Protocol: protocol,
163157
Domain: domainID,
@@ -186,7 +180,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
186180
msg.Publisher = clientID
187181
}
188182

189-
if err := h.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
183+
if err := h.publisher.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil {
190184
return errors.Wrap(errFailedPublishToMsgBroker, err)
191185
}
192186

@@ -209,52 +203,3 @@ func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error {
209203
func (h *handler) Disconnect(ctx context.Context) error {
210204
return nil
211205
}
212-
213-
func parseTopic(topic string) (string, string, string, error) {
214-
// Topics are in the format:
215-
// m/<domain_id>/c/<channel_id>/<subtopic>/.../ct/<content_type>
216-
channelParts := channelRegExp.FindStringSubmatch(topic)
217-
if len(channelParts) < 3 {
218-
return "", "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
219-
}
220-
221-
domainID := channelParts[1]
222-
chanID := channelParts[2]
223-
subtopic := channelParts[3]
224-
225-
subtopic, err := parseSubtopic(subtopic)
226-
if err != nil {
227-
return "", "", "", errors.Wrap(errFailedParseSubtopic, err)
228-
}
229-
230-
return domainID, chanID, subtopic, nil
231-
}
232-
233-
func parseSubtopic(subtopic string) (string, error) {
234-
if subtopic == "" {
235-
return subtopic, nil
236-
}
237-
238-
subtopic, err := url.QueryUnescape(subtopic)
239-
if err != nil {
240-
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
241-
}
242-
subtopic = strings.ReplaceAll(subtopic, "/", ".")
243-
244-
elems := strings.Split(subtopic, ".")
245-
filteredElems := []string{}
246-
for _, elem := range elems {
247-
if elem == "" {
248-
continue
249-
}
250-
251-
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
252-
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
253-
}
254-
255-
filteredElems = append(filteredElems, elem)
256-
}
257-
258-
subtopic = strings.Join(filteredElems, ".")
259-
return subtopic, nil
260-
}

0 commit comments

Comments
 (0)