Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/launchdarkly/go-server-sdk-dynamodb/v4 v4.0.0
github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1
github.com/launchdarkly/go-server-sdk-redis-redigo/v3 v3.0.0
github.com/launchdarkly/go-server-sdk/v7 v7.14.2
github.com/launchdarkly/go-server-sdk/v7 v7.14.3
github.com/launchdarkly/go-test-helpers/v3 v3.1.0
github.com/launchdarkly/opencensus-go-exporter-stackdriver v0.14.5
github.com/pborman/uuid v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1 h1:rTgcYAFraGFj7sBMB2
github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1/go.mod h1:fPS5d+zOsgFnMunj+Ki6jjlZtFvo4h9iNbtNXxzYn58=
github.com/launchdarkly/go-server-sdk-redis-redigo/v3 v3.0.0 h1:ItkPbTEzz0ObNzIpA3DfPqgEgeNyqno/Lnfd7BjC0Ns=
github.com/launchdarkly/go-server-sdk-redis-redigo/v3 v3.0.0/go.mod h1:ho3n0ML1YbV0QRnidDNF9ooFIC66FiVzZGW0u4behG0=
github.com/launchdarkly/go-server-sdk/v7 v7.14.2 h1:9LOCC9U1l8WcYLBg/TbJUTiwLW/1Ti61UikpD+4gttk=
github.com/launchdarkly/go-server-sdk/v7 v7.14.2/go.mod h1:0CUdE5PI0SVG1Tb6CwKz8wZ9zEHUzfMutl6wY2MzUF0=
github.com/launchdarkly/go-server-sdk/v7 v7.14.3 h1:a+0hA9Jk2bg+LgfXl8o7xeAolK1bOf6BY/j83MlmmSI=
github.com/launchdarkly/go-server-sdk/v7 v7.14.3/go.mod h1:0CUdE5PI0SVG1Tb6CwKz8wZ9zEHUzfMutl6wY2MzUF0=
github.com/launchdarkly/go-test-helpers/v2 v2.3.2 h1:WX6qSzt7v8xz6d94nVcoil9ljuLTC/6OzQt0MhWYxsQ=
github.com/launchdarkly/go-test-helpers/v2 v2.3.2/go.mod h1:L7+th5govYp5oKU9iN7To5PgznBuIjBPn+ejqKR0avw=
github.com/launchdarkly/go-test-helpers/v3 v3.1.0 h1:E3bxJMzMoA+cJSF3xxtk2/chr1zshl1ZWa0/oR+8bvg=
Expand Down
204 changes: 204 additions & 0 deletions internal/filedata/offline_mode_synchronizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package filedata

import (
"context"
"sync"
"sync/atomic"

"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
)

// simpleBroadcaster is a simple implementation of a broadcaster for changeSets and status updates.
type simpleBroadcaster[T any] struct {
mu sync.RWMutex
listeners []chan T
}

func newSimpleBroadcaster[T any]() *simpleBroadcaster[T] {
return &simpleBroadcaster[T]{
listeners: make([]chan T, 0),
}
}

func (b *simpleBroadcaster[T]) AddListener() <-chan T {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan T, 10)
b.listeners = append(b.listeners, ch)
return ch
}

func (b *simpleBroadcaster[T]) Broadcast(value T) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.listeners {
select {
case ch <- value:
default:
// If channel is full, skip this listener
}
}
}

// OfflineModeSynchronizerFactory creates a synchronizer that loads data from an external source
// (the relay's archive file) without making any network connections.
type OfflineModeSynchronizerFactory struct {
Synchronizer *OfflineModeSynchronizer
}

// OfflineModeSynchronizer implements subsystems.DataSynchronizer for offline mode.
// It loads pre-populated data from the archive file without connecting to LaunchDarkly.
type OfflineModeSynchronizer struct {
dataCh <-chan []ldstoretypes.Collection
changeSetBroadcaster *simpleBroadcaster[subsystems.ChangeSet]
statusBroadcaster *simpleBroadcaster[interfaces.DataSynchronizerStatus]
version int32
quit chan struct{}
closed atomic.Bool
}

