Skip to content

Commit 87239bb

Browse files
committed
feat: add optional logger wherever possible
This commit introduces an optional logger parameter to various structs. This enhancement allows users to provide custom logging implementations.
1 parent c176672 commit 87239bb

File tree

13 files changed

+271
-110
lines changed

13 files changed

+271
-110
lines changed

internal/pool/pool.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ type Options struct {
115115
// DialerRetryTimeout is the backoff duration between retry attempts.
116116
// Default: 100ms
117117
DialerRetryTimeout time.Duration
118+
119+
// Optional logger for connection pool operations.
120+
Logger internal.Logging
118121
}
119122

120123
type lastDialErrorWrap struct {
@@ -223,7 +226,7 @@ func (p *ConnPool) checkMinIdleConns() {
223226
p.idleConnsLen.Add(-1)
224227

225228
p.freeTurn()
226-
internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err)
229+
p.logf(context.Background(), "addIdleConn panic: %+v", err)
227230
}
228231
}()
229232

@@ -379,7 +382,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
379382
return cn, nil
380383
}
381384

382-
internal.Logger.Printf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
385+
p.logf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
383386
// All retries failed - handle error tracking
384387
p.setLastDialError(lastErr)
385388
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
@@ -452,7 +455,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
452455

453456
for {
454457
if attempts >= getAttempts {
455-
internal.Logger.Printf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
458+
p.logf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
456459
break
457460
}
458461
attempts++
@@ -479,12 +482,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
479482
if hookManager != nil {
480483
acceptConn, err := hookManager.ProcessOnGet(ctx, cn, false)
481484
if err != nil {
482-
internal.Logger.Printf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
485+
p.logf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
483486
_ = p.CloseConn(cn)
484487
continue
485488
}
486489
if !acceptConn {
487-
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
490+
p.logf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
488491
p.Put(ctx, cn)
489492
cn = nil
490493
continue
@@ -509,7 +512,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
509512
// this should not happen with a new connection, but we handle it gracefully
510513
if err != nil || !acceptConn {
511514
// Failed to process connection, discard it
512-
internal.Logger.Printf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
515+
p.logf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
513516
_ = p.CloseConn(newcn)
514517
return nil, err
515518
}
@@ -703,7 +706,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
703706

704707
// If we exhausted all attempts without finding a usable connection, return nil
705708
if attempts > 1 && attempts >= maxAttempts && int32(attempts) >= p.poolSize.Load() {
706-
internal.Logger.Printf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
709+
p.logf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
707710
return nil, nil
708711
}
709712

@@ -720,7 +723,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
720723
// Peek at the reply type to check if it's a push notification
721724
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
722725
// Not a push notification or error peeking, remove connection
723-
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
726+
p.logf(ctx, "Conn has unread data (not push notification), removing it")
724727
p.Remove(ctx, cn, err)
725728
}
726729
// It's a push notification, allow pooling (client will handle it)
@@ -733,7 +736,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
733736
if hookManager != nil {
734737
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
735738
if err != nil {
736-
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
739+
p.logf(ctx, "Connection hook error: %v", err)
737740
p.Remove(ctx, cn, err)
738741
return
739742
}
@@ -835,7 +838,7 @@ func (p *ConnPool) removeConn(cn *Conn) {
835838
// this can be idle conn
836839
for idx, ic := range p.idleConns {
837840
if ic.GetID() == cid {
838-
internal.Logger.Printf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
841+
p.logf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
839842
p.idleConns = append(p.idleConns[:idx], p.idleConns[idx+1:]...)
840843
p.idleConnsLen.Add(-1)
841844
break
@@ -951,7 +954,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
951954
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
952955
// For RESP3 connections with push notifications, we allow some buffered data
953956
// The client will process these notifications before using the connection
954-
internal.Logger.Printf(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
957+
p.logf(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
955958
return true // Connection is healthy, client will handle notifications
956959
}
957960
return false // Unexpected data, not push notifications, connection is unhealthy
@@ -961,3 +964,11 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
961964
}
962965
return true
963966
}
967+
968+
func (p *ConnPool) logf(ctx context.Context, format string, args ...any) {
969+
if p.cfg.Logger != nil {
970+
p.cfg.Logger.Printf(ctx, format, args...)
971+
} else {
972+
internal.Logger.Printf(ctx, format, args...)
973+
}
974+
}

maintnotifications/circuit_breaker.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (cb *CircuitBreaker) Execute(fn func() error) error {
103103
cb.requests.Store(0)
104104
cb.successes.Store(0)
105105
if internal.LogLevel.InfoOrAbove() {
106-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerTransitioningToHalfOpen(cb.endpoint))
106+
cb.logf(context.Background(), logs.CircuitBreakerTransitioningToHalfOpen(cb.endpoint))
107107
}
108108
// Fall through to half-open logic
109109
} else {
@@ -145,15 +145,15 @@ func (cb *CircuitBreaker) recordFailure() {
145145
if failures >= int64(cb.failureThreshold) {
146146
if cb.state.CompareAndSwap(int32(CircuitBreakerClosed), int32(CircuitBreakerOpen)) {
147147
if internal.LogLevel.WarnOrAbove() {
148-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerOpened(cb.endpoint, failures))
148+
cb.logf(context.Background(), logs.CircuitBreakerOpened(cb.endpoint, failures))
149149
}
150150
}
151151
}
152152
case CircuitBreakerHalfOpen:
153153
// Any failure in half-open state immediately opens the circuit
154154
if cb.state.CompareAndSwap(int32(CircuitBreakerHalfOpen), int32(CircuitBreakerOpen)) {
155155
if internal.LogLevel.WarnOrAbove() {
156-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerReopened(cb.endpoint))
156+
cb.logf(context.Background(), logs.CircuitBreakerReopened(cb.endpoint))
157157
}
158158
}
159159
}
@@ -177,7 +177,7 @@ func (cb *CircuitBreaker) recordSuccess() {
177177
if cb.state.CompareAndSwap(int32(CircuitBreakerHalfOpen), int32(CircuitBreakerClosed)) {
178178
cb.failures.Store(0)
179179
if internal.LogLevel.InfoOrAbove() {
180-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerClosed(cb.endpoint, successes))
180+
cb.logf(context.Background(), logs.CircuitBreakerClosed(cb.endpoint, successes))
181181
}
182182
}
183183
}
@@ -202,6 +202,14 @@ func (cb *CircuitBreaker) GetStats() CircuitBreakerStats {
202202
}
203203
}
204204

205+
func (cb *CircuitBreaker) logf(ctx context.Context, format string, args ...interface{}) {
206+
if cb.config != nil && cb.config.Logger != nil {
207+
cb.config.Logger.Printf(ctx, format, args...)
208+
} else {
209+
internal.Logger.Printf(ctx, format, args...)
210+
}
211+
}
212+
205213
// CircuitBreakerStats provides statistics about a circuit breaker
206214
type CircuitBreakerStats struct {
207215
Endpoint string
@@ -326,7 +334,7 @@ func (cbm *CircuitBreakerManager) cleanup() {
326334

327335
// Log cleanup results
328336
if len(toDelete) > 0 && internal.LogLevel.InfoOrAbove() {
329-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerCleanup(len(toDelete), count))
337+
cbm.logf(context.Background(), logs.CircuitBreakerCleanup(len(toDelete), count))
330338
}
331339

332340
cbm.lastCleanup.Store(now.Unix())
@@ -351,3 +359,11 @@ func (cbm *CircuitBreakerManager) Reset() {
351359
return true
352360
})
353361
}
362+
363+
func (cbm *CircuitBreakerManager) logf(ctx context.Context, format string, args ...interface{}) {
364+
if cbm.config != nil && cbm.config.Logger != nil {
365+
cbm.config.Logger.Printf(ctx, format, args...)
366+
} else {
367+
internal.Logger.Printf(ctx, format, args...)
368+
}
369+
}

