Skip to content

Commit aace8fc

Browse files
authored
Revert "🐛 fix: avoid writing into released Response in core::execFunc() (#3830)"
This reverts commit 7a17195.
1 parent 7a17195 commit aace8fc

File tree

3 files changed

+37
-251
lines changed

3 files changed

+37
-251
lines changed

client/client_test.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2297,49 +2297,3 @@ func Benchmark_Client_Request_Parallel(b *testing.B) {
22972297
require.NoError(b, err)
22982298
})
22992299
}
2300-
2301-
func Benchmark_Client_Request_Send_ContextCancel(b *testing.B) {
2302-
app, ln, start := createHelperServer(b)
2303-
2304-
startedCh := make(chan struct{})
2305-
errCh := make(chan error)
2306-
respCh := make(chan *Response)
2307-
2308-
app.Post("/", func(c fiber.Ctx) error {
2309-
startedCh <- struct{}{}
2310-
time.Sleep(time.Millisecond) // let cancel be called
2311-
return c.Status(fiber.StatusOK).SendString("post")
2312-
})
2313-
2314-
go start()
2315-
2316-
client := New().SetDial(ln)
2317-
2318-
b.ReportAllocs()
2319-
b.ResetTimer()
2320-
2321-
for b.Loop() {
2322-
ctx, cancel := context.WithCancel(context.Background())
2323-
2324-
req := AcquireRequest().
2325-
SetClient(client).
2326-
SetURL("http://example.com").
2327-
SetMethod(fiber.MethodPost).
2328-
SetContext(ctx)
2329-
2330-
go func(r *Request) {
2331-
defer ReleaseRequest(r)
2332-
2333-
resp, err := r.Send()
2334-
2335-
respCh <- resp
2336-
errCh <- err
2337-
}(req)
2338-
2339-
<-startedCh // request is made, we can cancel the context now
2340-
cancel()
2341-
2342-
require.Nil(b, <-respCh)
2343-
require.ErrorIs(b, <-errCh, ErrTimeoutOrCancel)
2344-
}
2345-
}

client/core.go

Lines changed: 37 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314