func NewOfflineModeSynchronizer(dataCh <-chan []ldstoretypes.Collection) *OfflineModeSynchronizer {
return &OfflineModeSynchronizer{
dataCh: dataCh,
changeSetBroadcaster: newSimpleBroadcaster[subsystems.ChangeSet](),
statusBroadcaster: newSimpleBroadcaster[interfaces.DataSynchronizerStatus](),
quit: make(chan struct{}),
}
}

func (f OfflineModeSynchronizerFactory) Build(
ctx subsystems.ClientContext,
) (subsystems.DataSynchronizer, error) {
return f.Synchronizer, nil
}

func (s *OfflineModeSynchronizer) Close() error {
if s.closed.Swap(true) {
return nil
}
close(s.quit)
return nil
}

func (s *OfflineModeSynchronizer) Name() string {
return "OfflineModeSynchronizer"
}

// Fetch returns the current basis (full dataset) from the offline data source.
func (s *OfflineModeSynchronizer) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) {
// Wait for data to arrive from the archive
select {
case data := <-s.dataCh:
changeSet, err := s.makeChangeSetFromCollections(data)
if err != nil {
return nil, err
}
return &subsystems.Basis{
ChangeSet: *changeSet,
Persist: false,
}, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-s.quit:
return nil, context.Canceled
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Single-use data channel shared across synchronizer calls

The dataCh channel is read from by both the Fetch() and Sync() methods, but only a single data item is ever sent to this channel (in AddEnvironment). Additionally, the same OfflineModeSynchronizer instance is used for both primary and fallback synchronizers via Synchronizers(syncFactory, syncFactory). If the SDK calls Sync() on both (or calls both Fetch() and Sync()), multiple goroutines will race to receive from dataCh. Only one will succeed; the others will block forever waiting for data that never arrives, potentially causing initialization hangs.

Additional Locations (2)

Fix in Cursor Fix in Web


// Sync starts the synchronizer and returns a channel for receiving updates.
// For offline mode, this immediately provides the data from the archive file.
func (s *OfflineModeSynchronizer) Sync(ds subsystems.DataSelector) <-chan subsystems.DataSynchronizerResult {
resultChan := make(chan subsystems.DataSynchronizerResult)
changeSetChan := s.changeSetBroadcaster.AddListener()
statusChan := s.statusBroadcaster.AddListener()

go func() {
defer close(resultChan)

result := subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateInitializing,
}

// Wait for initial data from archive
select {
case data := <-s.dataCh:
changeSet, err := s.makeChangeSetFromCollections(data)
if err != nil {
result.State = interfaces.DataSourceStateOff
result.Error = interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindUnknown,
Message: err.Error(),
}
} else {
result.State = interfaces.DataSourceStateValid
result.ChangeSet = changeSet
}
resultChan <- result

case <-s.quit:
result.State = interfaces.DataSourceStateOff
resultChan <- result
return
}

// Listen for updates (in offline mode, these would be file changes)
for {
select {
case <-s.quit:
return
case changeSet, ok := <-changeSetChan:
if !ok {
return
}
result.ChangeSet = &changeSet
result.State = interfaces.DataSourceStateValid
resultChan <- result
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Error state persists incorrectly across result updates

The result struct is reused across loop iterations in Sync(). When a changeSet is received (lines 150-156), the code sets result.ChangeSet and result.State but does not clear result.Error. If a previous status change set an error, that stale error will be included when sending subsequent valid changeSet updates. Consumers may incorrectly interpret the result as containing an error when the data is actually valid.

Fix in Cursor Fix in Web

case statusChange, ok := <-statusChan:
if !ok {
return
}
if statusChange.State != interfaces.DataSourceStateValid {
result.ChangeSet = nil
}
result.State = statusChange.State
result.Error = statusChange.Error
resultChan <- result
}
}
}()

return resultChan
}

// makeChangeSetFromCollections converts old-style Collection data to a new-style ChangeSet.
// This uses the SDK's NewChangeSetFromCollections which pre-caches the collections,
// avoiding redundant conversions when the data is accessed later.
func (s *OfflineModeSynchronizer) makeChangeSetFromCollections(
collections []ldstoretypes.Collection,
) (*subsystems.ChangeSet, error) {
version := int(atomic.AddInt32(&s.version, 1))

return subsystems.NewChangeSetFromCollections(
subsystems.ServerIntent{
Payload: subsystems.Payload{
ID: "",
Target: version,
Code: subsystems.IntentTransferFull,
Reason: "offline-mode-init",
},
},
subsystems.NewSelector("offline", version),
collections,
)
}

