Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package main
import (
"context"
"log"
"time"

"github.com/BranchIntl/goworker2/engines"
)
Expand All @@ -42,7 +43,11 @@ func emailJob(queue string, args ...interface{}) error {
}

func main() {
engine := engines.NewResqueEngine(engines.DefaultResqueOptions())
options := engines.DefaultResqueOptions()
options.Queues = []string{"email", "default"}
options.PollInterval = 3 * time.Second

engine := engines.NewResqueEngine(options)
engine.Register("EmailJob", emailJob)

if err := engine.Run(context.Background()); err != nil {
Expand All @@ -68,7 +73,10 @@ func imageProcessor(queue string, args ...interface{}) error {
}

func main() {
engine := engines.NewSneakersEngine(engines.DefaultSneakersOptions())
options := engines.DefaultSneakersOptions()
options.Queues = []string{"images", "default"}

engine := engines.NewSneakersEngine(options)
engine.Register("ImageProcessor", imageProcessor)

if err := engine.Run(context.Background()); err != nil {
Expand Down Expand Up @@ -97,8 +105,14 @@ import (
)

func main() {
// Configure broker with queues
brokerOpts := redis.DefaultOptions()
brokerOpts.Queues = []string{"critical", "default"}
brokerOpts.PollInterval = 5 * time.Second

// Create components
broker := redis.NewBroker(redis.DefaultOptions(), resque.NewSerializer())
serializer := resque.NewSerializer()
broker := redis.NewBroker(brokerOpts, serializer)
stats := resque.NewStatistics(resque.DefaultOptions())
registry := registry.NewRegistry()

Expand All @@ -107,10 +121,9 @@ func main() {
broker,
stats,
registry,
resque.NewSerializer(),
core.WithConcurrency(10),
core.WithQueues([]string{"critical", "default"}),
core.WithPollInterval(5*time.Second),
core.WithShutdownTimeout(30*time.Second),
core.WithJobBufferSize(200),
)

// Register workers
Expand Down Expand Up @@ -144,18 +157,17 @@ goworker2 uses a modular architecture with dependency injection:
┌─────────────────┐
│ Engine │ ← Orchestrates components
├─────────────────┤
│ Broker │ ← Queue backend (Redis/RabbitMQ)
│ Broker │ ← Queue backend with job consumption
│ Statistics │ ← Metrics and monitoring
│ Registry │ ← Worker function registry
│ Serializer │ ← Job serialization format
│ WorkerPool │ ← Manages concurrent workers
│ Poller │ ← Polls queues for jobs
└─────────────────┘
```

### Components

- **Broker**: Handles queue operations (enqueue, dequeue, ack/nack)
- **Broker**: Handles queue operations and job consumption (enqueue, ack/nack, polling/pushing)
- **Statistics**: Records metrics and worker information
- **Registry**: Maps job classes to worker functions
- **Serializer**: Converts jobs to/from bytes
Expand All @@ -176,8 +188,6 @@ See [`engines/`](engines/) directory for detailed engine documentation.
engine := core.NewEngine(
broker, stats, registry, serializer,
core.WithConcurrency(25), // Number of workers
core.WithQueues([]string{"high", "low"}), // Queue names
core.WithPollInterval(5*time.Second), // Polling frequency
core.WithShutdownTimeout(30*time.Second), // Graceful shutdown timeout
core.WithJobBufferSize(100), // Job channel buffer
)
Expand All @@ -190,6 +200,8 @@ engine := core.NewEngine(
options := redis.DefaultOptions()
options.URI = "redis://localhost:6379/"
options.Namespace = "jobs:"
options.Queues = []string{"high", "low"} // Queues to consume from
options.PollInterval = 5 * time.Second // Polling frequency
options.MaxConnections = 10
```

Expand All @@ -198,6 +210,7 @@ options.MaxConnections = 10
options := rabbitmq.DefaultOptions()
options.URI = "amqp://guest:guest@localhost:5672/"
options.Exchange = "jobs"
options.Queues = []string{"high", "low"} // Queues to consume from
options.PrefetchCount = 1
```

Expand Down
104 changes: 37 additions & 67 deletions brokers/rabbitmq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,49 @@ import (
"log/slog"
"time"

"github.com/BranchIntl/goworker2/core"
"github.com/BranchIntl/goworker2/errors"
"github.com/BranchIntl/goworker2/job"
amqp "github.com/rabbitmq/amqp091-go"
)

// RabbitMQBroker implements the Broker and Poller interfaces for RabbitMQ
// Serializer interface for serializing and deserializing jobs
type Serializer interface {
// Serialize converts a job to bytes
Serialize(j job.Job) ([]byte, error)
// Deserialize converts bytes to a job
Deserialize(data []byte, metadata job.Metadata) (job.Job, error)
// GetFormat returns the serialization format name
GetFormat() string
}

// QueueOptions for queue creation
type QueueOptions struct {
// MaxRetries before moving to dead letter queue
MaxRetries int
// MessageTTL is how long a message can remain in queue
MessageTTL time.Duration
// VisibilityTimeout for message processing
VisibilityTimeout time.Duration
// DeadLetterQueue name for failed messages
DeadLetterQueue string
}

// RabbitMQBroker implements the Broker interface for RabbitMQ
type RabbitMQBroker struct {
connection *amqp.Connection
channel *amqp.Channel
options Options
serializer core.Serializer
serializer Serializer
declaredQueues map[string]bool // Track declared queues
consumerQueues []string // Queues to consume from
consumerTags map[string]string // Track consumer tags
}

// NewBroker creates a new RabbitMQ broker
func NewBroker(options Options, serializer core.Serializer) *RabbitMQBroker {
return &RabbitMQBroker{
options: options,
serializer: serializer,
declaredQueues: make(map[string]bool),
consumerTags: make(map[string]string),
}
}

// NewBrokerWithQueues creates a new RabbitMQ broker with consumer queues
func NewBrokerWithQueues(options Options, serializer core.Serializer, queues []string) *RabbitMQBroker {
func NewBroker(options Options, serializer Serializer) *RabbitMQBroker {
return &RabbitMQBroker{
options: options,
serializer: serializer,
declaredQueues: make(map[string]bool),
consumerQueues: queues,
consumerTags: make(map[string]string),
}
}
Expand Down Expand Up @@ -103,16 +112,6 @@ func (r *RabbitMQBroker) Type() string {
return "rabbitmq"
}

// Capabilities returns RabbitMQ broker capabilities
func (r *RabbitMQBroker) Capabilities() core.BrokerCapabilities {
return core.BrokerCapabilities{
SupportsAck: true, // RabbitMQ supports ACK/NACK
SupportsDelay: true, // Can be implemented with delayed exchange plugin
SupportsPriority: true, // RabbitMQ supports message priority
SupportsDeadLetter: true, // RabbitMQ supports dead letter exchanges
}
}

// Enqueue adds a job to the queue
func (r *RabbitMQBroker) Enqueue(ctx context.Context, j job.Job) error {
channel, err := r.getChannel()
Expand Down Expand Up @@ -154,38 +153,6 @@ func (r *RabbitMQBroker) Enqueue(ctx context.Context, j job.Job) error {
return nil
}

// Dequeue retrieves a job from the queue
func (r *RabbitMQBroker) Dequeue(ctx context.Context, queue string) (job.Job, error) {
channel, err := r.getChannel()
if err != nil {
return nil, err
}

// Ensure queue exists
if err := r.ensureQueue(queue); err != nil {
return nil, errors.NewBrokerError("ensure_queue", queue, err)
}

// Get a single message
delivery, ok, err := channel.Get(queue, false) // Don't auto-ack
if err != nil {
return nil, errors.NewBrokerError("dequeue", queue, err)
}

if !ok {
return nil, nil // No message available
}

// Convert delivery to job using the shared method
job := r.convertDeliveryToJob(delivery, queue)
if job == nil {
return nil, errors.NewSerializationError(r.serializer.GetFormat(),
fmt.Errorf("failed to convert delivery to job"))
}

return job, nil
}

// Ack acknowledges job completion
func (r *RabbitMQBroker) Ack(ctx context.Context, j job.Job) error {
if rmqJob, ok := j.(*RMQJob); ok && rmqJob.deliveryTag > 0 {
Expand All @@ -203,7 +170,7 @@ func (r *RabbitMQBroker) Nack(ctx context.Context, j job.Job, requeue bool) erro
}

// CreateQueue creates a new queue
func (r *RabbitMQBroker) CreateQueue(ctx context.Context, name string, options core.QueueOptions) error {
func (r *RabbitMQBroker) CreateQueue(ctx context.Context, name string, options QueueOptions) error {
channel, err := r.getChannel()
if err != nil {
return err
Expand Down Expand Up @@ -275,6 +242,11 @@ func (r *RabbitMQBroker) QueueExists(ctx context.Context, name string) (bool, er
return true, nil
}

// Queues returns the list of queues
func (r *RabbitMQBroker) Queues() []string {
return r.options.Queues
}

// QueueLength returns the number of jobs in a queue
func (r *RabbitMQBroker) QueueLength(ctx context.Context, name string) (int64, error) {
channel, err := r.getChannel()
Expand Down Expand Up @@ -325,11 +297,14 @@ func (r *RabbitMQBroker) ensureQueue(name string) error {
return nil
}

// Start implements the Poller interface for push-based consumption
// Start begins consuming jobs and sending them to the job channel
func (r *RabbitMQBroker) Start(ctx context.Context, jobChan chan<- job.Job) error {
slog.Info("Starting RabbitMQ consumer", "queues", r.consumerQueues)
slog.Info("Starting RabbitMQ consumer", "queues", r.options.Queues)
if len(r.options.Queues) == 0 {
return errors.ErrNoQueues
}

for _, queue := range r.consumerQueues {
for _, queue := range r.options.Queues {
if err := r.ensureQueue(queue); err != nil {
return fmt.Errorf("failed to ensure queue %s: %w", queue, err)
}
Expand Down Expand Up @@ -425,11 +400,6 @@ func (r *RabbitMQBroker) convertDeliveryToJob(delivery amqp.Delivery, queue stri
return rmqJob
}

// SetConsumerQueues sets the queues that this broker will consume from
func (r *RabbitMQBroker) SetConsumerQueues(queues []string) {
r.consumerQueues = queues
}

// RMQJob wraps a job with RabbitMQ-specific delivery information
type RMQJob struct {
job.Job
Expand Down
49 changes: 20 additions & 29 deletions brokers/rabbitmq/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/BranchIntl/goworker2/core"
"github.com/BranchIntl/goworker2/errors"
"github.com/BranchIntl/goworker2/job"
"github.com/stretchr/testify/assert"
Expand All @@ -17,7 +16,6 @@ type mockSerializer struct {
serializeErr error
deserializeErr error
format string
useNumber bool
}

func (m *mockSerializer) Serialize(j job.Job) ([]byte, error) {
Expand All @@ -40,9 +38,7 @@ func (m *mockSerializer) Deserialize(data []byte, metadata job.Metadata) (job.Jo
}, nil
}

func (m *mockSerializer) GetFormat() string { return m.format }
func (m *mockSerializer) UseNumber() bool { return m.useNumber }
func (m *mockSerializer) SetUseNumber(useNumber bool) { m.useNumber = useNumber }
func (m *mockSerializer) GetFormat() string { return m.format }

// mockJob is a simple job implementation for testing
type mockJob struct {
Expand Down Expand Up @@ -79,25 +75,6 @@ func TestNewBroker(t *testing.T) {
assert.NotNil(t, broker.declaredQueues)
}

func TestRabbitMQBroker_Type(t *testing.T) {
broker := NewBroker(DefaultOptions(), &mockSerializer{})
assert.Equal(t, "rabbitmq", broker.Type())
}

func TestRabbitMQBroker_Capabilities(t *testing.T) {
broker := NewBroker(DefaultOptions(), &mockSerializer{})
capabilities := broker.Capabilities()

expected := core.BrokerCapabilities{
SupportsAck: true,
SupportsDelay: true,
SupportsPriority: true,
SupportsDeadLetter: true,
}

assert.Equal(t, expected, capabilities)
}

func TestRabbitMQBroker_Connect_InvalidURI(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -181,7 +158,7 @@ func TestRabbitMQBroker_CreateQueue_NotConnected(t *testing.T) {
broker := NewBroker(DefaultOptions(), &mockSerializer{})
ctx := context.Background()

queueOptions := core.QueueOptions{
queueOptions := QueueOptions{
MaxRetries: 3,
MessageTTL: 5 * time.Minute,
DeadLetterQueue: "dlq",
Expand Down Expand Up @@ -232,13 +209,27 @@ func TestRabbitMQBroker_Enqueue_NotConnected(t *testing.T) {
assert.ErrorIs(t, err, errors.ErrNotConnected)
}

func TestRabbitMQBroker_Dequeue_NotConnected(t *testing.T) {
broker := NewBroker(DefaultOptions(), &mockSerializer{})
func TestRabbitMQBroker_Start_NotConnected(t *testing.T) {
options := DefaultOptions()
options.Queues = []string{"test_queue"}
broker := NewBroker(options, &mockSerializer{})
ctx := context.Background()
jobChan := make(chan job.Job, 10)

job, err := broker.Dequeue(ctx, "test_queue")
err := broker.Start(ctx, jobChan)
assert.ErrorIs(t, err, errors.ErrNotConnected)
assert.Nil(t, job)
}

func TestRabbitMQBroker_Start_EmptyQueues(t *testing.T) {
options := DefaultOptions()
options.Queues = []string{}
broker := NewBroker(options, &mockSerializer{})
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
jobChan := make(chan job.Job, 10)

err := broker.Start(ctx, jobChan)
assert.ErrorIs(t, err, errors.ErrNoQueues)
}

func TestRabbitMQBroker_SerializationError(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions brokers/rabbitmq/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Options struct {
PrefetchCount int
Exchange string
ExchangeType string
Queues []string
}

// DefaultOptions returns default RabbitMQ options
Expand All @@ -15,5 +16,6 @@ func DefaultOptions() Options {
PrefetchCount: 1,
Exchange: "activejob",
ExchangeType: "direct",
Queues: []string{},
}
}
Loading