Skip to content

Commit e58cdf3

Browse files
committed
Fixed calling onTimeout and onFilter
1 parent ef6d152 commit e58cdf3

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

publisher/publication.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,18 @@ func (p *Publication[T]) Subscribe(buffer int, opts ...SubscriberOption[T]) *Sub
7272
func (p *Publication[T]) Publish(message T) {
7373
for _, sub := range p.subscribers.Iterate() {
7474
if sub.filter == nil || sub.filter(message) {
75-
go func() {
75+
go func(sub *Subscriber[T]) {
7676
select {
7777
case sub.receiveCh <- message:
7878
// continue
7979
case <-time.After(sub.timeout):
80-
// continue
80+
if sub.onTimeout != nil {
81+
sub.onTimeout(message)
82+
}
8183
}
82-
}()
84+
}(sub)
85+
} else if sub.filter != nil && sub.onFiltered != nil {
86+
sub.onFiltered(message)
8387
}
8488
}
8589
}

0 commit comments

Comments
 (0)