Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
)

Expand All @@ -32,9 +30,6 @@ type Conn struct {
Meta Metadata
}

mu sync.RWMutex
topics map[string]struct{} // set of subscribed topics

closed int32
onCloseErrorCB func(c *Conn)
}
Expand Down Expand Up @@ -80,7 +75,6 @@ func Open(rw net.Conn, sec Security, sockType SocketType, sockID SocketIdentity,
sec: sec,
Server: server,
Meta: make(Metadata),
topics: make(map[string]struct{}),
onCloseErrorCB: onCloseErrorCB,
}
conn.Meta[sysSockType] = string(conn.typ)
Expand Down Expand Up @@ -439,34 +433,6 @@ func (c *Conn) read() Msg {
return msg
}

func (conn *Conn) subscribe(msg Msg) {
conn.mu.Lock()
v := msg.Frames[0]
k := string(v[1:])
switch v[0] {
case 0:
delete(conn.topics, k)
case 1:
conn.topics[k] = struct{}{}
}
conn.mu.Unlock()
}

func (conn *Conn) subscribed(topic string) bool {
conn.mu.RLock()
defer conn.mu.RUnlock()
for k := range conn.topics {
switch {
case k == "":
// subscribed to everything
return true
case strings.HasPrefix(topic, k):
return true
}
}
return false
}