1415
"github.com/gofiber/fiber/v3"
1516
"github.com/gofiber/fiber/v3/addon/retry"
@@ -72,25 +73,25 @@ func (c *core) getRetryConfig() *RetryConfig {
7273
// execFunc is the core logic to send the request and receive the response.
7374
// It leverages the fasthttp client, optionally with retries or redirects.
7475
func (c *core) execFunc() (*Response, error) {
75-
// do not close, these will be returned to the pool
76-
errChan := acquireErrChan()
77-
respChan := acquireResponseChan()
76+
resp := AcquireResponse()
77+
resp.setClient(c.client)
78+
resp.setRequest(c.req)
7879

79-
cfg := c.getRetryConfig()
80-
go func() {
81-
// retain both channels until they are drained
82-
defer releaseErrChan(errChan)
83-
defer releaseResponseChan(respChan)
80+
done := int32(0)
81+
errCh, reqv := acquireErrChan(), fasthttp.AcquireRequest()
82+
defer releaseErrChan(errCh)
8483

85-
reqv := fasthttp.AcquireRequest()
86-
defer fasthttp.ReleaseRequest(reqv)
84+
c.req.RawRequest.CopyTo(reqv)
85+
cfg := c.getRetryConfig()
8786

87+
var err error
88+
go func() {
8889
respv := fasthttp.AcquireResponse()
89-
defer fasthttp.ReleaseResponse(respv)
90-
91-
c.req.RawRequest.CopyTo(reqv)
90+
defer func() {
91+
fasthttp.ReleaseRequest(reqv)
92+
fasthttp.ReleaseResponse(respv)
93+
}()
9294

93-
var err error
9495
if cfg != nil {
9596
// Use an exponential backoff retry strategy.
9697
err = retry.NewExponentialBackoff(*cfg).Retry(func() error {
@@ -107,31 +108,27 @@ func (c *core) execFunc() (*Response, error) {
107108
}
108109
}
109110

110-
if err != nil {
111-
errChan <- err
112-
return
111+
if atomic.CompareAndSwapInt32(&done, 0, 1) {
112+
if err != nil {
113+
errCh <- err
114+
return
115+
}
116+
respv.CopyTo(resp.RawResponse)
117+
errCh <- nil
113118
}
114-
115-
resp := AcquireResponse()
116-
resp.setClient(c.client)
117-
resp.setRequest(c.req)
118-
respv.CopyTo(resp.RawResponse)
119-
respChan <- resp
120119
}()
121120

122121
select {
123-
case err := <-errChan:
124-
return nil, err
125-
case resp := <-respChan:
122+
case err := <-errCh:
123+
if err != nil {
124+
// Release the response if an error occurs.
125+
ReleaseResponse(resp)
126+
return nil, err
127+
}
126128
return resp, nil
127129
case <-c.ctx.Done():
128-
go func() { // drain the channels and release the response
129-
select {
130-
case resp := <-respChan:
131-
ReleaseResponse(resp)
132-
case <-errChan:
133-
}
134-
}()
130+
atomic.SwapInt32(&done, 1)
131+
ReleaseResponse(resp)
135132
return nil, ErrTimeoutOrCancel
136133
}
137134
}
@@ -222,39 +219,16 @@ func (c *core) execute(ctx context.Context, client *Client, req *Request) (*Resp
222219
return resp, nil
223220
}
224221

225-
var responseChanPool = &sync.Pool{
226-
New: func() any {
227-
return make(chan *Response)
228-
},
229-
}
230-
231-
// acquireResponseChan returns an empty, non-closed *Response channel from the pool.
232-
// The returned channel may be returned to the pool with releaseResponseChan
233-
func acquireResponseChan() chan *Response {
234-
ch, ok := responseChanPool.Get().(chan *Response)
235-
if !ok {
236-
panic(errors.New("failed to type-assert to *Response"))
237-
}
238-
return ch
239-
}
240-
241-
// releaseResponseChan returns the *Response channel to the pool.
242-
// It's the caller's responsibility to ensure that:
243-
// - the channel is not closed
244-
// - the channel is drained before returning it
245-
// - the channel is not reused after returning it
246-
func releaseResponseChan(ch chan *Response) {
247-
responseChanPool.Put(ch)
248-
}
249-
250222
var errChanPool = &sync.Pool{
251223
New: func() any {
252-
return make(chan error)
224+
return make(chan error, 1)
253225
},
254226
}
255227

256-
// acquireErrChan returns an empty, non-closed error channel from the pool.
257-
// The returned channel may be returned to the pool with releaseErrChan
228+
// acquireErrChan returns an empty error channel from the pool.
229+
//
230+
// The returned channel may be returned to the pool with releaseErrChan when no longer needed,
231+
// reducing GC load.
258232
func acquireErrChan() chan error {
259233
ch, ok := errChanPool.Get().(chan error)
260234
if !ok {
@@ -264,10 +238,8 @@ func acquireErrChan() chan error {
264238
}
265239

266240
// releaseErrChan returns the error channel to the pool.
267-
// It's caller's responsibility to ensure that:
268-
// - the channel is not closed
269-
// - the channel is drained before returning it
270-
// - the channel is not reused after returning it
241+
//
242+
// Do not use the released channel afterward to avoid data races.
271243
func releaseErrChan(ch chan error) {
272244
errChanPool.Put(ch)
273245
}

client/core_test.go

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,13 @@ package client
22

33
import (
44
"context"
5-
"crypto/tls"
65
"errors"
76
"net"
8-
"sync"
97
"testing"
108
"time"
119

1210
"github.com/stretchr/testify/assert"
1311
"github.com/stretchr/testify/require"
14-
"github.com/valyala/fasthttp"
1512
"github.com/valyala/fasthttp/fasthttputil"
1613

1714
"github.com/gofiber/fiber/v3"
@@ -73,10 +70,6 @@ func Test_Exec_Func(t *testing.T) {
7370
return errors.New("the request is error")
7471
})
7572

76-
app.Get("/redirect", func(c fiber.Ctx) error {
77-
return c.Redirect().Status(fiber.StatusFound).To("/normal")
78-
})
79-
8073
app.Get("/hang-up", func(c fiber.Ctx) error {
8174
time.Sleep(time.Second)
8275
return c.SendString(c.Hostname() + " hang up")
@@ -104,25 +97,6 @@ func Test_Exec_Func(t *testing.T) {
10497
require.Equal(t, "example.com", string(resp.RawResponse.Body()))
10598
})
10699

107-
t.Run("follow redirect with retry config", func(t *testing.T) {
108-
t.Parallel()
109-
core, client, req := newCore(), New(), AcquireRequest()
110-
core.ctx = context.Background()
111-
core.client = client
112-
core.req = req
113-
114-
client.SetRetryConfig(&RetryConfig{MaxRetryCount: 1})
115-
client.SetDial(func(_ string) (net.Conn, error) { return ln.Dial() })
116-
req.SetMaxRedirects(1)
117-
req.RawRequest.Header.SetMethod(fiber.MethodGet)
118-
req.RawRequest.SetRequestURI("http://example.com/redirect")
119-
120-
resp, err := core.execFunc()
121-
require.NoError(t, err)
122-
require.Equal(t, 200, resp.RawResponse.StatusCode())
123-
require.Equal(t, "example.com", string(resp.RawResponse.Body()))
124-
})
125-
126100
t.Run("the request return an error", func(t *testing.T) {
127101
t.Parallel()
128102
core, client, req := newCore(), New(), AcquireRequest()
@@ -157,59 +131,6 @@ func Test_Exec_Func(t *testing.T) {
157131

158132
require.Equal(t, ErrTimeoutOrCancel, err)
159133
})
160-
161-
t.Run("cancel drains errChan", func(t *testing.T) {
162-
core, client, req := newCore(), New(), AcquireRequest()
163-
ctx, cancel := context.WithCancel(context.Background())
164-
defer cancel()
165-
166-
core.ctx = ctx
167-
core.client = client
168-
core.req = req
169-
170-
req.RawRequest.SetRequestURI("http://example.com/drain-err")
171-
172-
blockingTransport := newBlockingErrTransport(errors.New("upstream failure"))
173-
client.transport = blockingTransport
174-
defer blockingTransport.release()
175-
176-
type execResult struct {
177-
resp *Response
178-
err error
179-
}
180-
181-
resultCh := make(chan execResult, 1)
182-
go func() {
183-
resp, err := core.execFunc()
184-
resultCh <- execResult{resp: resp, err: err}
185-
}()
186-
187-
select {
188-
case <-blockingTransport.called:
189-
case <-time.After(time.Second):
190-
t.Fatal("transport Do was not invoked")
191-
}
192-
193-
cancel()
194-
195-
var result execResult
196-
select {
197-
case result = <-resultCh:
198-
case <-time.After(time.Second):
199-
t.Fatal("execFunc did not return")
200-
}
201-
202-
require.Nil(t, result.resp)
203-
require.ErrorIs(t, result.err, ErrTimeoutOrCancel)
204-
205-
blockingTransport.release()
206-
207-
select {
208-
case <-blockingTransport.finished:
209-
case <-time.After(time.Second):
210-
t.Fatal("transport Do did not finish")
211-
}
212-
})
213134
}
214135

215136
func Test_Execute(t *testing.T) {
@@ -325,64 +246,3 @@ func Test_Execute(t *testing.T) {
325246
require.Equal(t, "example.com hang up", string(resp.RawResponse.Body()))
326247
})
327248
}
328-
329-
type blockingErrTransport struct {
330-
err error
331-
332-
called chan struct{}
333-
unblock chan struct{}
334-
finished chan struct{}
335-
336-
calledOnce sync.Once
337-
releaseOnce sync.Once
338-
finishedOnce sync.Once
339-
}
340-
341-
func newBlockingErrTransport(err error) *blockingErrTransport {
342-
return &blockingErrTransport{
343-
err: err,
344-
called: make(chan struct{}),
345-
unblock: make(chan struct{}),
346-
finished: make(chan struct{}),
347-
}
348-
}
349-
350-
func (b *blockingErrTransport) Do(_ *fasthttp.Request, _ *fasthttp.Response) error {
351-
b.calledOnce.Do(func() { close(b.called) })
352-
<-b.unblock
353-
b.finishedOnce.Do(func() { close(b.finished) })
354-
return b.err
355-
}
356-
357-
func (b *blockingErrTransport) DoTimeout(req *fasthttp.Request, resp *fasthttp.Response, _ time.Duration) error {
358-
return b.Do(req, resp)
359-
}
360-
361-
func (b *blockingErrTransport) DoDeadline(req *fasthttp.Request, resp *fasthttp.Response, _ time.Time) error {
362-
return b.Do(req, resp)
363-
}
364-
365-
func (b *blockingErrTransport) DoRedirects(req *fasthttp.Request, resp *fasthttp.Response, _ int) error {
366-
return b.Do(req, resp)
367-
}
368-
369-
func (*blockingErrTransport) CloseIdleConnections() {
370-
}
371-
372-
func (*blockingErrTransport) TLSConfig() *tls.Config {
373-
return nil
374-
}
375-
376-
func (*blockingErrTransport) SetTLSConfig(_ *tls.Config) {
377-
}
378-
379-
func (*blockingErrTransport) SetDial(_ fasthttp.DialFunc) {
380-
}
381-
382-
func (*blockingErrTransport) Client() any {
383-
return nil
384-
}
385-
386-
func (b *blockingErrTransport) release() {
387-
b.releaseOnce.Do(func() { close(b.unblock) })
388-
}

0 commit comments

Comments
 (0)