@@ -13,6 +13,7 @@ import (
13
13
"net/url"
14
14
"strconv"
15
15
"strings"
16
+ "sync/atomic"
16
17
"time"
17
18
18
19
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
@@ -25,14 +26,17 @@ type configurationClientManager struct {
25
26
replicaDiscoveryEnabled bool
26
27
clientOptions * azappconfig.ClientOptions
27
28
staticClient * configurationClientWrapper
28
- dynamicClients [ ]* configurationClientWrapper
29
+ dynamicClients atomic. Pointer [[ ]* configurationClientWrapper ] // atomic pointer to slice
29
30
endpoint string
30
31
validDomain string
31
32
credential azcore.TokenCredential
32
33
secret string
33
34
id string
34
35
lastFallbackClientAttempt time.Time
35
36
lastFallbackClientRefresh time.Time
37
+
38
+ // discoveryInProgress prevents concurrent replica discovery operations
39
+ discoveryInProgress atomic.Bool
36
40
}
37
41
38
42
// configurationClientWrapper wraps an Azure App Configuration client with additional metadata
@@ -110,7 +114,12 @@ func (manager *configurationClientManager) initializeClient(authOptions Authenti
110
114
111
115
func (manager * configurationClientManager ) getClients (ctx context.Context ) ([]* configurationClientWrapper , error ) {
112
116
currentTime := time .Now ()
113
- clients := make ([]* configurationClientWrapper , 0 , 1 + len (manager .dynamicClients ))
117
+ dynamicClients := manager .dynamicClients .Load () // atomic read
118
+ clientsCapacity := 1
119
+ if dynamicClients != nil {
120
+ clientsCapacity += len (* dynamicClients )
121
+ }
122
+ clients := make ([]* configurationClientWrapper , 0 , clientsCapacity )
114
123
115
124
// Add the static client if it is not in backoff
116
125
if currentTime .After (manager .staticClient .backOffEndTime ) {
@@ -122,16 +131,18 @@ func (manager *configurationClientManager) getClients(ctx context.Context) ([]*c
122
131
}
123
132
124
133
if currentTime .After (manager .lastFallbackClientAttempt .Add (minimalClientRefreshInterval )) &&
125
- (manager . dynamicClients == nil ||
134
+ (dynamicClients == nil ||
126
135
currentTime .After (manager .lastFallbackClientRefresh .Add (fallbackClientRefreshExpireInterval ))) {
127
136
manager .lastFallbackClientAttempt = currentTime
128
137
url , _ := url .Parse (manager .endpoint )
129
- manager .discoverFallbackClients (ctx , url .Host )
138
+ manager .discoverFallbackClients (url .Host )
130
139
}
131
140
132
- for _ , clientWrapper := range manager .dynamicClients {
133
- if currentTime .After (clientWrapper .backOffEndTime ) {
134
- clients = append (clients , clientWrapper )
141
+ if dynamicClients != nil {
142
+ for _ , clientWrapper := range * dynamicClients {
143
+ if currentTime .After (clientWrapper .backOffEndTime ) {
144
+ clients = append (clients , clientWrapper )
145
+ }
135
146
}
136
147
}
137
148
@@ -144,21 +155,36 @@ func (manager *configurationClientManager) refreshClients(ctx context.Context) {
144
155
currentTime .After (manager .lastFallbackClientAttempt .Add (minimalClientRefreshInterval )) {
145
156
manager .lastFallbackClientAttempt = currentTime
146
157
url , _ := url .Parse (manager .endpoint )
147
- manager .discoverFallbackClients (ctx , url .Host )
158
+ manager .discoverFallbackClients (url .Host )
148
159
}
149
160
}
150
161
151
- func (manager * configurationClientManager ) discoverFallbackClients (ctx context.Context , host string ) {
152
- newCtx , cancel := context .WithTimeout (ctx , failoverTimeout )
153
- defer cancel ()
154
-
155
- srvTargetHosts , err := querySrvTargetHost (newCtx , host )
156
- if err != nil {
157
- log .Printf ("failed to discover fallback clients for %s: %v" , host , err )
158
- return
162
+ func (manager * configurationClientManager ) discoverFallbackClients (host string ) {
163
+ if ! manager .discoveryInProgress .CompareAndSwap (false , true ) {
164
+ return // Another discovery is already in progress
159
165
}
160
166
161
- manager .processSrvTargetHosts (srvTargetHosts )
167
+ // Reset the flag when we're done
168
+ defer manager .discoveryInProgress .Store (false )
169
+
170
+ go func () {
171
+ defer func () {
172
+ if r := recover (); r != nil {
173
+ log .Printf ("panic in replica discovery: %v" , r )
174
+ }
175
+ }()
176
+
177
+ discoveryCtx , cancel := context .WithTimeout (context .Background (), failoverTimeout )
178
+ defer cancel ()
179
+
180
+ srvTargetHosts , err := querySrvTargetHost (discoveryCtx , host )
181
+ if err != nil {
182
+ log .Printf ("failed to discover fallback clients for %s: %v" , host , err )
183
+ return
184
+ }
185
+
186
+ manager .processSrvTargetHosts (srvTargetHosts )
187
+ }()
162
188
}
163
189
164
190
func (manager * configurationClientManager ) processSrvTargetHosts (srvTargetHosts []string ) {
@@ -188,7 +214,7 @@ func (manager *configurationClientManager) processSrvTargetHosts(srvTargetHosts
188
214
}
189
215
}
190
216
191
- manager .dynamicClients = newDynamicClients
217
+ manager .dynamicClients . Store ( & newDynamicClients ) // atomic write
192
218
manager .lastFallbackClientRefresh = time .Now ()
193
219
}
194
220
0 commit comments