func (conn *Conn) SetClosed() {
if wasClosed := atomic.CompareAndSwapInt32(&conn.closed, 0, 1); wasClosed {
conn.notifyOnCloseError()
Expand Down
17 changes: 13 additions & 4 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ const (
//
// Before creating a Proxy, users must set any socket options,
// and Listen or Dial both frontend and backend sockets.
func NewProxy(ctx context.Context, front, back, capture Socket) *Proxy {
func NewProxy(ctx context.Context, front, back, capture Socket) (*Proxy, error) {
grp, ctx := errgroup.WithContext(ctx)
proxy := Proxy{
ctx: ctx,
grp: grp,
cmds: make(chan proxyCmd),
}
proxy.init(front, back, capture)
return &proxy
if err := proxy.init(front, back, capture); err != nil {
return nil, err
}
return &proxy, nil
}

func (p *Proxy) Pause() { p.cmds <- proxyPause }
Expand All @@ -61,7 +63,7 @@ func (p *Proxy) Run() error {
return p.grp.Wait()
}

func (p *Proxy) init(front, back, capture Socket) {
func (p *Proxy) init(front, back, capture Socket) error {
canRecv := func(sck Socket) bool {
switch sck.Type() {
case Push:
Expand Down Expand Up @@ -102,6 +104,12 @@ func (p *Proxy) init(front, back, capture Socket) {
}
)

// Subscribe all. If we don't do this, the sub socket will drop all messages.
// It has no effect for other socket types.
if err := front.SetOption(OptionSubscribe, ""); err != nil {
return err
}

// workers makes sure all goroutines are launched and scheduled.
var workers sync.WaitGroup
workers.Add(len(pipes) + 1)
Expand Down Expand Up @@ -160,4 +168,5 @@ func (p *Proxy) init(front, back, capture Socket) {

// wait for all worker routines to be scheduled.
workers.Wait()
return nil
}
15 changes: 12 additions & 3 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,15 @@ func TestProxy(t *testing.T) {
})

grp.Go(func() error {
var err error
wg1.Wait() // sockets ready
proxy = zmq4.NewProxy(ctx, front, back, capt)
proxy, err = zmq4.NewProxy(ctx, front, back, capt)
if err != nil {
return err
}
t.Logf("proxy ready")
wg2.Done()
err := proxy.Run()
err = proxy.Run()
t.Logf("proxy done: err=%+v", err)
return err
})
Expand Down Expand Up @@ -314,7 +318,12 @@ func TestProxyStop(t *testing.T) {

var errc = make(chan error)
go func() {
errc <- zmq4.NewProxy(ctx, front, back, nil).Run()
proxy, err := zmq4.NewProxy(ctx, front, back, nil)
if err != nil {
errc <- err
} else {
errc <- proxy.Run()
}
}()

go func() {
Expand Down
34 changes: 4 additions & 30 deletions pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ import (
"sync"
)

// Topics is an interface that wraps the basic Topics method.
type Topics interface {
// Topics returns the sorted list of topics a socket is subscribed to.
Topics() []string
}

// NewPub returns a new PUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewPub(ctx context.Context, opts ...Option) Socket {
Expand Down Expand Up @@ -109,9 +103,9 @@ func (pub *pubSocket) SetOption(name string, value interface{}) error {
return nil
}

// Topics returns the sorted list of topics a socket is subscribed to.
func (pub *pubSocket) Topics() []string {
return pub.sck.topics()
// pub sockets don't have any subscribed topics, so return an empty list.
return []string {}
}

// pubQReader is a queued-message reader.
Expand Down Expand Up @@ -194,28 +188,11 @@ func (q *pubQReader) listen(ctx context.Context, r *Conn) {
if msg.err != nil {
return
}
switch {
case q.topic(msg):
r.subscribe(msg)
default:
q.c <- msg
}
q.c <- msg
}
}
}

func (q *pubQReader) topic(msg Msg) bool {
if len(msg.Frames) != 1 {
return false
}
frame := msg.Frames[0]
if len(frame) == 0 {
return false
}
topic := frame[0]
return topic == 0 || topic == 1
}

type pubMWriter struct {
ctx context.Context
mu sync.Mutex
Expand Down Expand Up @@ -309,15 +286,12 @@ func (w *pubMWriter) write(ctx context.Context, msg Msg) error {
}

func (w *pubMWriter) sendMsg(msg Msg) {
topic := string(msg.Frames[0])
w.mu.Lock()
defer w.mu.Unlock()
// TODO(inphi): distribute messages across subscribers at once
for i := range w.ws {
ww := w.ws[i]
if ww.subscribed(topic) {
_ = ww.SendMsg(msg)
}
_ = ww.SendMsg(msg)
}
}

Expand Down
23 changes: 0 additions & 23 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"log"
"net"
"os"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -90,28 +89,6 @@ func newSocket(ctx context.Context, sockType SocketType, opts ...Option) *socket
return sck
}

func (sck *socket) topics() []string {
var (
keys = make(map[string]struct{})
topics []string
)
sck.mu.RLock()
for _, con := range sck.conns {
con.mu.RLock()
for topic := range con.topics {
if _, dup := keys[topic]; dup {
continue
}
keys[topic] = struct{}{}
topics = append(topics, topic)
}
con.mu.RUnlock()
}
sck.mu.RUnlock()
sort.Strings(topics)
return topics
}

// Close closes the open Socket
func (sck *socket) Close() error {
sck.cancel()
Expand Down
42 changes: 36 additions & 6 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ import (
"context"
"net"
"sort"
"strings"
"sync"
)

// Topics is an interface that wraps the basic Topics method.
type Topics interface {
// Topics returns the sorted list of topics a socket is subscribed to.
Topics() []string
}

// NewSub returns a new SUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewSub(ctx context.Context, opts ...Option) Socket {
Expand Down Expand Up @@ -48,7 +55,17 @@ func (sub *subSocket) SendMulti(msg Msg) error {

// Recv receives a complete message.
func (sub *subSocket) Recv() (Msg, error) {
return sub.sck.Recv()
// If we're not subscribed to this message, we keep looping until we get one we are subscribed to, or an empty message or an error.
for {
msg, err := sub.sck.Recv()
if err != nil || len(msg.Frames) == 0 || string(msg.Frames[0]) == "" {
return msg, err
}
t := string(msg.Frames[0])
if sub.subscribed(t) {
return msg, err
}
}
}

// Listen connects a local endpoint to the Socket.
Expand All @@ -58,11 +75,7 @@ func (sub *subSocket) Listen(ep string) error {

// Dial connects a remote endpoint to the Socket.
func (sub *subSocket) Dial(ep string) error {
err := sub.sck.Dial(ep)
if err != nil {
return err
}
return nil
return sub.sck.Dial(ep)
}

// Type returns the type of this Socket (PUB, SUB, ...)
Expand Down Expand Up @@ -138,6 +151,23 @@ func (sub *subSocket) subscribe(topic string, v int) {
sub.mu.Unlock()
}

func (sub *subSocket) subscribed(topic string) bool {
sub.mu.RLock()
defer sub.mu.RUnlock()
if _, ok := sub.topics[""]; ok {
return true
}
if _, ok := sub.topics[topic]; ok {
return true
}
for k := range sub.topics {
if strings.HasPrefix(topic, k) {
return true
}
}
return false
}

var (
_ Socket = (*subSocket)(nil)
_ Topics = (*subSocket)(nil)
Expand Down
3 changes: 2 additions & 1 deletion xpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func (xpub *xpubSocket) SetOption(name string, value interface{}) error {
}

func (xpub *xpubSocket) Topics() []string {
return xpub.sck.topics()
// xpub sockets don't have any subscriptions, so return an empty list.
return []string {}
}

var (
Expand Down
Loading