// UpdateData allows external updates to the data (e.g., when the archive file changes).
func (s *OfflineModeSynchronizer) UpdateData(collections []ldstoretypes.Collection) error {
changeSet, err := s.makeChangeSetFromCollections(collections)
if err != nil {
return err
}
s.changeSetBroadcaster.Broadcast(*changeSet)
return nil
}
80 changes: 31 additions & 49 deletions relay/filedata_actions.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package relay

import (
"time"

"github.com/launchdarkly/ld-relay/v8/internal/relayenv"

"github.com/launchdarkly/ld-relay/v8/internal/sdkauth"
Expand All @@ -13,9 +11,8 @@ import (
"github.com/launchdarkly/ld-relay/v8/internal/filedata"

ld "github.com/launchdarkly/go-server-sdk/v7"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/ldcomponents"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
)

const (
Expand All @@ -29,48 +26,50 @@ const (
// methods on this object to let us know when environments have been read from the file for the
// first time and also if environments have changed due to a file update.
type relayFileDataActions struct {
r *Relay
envUpdates map[config.EnvironmentID]subsystems.DataSourceUpdateSink
r *Relay
envSynchronizers map[config.EnvironmentID]*filedata.OfflineModeSynchronizer
}

type dataSourceFactoryToCaptureUpdates struct {
updatesCh chan<- subsystems.DataSourceUpdateSink
}
func (a *relayFileDataActions) AddEnvironment(ae filedata.ArchiveEnvironment) {
// Create a channel to pass the archive data to the offline synchronizer
dataCh := make(chan []ldstoretypes.Collection, 1)

type stubDataSourceToCaptureUpdates struct {
dataSourceUpdates subsystems.DataSourceUpdateSink
updatesCh chan<- subsystems.DataSourceUpdateSink
}
// Create the synchronizer instance that we can later update when the file changes
synchronizer := filedata.NewOfflineModeSynchronizer(dataCh)

func (a *relayFileDataActions) AddEnvironment(ae filedata.ArchiveEnvironment) {
updatesCh := make(chan subsystems.DataSourceUpdateSink)
transformConfig := func(baseConfig ld.Config) ld.Config {
config := baseConfig
config.DataSource = dataSourceFactoryToCaptureUpdates{updatesCh}
// In offline mode, replace the DataSystem with our custom offline synchronizer.
// This synchronizer loads data from the archive file without making network connections.
syncFactory := filedata.OfflineModeSynchronizerFactory{Synchronizer: synchronizer}
config.DataSystem = ldcomponents.DataSystem().
Custom().
Synchronizers(syncFactory, nil) // primary and fallback use the same synchronizer
config.Events = ldcomponents.NoEvents()
return config
}

envConfig := envfactory.NewEnvConfigFactoryForOfflineMode(a.r.config.OfflineMode).MakeEnvironmentConfig(ae.Params)
env, _, err := a.r.addEnvironment(ae.Params.Identifiers, envConfig, transformConfig)
if err != nil {
a.r.loggers.Errorf(logMsgAutoConfEnvInitError, ae.Params.Identifiers.GetDisplayName(), err)
return
}

if ae.Params.ExpiringSDKKey.Defined() {
update := relayenv.NewCredentialUpdate(ae.Params.SDKKey)
env.UpdateCredential(update.WithGracePeriod(ae.Params.ExpiringSDKKey.Key, ae.Params.ExpiringSDKKey.Expiration))
}
select {
case updates := <-updatesCh:
if a.envUpdates == nil {
a.envUpdates = make(map[config.EnvironmentID]subsystems.DataSourceUpdateSink)
}
a.envUpdates[ae.Params.EnvID] = updates
updates.Init(ae.SDKData)
updates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
case <-time.After(time.Second * 2):
a.r.loggers.Errorf(logMsgOfflineEnvTimeoutError, ae.Params.Identifiers.GetDisplayName())

// Store the synchronizer so we can update it later when the file changes
if a.envSynchronizers == nil {
a.envSynchronizers = make(map[config.EnvironmentID]*filedata.OfflineModeSynchronizer)
}
a.envSynchronizers[ae.Params.EnvID] = synchronizer

// Send the initial archive data to the synchronizer
// The synchronizer will be waiting for this data in its Sync() method
dataCh <- ae.SDKData
}

func (a *relayFileDataActions) UpdateEnvironment(ae filedata.ArchiveEnvironment) {
Expand All @@ -79,8 +78,8 @@ func (a *relayFileDataActions) UpdateEnvironment(ae filedata.ArchiveEnvironment)
a.r.loggers.Errorf(logMsgInternalErrorUpdatedEnvNotFound, ae.Params.EnvID)
return
}
updates := a.envUpdates[ae.Params.EnvID]
if updates == nil { // COVERAGE: this should never happen and can't be covered in unit tests
synchronizer := a.envSynchronizers[ae.Params.EnvID]
if synchronizer == nil { // COVERAGE: this should never happen and can't be covered in unit tests
a.r.loggers.Errorf(logMsgInternalErrorNoUpdatesForEnv, ae.Params.EnvID)
return
}
Expand All @@ -102,7 +101,9 @@ func (a *relayFileDataActions) UpdateEnvironment(ae filedata.ArchiveEnvironment)

// SDKData will be non-nil only if the flag/segment data for the environment has actually changed.
if ae.SDKData != nil {
updates.Init(ae.SDKData)
if err := synchronizer.UpdateData(ae.SDKData); err != nil {
a.r.loggers.Errorf("Error updating offline environment data: %v", err)
}
}
}

Expand All @@ -112,24 +113,5 @@ func (a *relayFileDataActions) EnvironmentFailed(id config.EnvironmentID, err er

func (a *relayFileDataActions) DeleteEnvironment(id config.EnvironmentID, filter config.FilterKey) {
a.r.removeEnvironment(sdkauth.NewScoped(filter, id))
delete(a.envUpdates, id)
}

func (d dataSourceFactoryToCaptureUpdates) Build(
ctx subsystems.ClientContext,
) (subsystems.DataSource, error) {
return stubDataSourceToCaptureUpdates{ctx.GetDataSourceUpdateSink(), d.updatesCh}, nil
}

func (s stubDataSourceToCaptureUpdates) Close() error {
return nil
}

func (s stubDataSourceToCaptureUpdates) IsInitialized() bool {
return true
}

func (s stubDataSourceToCaptureUpdates) Start(readyCh chan<- struct{}) {
s.updatesCh <- s.dataSourceUpdates
close(readyCh)
delete(a.envSynchronizers, id)
}
10 changes: 4 additions & 6 deletions relay/filedata_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,16 @@ func (p offlineModeTestParams) shouldNotCreateClient(timeout time.Duration) {
func TestOfflineModeDeleteEnvironment(t *testing.T) {
offlineModeTest(t, config.Config{}, func(p offlineModeTestParams) {
p.updateHandler.AddEnvironment(testFileDataEnv1)
p.updateHandler.AddEnvironment(testFileDataEnv2)

client1 := p.awaitClient()
client2 := p.awaitClient()
assert.Equal(t, testFileDataEnv1.Params.SDKKey, client1.Key)
assert.Equal(t, testFileDataEnv2.Params.SDKKey, client2.Key)

_ = p.awaitEnvironment(testFileDataEnv1.Params.EnvID)

p.updateHandler.AddEnvironment(testFileDataEnv2)
client2 := p.awaitClient()
assert.Equal(t, testFileDataEnv2.Params.SDKKey, client2.Key)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test awaits wrong environment ID after adding second env

After adding testFileDataEnv2, the test calls awaitEnvironment(testFileDataEnv1.Params.EnvID) instead of awaitEnvironment(testFileDataEnv2.Params.EnvID). This means the test doesn't actually verify that the second environment was properly initialized - it just re-checks the first environment. The test will pass even if adding the second environment fails to complete.

Fix in Cursor Fix in Web

_ = p.awaitEnvironment(testFileDataEnv1.Params.EnvID)

p.updateHandler.DeleteEnvironment(testFileDataEnv1.Params.EnvID, testFileDataEnv1.Params.Identifiers.FilterKey)

p.shouldNotHaveEnvironment(testFileDataEnv1.Params.EnvID, time.Second)
})
}
Expand Down
Loading
Loading