Skip to content

Commit 4f38b0b

Browse files
committed
Simplify file provider synchronization to use only mutex
- Remove atomic operations in favor of simple boolean flags - Fix potential deadlock in Watch() by releasing lock before goroutine spawn - Use minimal locking in cleanup goroutine - Cleaner, more maintainable synchronization pattern - All tests pass with race detector
1 parent 00f18ae commit 4f38b0b

File tree

2 files changed

+136
-19
lines changed

2 files changed

+136
-19
lines changed

providers/file/file.go

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"os"
1010
"path/filepath"
1111
"sync"
12-
"sync/atomic"
1312
"time"
1413

1514
"github.com/fsnotify/fsnotify"
@@ -21,11 +20,8 @@ type File struct {
2120
w *fsnotify.Watcher
2221

2322
// Mutex to protect concurrent access to watcher state
24-
mu sync.Mutex
25-
26-
// Using Go 1.18 atomic functions for backwards compatibility.
27-
isWatching uint32
28-
isUnwatched uint32
23+
mu sync.Mutex
24+
isWatching bool
2925
}
3026

3127
// Provider returns a file provider.
@@ -47,10 +43,10 @@ func (f *File) Read() (map[string]interface{}, error) {
4743
// blocking function that internally spawns a goroutine to watch for changes.
4844
func (f *File) Watch(cb func(event interface{}, err error)) error {
4945
f.mu.Lock()
50-
defer f.mu.Unlock()
51-
46+
5247
// If a watcher already exists, return an error.
53-
if atomic.LoadUint32(&f.isWatching) == 1 {
48+
if f.isWatching {
49+
f.mu.Unlock()
5450
return errors.New("file is already being watched")
5551
}
5652

@@ -68,10 +64,24 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
6864

6965
f.w, err = fsnotify.NewWatcher()
7066
if err != nil {
67+
f.mu.Unlock()
7168
return err
7269
}
7370

74-
atomic.StoreUint32(&f.isWatching, 1)
71+
f.isWatching = true
72+
73+
// Set up the directory watch before releasing the lock
74+
err = f.w.Add(fDir)
75+
if err != nil {
76+
f.w.Close()
77+
f.w = nil
78+
f.isWatching = false
79+
f.mu.Unlock()
80+
return err
81+
}
82+
83+
// Release the lock before spawning goroutine
84+
f.mu.Unlock()
7585

7686
var (
7787
lastEvent string
@@ -84,8 +94,12 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
8494
select {
8595
case event, ok := <-f.w.Events:
8696
if !ok {
87-
// Only throw an error if it was not an explicit unwatch.
88-
if atomic.LoadUint32(&f.isUnwatched) == 0 {
97+
// Only throw an error if we were still supposed to be watching.
98+
f.mu.Lock()
99+
stillWatching := f.isWatching
100+
f.mu.Unlock()
101+
102+
if stillWatching {
89103
cb(nil, errors.New("fsnotify watch channel closed"))
90104
}
91105

@@ -133,8 +147,12 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
133147
// There's an error.
134148
case err, ok := <-f.w.Errors:
135149
if !ok {
136-
// Only throw an error if it was not an explicit unwatch.
137-
if atomic.LoadUint32(&f.isUnwatched) == 0 {
150+
// Only throw an error if we were still supposed to be watching.
151+
f.mu.Lock()
152+
stillWatching := f.isWatching
153+
f.mu.Unlock()
154+
155+
if stillWatching {
138156
cb(nil, errors.New("fsnotify err channel closed"))
139157
}
140158

@@ -148,25 +166,23 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
148166
}
149167

150168
f.mu.Lock()
151-
atomic.StoreUint32(&f.isWatching, 0)
152-
atomic.StoreUint32(&f.isUnwatched, 0)
169+
f.isWatching = false
153170
if f.w != nil {
154171
f.w.Close()
155172
f.w = nil
156173
}
157174
f.mu.Unlock()
158175
}()
159176

160-
// Watch the directory for changes.
161-
return f.w.Add(fDir)
177+
return nil
162178
}
163179

164180
// Unwatch stops watching the files and closes fsnotify watcher.
165181
func (f *File) Unwatch() error {
166182
f.mu.Lock()
167183
defer f.mu.Unlock()
168184

169-
atomic.StoreUint32(&f.isUnwatched, 1)
185+
f.isWatching = false
170186
if f.w != nil {
171187
return f.w.Close()
172188
}

tests/koanf_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1942,3 +1942,104 @@ func TestNoDeadlock(t *testing.T) {
19421942
t.Fatal("DEADLOCK DETECTED: Goroutines did not complete within timeout")
19431943
}
19441944
}
1945+
1946+
// TestFileProviderConcurrency specifically tests the file provider's synchronization
1947+
// under heavy concurrent load to catch any races in Watch/Unwatch
1948+
func TestFileProviderConcurrency(t *testing.T) {
1949+
if testing.Short() {
1950+
t.Skip("skipping file provider concurrency test in short mode")
1951+
}
1952+
1953+
// Create temp config file
1954+
tmpDir := t.TempDir()
1955+
tmpFile := filepath.Join(tmpDir, "config.json")
1956+
initialData := []byte(`{"test": "value"}`)
1957+
err := os.WriteFile(tmpFile, initialData, 0600)
1958+
if err != nil {
1959+
t.Fatal(err)
1960+
}
1961+
1962+
var wg sync.WaitGroup
1963+
done := make(chan struct{})
1964+
1965+
// Stress test: multiple goroutines rapidly watch/unwatch the same file
1966+
for i := 0; i < 5; i++ {
1967+
wg.Add(1)
1968+
go func(id int) {
1969+
defer wg.Done()
1970+
1971+
for {
1972+
select {
1973+
case <-done:
1974+
return
1975+
default:
1976+
// Create new file provider for each iteration
1977+
f := file.Provider(tmpFile)
1978+
1979+
// Try to watch
1980+
var watchErr error
1981+
watchErr = f.Watch(func(event interface{}, err error) {
1982+
// Simple callback that doesn't do much
1983+
_ = event
1984+
_ = err
1985+
})
1986+
1987+
// Sometimes watch will fail if another goroutine is already watching
1988+
if watchErr == nil {
1989+
// If watch succeeded, unwatch after a short delay
1990+
time.Sleep(time.Millisecond)
1991+
unwatchErr := f.Unwatch()
1992+
if unwatchErr != nil {
1993+
// Log but don't fail - this can happen during cleanup
1994+
t.Logf("Unwatch error (normal during stress test): %v", unwatchErr)
1995+
}
1996+
} else if watchErr.Error() != "file is already being watched" {
1997+
// Unexpected error
1998+
t.Errorf("Unexpected watch error: %v", watchErr)
1999+
}
2000+
2001+
// Small delay to prevent tight loop
2002+
time.Sleep(time.Microsecond)
2003+
}
2004+
}
2005+
}(i)
2006+
}
2007+
2008+
// Also test concurrent file modifications while watching
2009+
wg.Add(1)
2010+
go func() {
2011+
defer wg.Done()
2012+
counter := 0
2013+
2014+
for {
2015+
select {
2016+
case <-done:
2017+
return
2018+
default:
2019+
// Write new data to trigger file events
2020+
newData := fmt.Sprintf(`{"test": "value", "counter": %d}`, counter)
2021+
os.WriteFile(tmpFile, []byte(newData), 0600)
2022+
counter++
2023+
time.Sleep(5 * time.Millisecond)
2024+
}
2025+
}
2026+
}()
2027+
2028+
// Let the stress test run for 200ms
2029+
time.Sleep(200 * time.Millisecond)
2030+
close(done)
2031+
2032+
// Wait with timeout to detect deadlocks
2033+
waitChan := make(chan struct{})
2034+
go func() {
2035+
wg.Wait()
2036+
close(waitChan)
2037+
}()
2038+
2039+
select {
2040+
case <-waitChan:
2041+
t.Log("File provider concurrency test completed successfully")
2042+
case <-time.After(5 * time.Second):
2043+
t.Fatal("FILE PROVIDER DEADLOCK: Goroutines did not complete within timeout")
2044+
}
2045+
}

0 commit comments

Comments
 (0)