Skip to content

Commit 5c464f7

Browse files
authored
Merge branch 'master' into optional-logger
2 parents 1ec1926 + ae5434c commit 5c464f7

File tree

11 files changed

+1361
-97
lines changed

11 files changed

+1361
-97
lines changed

async_handoff_integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5353
Dialer: func(ctx context.Context) (net.Conn, error) {
5454
return &mockNetConn{addr: "original:6379"}, nil
5555
},
56-
PoolSize: int32(5),
57-
PoolTimeout: time.Second,
56+
PoolSize: int32(5),
57+
MaxConcurrentDials: 5,
58+
PoolTimeout: time.Second,
5859
})
5960

6061
// Add the hook to the pool after creation
@@ -153,8 +154,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
153154
return &mockNetConn{addr: "original:6379"}, nil
154155
},
155156

156-
PoolSize: int32(10),
157-
PoolTimeout: time.Second,
157+
PoolSize: int32(10),
158+
MaxConcurrentDials: 10,
159+
PoolTimeout: time.Second,
158160
})
159161
defer testPool.Close()
160162

@@ -225,8 +227,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
225227
return &mockNetConn{addr: "original:6379"}, nil
226228
},
227229

228-
PoolSize: int32(3),
229-
PoolTimeout: time.Second,
230+
PoolSize: int32(3),
231+
MaxConcurrentDials: 3,
232+
PoolTimeout: time.Second,
230233
})
231234
defer testPool.Close()
232235

@@ -288,8 +291,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
288291
return &mockNetConn{addr: "original:6379"}, nil
289292
},
290293

291-
PoolSize: int32(2),
292-
PoolTimeout: time.Second,
294+
PoolSize: int32(2),
295+
MaxConcurrentDials: 2,
296+
PoolTimeout: time.Second,
293297
})
294298
defer testPool.Close()
295299

