Skip to content

Commit d3cbd27

Browse files
committed
fixup: review feedback from giovanni
Signed-off-by: Todd Baert <[email protected]>
1 parent 7363dc3 commit d3cbd27

File tree

6 files changed

+123
-56
lines changed

6 files changed

+123
-56
lines changed

CONTRIBUTING.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags
8383
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
8484
```
8585

86+
#### Remote event streaming via gRPC
87+
88+
```sh
89+
# notifies of flag changes (but does not evaluate)
90+
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/EventStream
91+
```
92+
8693
#### Flag configuration fetch via gRPC
8794

8895
```sh

core/pkg/evaluator/json.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,18 @@ func (je *Resolver) evaluateVariant(ctx context.Context, reqID string, flagKey s
308308
variant string, variants map[string]interface{}, reason string, metadata map[string]interface{}, err error,
309309
) {
310310

311-
selector := store.NewSelector("")
311+
var selector store.Selector
312312
s := ctx.Value(store.SelectorContextKey{})
313313
if s != nil {
314314
selector = s.(store.Selector)
315+
} else {
316+
selector = store.NewSelector("")
315317
}
316-
flag, metadata, ok := je.store.Get(ctx, flagKey, selector)
317-
if !ok {
318+
if s != nil {
319+
selector = s.(store.Selector)
320+
}
321+
flag, metadata, err := je.store.Get(ctx, flagKey, selector)
322+
if err != nil {
318323
// flag not found
319324
je.Logger.DebugWithID(reqID, fmt.Sprintf("requested flag could not be found: %s", flagKey))
320325
return "", map[string]interface{}{}, model.ErrorReason, metadata, errors.New(model.FlagNotFoundErrorCode)

core/pkg/store/store.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type FlagQueryResult struct {
2222
}
2323

2424
type IStore interface {
25-
Get(ctx context.Context, key string, selector Selector) (model.Flag, model.Metadata, bool)
25+
Get(ctx context.Context, key string, selector Selector) (model.Flag, model.Metadata, error)
2626
GetAll(ctx context.Context, selector Selector) (map[string]model.Flag, model.Metadata, error)
2727
Watch(ctx context.Context, selector Selector, watcher chan FlagQueryResult)
2828
}
@@ -151,7 +151,7 @@ func NewFlags() *Store {
151151
return state
152152
}
153153

154-
func (s *Store) Get(_ context.Context, key string, selector Selector) (model.Flag, model.Metadata, bool) {
154+
func (s *Store) Get(_ context.Context, key string, selector Selector) (model.Flag, model.Metadata, error) {
155155
s.logger.Debug(fmt.Sprintf("getting flag %s", key))
156156
txn := s.db.Txn(false)
157157
queryMeta := model.Metadata{}
@@ -165,33 +165,36 @@ func (s *Store) Get(_ context.Context, key string, selector Selector) (model.Fla
165165
raw, err := txn.First(flagsTable, indexId, constraints...)
166166
flag, ok := raw.(model.Flag)
167167
if (err != nil) || !ok {
168-
return model.Flag{}, queryMeta, false
168+
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
169169
}
170-
return flag, queryMeta, true
170+
return flag, queryMeta, nil
171171

172172
}
173173
// otherwise, get all flags with the given key, and keep the last one with the highest priority
174174
s.logger.Debug(fmt.Sprintf("getting highest priority flag with key: %s", key))
175175
it, err := txn.Get(flagsTable, keyIndex, key)
176176
if err != nil {
177-
return model.Flag{}, queryMeta, false
177+
return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err)
178178
}
179179
flag := model.Flag{}
180-
var found bool
180+
found := false
181181
for raw := it.Next(); raw != nil; raw = it.Next() {
182-
s.logger.Debug(fmt.Sprintf("got range scan: %v", raw))
183182
nextFlag, ok := raw.(model.Flag)
184-
found = true
185183
if !ok {
186184
continue
187185
}
186+
found = true
188187
if nextFlag.Priority >= flag.Priority {
189188
flag = nextFlag
190189
} else {
191190
s.logger.Debug(fmt.Sprintf("discarding flag %s from lower priority source %s in favor of flag from source %s", nextFlag.Key, s.sources[nextFlag.Priority], s.sources[flag.Priority]))
192191
}
193192
}
194-
return flag, queryMeta, found
193+
194+
if !found {
195+
return flag, queryMeta, fmt.Errorf("flag %s not found", key)
196+
}
197+
return flag, queryMeta, nil
195198
}
196199

197200
func (f *Store) String() (string, error) {

core/pkg/store/store_test.go

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -174,46 +174,46 @@ func TestGet(t *testing.T) {
174174

175175
t.Parallel()
176176
tests := []struct {
177-
name string
178-
key string
179-
selector Selector
180-
wantFlag model.Flag
181-
wantFound bool
177+
name string
178+
key string
179+
selector Selector
180+
wantFlag model.Flag
181+
wantErr bool
182182
}{
183183
{
184-
name: "nil selector",
185-
key: "flagA",
186-
selector: nil,
187-
wantFlag: model.Flag{Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
188-
wantFound: true,
184+
name: "nil selector",
185+
key: "flagA",
186+
selector: nil,
187+
wantFlag: model.Flag{Key: "flagA", DefaultVariant: "off", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
188+
wantErr: false,
189189
},
190190
{
191-
name: "flagSetId selector",
192-
key: "dupe",
193-
selector: flagSetIdCSelector,
194-
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
195-
wantFound: true,
191+
name: "flagSetId selector",
192+
key: "dupe",
193+
selector: flagSetIdCSelector,
194+
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "off", Source: sourceC, FlagSetId: flagSetIdC, Priority: 2, Metadata: model.Metadata{"flagSetId": flagSetIdC}},
195+
wantErr: false,
196196
},
197197
{
198-
name: "source selector",
199-
key: "dupe",
200-
selector: sourceASelector,
201-
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
202-
wantFound: true,
198+
name: "source selector",
199+
key: "dupe",
200+
selector: sourceASelector,
201+
wantFlag: model.Flag{Key: "dupe", DefaultVariant: "on", Source: sourceA, FlagSetId: nilFlagSetId, Priority: 0},
202+
wantErr: false,
203203
},
204204
{
205-
name: "flag not found with source selector",
206-
key: "flagB",
207-
selector: sourceASelector,
208-
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
209-
wantFound: false,
205+
name: "flag not found with source selector",
206+
key: "flagB",
207+
selector: sourceASelector,
208+
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
209+
wantErr: true,
210210
},
211211
{
212-
name: "flag not found with flagSetId selector",
213-
key: "flagB",
214-
selector: flagSetIdCSelector,
215-
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
216-
wantFound: false,
212+
name: "flag not found with flagSetId selector",
213+
key: "flagB",
214+
selector: flagSetIdCSelector,
215+
wantFlag: model.Flag{Key: "flagB", DefaultVariant: "off", Source: sourceB, FlagSetId: flagSetIdB, Priority: 1, Metadata: model.Metadata{"flagSetId": flagSetIdB}},
216+
wantErr: true,
217217
},
218218
}
219219

@@ -242,12 +242,12 @@ func TestGet(t *testing.T) {
242242
store.Update(sourceA, sourceAFlags, nil)
243243
store.Update(sourceB, sourceBFlags, nil)
244244
store.Update(sourceC, sourceCFlags, nil)
245-
gotFlag, _, found := store.Get(context.Background(), tt.key, tt.selector)
246-
247-
require.Equal(t, tt.wantFound, found, "expected found to be %v, got %v", tt.wantFound, found)
245+
gotFlag, _, err := store.Get(context.Background(), tt.key, tt.selector)
248246

249-
if tt.wantFound {
247+
if !tt.wantErr {
250248
require.Equal(t, tt.wantFlag, gotFlag)
249+
} else {
250+
require.Error(t, err, "expected an error for key %s with selector %v", tt.key, tt.selector)
251251
}
252252
})
253253
}

flagd/pkg/service/flag-evaluation/connect_service_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/http"
88
"os"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -214,6 +215,53 @@ func TestConnectServiceNotify(t *testing.T) {
214215
}
215216
}
216217

218+
func TestConnectServiceWatcher(t *testing.T) {
219+
sources := []string{"source1", "source2"}
220+
log := logger.NewLogger(nil, false)
221+
s, err := store.NewStore(log, sources)
222+
223+
if err != nil {
224+
t.Fatalf("NewStore failed: %v", err)
225+
}
226+
227+
sChan := make(chan iservice.Notification, 1)
228+
eventing := eventingConfiguration{
229+
store: s,
230+
logger: log,
231+
mu: &sync.RWMutex{},
232+
subs: make(map[any]chan iservice.Notification),
233+
}
234+
235+
// subscribe and wait for for the sub to be active
236+
eventing.Subscribe(context.Background(), "anything", nil, sChan)
237+
time.Sleep(100 * time.Millisecond)
238+
239+
// make a change
240+
s.Update(sources[0], map[string]model.Flag{
241+
"flag1": {
242+
Key: "flag1",
243+
DefaultVariant: "off",
244+
},
245+
}, model.Metadata{})
246+
247+
// notification type
248+
ofType := iservice.ConfigurationChange
249+
250+
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
251+
defer cancel()
252+
253+
select {
254+
case n := <-sChan:
255+
require.Equal(t, ofType, n.Type, "expected notification type: %s, but received %s", ofType, n.Type)
256+
notifications := n.Data["flags"].(map[string]interface{})
257+
flag1, ok := notifications["flag1"].(map[string]interface{})
258+
require.True(t, ok, "flag1 notification should be a map[string]interface{}")
259+
require.Equal(t, flag1["type"], string(model.NotificationCreate), "expected notification type: %s, but received %s", model.NotificationCreate, flag1["type"])
260+
case <-timeout.Done():
261+
t.Error("timeout while waiting for notifications")
262+
}
263+
}
264+
217265
func TestConnectServiceShutdown(t *testing.T) {
218266
// given
219267
ctrl := gomock.NewController(t)

flagd/pkg/service/flag-evaluation/eventing.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,21 @@ func (eventing *eventingConfiguration) Subscribe(ctx context.Context, id any, se
3737
watcher := make(chan store.FlagQueryResult, 1)
3838
go func() {
3939
// store the previous flags to compare against new notifications, to compute proper diffs for RPC mode
40-
oldFlags := map[string]model.Flag{}
41-
for payload := range watcher {
42-
newFlags := payload.Flags
43-
notifications := notifications.NewFromFlags(oldFlags, newFlags)
44-
notifier <- iservice.Notification{
45-
Type: iservice.ConfigurationChange,
46-
Data: map[string]interface{}{
47-
"flags": notifications,
48-
},
40+
var oldFlags map[string]model.Flag
41+
for result := range watcher {
42+
newFlags := result.Flags
43+
44+
// ignore the first notification (nil old flags), the watcher emits on initialization, but for RPC we don't care until there's a change
45+
if oldFlags != nil {
46+
notifications := notifications.NewFromFlags(oldFlags, newFlags)
47+
notifier <- iservice.Notification{
48+
Type: iservice.ConfigurationChange,
49+
Data: map[string]interface{}{
50+
"flags": notifications,
51+
},
52+
}
4953
}
50-
oldFlags = newFlags
54+
oldFlags = result.Flags
5155
}
5256

5357
eventing.logger.Debug(fmt.Sprintf("closing notify channel for id %v", id))

0 commit comments

Comments
 (0)