@@ -51,7 +51,7 @@ func NewBus() event.Bus {
51
51
}
52
52
}
53
53
54
- func (b * basicBus ) withNode (typ reflect.Type , cb func (* node ), async func (* node )) error {
54
+ func (b * basicBus ) withNode (typ reflect.Type , cb func (* node ), async func (* node )) {
55
55
b .lk .Lock ()
56
56
57
57
n , ok := b .nodes [typ ]
@@ -65,12 +65,14 @@ func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node))
65
65
66
66
cb (n )
67
67
68
- go func () {
69
- defer n .lk .Unlock ()
70
- async (n )
71
- }()
72
-
73
- return nil
68
+ if async == nil {
69
+ n .lk .Unlock ()
70
+ } else {
71
+ go func () {
72
+ defer n .lk .Unlock ()
73
+ async (n )
74
+ }()
75
+ }
74
76
}
75
77
76
78
func (b * basicBus ) tryDropNode (typ reflect.Type ) {
@@ -168,7 +170,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
168
170
for i , etyp := range types {
169
171
typ := reflect .TypeOf (etyp )
170
172
171
- err = b .withNode (typ .Elem (), func (n * node ) {
173
+ b .withNode (typ .Elem (), func (n * node ) {
172
174
n .sinks = append (n .sinks , out .ch )
173
175
out .nodes [i ] = n
174
176
}, func (n * node ) {
@@ -210,11 +212,11 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
210
212
}
211
213
typ = typ .Elem ()
212
214
213
- err = b .withNode (typ , func (n * node ) {
215
+ b .withNode (typ , func (n * node ) {
214
216
atomic .AddInt32 (& n .nEmitters , 1 )
215
217
n .keepLast = n .keepLast || settings .makeStateful
216
218
e = & emitter {n : n , typ : typ , dropper : b .tryDropNode }
217
- }, func ( _ * node ) {} )
219
+ }, nil )
218
220
return
219
221
}
220
222
0 commit comments