Skip to content

Commit 68faed8

Browse files
committed
Add test, more logging, better cancellation.
Signed-off-by: Tom Wilkie <[email protected]>
1 parent d9f78a5 commit 68faed8

File tree

3 files changed

+156
-22
lines changed

3 files changed

+156
-22
lines changed

pkg/querier/frontend/frontend.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package frontend
22

33
import (
4+
"context"
45
"flag"
56
"math/rand"
67
"net/http"
@@ -139,28 +140,38 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error {
139140

140141
case resp = <-request.response:
141142
case lastErr = <-request.err:
142-
level.Error(f.log).Log("msg", "error processing request", "try", tries, "err", lastErr)
143143
resp, _ = httpgrpc.HTTPResponseFromError(lastErr)
144144
}
145145

146-
// Only fail is we get a valid HTTP non-500; otherwise retry.
147-
if resp != nil && resp.Code/100 != 5 {
148-
retries.Observe(float64(tries))
149-
server.WriteResponse(w, resp)
150-
return nil
146+
// Only retry is we get a HTTP 500 or non-HTTP error.
147+
if resp == nil || resp.Code/100 == 5 {
148+
level.Error(f.log).Log("msg", "error processing request", "try", tries, "err", lastErr, "resp", resp)
149+
continue
151150
}
151+
152+
retries.Observe(float64(tries))
153+
server.WriteResponse(w, resp)
154+
return nil
152155
}
153156

154157
return lastErr
155158
}
156159

157160
// Process allows backends to pull requests from the frontend.
158161
func (f *Frontend) Process(server Frontend_ProcessServer) error {
162+
163+
// If this request is canceled, ping the condition to unblock. This is done
164+
// once, here (instead of in getNextRequest) as we expect calls to Process to
165+
// process many requests.
166+
go func() {
167+
<-server.Context().Done()
168+
f.cond.Broadcast()
169+
}()
170+
159171
for {
160-
request := f.getNextRequest()
161-
if request == nil {
162-
// Occurs when server is shutting down.
163-
return nil
172+
request, err := f.getNextRequest(server.Context())
173+
if err != nil {
174+
return err
164175
}
165176

166177
if err := server.Send(&ProcessRequest{
@@ -208,16 +219,20 @@ func (f *Frontend) queueRequest(userID string, req *request) error {
208219

209220
// getQueue picks a random queue and takes the next request off of it, so we
210221
// faily process users queries. Will block if there are no requests.
211-
func (f *Frontend) getNextRequest() *request {
222+
func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) {
212223
f.mtx.Lock()
213224
defer f.mtx.Unlock()
214225

215-
for len(f.queues) == 0 && !f.closed {
226+
for len(f.queues) == 0 && !f.closed && ctx.Err() == nil {
216227
f.cond.Wait()
217228
}
218229

230+
if err := ctx.Err(); err != nil {
231+
return nil, err
232+
}
233+
219234
if f.closed {
220-
return nil
235+
return nil, errServerClosing
221236
}
222237

223238
i, n := 0, rand.Intn(len(f.queues))
@@ -234,7 +249,7 @@ func (f *Frontend) getNextRequest() *request {
234249

235250
queueDutation.Observe(time.Now().Sub(request.enqueueTime).Seconds())
236251
queueLength.Add(-1)
237-
return request
252+
return request, nil
238253
}
239254

240255
panic("should never happen")

pkg/querier/frontend/frontend_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package frontend
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io/ioutil"
7+
"net"
8+
"net/http"
9+
"sync/atomic"
10+
"testing"
11+
12+
"github.com/go-kit/kit/log"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
16+
"github.com/weaveworks/common/middleware"
17+
"google.golang.org/grpc"
18+
19+
"github.com/weaveworks/common/user"
20+
"github.com/weaveworks/cortex/pkg/util"
21+
)
22+
23+
func TestFrontend(t *testing.T) {
24+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
25+
w.Write([]byte("Hello World"))
26+
})
27+
test := func(addr string) {
28+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
29+
require.NoError(t, err)
30+
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
31+
require.NoError(t, err)
32+
33+
resp, err := http.DefaultClient.Do(req)
34+
require.NoError(t, err)
35+
require.Equal(t, 200, resp.StatusCode)
36+
37+
body, err := ioutil.ReadAll(resp.Body)
38+
require.NoError(t, err)
39+
40+
assert.Equal(t, "Hello World", string(body))
41+
}
42+
testFrontend(t, handler, test)
43+
}
44+
45+
func TestFrontendRetries(t *testing.T) {
46+
try := int32(0)
47+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
48+
if atomic.AddInt32(&try, 1) == 5 {
49+
w.Write([]byte("Hello World"))
50+
return
51+
}
52+
53+
w.WriteHeader(http.StatusInternalServerError)
54+
})
55+
test := func(addr string) {
56+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
57+
require.NoError(t, err)
58+
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
59+
require.NoError(t, err)
60+
61+
resp, err := http.DefaultClient.Do(req)
62+
require.NoError(t, err)
63+
require.Equal(t, 200, resp.StatusCode)
64+
65+
body, err := ioutil.ReadAll(resp.Body)
66+
require.NoError(t, err)
67+
68+
assert.Equal(t, "Hello World", string(body))
69+
}
70+
testFrontend(t, handler, test)
71+
}
72+
73+
func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
74+
logger := log.NewNopLogger() //log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
75+
76+
var (
77+
config Config
78+
workerConfig WorkerConfig
79+
)
80+
util.DefaultValues(&config, &workerConfig)
81+
82+
grpcListen, err := net.Listen("tcp", "")
83+
require.NoError(t, err)
84+
workerConfig.Address = grpcListen.Addr().String()
85+
86+
httpListen, err := net.Listen("tcp", "")
87+
require.NoError(t, err)
88+
89+
frontend, err := New(config, logger)
90+
require.NoError(t, err)
91+
92+
grpcServer := grpc.NewServer()
93+
defer grpcServer.GracefulStop()
94+
95+
RegisterFrontendServer(grpcServer, frontend)
96+
97+
httpServer := http.Server{
98+
Handler: middleware.AuthenticateUser.Wrap(frontend),
99+
}
100+
defer httpServer.Shutdown(context.Background())
101+
102+
go httpServer.Serve(httpListen)
103+
go grpcServer.Serve(grpcListen)
104+
105+
worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger)
106+
require.NoError(t, err)
107+
defer worker.Stop()
108+
109+
test(httpListen.Addr().String())
110+
}

pkg/querier/frontend/worker.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ import (
1212
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
1313
"github.com/mwitkow/go-grpc-middleware"
1414
"github.com/opentracing/opentracing-go"
15-
"github.com/weaveworks/common/httpgrpc"
16-
"github.com/weaveworks/common/httpgrpc/server"
17-
"github.com/weaveworks/common/middleware"
1815
"google.golang.org/grpc"
1916
"google.golang.org/grpc/naming"
2017

18+
"github.com/weaveworks/common/httpgrpc"
19+
"github.com/weaveworks/common/httpgrpc/server"
20+
"github.com/weaveworks/common/middleware"
2121
"github.com/weaveworks/cortex/pkg/util"
2222
)
2323

@@ -38,7 +38,7 @@ type WorkerConfig struct {
3838
// RegisterFlags adds the flags required to config this to the given FlagSet.
3939
func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
4040
f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.")
41-
f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 1, "Number of simultaneous queries to process.")
41+
f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.")
4242
f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.")
4343
}
4444

