Skip to content

Commit 5e85480

Browse files
Merge pull request #6 from BranchIntl/fix-logging-interface
Remove seelog and use the new slog standard logger
2 parents 36e8369 + 71f6124 commit 5e85480

File tree

23 files changed

+118
-310
lines changed

23 files changed

+118
-310
lines changed

.golangci.yml

Lines changed: 0 additions & 11 deletions
This file was deleted.

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ engine := core.NewEngine(
180180
core.WithPollInterval(5*time.Second), // Polling frequency
181181
core.WithShutdownTimeout(30*time.Second), // Graceful shutdown timeout
182182
core.WithJobBufferSize(100), // Job channel buffer
183-
core.WithExitOnComplete(false), // Exit when queues empty
184183
)
185184
```
186185

@@ -202,6 +201,10 @@ options.Exchange = "jobs"
202201
options.PrefetchCount = 1
203202
```
204203

204+
## Logging
205+
206+
goworker2 uses Go's standard `log/slog` library for structured logging. By default, it uses the default slog logger. To customize logging, configure your logger before creating the engine using `slog.SetDefault(logger)`. For example: `slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})))`.
207+
205208
## Worker Functions
206209

207210
Worker functions must match this signature:

brokers/rabbitmq/broker.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package rabbitmq
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"time"
78

89
"github.com/BranchIntl/goworker2/core"
910
"github.com/BranchIntl/goworker2/errors"
1011
"github.com/BranchIntl/goworker2/job"
11-
"github.com/cihub/seelog"
1212
amqp "github.com/rabbitmq/amqp091-go"
1313
)
1414

@@ -21,7 +21,6 @@ type RabbitMQBroker struct {
2121
declaredQueues map[string]bool // Track declared queues
2222
consumerQueues []string // Queues to consume from
2323
consumerTags map[string]string // Track consumer tags
24-
logger seelog.LoggerInterface
2524
}
2625

2726
// NewBroker creates a new RabbitMQ broker
@@ -104,11 +103,6 @@ func (r *RabbitMQBroker) Type() string {
104103
return "rabbitmq"
105104
}
106105

