Skip to content

Commit d7da4fa

Browse files
committed
feat: use mgate to proxy coap with dtls
Signed-off-by: Felix Gateru <[email protected]>
1 parent d31c661 commit d7da4fa

File tree

11 files changed

+298
-124
lines changed

11 files changed

+298
-124
lines changed

cmd/coap/main.go

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@ import (
88
"context"
99
"fmt"
1010
"log"
11+
"log/slog"
1112
"net/url"
1213
"os"
1314

1415
chclient "github.com/absmach/callhome/pkg/client"
16+
"github.com/absmach/mgate"
17+
mgatecoap "github.com/absmach/mgate/pkg/coap"
18+
"github.com/absmach/mgate/pkg/session"
19+
mgtls "github.com/absmach/mgate/pkg/tls"
1520
"github.com/absmach/supermq"
1621
"github.com/absmach/supermq/coap"
1722
httpapi "github.com/absmach/supermq/coap/api"
18-
"github.com/absmach/supermq/coap/tracing"
23+
"github.com/absmach/supermq/coap/middleware"
1924
smqlog "github.com/absmach/supermq/logger"
2025
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
2126
"github.com/absmach/supermq/pkg/grpcclient"
@@ -30,19 +35,24 @@ import (
3035
httpserver "github.com/absmach/supermq/pkg/server/http"
3136
"github.com/absmach/supermq/pkg/uuid"
3237
"github.com/caarlos0/env/v11"
38+
"github.com/pion/dtls/v3"
3339
"golang.org/x/sync/errgroup"
3440
)
3541

3642
const (
37-
svcName = "coap_adapter"
38-
envPrefix = "SMQ_COAP_ADAPTER_"
39-
envPrefixHTTP = "SMQ_COAP_ADAPTER_HTTP_"
40-
envPrefixCache = "SMQ_COAP_CACHE_"
41-
envPrefixClients = "SMQ_CLIENTS_GRPC_"
42-
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
43-
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
44-
defSvcHTTPPort = "5683"
45-
defSvcCoAPPort = "5683"
43+
svcName = "coap_adapter"
44+
envPrefix = "SMQ_COAP_ADAPTER_"
45+
envPrefixHTTP = "SMQ_COAP_ADAPTER_HTTP_"
46+
envPrefixCache = "SMQ_COAP_CACHE_"
47+
envPrefixClients = "SMQ_CLIENTS_GRPC_"
48+
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
49+
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
50+
defSvcHTTPPort = "5683"
51+
defSvcCoAPPort = "5683"
52+
targetProtocol = "coap"
53+
targetCoapHost = "localhost"
54+
targetCoapPort = "5683"
55+
targetCoapDtlsPort = "5684"
4656
)
4757

4858
type config struct {
@@ -94,6 +104,13 @@ func main() {
94104
return
95105
}
96106

107+
dtlsCfg, err := mgtls.NewConfig(env.Options{Prefix: envPrefix})
108+
if err != nil {
109+
logger.Error(fmt.Sprintf("failed to load %s DTLS configuration : %s", svcName, err))
110+
exitCode = 1
111+
return
112+
}
113+
97114
cacheConfig := messaging.CacheConfig{}
98115
if err := env.ParseWithOptions(&cacheConfig, env.Options{Prefix: envPrefixCache}); err != nil {
99116
logger.Error(fmt.Sprintf("failed to load cache configuration : %s", err))
@@ -181,12 +198,12 @@ func main() {
181198

182199
svc := coap.New(clientsClient, channelsClient, nps)
183200

184-
svc = tracing.New(tracer, svc)
201+
svc = middleware.TracingMiddleware(tracer, svc)
185202

186-
svc = httpapi.LoggingMiddleware(svc, logger)
203+
svc = middleware.LoggingMiddleware(svc, logger)
187204

188205
counter, latency := prometheus.MakeMetrics(svcName, "api")
189-
svc = httpapi.MetricsMiddleware(svc, counter, latency)
206+
svc = middleware.MetricsMiddleware(svc, counter, latency)
190207

191208
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(cfg.InstanceID), logger)
192209

@@ -207,7 +224,8 @@ func main() {
207224
return hs.Start()
208225
})
209226
g.Go(func() error {
210-
return cs.Start()
227+
handler := coap.NewHandler(nps, logger, clientsClient, channelsClient, parser)
228+
return proxyCoAP(ctx, coapServerConfig, dtlsCfg, handler, logger)
211229
})
212230
g.Go(func() error {
213231
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs, cs)
@@ -217,3 +235,43 @@ func main() {
217235
logger.Error(fmt.Sprintf("CoAP adapter service terminated: %s", err))
218236
}
219237
}
238+
239+
func proxyCoAP(ctx context.Context, cfg server.Config, dtlsCfg mgtls.Config, handler session.Handler, logger *slog.Logger) error {
240+
var err error
241+
config := mgate.Config{
242+
Host: cfg.Host,
243+
Port: cfg.Port,
244+
TargetProtocol: targetProtocol,
245+
TargetHost: targetCoapHost,
246+
TargetPort: targetCoapPort,
247+
}
248+
249+
mg := mgatecoap.NewProxy(config, handler, logger)
250+
251+
errCh := make(chan error)
252+
253+
logger.Info(fmt.Sprintf("Starting COAP without DTLS proxy on port %s", cfg.Port))
254+
go func() {
255+
errCh <- mg.Listen(ctx)
256+
}()
257+
config.DTLSConfig, err = mgtls.LoadTLSConfig(&dtlsCfg, &dtls.Config{})
258+
if err != nil {
259+
return err
260+
}
261+
262+
if config.DTLSConfig != nil {
263+
config.Port = targetCoapDtlsPort
264+
mgDtls := mgatecoap.NewProxy(config, handler, logger)
265+
logger.Info(fmt.Sprintf("Starting COAP with DTLS proxy on port %s", cfg.Port))
266+
go func() {
267+
errCh <- mgDtls.Listen(ctx)
268+
}()
269+
}
270+
select {
271+
case <-ctx.Done():
272+
logger.Info(fmt.Sprintf("proxy COAP shutdown at %s:%s", config.Host, config.Port))
273+
return nil
274+
case err := <-errCh:
275+
return err
276+
}
277+
}

coap/adapter.go

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,11 @@ var errFailedToDisconnectClient = errors.New("failed to disconnect client")
2323

2424
// Service specifies CoAP service API.
2525
type Service interface {
26-
// Publish publishes message to specified channel.
27-
// Key is used to authorize publisher.
28-
Publish(ctx context.Context, key string, msg *messaging.Message) error
29-
3026
// Subscribes to channel with specified id, domainID, subtopic and adds subscription to
3127
// service map of subscriptions under given ID.
3228
Subscribe(ctx context.Context, key, domainID, chanID, subtopic string, c Client) error
3329

34-
// Unsubscribe method is used to stop observing resource.
30+
// Unsubscribe methdod is used to stop observing resource.
3531
Unsubscribe(ctx context.Context, key, domainID, chanID, subptopic, token string) error
3632

3733
// DisconnectHandler method is used to disconnected the client
@@ -58,36 +54,6 @@ func New(clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.Cha
5854
return as
5955
}
6056

61-
func (svc *adapterService) Publish(ctx context.Context, key string, msg *messaging.Message) error {
62-
authnRes, err := svc.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{
63-
Token: authn.AuthPack(authn.DomainAuth, msg.GetDomain(), key),
64-
})
65-
if err != nil {
66-
return errors.Wrap(svcerr.ErrAuthentication, err)
67-
}
68-
if !authnRes.Authenticated {
69-
return svcerr.ErrAuthentication
70-
}
71-
72-
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
73-
DomainId: msg.GetDomain(),
74-
ClientId: authnRes.GetId(),
75-
ClientType: policies.ClientType,
76-
Type: uint32(connections.Publish),
77-
ChannelId: msg.GetChannel(),
78-
})
79-
if err != nil {
80-
return errors.Wrap(svcerr.ErrAuthorization, err)
81-
}
82-
if !authzRes.Authorized {
83-
return svcerr.ErrAuthorization
84-
}
85-
86-
msg.Publisher = authnRes.GetId()
87-
88-
return svc.pubsub.Publish(ctx, messaging.EncodeMessageTopic(msg), msg)
89-
}
90-
9157
func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID, subtopic string, c Client) error {
9258
authnRes, err := svc.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{
9359
Token: authn.AuthPack(authn.DomainAuth, domainID, key),
@@ -100,19 +66,6 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID,
10066
}
10167

10268
clientID := authnRes.GetId()
103-
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
104-
DomainId: domainID,
105-
ClientId: clientID,
106-
ClientType: policies.ClientType,
107-
Type: uint32(connections.Subscribe),
108-
ChannelId: chanID,
109-
})
110-
if err != nil {
111-
return errors.Wrap(svcerr.ErrAuthorization, err)
112-
}
113-
if !authzRes.Authorized {
114-
return svcerr.ErrAuthorization
115-
}
11669

11770
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
11871
authzc := newAuthzClient(clientID, domainID, chanID, subtopic, svc.channels, c)

coap/api/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (h *CoAPHandler) ServeCOAP(w mux.ResponseWriter, m *mux.Message) {
104104
err = h.handleGet(m, w, msg, key)
105105
case codes.POST:
106106
resp.SetCode(codes.Created)
107-
err = h.service.Publish(m.Context(), key, msg)
107+
err = nil
108108
default:
109109
err = errMethodNotAllowed
110110
}

0 commit comments

Comments
 (0)