Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func main() {
* **Publish()**
* **SubscribeAsync()**
* **SubscribeOnceAsync()**
* **SubscribeReplyAsync()**
* **Request()**
* **WaitAsync()**

#### New()
Expand Down Expand Up @@ -116,6 +118,28 @@ Transactional determines whether subsequent callbacks for a topic are run serial
#### SubscribeOnceAsync(topic string, args ...interface{})
SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

#### SubscribeReplyAsync(topic string, fn interface{})
SubscribeReplyAsync works like SubscribeAsync except the callback is expected to return a value. The value is returned to the caller of Publish.

#### Request(topic string, handler interface{}, timeoutMs time.Duration, args ...interface{})
Request is a function that allows you to make a request to a topic and wait for a response. The response is returned to the caller of `Request` as an interface{}.

```go
bus := EventBus.New()

func slowCalculator(reply string, a, b int) {
time.Sleep(3 * time.Second)
bus.Publish(reply, a + b)
}

bus.SubscribeReplyAsync("main:slow_calculator", slowCalculator)

reply := bus.Request("main:slow_calculator", func(rslt int) {
fmt.Printf("Result: %d\n", rslt)
}, 20, 60)

```

#### WaitAsync()
WaitAsync waits for all async callbacks to complete.

Expand Down
159 changes: 119 additions & 40 deletions event_bus.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
package EventBus

import (
"errors"
"fmt"
"github.com/google/uuid"
"reflect"
"sync"
"time"
)

//BusSubscriber defines subscription-related bus behavior
const (
ReplyTopicPrefix = "_INBOX:"
)

type Void struct{}

// BusSubscriber defines subscription-related bus behavior
type BusSubscriber interface {
Subscribe(topic string, fn interface{}) error
SubscribeAsync(topic string, fn interface{}, transactional bool) error
SubscribeOnce(topic string, fn interface{}) error
SubscribeOnceAsync(topic string, fn interface{}) error
SubscribeReplyAsync(topic string, fn interface{}) error
Unsubscribe(topic string, handler interface{}) error
}

//BusPublisher defines publishing-related bus behavior
// BusPublisher defines publishing-related bus behavior
type BusPublisher interface {
Publish(topic string, args ...interface{})
Request(topic string, handler interface{}, timeout time.Duration, args ...interface{}) error
}

//BusController defines bus control behavior (checking handler's presence, synchronization)
// BusController defines bus control behavior (checking handler's presence, synchronization)
type BusController interface {
HasCallback(topic string) bool
WaitAsync()
}

//Bus englobes global (subscribe, publish, control) bus behavior
// Bus englobes global (subscribe, publish, control) bus behavior
type Bus interface {
BusController
BusSubscriber
Expand All @@ -35,9 +46,9 @@ type Bus interface {

// EventBus - box for handlers and callbacks.
type EventBus struct {
handlers map[string][]*eventHandler
lock sync.Mutex // a lock for the map
wg sync.WaitGroup
mapHandlers sync.Map
lock sync.Mutex // a lock for the map
wg sync.WaitGroup
}

type eventHandler struct {
Expand All @@ -51,7 +62,7 @@ type eventHandler struct {
// New returns new EventBus with empty handlers.
func New() Bus {
b := &EventBus{
make(map[string][]*eventHandler),
sync.Map{},
sync.Mutex{},
sync.WaitGroup{},
}
Expand All @@ -60,12 +71,15 @@ func New() Bus {

// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions
func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if !(reflect.TypeOf(fn).Kind() == reflect.Func) {
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
}
bus.handlers[topic] = append(bus.handlers[topic], handler)
// rewrite in sync.Map
if _, ok := bus.mapHandlers.Load(topic); !ok {
bus.mapHandlers.Store(topic, []*eventHandler{})
}
handlers, _ := bus.mapHandlers.Load(topic)
bus.mapHandlers.Store(topic, append(handlers.([]*eventHandler), handler))
return nil
}

Expand Down Expand Up @@ -104,36 +118,106 @@ func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
})
}

// SubcribeReplyAsync subscribes to a topic with an asynchronous callback
func (bus *EventBus) SubscribeReplyAsync(topic string, fn interface{}) error {
fnValue := reflect.ValueOf(fn)
if fnValue.Kind() != reflect.Func {
return errors.New("fn must be a function")
}

fnType := fnValue.Type()
if fnType.NumIn() == 0 {
return errors.New("fn must have at least one input parameter")
}

if fnType.In(0).Kind() != reflect.String {
return errors.New("fn's first parameter (reply topic) must be a string")
}

return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, true, false, sync.Mutex{},
})
}

// HasCallback returns true if exists any callback subscribed to the topic.
func (bus *EventBus) HasCallback(topic string) bool {
bus.lock.Lock()
defer bus.lock.Unlock()
_, ok := bus.handlers[topic]
if ok {
return len(bus.handlers[topic]) > 0
defer func() {
bus.lock.Unlock()
}()
if handlers, ok := bus.mapHandlers.Load(topic); ok {
return len(handlers.([]*eventHandler)) > 0
}
return false
}

// Unsubscribe removes callback defined for a topic.
// Returns error if there are no callbacks subscribed to the topic.
func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 {
bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler)))
return nil
if iHandlers, ok := bus.mapHandlers.Load(topic); ok {
handlers := iHandlers.([]*eventHandler)
for i, h := range handlers {
if h.callBack.Type() == reflect.ValueOf(handler).Type() &&
h.callBack.Pointer() == reflect.ValueOf(handler).Pointer() {
handlers = append(handlers[:i], handlers[i+1:]...)
bus.mapHandlers.Store(topic, handlers)
return nil
}
}
}
return fmt.Errorf("topic %s doesn't exist", topic)
}