107-
// SetLogger sets the logger for the broker
108-
func (r *RabbitMQBroker) SetLogger(logger seelog.LoggerInterface) {
109-
r.logger = logger
110-
}
111-
112106
// Capabilities returns RabbitMQ broker capabilities
113107
func (r *RabbitMQBroker) Capabilities() core.BrokerCapabilities {
114108
return core.BrokerCapabilities{
@@ -331,16 +325,9 @@ func (r *RabbitMQBroker) ensureQueue(name string) error {
331325
return nil
332326
}
333327

334-
// logError logs an error message if logger is available
335-
func (r *RabbitMQBroker) logError(format string, args ...interface{}) {
336-
if r.logger != nil {
337-
r.logger.Errorf(format, args...)
338-
}
339-
}
340-
341328
// Start implements the Poller interface for push-based consumption
342329
func (r *RabbitMQBroker) Start(ctx context.Context, jobChan chan<- job.Job) error {
343-
r.logger.Infof("Starting RabbitMQ consumer for queues: %v", r.consumerQueues)
330+
slog.Info("Starting RabbitMQ consumer", "queues", r.consumerQueues)
344331

345332
for _, queue := range r.consumerQueues {
346333
if err := r.ensureQueue(queue); err != nil {
@@ -370,7 +357,7 @@ func (r *RabbitMQBroker) Start(ctx context.Context, jobChan chan<- job.Job) erro
370357

371358
// Keep running until context is cancelled
372359
<-ctx.Done()
373-
r.logger.Info("RabbitMQ consumer stopped")
360+
slog.Info("RabbitMQ consumer stopped")
374361
close(jobChan)
375362
return nil
376363
}
@@ -383,7 +370,7 @@ func (r *RabbitMQBroker) handleDeliveries(ctx context.Context, queue string, del
383370
return
384371
case delivery, ok := <-deliveries:
385372
if !ok {
386-
r.logger.Warnf("Delivery channel closed for queue %s", queue)
373+
slog.Warn("Delivery channel closed", "queue", queue)
387374
return
388375
}
389376

@@ -394,11 +381,11 @@ func (r *RabbitMQBroker) handleDeliveries(ctx context.Context, queue string, del
394381
case <-ctx.Done():
395382
// Put job back on queue
396383
if err := delivery.Nack(false, true); err != nil {
397-
r.logger.Errorf("Failed to nack job during shutdown: %v", err)
384+
slog.Error("Failed to nack job during shutdown", "error", err)
398385
}
399386
return
400387
case jobChan <- job:
401-
r.logger.Debugf("Job sent to workers: %s", job.GetClass())
388+
slog.Debug("Job sent to workers", "class", job.GetClass())
402389
}
403390
}
404391
}
@@ -422,9 +409,9 @@ func (r *RabbitMQBroker) convertDeliveryToJob(delivery amqp.Delivery, queue stri
422409
if err != nil {
423410
// Reject message if we can't deserialize it
424411
if nackErr := delivery.Nack(false, false); nackErr != nil {
425-
r.logError("Failed to nack message after deserialization error: %v", nackErr)
412+
slog.Error("Failed to nack message after deserialization error", "error", nackErr)
426413
}
427-
r.logError("Failed to deserialize job: %v", err)
414+
slog.Error("Failed to deserialize job", "error", err)
428415
return nil
429416
}
430417

brokers/redis/broker.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package redis
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"time"
78

89
"github.com/BranchIntl/goworker2/core"
910
"github.com/BranchIntl/goworker2/errors"
1011
redisUtils "github.com/BranchIntl/goworker2/internal/redis"
1112
"github.com/BranchIntl/goworker2/job"
12-
"github.com/cihub/seelog"
1313
"github.com/gomodule/redigo/redis"
1414
)
1515

@@ -19,7 +19,6 @@ type RedisBroker struct {
1919
namespace string
2020
options Options
2121
serializer core.Serializer
22-
logger seelog.LoggerInterface
2322
}
2423

2524
// NewBroker creates a new Redis broker
@@ -61,7 +60,7 @@ func (r *RedisBroker) Close() error {
6160
return nil
6261
}
6362

64-
// Health checks the Redis connection health
63+
// Health checks Redis connection health
6564
func (r *RedisBroker) Health() error {
6665
if r.pool == nil {
6766
return errors.ErrNotConnected
@@ -83,11 +82,6 @@ func (r *RedisBroker) Type() string {
8382
return "redis"
8483
}
8584

86-
// SetLogger sets the logger for the broker
87-
func (r *RedisBroker) SetLogger(logger seelog.LoggerInterface) {
88-
r.logger = logger
89-
}
90-
9185
// Capabilities returns Redis broker capabilities
9286
func (r *RedisBroker) Capabilities() core.BrokerCapabilities {
9387
return core.BrokerCapabilities{
@@ -123,7 +117,7 @@ func (r *RedisBroker) Enqueue(ctx context.Context, j job.Job) error {
123117

124118
// Add queue to set of known queues (best effort)
125119
if _, err := conn.Do("SADD", r.queuesKey(), j.GetQueue()); err != nil {
126-
r.logError("Failed to track queue %s: %v", j.GetQueue(), err)
120+
slog.Error("Failed to track queue %s: %v", j.GetQueue(), err)
127121
}
128122

129123
return nil
@@ -213,7 +207,7 @@ func (r *RedisBroker) DeleteQueue(ctx context.Context, name string) error {
213207

214208
// Remove from set of queues (best effort)
215209
if _, err := conn.Do("SREM", r.queuesKey(), name); err != nil {
216-
r.logError("Failed to remove queue %s from set: %v", name, err)
210+
slog.Error("Failed to remove queue %s from set: %v", name, err)
217211
}
218212

219213
return nil
@@ -264,9 +258,3 @@ func (r *RedisBroker) queueKey(queue string) string {
264258
func (r *RedisBroker) queuesKey() string {
265259
return fmt.Sprintf("%squeues", r.namespace)
266260
}
267-
268-
func (r *RedisBroker) logError(format string, args ...interface{}) {
269-
if r.logger != nil {
270-
r.logger.Errorf(format, args...)
271-
}
272-
}

core/engine.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package core
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"os"
78
"os/signal"
89
"sync"
@@ -11,7 +12,6 @@ import (
1112

1213
"github.com/BranchIntl/goworker2/errors"
1314
"github.com/BranchIntl/goworker2/job"
14-
"github.com/cihub/seelog"
1515
)
1616

1717
// Engine is the main orchestration engine
@@ -27,8 +27,6 @@ type Engine struct {
2727
ctx context.Context
2828
cancel context.CancelFunc
2929
wg sync.WaitGroup
30-
31-
logger seelog.LoggerInterface
3230
}
3331

3432
// NewEngine creates a new engine with dependency injection
@@ -44,25 +42,19 @@ func NewEngine(
4442
opt(config)
4543
}
4644

47-
logger, _ := seelog.LoggerFromWriterWithMinLevel(config.LogOutput, config.LogLevel)
48-
4945
return &Engine{
5046
broker: broker,
5147
stats: stats,
5248
registry: registry,
5349
serializer: serializer,
5450
config: config,
55-
logger: logger,
5651
}
5752
}
5853

5954
// Start begins processing jobs
6055
func (e *Engine) Start(ctx context.Context) error {
6156
e.ctx, e.cancel = context.WithCancel(ctx)
6257

63-
// Set logger on broker
64-
e.broker.SetLogger(e.logger)
65-
6658
// If broker implements SetConsumerQueues (for push-based consumption), set the queues
6759
if setQueues, ok := e.broker.(interface{ SetConsumerQueues([]string) }); ok {
6860
setQueues.SetConsumerQueues(e.config.Queues)
@@ -94,7 +86,6 @@ func (e *Engine) Start(ctx context.Context) error {
9486
e.stats,
9587
e.config.Queues,
9688
e.config.PollInterval,
97-
e.logger,
9889
)
9990
}
10091

@@ -106,7 +97,6 @@ func (e *Engine) Start(ctx context.Context) error {
10697
e.config.Concurrency,
10798
e.config.Queues,
10899
jobChan,
109-
e.logger,
110100
e.broker,
111101
)
112102

@@ -115,18 +105,18 @@ func (e *Engine) Start(ctx context.Context) error {
115105
go func() {
116106
defer e.wg.Done()
117107
if err := poller.Start(e.ctx, jobChan); err != nil {
118-
e.logger.Errorf("Poller error: %v", err)
108+
slog.Error("Poller error", "error", err)
119109
}
120110
}()
121111

122112
go func() {
123113
defer e.wg.Done()
124114
if err := e.workerPool.Start(e.ctx); err != nil {
125-
e.logger.Errorf("Worker pool error: %v", err)
115+
slog.Error("Worker pool error", "error", err)
126116
}
127117
}()
128118

129-
e.logger.Info("Engine started")
119+
slog.Info("Engine started")
130120
return nil
131121
}
132122

@@ -145,18 +135,18 @@ func (e *Engine) Stop() error {
145135

146136
select {
147137
case <-done:
148-
e.logger.Info("Engine stopped gracefully")
138+
slog.Info("Engine stopped gracefully")
149139
case <-time.After(e.config.ShutdownTimeout):
150-
e.logger.Warn("Engine shutdown timeout exceeded")
140+
slog.Warn("Engine shutdown timeout exceeded")
151141
}
152142

153143
// Close connections
154144
if err := e.broker.Close(); err != nil {
155-
e.logger.Errorf("Error closing broker: %v", err)
145+
slog.Error("Error closing broker", "error", err)
156146
}
157147

158148
if err := e.stats.Close(); err != nil {
159-
e.logger.Errorf("Error closing statistics: %v", err)
149+
slog.Error("Error closing statistics", "error", err)
160150
}
161151

162152
return nil
@@ -209,9 +199,9 @@ func (e *Engine) Run(ctx context.Context) error {
209199
// Wait for either context cancellation or signal
210200
select {
211201
case <-ctx.Done():
212-
e.logger.Info("Context cancelled, shutting down...")
202+
slog.Info("Context cancelled, shutting down...")
213203
case sig := <-sigChan:
214-
e.logger.Infof("Received signal %v, shutting down...", sig)
204+
slog.Info("Received signal, shutting down...", "signal", sig)
215205
}
216206

217207
// Graceful shutdown

core/engine_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ func TestEngine_Integration_MultipleQueues(t *testing.T) {
289289
WithQueues([]string{"high", "medium", "low"}),
290290
WithConcurrency(2),
291291
WithPollInterval(50*time.Millisecond),
292-
WithStrictQueues(true), // Process in queue order
293292
)
294293

295294
// Register workers

core/helpers_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package core
22

33
import (
44
"context"
5+
"log/slog"
6+
"os"
57
"testing"
68
"time"
79

810
"github.com/BranchIntl/goworker2/job"
9-
"github.com/cihub/seelog"
1011
)
1112

1213
// TestSetup provides common test dependencies
@@ -15,17 +16,21 @@ type TestSetup struct {
1516
Stats *MockStatistics
1617
Registry *MockRegistry
1718
Serializer *MockSerializer
18-
Logger seelog.LoggerInterface
1919
}
2020

2121
// NewTestSetup creates a standard test setup with all mocks
2222
func NewTestSetup() *TestSetup {
23+
// Set up a discard logger for tests to avoid noise
24+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
25+
Level: slog.LevelError, // Only show errors in tests
26+
}))
27+
slog.SetDefault(logger)
28+
2329
return &TestSetup{
2430
Broker: NewMockBroker(),
2531
Stats: NewMockStatistics(),
2632
Registry: NewMockRegistry(),
2733
Serializer: NewMockSerializer(),
28-
Logger: seelog.Disabled,
2934
}
3035
}
3136

@@ -133,7 +138,7 @@ func (b *WorkerBuilder) WithQueues(queues []string) *WorkerBuilder {
133138

134139
// Build creates the worker
135140
func (b *WorkerBuilder) Build() *Worker {
136-
return NewWorker(b.id, b.queues, b.setup.Registry, b.setup.Stats, b.setup.Logger, b.setup.Broker)
141+
return NewWorker(b.id, b.queues, b.setup.Registry, b.setup.Stats, b.setup.Broker)
137142
}
138143

139144
// PollerBuilder helps create pollers for testing
@@ -168,7 +173,7 @@ func (b *PollerBuilder) WithInterval(interval time.Duration) *PollerBuilder {
168173

169174
// Build creates the poller
170175
func (b *PollerBuilder) Build() *StandardPoller {
171-
return NewStandardPoller(b.setup.Broker, b.setup.Stats, b.queues, b.interval, b.setup.Logger)
176+
return NewStandardPoller(b.setup.Broker, b.setup.Stats, b.queues, b.interval)
172177
}
173178

174179
// WorkerPoolBuilder helps create worker pools for testing
@@ -204,7 +209,7 @@ func (b *WorkerPoolBuilder) WithQueues(queues []string) *WorkerPoolBuilder {
204209
// Build creates the worker pool
205210
func (b *WorkerPoolBuilder) Build() *WorkerPool {
206211
return NewWorkerPool(b.setup.Registry, b.setup.Stats, b.setup.Serializer,
207-
b.concurrency, b.queues, b.jobChan, b.setup.Logger, b.setup.Broker)
212+
b.concurrency, b.queues, b.jobChan, b.setup.Broker)
208213
}
209214

210215
// ErrorTestCase represents a common error test scenario

0 commit comments

Comments
 (0)