Skip to content

Commit 36e8369

Browse files
Merge pull request #7 from BranchIntl/push-based-rabbitmq
Add support for push based brokers like RabbitMQ
2 parents 0b3f6ac + 0a24c47 commit 36e8369

File tree

8 files changed

+203
-93
lines changed

8 files changed

+203
-93
lines changed

brokers/rabbitmq/broker.go

Lines changed: 138 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,36 @@ import (
1212
amqp "github.com/rabbitmq/amqp091-go"
1313
)
1414

15-
// RabbitMQBroker implements the Broker interface for RabbitMQ
15+
// RabbitMQBroker implements the Broker and Poller interfaces for RabbitMQ
1616
type RabbitMQBroker struct {
17-
connection *amqp.Connection
18-
channel *amqp.Channel
19-
options Options
20-
serializer core.Serializer
21-
queues map[string]bool // Track declared queues
22-
logger seelog.LoggerInterface
17+
connection *amqp.Connection
18+
channel *amqp.Channel
19+
options Options
20+
serializer core.Serializer
21+
declaredQueues map[string]bool // Track declared queues
22+
consumerQueues []string // Queues to consume from
23+
consumerTags map[string]string // Track consumer tags
24+
logger seelog.LoggerInterface
2325
}
2426

2527
// NewBroker creates a new RabbitMQ broker
2628
func NewBroker(options Options, serializer core.Serializer) *RabbitMQBroker {
2729
return &RabbitMQBroker{
28-
options: options,
29-
serializer: serializer,
30-
queues: make(map[string]bool),
30+
options: options,
31+
serializer: serializer,
32+
declaredQueues: make(map[string]bool),
33+
consumerTags: make(map[string]string),
34+
}
35+
}
36+
37+
// NewBrokerWithQueues creates a new RabbitMQ broker with consumer queues
38+
func NewBrokerWithQueues(options Options, serializer core.Serializer, queues []string) *RabbitMQBroker {
39+
return &RabbitMQBroker{
40+
options: options,
41+
serializer: serializer,
42+
declaredQueues: make(map[string]bool),
43+
consumerQueues: queues,
44+
consumerTags: make(map[string]string),
3145
}
3246
}
3347

@@ -168,36 +182,14 @@ func (r *RabbitMQBroker) Dequeue(ctx context.Context, queue string) (job.Job, er
168182
return nil, nil // No message available
169183
}
170184

171-
// Create metadata
172-
metadata := job.Metadata{
173-
Queue: queue,
174-
EnqueuedAt: delivery.Timestamp,
175-
}
176-
177-
if delivery.MessageId != "" {
178-
metadata.ID = delivery.MessageId
179-
}
180-
181-
// Deserialize job
182-
j, err := r.serializer.Deserialize(delivery.Body, metadata)
183-
if err != nil {
184-
// Reject message if we can't deserialize it
185-
if nackErr := delivery.Nack(false, false); nackErr != nil {
186-
r.logError("Failed to nack message after deserialization error: %v", nackErr)
187-
}
185+
// Convert delivery to job using the shared method
186+
job := r.convertDeliveryToJob(delivery, queue)
187+
if job == nil {
188188
return nil, errors.NewSerializationError(r.serializer.GetFormat(),
189-
fmt.Errorf("deserialize job: %w", err))
189+
fmt.Errorf("failed to convert delivery to job"))
190190
}
191191

192-
// Store delivery tag for ACK/NACK
193-
// Always wrap in RMQJob
194-
rmqJob := &RMQJob{
195-
Job: j,
196-
deliveryTag: delivery.DeliveryTag,
197-
channel: channel,
198-
}
199-
200-
return rmqJob, nil
192+
return job, nil
201193
}
202194

203195
// Ack acknowledges job completion
@@ -254,7 +246,7 @@ func (r *RabbitMQBroker) CreateQueue(ctx context.Context, name string, options c
254246
return errors.NewBrokerError("create_queue", name, err)
255247
}
256248

257-
r.queues[name] = true
249+
r.declaredQueues[name] = true
258250
return nil
259251
}
260252

@@ -270,7 +262,7 @@ func (r *RabbitMQBroker) DeleteQueue(ctx context.Context, name string) error {
270262
return errors.NewBrokerError("delete_queue", name, err)
271263
}
272264

273-
delete(r.queues, name)
265+
delete(r.declaredQueues, name)
274266
return nil
275267
}
276268