maintnotifications/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ type Config struct {
128128
// After this many retries, the connection will be removed from the pool.
129129
// Default: 3
130130
MaxHandoffRetries int
131+
132+
// Logger is an optional custom logger for maintenance notifications.
133+
Logger internal.Logging
131134
}
132135

133136
func (c *Config) IsEnabled() bool {
@@ -341,6 +344,8 @@ func (c *Config) Clone() *Config {
341344

342345
// Configuration fields
343346
MaxHandoffRetries: c.MaxHandoffRetries,
347+
348+
Logger: c.Logger,
344349
}
345350
}
346351

maintnotifications/handoff_worker.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (hwm *handoffWorkerManager) onDemandWorker() {
121121
defer func() {
122122
// Handle panics to ensure proper cleanup
123123
if r := recover(); r != nil {
124-
internal.Logger.Printf(context.Background(), logs.WorkerPanicRecovered(r))
124+
hwm.logf(context.Background(), logs.WorkerPanicRecovered(r))
125125
}
126126

127127
// Decrement active worker count when exiting
@@ -146,21 +146,21 @@ func (hwm *handoffWorkerManager) onDemandWorker() {
146146
select {
147147
case <-hwm.shutdown:
148148
if internal.LogLevel.InfoOrAbove() {
149-
internal.Logger.Printf(context.Background(), logs.WorkerExitingDueToShutdown())
149+
hwm.logf(context.Background(), logs.WorkerExitingDueToShutdown())
150150
}
151151
return
152152
case <-timer.C:
153153
// Worker has been idle for too long, exit to save resources
154154
if internal.LogLevel.InfoOrAbove() {
155-
internal.Logger.Printf(context.Background(), logs.WorkerExitingDueToInactivityTimeout(hwm.workerTimeout))
155+
hwm.logf(context.Background(), logs.WorkerExitingDueToInactivityTimeout(hwm.workerTimeout))
156156
}
157157
return
158158
case request := <-hwm.handoffQueue:
159159
// Check for shutdown before processing
160160
select {
161161
case <-hwm.shutdown:
162162
if internal.LogLevel.InfoOrAbove() {
163-
internal.Logger.Printf(context.Background(), logs.WorkerExitingDueToShutdownWhileProcessing())
163+
hwm.logf(context.Background(), logs.WorkerExitingDueToShutdownWhileProcessing())
164164
}
165165
// Clean up the request before exiting
166166
hwm.pending.Delete(request.ConnID)
@@ -178,7 +178,7 @@ func (hwm *handoffWorkerManager) processHandoffRequest(request HandoffRequest) {
178178
// Remove from pending map
179179
defer hwm.pending.Delete(request.Conn.GetID())
180180
if internal.LogLevel.InfoOrAbove() {
181-
internal.Logger.Printf(context.Background(), logs.HandoffStarted(request.Conn.GetID(), request.Endpoint))
181+
hwm.logf(context.Background(), logs.HandoffStarted(request.Conn.GetID(), request.Endpoint))
182182
}
183183

184184
// Create a context with handoff timeout from config
@@ -226,12 +226,12 @@ func (hwm *handoffWorkerManager) processHandoffRequest(request HandoffRequest) {
226226
if hwm.config != nil {
227227
maxRetries = hwm.config.MaxHandoffRetries
228228
}
229-
internal.Logger.Printf(context.Background(), logs.HandoffFailed(request.ConnID, request.Endpoint, currentRetries, maxRetries, err))
229+
hwm.logf(context.Background(), logs.HandoffFailed(request.ConnID, request.Endpoint, currentRetries, maxRetries, err))
230230
}
231231
time.AfterFunc(afterTime, func() {
232232
if err := hwm.queueHandoff(request.Conn); err != nil {
233233
if internal.LogLevel.WarnOrAbove() {
234-
internal.Logger.Printf(context.Background(), logs.CannotQueueHandoffForRetry(err))
234+
hwm.logf(context.Background(), logs.CannotQueueHandoffForRetry(err))
235235
}
236236
hwm.closeConnFromRequest(context.Background(), request, err)
237237
}
@@ -259,7 +259,7 @@ func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error {
259259
// if shouldHandoff is false and retries is 0, then we are not retrying and not do a handoff
260260
if !shouldHandoff && conn.HandoffRetries() == 0 {
261261
if internal.LogLevel.InfoOrAbove() {
262-
internal.Logger.Printf(context.Background(), logs.ConnectionNotMarkedForHandoff(conn.GetID()))
262+
hwm.logf(context.Background(), logs.ConnectionNotMarkedForHandoff(conn.GetID()))
263263
}
264264
return errors.New(logs.ConnectionNotMarkedForHandoffError(conn.GetID()))
265265
}
@@ -302,7 +302,7 @@ func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error {
302302
queueLen := len(hwm.handoffQueue)
303303
queueCap := cap(hwm.handoffQueue)
304304
if internal.LogLevel.WarnOrAbove() {
305-
internal.Logger.Printf(context.Background(), logs.HandoffQueueFull(queueLen, queueCap))
305+
hwm.logf(context.Background(), logs.HandoffQueueFull(queueLen, queueCap))
306306
}
307307
}
308308
}
@@ -356,7 +356,7 @@ func (hwm *handoffWorkerManager) performConnectionHandoff(ctx context.Context, c
356356

357357
// Check if circuit breaker is open before attempting handoff
358358
if circuitBreaker.IsOpen() {
359-
internal.Logger.Printf(ctx, logs.CircuitBreakerOpen(connID, newEndpoint))
359+
hwm.logf(ctx, logs.CircuitBreakerOpen(connID, newEndpoint))
360360
return false, ErrCircuitBreakerOpen // Don't retry when circuit breaker is open
361361
}
362362

@@ -385,15 +385,15 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
385385
connID uint64,
386386
) (shouldRetry bool, err error) {
387387
retries := conn.IncrementAndGetHandoffRetries(1)
388-
internal.Logger.Printf(ctx, logs.HandoffRetryAttempt(connID, retries, newEndpoint, conn.RemoteAddr().String()))
388+
hwm.logf(ctx, logs.HandoffRetryAttempt(connID, retries, newEndpoint, conn.RemoteAddr().String()))
389389
maxRetries := 3 // Default fallback
390390
if hwm.config != nil {
391391
maxRetries = hwm.config.MaxHandoffRetries
392392
}
393393

394394
if retries > maxRetries {
395395
if internal.LogLevel.WarnOrAbove() {
396-
internal.Logger.Printf(ctx, logs.ReachedMaxHandoffRetries(connID, newEndpoint, maxRetries))
396+
hwm.logf(ctx, logs.ReachedMaxHandoffRetries(connID, newEndpoint, maxRetries))
397397
}
398398
// won't retry on ErrMaxHandoffRetriesReached
399399
return false, ErrMaxHandoffRetriesReached
@@ -405,7 +405,7 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
405405
// Create new connection to the new endpoint
406406
newNetConn, err := endpointDialer(ctx)
407407
if err != nil {
408-
internal.Logger.Printf(ctx, logs.FailedToDialNewEndpoint(connID, newEndpoint, err))
408+
hwm.logf(ctx, logs.FailedToDialNewEndpoint(connID, newEndpoint, err))
409409
// will retry
410410
// Maybe a network error - retry after a delay
411411
return true, err
@@ -425,7 +425,7 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
425425
conn.SetRelaxedTimeoutWithDeadline(relaxedTimeout, relaxedTimeout, deadline)
426426

427427
if internal.LogLevel.InfoOrAbove() {
428-
internal.Logger.Printf(context.Background(), logs.ApplyingRelaxedTimeoutDueToPostHandoff(connID, relaxedTimeout, deadline.Format("15:04:05.000")))
428+
hwm.logf(context.Background(), logs.ApplyingRelaxedTimeoutDueToPostHandoff(connID, relaxedTimeout, deadline.Format("15:04:05.000")))
429429
}
430430
}
431431

@@ -447,7 +447,7 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
447447
// - clear the handoff state (shouldHandoff, endpoint, seqID)
448448
// - reset the handoff retries to 0
449449
conn.ClearHandoffState()
450-
internal.Logger.Printf(ctx, logs.HandoffSucceeded(connID, newEndpoint))
450+
hwm.logf(ctx, logs.HandoffSucceeded(connID, newEndpoint))
451451

452452
// successfully completed the handoff, no retry needed and no error
453453
return false, nil
@@ -478,15 +478,23 @@ func (hwm *handoffWorkerManager) closeConnFromRequest(ctx context.Context, reque
478478
if pooler != nil {
479479
pooler.Remove(ctx, conn, err)
480480
if internal.LogLevel.WarnOrAbove() {
481-
internal.Logger.Printf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err))
481+
hwm.logf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err))
482482
}
483483
} else {
484484
err := conn.Close() // Close the connection if no pool provided
485485
if err != nil {
486-
internal.Logger.Printf(ctx, "redis: failed to close connection: %v", err)
486+
hwm.logf(ctx, "redis: failed to close connection: %v", err)
487487
}
488488
if internal.LogLevel.WarnOrAbove() {
489-
internal.Logger.Printf(ctx, logs.NoPoolProvidedCannotRemove(conn.GetID(), err))
489+
hwm.logf(ctx, logs.NoPoolProvidedCannotRemove(conn.GetID(), err))
490490
}
491491
}
492492
}
493+
494+
func (hwm *handoffWorkerManager) logf(ctx context.Context, format string, args ...interface{}) {
495+
if hwm.config != nil && hwm.config.Logger != nil {
496+
hwm.config.Logger.Printf(ctx, format, args...)
497+
} else {
498+
internal.Logger.Printf(ctx, format, args...)
499+
}
500+
}

0 commit comments

Comments
 (0)