@@ -21,14 +21,15 @@ import (
21
21
"github.com/gerfey/messenger/transport"
22
22
"github.com/gerfey/messenger/transport/amqp"
23
23
"github.com/gerfey/messenger/transport/inmemory"
24
+ "github.com/gerfey/messenger/transport/sync"
24
25
)
25
26
26
27
type Builder struct {
27
28
cfg * config.MessengerConfig
28
29
resolver api.TypeResolver
29
30
transportFactory * transport.FactoryChain
30
31
handlersLocator api.HandlerLocator
31
- transportLocator api.TransportLocator
32
+ senderLocator api.SenderLocator
32
33
middlewareLocator api.MiddlewareLocator
33
34
busLocator api.BusLocator
34
35
eventDispatcher api.EventDispatcher
@@ -38,19 +39,22 @@ type Builder struct {
38
39
func NewBuilder (cfg * config.MessengerConfig , logger * slog.Logger ) api.Builder {
39
40
resolver := NewResolver ()
40
41
42
+ busLocator := bus .NewLocator ()
43
+
41
44
tf := transport .NewFactoryChain (
42
- amqp .NewTransportFactory (resolver , logger ),
43
- inmemory .NewTransportFactory (resolver , logger ),
45
+ amqp .NewTransportFactory (logger , resolver ),
46
+ inmemory .NewTransportFactory (logger , resolver ),
47
+ sync .NewTransportFactory (logger , busLocator ),
44
48
)
45
49
46
50
return & Builder {
47
51
cfg : cfg ,
48
52
resolver : resolver ,
49
53
transportFactory : tf ,
50
54
handlersLocator : handler .NewHandlerLocator (),
51
- transportLocator : transport .NewLocator (),
55
+ senderLocator : transport .NewSenderLocator (),
52
56
middlewareLocator : middleware .NewMiddlewareLocator (),
53
- busLocator : bus . NewLocator () ,
57
+ busLocator : busLocator ,
54
58
eventDispatcher : event .NewEventDispatcher (logger ),
55
59
logger : logger ,
56
60
}
@@ -91,25 +95,16 @@ func (b *Builder) RegisterListener(event any, listener any) {
91
95
}
92
96
93
97
func (b * Builder ) Build () (api.Messenger , error ) {
94
- router := routing .NewRouter ()
95
- for msgTypeStr , transportName := range b .cfg .Routing {
96
- t , err := b .handlersLocator .ResolveMessageType (msgTypeStr )
97
- if err != nil {
98
- return nil , fmt .Errorf ("failed to resolve message type '%s' in routing configuration: %w" , msgTypeStr , err )
99
- }
100
- router .RouteTypeTo (t , transportName )
101
- }
102
-
103
98
b .registerStamps ()
104
99
105
- if err := b .setupBuses (router ); err != nil {
100
+ if err := b .setupBuses (); err != nil {
106
101
return nil , err
107
102
}
108
103
109
- return b .createMessenger (router )
104
+ return b .createMessenger ()
110
105
}
111
106
112
- func (b * Builder ) setupBuses (router api. Router ) error {
107
+ func (b * Builder ) setupBuses () error {
113
108
for name , cfg := range b .cfg .Buses {
114
109
var chain []api.Middleware
115
110
@@ -124,9 +119,9 @@ func (b *Builder) setupBuses(router api.Router) error {
124
119
chain = append (chain , implementation .NewAddBusNameMiddleware (name ))
125
120
chain = append (
126
121
chain ,
127
- implementation .NewSendMessageMiddleware (router , b . transportLocator , b .eventDispatcher , b .logger ),
122
+ implementation .NewSendMessageMiddleware (b . logger , b .senderLocator , b .eventDispatcher ),
128
123
)
129
- chain = append (chain , implementation .NewHandleMessageMiddleware (b .handlersLocator , b .logger ))
124
+ chain = append (chain , implementation .NewHandleMessageMiddleware (b .logger , b .handlersLocator ))
130
125
131
126
createNewBus := bus .NewBus (chain ... )
132
127
@@ -139,7 +134,48 @@ func (b *Builder) setupBuses(router api.Router) error {
139
134
return nil
140
135
}
141
136
142
- func (b * Builder ) createMessenger (router api.Router ) (api.Messenger , error ) {
137
+ func (b * Builder ) createMessenger () (api.Messenger , error ) {
138
+ router , err := b .setupRouting ()
139
+ if err != nil {
140
+ return nil , err
141
+ }
142
+
143
+ busMap := b .createBusMap ()
144
+ handlerManager := b .createHandlerManager (busMap )
145
+ manager := transport .NewManager (b .logger , handlerManager , b .eventDispatcher )
146
+
147
+ createdTransports , transportNames , err := b .createTransports (manager )
148
+ if err != nil {
149
+ return nil , err
150
+ }
151
+
152
+ b .setupFallbackTransports (transportNames )
153
+ b .setupRetryListeners (createdTransports )
154
+
155
+ defaultBus , ok := b .busLocator .Get (b .cfg .DefaultBus )
156
+ if ! ok {
157
+ return nil , fmt .Errorf ("default_bus %q not found" , defaultBus )
158
+ }
159
+
160
+ return messenger .NewMessenger (b .cfg .DefaultBus , manager , b .busLocator , router ), nil
161
+ }
162
+
163
+ func (b * Builder ) setupRouting () (api.Router , error ) {
164
+ router := routing .NewRouter ()
165
+
166
+ for msgTypeStr , transportName := range b .cfg .Routing {
167
+ t , err := b .handlersLocator .ResolveMessageType (msgTypeStr )
168
+ if err != nil {
169
+ return nil , fmt .Errorf ("failed to resolve message type '%s' in routing configuration: %w" , msgTypeStr , err )
170
+ }
171
+ router .RouteTypeTo (t , transportName )
172
+ b .senderLocator .RegisterMessageType (t , []string {transportName })
173
+ }
174
+
175
+ return router , nil
176
+ }
177
+
178
+ func (b * Builder ) createBusMap () map [reflect.Type ]string {
143
179
busMap := make (map [reflect.Type ]string )
144
180
for _ , h := range b .handlersLocator .GetAll () {
145
181
busName := h .BusName
@@ -149,7 +185,11 @@ func (b *Builder) createMessenger(router api.Router) (api.Messenger, error) {
149
185
busMap [h .InputType ] = busName
150
186
}
151
187
152
- handlerManager := func (ctx context.Context , env api.Envelope ) error {
188
+ return busMap
189
+ }
190
+
191
+ func (b * Builder ) createHandlerManager (busMap map [reflect.Type ]string ) func (context.Context , api.Envelope ) error {
192
+ return func (ctx context.Context , env api.Envelope ) error {
153
193
msgType := reflect .TypeOf (env .Message ())
154
194
busName , ok := busMap [msgType ]
155
195
if ! ok {
@@ -165,27 +205,47 @@ func (b *Builder) createMessenger(router api.Router) (api.Messenger, error) {
165
205
166
206
return err
167
207
}
208
+ }
168
209
169
- manager := transport .NewManager (handlerManager , b .eventDispatcher , b .logger )
210
+ func (b * Builder ) createTransports (manager * transport.Manager ) (map [string ]api.Transport , []string , error ) {
211
+ var transportNames []string
212
+ createdTransports := make (map [string ]api.Transport )
213
+
214
+ b .createdSyncTransport (createdTransports )
170
215
171
216
for name , tCfg := range b .cfg .Transports {
172
217
tr , err := b .transportFactory .CreateTransport (name , tCfg )
173
218
if err != nil {
174
- return nil , fmt .Errorf ("failed to create transport '%s': %w" , name , err )
219
+ return nil , nil , fmt .Errorf ("failed to create transport '%s': %w" , name , err )
175
220
}
176
221
222
+ createdTransports [name ] = tr
223
+ }
224
+
225
+ for nameTransport , tr := range createdTransports {
177
226
manager .AddTransport (tr )
227
+ transportNames = append (transportNames , nameTransport )
178
228
179
- errTransportLocator := b .transportLocator .Register (name , tr )
229
+ errTransportLocator := b .senderLocator .Register (nameTransport , tr )
180
230
if errTransportLocator != nil {
181
- return nil , fmt .Errorf ("failed to register transport '%s': %w" , name , errTransportLocator )
231
+ return nil , nil , fmt .Errorf ("failed to register transport '%s': %w" , nameTransport , errTransportLocator )
182
232
}
183
233
}
184
234
235
+ return createdTransports , transportNames , nil
236
+ }
237
+
238
+ func (b * Builder ) setupFallbackTransports (transportNames []string ) {
239
+ if len (b .cfg .Routing ) == 0 && len (transportNames ) > 0 {
240
+ b .senderLocator .SetFallback (transportNames )
241
+ }
242
+ }
243
+
244
+ func (b * Builder ) setupRetryListeners (createdTransports map [string ]api.Transport ) {
185
245
for name , tCfg := range b .cfg .Transports {
186
- tr := b . transportLocator . GetTransport ( name )
246
+ t := createdTransports [ name ]
187
247
188
- if retryable , ok := tr .(api.RetryableTransport ); ok && tCfg .RetryStrategy != nil {
248
+ if retryable , ok := t .(api.RetryableTransport ); ok && tCfg .RetryStrategy != nil {
189
249
strategy := retry .NewMultiplierRetryStrategy (
190
250
tCfg .RetryStrategy .MaxRetries ,
191
251
tCfg .RetryStrategy .Delay ,
@@ -195,23 +255,29 @@ func (b *Builder) createMessenger(router api.Router) (api.Messenger, error) {
195
255
196
256
var failureTransport api.Transport
197
257
if b .cfg .FailureTransport != "" {
198
- failureTransport = b .transportLocator .GetTransport (b .cfg .FailureTransport )
258
+ if ft , exists := createdTransports [b .cfg .FailureTransport ]; exists {
259
+ failureTransport = ft
260
+ }
199
261
}
200
262
201
- lst := listener .NewSendFailedMessageForRetryListener (retryable , failureTransport , strategy , b . logger )
263
+ lst := listener .NewSendFailedMessageForRetryListener (b . logger , retryable , failureTransport , strategy )
202
264
b .eventDispatcher .AddListener (event.SendFailedMessageEvent {}, lst )
203
265
}
204
266
}
205
-
206
- defaultBus , ok := b .busLocator .Get (b .cfg .DefaultBus )
207
- if ! ok {
208
- return nil , fmt .Errorf ("default_bus %q not found" , defaultBus )
209
- }
210
-
211
- return messenger .NewMessenger (b .cfg .DefaultBus , manager , b .busLocator , router ), nil
212
267
}
213
268
214
269
func (b * Builder ) registerStamps () {
215
270
b .resolver .RegisterStamp (stamps.BusNameStamp {})
216
271
b .resolver .RegisterStamp (stamps.RedeliveryStamp {})
217
272
}
273
+
274
+ func (b * Builder ) createdSyncTransport (createdTransports map [string ]api.Transport ) {
275
+ cfg := config.TransportConfig {
276
+ DSN : "sync://" ,
277
+ Options : config.OptionsConfig {},
278
+ }
279
+
280
+ if syncTransport , err := b .transportFactory .CreateTransport ("sync" , cfg ); err == nil {
281
+ createdTransports ["sync" ] = syncTransport
282
+ }
283
+ }
0 commit comments