@@ -318,7 +310,7 @@ func (r *RabbitMQBroker) ensureQueue(name string) error {
318310
return err
319311
}
320312

321-
if r.queues[name] {
313+
if r.declaredQueues[name] {
322314
return nil // Already declared
323315
}
324316

@@ -335,7 +327,7 @@ func (r *RabbitMQBroker) ensureQueue(name string) error {
335327
return err
336328
}
337329

338-
r.queues[name] = true
330+
r.declaredQueues[name] = true
339331
return nil
340332
}
341333

@@ -346,6 +338,111 @@ func (r *RabbitMQBroker) logError(format string, args ...interface{}) {
346338
}
347339
}
348340

341+
// Start implements the Poller interface for push-based consumption
342+
func (r *RabbitMQBroker) Start(ctx context.Context, jobChan chan<- job.Job) error {
343+
r.logger.Infof("Starting RabbitMQ consumer for queues: %v", r.consumerQueues)
344+
345+
for _, queue := range r.consumerQueues {
346+
if err := r.ensureQueue(queue); err != nil {
347+
return fmt.Errorf("failed to ensure queue %s: %w", queue, err)
348+
}
349+
350+
deliveries, err := r.channel.Consume(
351+
queue, // queue
352+
"", // consumer tag (auto-generated)
353+
false, // auto-ack
354+
false, // exclusive
355+
false, // no-local
356+
false, // no-wait
357+
nil, // args
358+
)
359+
if err != nil {
360+
return fmt.Errorf("failed to start consumer for queue %s: %w", queue, err)
361+
}
362+
363+
// Store consumer tag for cleanup
364+
if r.consumerTags == nil {
365+
r.consumerTags = make(map[string]string)
366+
}
367+
368+
go r.handleDeliveries(ctx, queue, deliveries, jobChan)
369+
}
370+
371+
// Keep running until context is cancelled
372+
<-ctx.Done()
373+
r.logger.Info("RabbitMQ consumer stopped")
374+
close(jobChan)
375+
return nil
376+
}
377+
378+
// handleDeliveries processes incoming messages from RabbitMQ
379+
func (r *RabbitMQBroker) handleDeliveries(ctx context.Context, queue string, deliveries <-chan amqp.Delivery, jobChan chan<- job.Job) {
380+
for {
381+
select {
382+
case <-ctx.Done():
383+
return
384+
case delivery, ok := <-deliveries:
385+
if !ok {
386+
r.logger.Warnf("Delivery channel closed for queue %s", queue)
387+
return
388+
}
389+
390+
// Convert delivery to job
391+
job := r.convertDeliveryToJob(delivery, queue)
392+
if job != nil {
393+
select {
394+
case <-ctx.Done():
395+
// Put job back on queue
396+
if err := delivery.Nack(false, true); err != nil {
397+
r.logger.Errorf("Failed to nack job during shutdown: %v", err)
398+
}
399+
return
400+
case jobChan <- job:
401+
r.logger.Debugf("Job sent to workers: %s", job.GetClass())
402+
}
403+
}
404+
}
405+
}
406+
}
407+
408+
// convertDeliveryToJob converts an AMQP delivery to a Job
409+
func (r *RabbitMQBroker) convertDeliveryToJob(delivery amqp.Delivery, queue string) job.Job {
410+
// Create metadata
411+
metadata := job.Metadata{
412+
Queue: queue,
413+
EnqueuedAt: delivery.Timestamp,
414+
}
415+
416+
if delivery.MessageId != "" {
417+
metadata.ID = delivery.MessageId
418+
}
419+
420+
// Deserialize job
421+
j, err := r.serializer.Deserialize(delivery.Body, metadata)
422+
if err != nil {
423+
// Reject message if we can't deserialize it
424+
if nackErr := delivery.Nack(false, false); nackErr != nil {
425+
r.logError("Failed to nack message after deserialization error: %v", nackErr)
426+
}
427+
r.logError("Failed to deserialize job: %v", err)
428+
return nil
429+
}
430+
431+
// Wrap in RMQJob for proper ACK/NACK handling
432+
rmqJob := &RMQJob{
433+
Job: j,
434+
deliveryTag: delivery.DeliveryTag,
435+
channel: r.channel,
436+
}
437+
438+
return rmqJob
439+
}
440+
441+
// SetConsumerQueues sets the queues that this broker will consume from
442+
func (r *RabbitMQBroker) SetConsumerQueues(queues []string) {
443+
r.consumerQueues = queues
444+
}
445+
349446
// RMQJob wraps a job with RabbitMQ-specific delivery information
350447
type RMQJob struct {
351448
job.Job

brokers/rabbitmq/broker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestNewBroker(t *testing.T) {
7676
require.NotNil(t, broker)
7777
assert.Equal(t, options, broker.options)
7878
assert.Equal(t, serializer, broker.serializer)
79-
assert.NotNil(t, broker.queues)
79+
assert.NotNil(t, broker.declaredQueues)
8080
}
8181

8282
func TestRabbitMQBroker_Type(t *testing.T) {

core/engine.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ type Engine struct {
2222
serializer Serializer
2323
config *Config
2424

25-
poller *Poller
2625
workerPool *WorkerPool
2726

2827
ctx context.Context
@@ -61,6 +60,14 @@ func NewEngine(
6160
func (e *Engine) Start(ctx context.Context) error {
6261
e.ctx, e.cancel = context.WithCancel(ctx)
6362

63+
// Set logger on broker
64+
e.broker.SetLogger(e.logger)
65+
66+
// If broker implements SetConsumerQueues (for push-based consumption), set the queues
67+
if setQueues, ok := e.broker.(interface{ SetConsumerQueues([]string) }); ok {
68+
setQueues.SetConsumerQueues(e.config.Queues)
69+
}
70+
6471
// Connect broker and statistics
6572
if err := e.broker.Connect(e.ctx); err != nil {
6673
return errors.NewConnectionError("",
@@ -75,15 +82,21 @@ func (e *Engine) Start(ctx context.Context) error {
7582
// Create job channel
7683
jobChan := make(chan job.Job, e.config.JobBufferSize)
7784

78-
// Create and start poller
79-
e.poller = NewPoller(
80-
e.broker,
81-
e.stats,
82-
e.config.Queues,
83-
e.config.PollInterval,
84-
jobChan,
85-
e.logger,
86-
)
85+
// Check if broker implements Poller interface
86+
var poller Poller
87+
if brokerPoller, ok := e.broker.(Poller); ok {
88+
// Broker can poll/consume directly
89+
poller = brokerPoller
90+
} else {
91+
// Use StandardPoller wrapper for pull-based brokers
92+
poller = NewStandardPoller(
93+
e.broker,
94+
e.stats,
95+
e.config.Queues,
96+
e.config.PollInterval,
97+
e.logger,
98+
)
99+
}
87100

88101
// Create and start worker pool
89102
e.workerPool = NewWorkerPool(
@@ -101,7 +114,7 @@ func (e *Engine) Start(ctx context.Context) error {
101114
e.wg.Add(2)
102115
go func() {
103116
defer e.wg.Done()
104-
if err := e.poller.Start(e.ctx); err != nil {
117+
if err := poller.Start(e.ctx, jobChan); err != nil {
105118
e.logger.Errorf("Poller error: %v", err)
106119
}
107120
}()

core/engine_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ func TestEngine_Start_Success(t *testing.T) {
2323

2424
err := engine.Start(ctx)
2525
assert.NoError(t, err)
26-
assert.NotNil(t, engine.poller)
2726
assert.NotNil(t, engine.workerPool)
2827

2928
err = engine.Stop()

core/helpers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ func (b *PollerBuilder) WithInterval(interval time.Duration) *PollerBuilder {
167167
}
168168

169169
// Build creates the poller
170-
func (b *PollerBuilder) Build() *Poller {
171-
return NewPoller(b.setup.Broker, b.setup.Stats, b.queues, b.interval, b.jobChan, b.setup.Logger)
170+
func (b *PollerBuilder) Build() *StandardPoller {
171+
return NewStandardPoller(b.setup.Broker, b.setup.Stats, b.queues, b.interval, b.setup.Logger)
172172
}
173173

174174
// WorkerPoolBuilder helps create worker pools for testing

core/interfaces.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ type Registry interface {
8181
Clear()
8282
}
8383

84+
// Poller interface defines what core needs from a job poller/consumer
85+
type Poller interface {
86+
// Start begins consuming jobs and sending them to the job channel
87+
Start(ctx context.Context, jobChan chan<- job.Job) error
88+
}
89+
8490
// Serializer interface defines what core needs from a serializer
8591
type Serializer interface {
8692
// Serialize converts a job to bytes

0 commit comments

Comments
 (0)