Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ flagd start [flags]
-H, --context-from-header stringToString add key-value pairs to map header values to context values, where key is Header name, value is context key (default [])
-X, --context-value stringToString add arbitrary key value pairs to the flag evaluation context (default [])
-C, --cors-origin strings CORS allowed origins, * will allow all origins
--disable-sync-metadata Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)
Expand Down
4 changes: 4 additions & 0 deletions flagd/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
syncPortFlagName = "sync-port"
syncSocketPathFlagName = "sync-socket-path"
uriFlagName = "uri"
disableSyncMetadata = "disable-sync-metadata"
contextValueFlagName = "context-value"
headerToContextKeyFlagName = "context-from-header"
streamDeadlineFlagName = "stream-deadline"
Expand Down Expand Up @@ -89,6 +90,7 @@ func init() {
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
"header values to context values, where key is Header name, value is context key")
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
flags.Bool(disableSyncMetadata, false, "Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.")

bindFlags(flags)
}
Expand All @@ -114,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
_ = viper.BindPFlag(disableSyncMetadata, flags.Lookup(disableSyncMetadata))
}

// startCmd represents the start command
Expand Down Expand Up @@ -186,6 +189,7 @@ var startCmd = &cobra.Command{
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
DisableSyncMetadata: viper.GetBool(disableSyncMetadata),
SyncProviders: syncProviders,
ContextValues: contextValuesToMap,
HeaderToContextKeyMappings: headerToContextKeyMappings,
Expand Down
20 changes: 11 additions & 9 deletions flagd/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Config struct {
SyncServicePort uint16
SyncServiceSocketPath string
StreamDeadline time.Duration
DisableSyncMetadata bool

SyncProviders []sync.SourceConfig
CORS []string
Expand Down Expand Up @@ -116,15 +117,16 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,

// flag sync service
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
DisableSyncMetadata: config.DisableSyncMetadata,
})
if err != nil {
return nil, fmt.Errorf("error creating sync service: %w", err)
Expand Down
20 changes: 10 additions & 10 deletions flagd/pkg/service/flag-sync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"context"
"errors"
"fmt"
"maps"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"maps"
"time"

"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
Expand All @@ -17,10 +17,11 @@

// syncHandler implements the sync contract
type syncHandler struct {
mux *Multiplexer
log *logger.Logger
contextValues map[string]any
deadline time.Duration
mux *Multiplexer
log *logger.Logger
contextValues map[string]any
deadline time.Duration
disableSyncMetadata bool
}

func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
Expand All @@ -44,7 +45,6 @@
for {
select {
case payload := <-muxPayload:

metadataSrc := make(map[string]any)
maps.Copy(metadataSrc, s.contextValues)

Expand All @@ -58,10 +58,7 @@
return fmt.Errorf("error constructing metadata response")
}

err = server.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: payload.flags,
SyncContext: metadata,
})
err = server.Send(&syncv1.SyncFlagsResponse{FlagConfiguration: payload.flags, SyncContext: metadata})
if err != nil {
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
return fmt.Errorf("error sending stream response: %w", err)
Expand Down Expand Up @@ -94,9 +91,12 @@

// Deprecated - GetMetadata is deprecated and will be removed in a future release.
// Use the sync_context field in syncv1.SyncFlagsResponse, providing same info.
func (s syncHandler) GetMetadata(_ context.Context, _ *syncv1.GetMetadataRequest) (

Check failure on line 94 in flagd/pkg/service/flag-sync/handler.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: syncv1.GetMetadataRequest is deprecated: Marked as deprecated in flagd/sync/v1/sync.proto. (staticcheck)
*syncv1.GetMetadataResponse, error,

Check failure on line 95 in flagd/pkg/service/flag-sync/handler.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: syncv1.GetMetadataResponse is deprecated: Marked as deprecated in flagd/sync/v1/sync.proto. (staticcheck)
) {
if s.disableSyncMetadata {
return nil, status.Error(codes.Unimplemented, "metadata endpoint disabled")
}
metadataSrc := make(map[string]any)
for k, v := range s.contextValues {
metadataSrc[k] = v
Expand All @@ -111,7 +111,7 @@
return nil, fmt.Errorf("error constructing metadata response")
}

return &syncv1.GetMetadataResponse{

Check failure on line 114 in flagd/pkg/service/flag-sync/handler.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: syncv1.GetMetadataResponse is deprecated: Marked as deprecated in flagd/sync/v1/sync.proto. (staticcheck)
Metadata: metadata,
},
nil
Expand Down
95 changes: 48 additions & 47 deletions flagd/pkg/service/flag-sync/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,54 +54,55 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore := store.NewFlags()
mp, err := NewMux(flagStore, tt.sources)
require.NoError(t, err)

handler := syncHandler{
mux: mp,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
}

// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)

// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}

go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()

select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)

syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)

// Check the two metadatas are equal
for _, disableSyncMetadata := range []bool{true, false} {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore := store.NewFlags()
mp, err := NewMux(flagStore, tt.sources)
require.NoError(t, err)

handler := syncHandler{
mux: mp,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
disableSyncMetadata: disableSyncMetadata,
}

// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
assert.Equal(t, respMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}

})
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
if !disableSyncMetadata {
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)
} else {
assert.NotNil(t, err)
}

// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}

go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()

select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)
syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
})
}
}
}

Expand Down
28 changes: 15 additions & 13 deletions flagd/pkg/service/flag-sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ type ISyncService interface {
}

type SvcConfigurations struct {
Logger *logger.Logger
Port uint16
Sources []string
Store *store.State
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
Logger *logger.Logger
Port uint16
Sources []string
Store *store.State
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
DisableSyncMetadata bool
}

type Service struct {
Expand Down Expand Up @@ -82,10 +83,11 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
}

syncv1grpc.RegisterFlagSyncServiceServer(server, &syncHandler{
mux: mux,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
mux: mux,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
disableSyncMetadata: cfg.DisableSyncMetadata,
})

var lis net.Listener
Expand Down
Loading
Loading