internal/pool/bench_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ func BenchmarkPoolGetPut(b *testing.B) {
3131
for _, bm := range benchmarks {
3232
b.Run(bm.String(), func(b *testing.B) {
3333
connPool := pool.NewConnPool(&pool.Options{
34-
Dialer: dummyDialer,
35-
PoolSize: int32(bm.poolSize),
36-
PoolTimeout: time.Second,
37-
DialTimeout: 1 * time.Second,
38-
ConnMaxIdleTime: time.Hour,
34+
Dialer: dummyDialer,
35+
PoolSize: int32(bm.poolSize),
36+
MaxConcurrentDials: bm.poolSize,
37+
PoolTimeout: time.Second,
38+
DialTimeout: 1 * time.Second,
39+
ConnMaxIdleTime: time.Hour,
3940
})
4041

4142
b.ResetTimer()
@@ -75,11 +76,12 @@ func BenchmarkPoolGetRemove(b *testing.B) {
7576
for _, bm := range benchmarks {
7677
b.Run(bm.String(), func(b *testing.B) {
7778
connPool := pool.NewConnPool(&pool.Options{
78-
Dialer: dummyDialer,
79-
PoolSize: int32(bm.poolSize),
80-
PoolTimeout: time.Second,
81-
DialTimeout: 1 * time.Second,
82-
ConnMaxIdleTime: time.Hour,
79+
Dialer: dummyDialer,
80+
PoolSize: int32(bm.poolSize),
81+
MaxConcurrentDials: bm.poolSize,
82+
PoolTimeout: time.Second,
83+
DialTimeout: 1 * time.Second,
84+
ConnMaxIdleTime: time.Hour,
8385
})
8486

8587
b.ResetTimer()

internal/pool/buffer_size_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ var _ = Describe("Buffer Size Configuration", func() {
2424

2525
It("should use default buffer sizes when not specified", func() {
2626
connPool = pool.NewConnPool(&pool.Options{
27-
Dialer: dummyDialer,
28-
PoolSize: int32(1),
29-
PoolTimeout: 1000,
27+
Dialer: dummyDialer,
28+
PoolSize: int32(1),
29+
MaxConcurrentDials: 1,
30+
PoolTimeout: 1000,
3031
})
3132

3233
cn, err := connPool.NewConn(ctx)
@@ -46,11 +47,12 @@ var _ = Describe("Buffer Size Configuration", func() {
4647
customWriteSize := 64 * 1024 // 64KB
4748

4849
connPool = pool.NewConnPool(&pool.Options{
49-
Dialer: dummyDialer,
50-
PoolSize: int32(1),
51-
PoolTimeout: 1000,
52-
ReadBufferSize: customReadSize,
53-
WriteBufferSize: customWriteSize,
50+
Dialer: dummyDialer,
51+
PoolSize: int32(1),
52+
MaxConcurrentDials: 1,
53+
PoolTimeout: 1000,
54+
ReadBufferSize: customReadSize,
55+
WriteBufferSize: customWriteSize,
5456
})
5557

5658
cn, err := connPool.NewConn(ctx)
@@ -67,11 +69,12 @@ var _ = Describe("Buffer Size Configuration", func() {
6769

6870
It("should handle zero buffer sizes by using defaults", func() {
6971
connPool = pool.NewConnPool(&pool.Options{
70-
Dialer: dummyDialer,
71-
PoolSize: int32(1),
72-
PoolTimeout: 1000,
73-
ReadBufferSize: 0, // Should use default
74-
WriteBufferSize: 0, // Should use default
72+
Dialer: dummyDialer,
73+
PoolSize: int32(1),
74+
MaxConcurrentDials: 1,
75+
PoolTimeout: 1000,
76+
ReadBufferSize: 0, // Should use default
77+
WriteBufferSize: 0, // Should use default
7578
})
7679

7780
cn, err := connPool.NewConn(ctx)
@@ -103,9 +106,10 @@ var _ = Describe("Buffer Size Configuration", func() {
103106
// Test the scenario where someone creates a pool directly (like in tests)
104107
// without setting ReadBufferSize and WriteBufferSize
105108
connPool = pool.NewConnPool(&pool.Options{
106-
Dialer: dummyDialer,
107-
PoolSize: int32(1),
108-
PoolTimeout: 1000,
109+
Dialer: dummyDialer,
110+
PoolSize: int32(1),
111+
MaxConcurrentDials: 1,
112+
PoolTimeout: 1000,
109113
// ReadBufferSize and WriteBufferSize are not set (will be 0)
110114
})
111115

internal/pool/hooks_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ func TestPoolWithHooks(t *testing.T) {
191191
Dialer: func(ctx context.Context) (net.Conn, error) {
192192
return &net.TCPConn{}, nil // Mock connection
193193
},
194-
PoolSize: 1,
195-
DialTimeout: time.Second,
194+
PoolSize: 1,
195+
MaxConcurrentDials: 1,
196+
DialTimeout: time.Second,
196197
}
197198

198199
pool := NewConnPool(opt)

internal/pool/pool.go

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type Options struct {
9898

9999
PoolFIFO bool
100100
PoolSize int32
101+
MaxConcurrentDials int
101102
DialTimeout time.Duration
102103
PoolTimeout time.Duration
103104
MinIdleConns int32
@@ -129,7 +130,9 @@ type ConnPool struct {
129130
dialErrorsNum uint32 // atomic
130131
lastDialError atomic.Value
131132

132-
queue chan struct{}
133+
queue chan struct{}
134+
dialsInProgress chan struct{}
135+
dialsQueue *wantConnQueue
133136

134137
connsMu sync.Mutex
135138
conns map[uint64]*Conn
@@ -155,9 +158,11 @@ func NewConnPool(opt *Options) *ConnPool {
155158
p := &ConnPool{
156159
cfg: opt,
157160

158-
queue: make(chan struct{}, opt.PoolSize),
159-
conns: make(map[uint64]*Conn),
160-
idleConns: make([]*Conn, 0, opt.PoolSize),
161+
queue: make(chan struct{}, opt.PoolSize),
162+
conns: make(map[uint64]*Conn),
163+
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
164+
dialsQueue: newWantConnQueue(),
165+
idleConns: make([]*Conn, 0, opt.PoolSize),
161166
}
162167

163168
// Only create MinIdleConns if explicitly requested (> 0)
@@ -236,6 +241,7 @@ func (p *ConnPool) checkMinIdleConns() {
236241
return
237242
}
238243
}
244+
239245
}
240246

241247
func (p *ConnPool) addIdleConn() error {
@@ -494,9 +500,8 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
494500

495501
atomic.AddUint32(&p.stats.Misses, 1)
496502

497-
newcn, err := p.newConn(ctx, true)
503+
newcn, err := p.queuedNewConn(ctx)
498504
if err != nil {
499-
p.freeTurn()
500505
return nil, err
501506
}
502507

@@ -515,6 +520,99 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
515520
return newcn, nil
516521
}
517522

523+
func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
524+
select {
525+
case p.dialsInProgress <- struct{}{}:
526+
// Got permission, proceed to create connection
527+
case <-ctx.Done():
528+
p.freeTurn()
529+
return nil, ctx.Err()
530+
}
531+
532+
dialCtx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
533+
534+
w := &wantConn{
535+
ctx: dialCtx,
536+
cancelCtx: cancel,
537+
result: make(chan wantConnResult, 1),
538+
}
539+
var err error
540+
defer func() {
541+
if err != nil {
542+
if cn := w.cancel(); cn != nil {
543+
p.putIdleConn(ctx, cn)
544+
p.freeTurn()
545+
}
546+
}
547+
}()
548+
549+
p.dialsQueue.enqueue(w)
550+
551+
go func(w *wantConn) {
552+
var freeTurnCalled bool
553+
defer func() {
554+
if err := recover(); err != nil {
555+
if !freeTurnCalled {
556+
p.freeTurn()
557+
}
558+
internal.Logger.Printf(context.Background(), "queuedNewConn panic: %+v", err)
559+
}
560+
}()
561+
562+
defer w.cancelCtx()
563+
defer func() { <-p.dialsInProgress }() // Release connection creation permission
564+
565+
dialCtx := w.getCtxForDial()
566+
cn, cnErr := p.newConn(dialCtx, true)
567+
delivered := w.tryDeliver(cn, cnErr)
568+
if cnErr == nil && delivered {
569+
return
570+
} else if cnErr == nil && !delivered {
571+
p.putIdleConn(dialCtx, cn)
572+
p.freeTurn()
573+
freeTurnCalled = true
574+
} else {
575+
p.freeTurn()
576+
freeTurnCalled = true
577+
}
578+
}(w)
579+
580+
select {
581+
case <-ctx.Done():
582+
err = ctx.Err()
583+
return nil, err
584+
case result := <-w.result:
585+
err = result.err
586+
return result.cn, err
587+
}
588+
}
589+
590+
func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
591+
for {
592+
w, ok := p.dialsQueue.dequeue()
593+
if !ok {
594+
break
595+
}
596+
if w.tryDeliver(cn, nil) {
597+
return
598+
}
599+
}
600+
601+
cn.SetUsable(true)
602+
603+
p.connsMu.Lock()
604+
defer p.connsMu.Unlock()
605+
606+
if p.closed() {
607+
_ = cn.Close()
608+
return
609+
}
610+
611+
// poolSize is increased in newConn
612+
p.idleConns = append(p.idleConns, cn)
613+
p.idleConnsLen.Add(1)
614+
}
615+
518616
func (p *ConnPool) waitTurn(ctx context.Context) error {
519617
select {
520618
case <-ctx.Done():

0 commit comments

Comments
 (0)