@@ -66,6 +66,7 @@ func (noopWorker) Stop() {}
6666
// NewWorker creates a new Worker.
6767
func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) {
6868
if cfg.Address == "" {
69+
level.Info(log).Log("msg", "no address specified, not starting worker")
6970
return noopWorker{}, nil
7071
}
7172

@@ -113,15 +114,23 @@ func (w *worker) watchDNSLoop() {
113114
}
114115
}()
115116

116-
for updates, err := w.watcher.Next(); err != nil; {
117+
for {
118+
updates, err := w.watcher.Next()
119+
if err != nil {
120+
level.Error(w.log).Log("msg", "error from DNS watcher", "err", err)
121+
return
122+
}
123+
117124
for _, update := range updates {
118125
switch update.Op {
119126
case naming.Add:
127+
level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr)
120128
ctx, cancel := context.WithCancel(w.ctx)
121129
cancels[update.Addr] = cancel
122130
w.runMany(ctx, update.Addr)
123131

124132
case naming.Delete:
133+
level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr)
125134
if cancel, ok := cancels[update.Addr]; ok {
126135
cancel()
127136
}
@@ -134,17 +143,17 @@ func (w *worker) watchDNSLoop() {
134143
}
135144

136145
// runMany starts N runOne loops for a given address.
137-
func (w *worker) runMany(ctx context.Context, address string) error {
146+
func (w *worker) runMany(ctx context.Context, address string) {
138147
client, err := connect(address)
139148
if err != nil {
140-
return err
149+
level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err)
150+
return
141151
}
142152

143153
w.wg.Add(w.cfg.Parallelism)
144154
for i := 0; i < w.cfg.Parallelism; i++ {
145155
go w.runOne(ctx, client)
146156
}
147-
return nil
148157
}
149158

150159
// runOne loops, trying to establish a stream to the frontend to begin

0 commit comments

Comments
 (0)