func (bus *EventBus) Request(topic string, handler interface{}, timeout time.Duration, args ...interface{}) error {
inboxStr := fmt.Sprintf("%v%v:%v", ReplyTopicPrefix, topic, uuid.NewString())
if !bus.HasCallback(topic) {
return fmt.Errorf("no responder on topic: %v", topic)
}
chResult := make(chan Void)

wrapperHandler := func(args ...interface{}) {
chResult <- Void{}
handlerValue := reflect.ValueOf(handler)
if handlerValue.Kind() != reflect.Func {
fmt.Printf("handler is not a function: %v\n", handler)
return
}
handlerArgs := make([]reflect.Value, len(args))
for i, arg := range args {
handlerArgs[i] = reflect.ValueOf(arg)
}
handlerValue.Call(handlerArgs)
}
err := bus.SubscribeOnce(inboxStr, wrapperHandler)
// fmt.Printf("subscribing: %v\n", inboxStr)
if err != nil {
fmt.Println("failed to subscribe to reply topic: %w", err)
}
newArgs := append([]interface{}{inboxStr}, args...)
bus.Publish(topic, newArgs...)

timer := time.NewTimer(timeout)
select {
case <-chResult:
return nil
case <-timer.C:
err = bus.Unsubscribe(inboxStr, wrapperHandler)
if err != nil {
err = fmt.Errorf("failed to unsubscribe: %v %w", inboxStr, err)
}
if err != nil {
err = fmt.Errorf("request timed out %w", err)
} else {
err = fmt.Errorf("request timed out")
}
return err
}
}

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
// so make a copy and iterate the copied slice.
if iHandlers, ok := bus.mapHandlers.Load(topic); ok {
handlers := iHandlers.([]*eventHandler)
copyHandlers := make([]*eventHandler, len(handlers))
copy(copyHandlers, handlers)
for i, handler := range copyHandlers {
Expand All @@ -145,9 +229,7 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) {
} else {
bus.wg.Add(1)
if handler.transactional {
bus.lock.Unlock()
handler.Lock()
bus.lock.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
}
Expand All @@ -169,28 +251,25 @@ func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ..
}

func (bus *EventBus) removeHandler(topic string, idx int) {
if _, ok := bus.handlers[topic]; !ok {
return
}
l := len(bus.handlers[topic])

if !(0 <= idx && idx < l) {
return
if iHandlers, ok := bus.mapHandlers.Load(topic); ok {
handlers := iHandlers.([]*eventHandler)
if len(handlers) > idx && idx >= 0 {
bus.mapHandlers.Store(topic, append(handlers[:idx], handlers[idx+1:]...))
}
}

copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
bus.handlers[topic][l-1] = nil // or the zero value of T
bus.handlers[topic] = bus.handlers[topic][:l-1]
}

func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int {
if _, ok := bus.handlers[topic]; ok {
for idx, handler := range bus.handlers[topic] {
// rewrite in sync.Map
if iHandlers, ok := bus.mapHandlers.Load(topic); ok {
handlers := iHandlers.([]*eventHandler)
for idx, handler := range handlers {
if handler.callBack.Type() == callback.Type() &&
handler.callBack.Pointer() == callback.Pointer() {
return idx
}
}

}
return -1
}
Expand Down
Loading