Skip to content

Commit db91df0

Browse files
Hedging: Fixes Concurrency Issue (#5189)
# Pull Request Template ## Description This pull request refactors the `CrossRegionHedgingAvailabilityStrategy` class in the Azure Cosmos SDK to fix a rare concurrency issue regarding session tokens and improve diagnostics for improved debugging.. It also updates related tests to align with the changes. The most important changes include replacing the `Response Region` diagnostic with a new `Hedge Config` diagnostic, modifying the `HedgingResponse` class, and updating test cases to validate the new behavior. the `Response Region` field can sometimes be inaccurate with internal cross regional retries so it has been removed. To find the response region, you can look at the existing store response to see where the response came from. ## Type of change Please delete options that are not relevant. - [] Bug fix (non-breaking change which fixes an issue) - [] New feature (non-breaking change which adds functionality) ## Closing issues To automatically close an issue: closes #IssueNumber
1 parent 6fca24b commit db91df0

File tree

2 files changed

+200
-117
lines changed

2 files changed

+200
-117
lines changed

Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace Microsoft.Azure.Cosmos
2424
internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInternal
2525
{
2626
private const string HedgeContext = "Hedge Context";
27-
private const string ResponseRegion = "Response Region";
27+
private const string HedgeConfig = "Hedge Config";
2828

2929
/// <summary>
3030
/// Latency threshold which activates the first region hedging
@@ -44,6 +44,8 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
4444
/// </summary>
4545
public bool EnableMultiWriteRegionHedge { get; private set; }
4646

47+
private readonly string HedgeConfigText;
48+
4749
/// <summary>
4850
/// Constructor for hedging availability strategy
4951
/// </summary>
@@ -68,6 +70,8 @@ public CrossRegionHedgingAvailabilityStrategy(
6870
this.Threshold = threshold;
6971
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
7072
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
73+
74+
this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
7175
}
7276

7377
/// <inheritdoc/>
@@ -134,13 +138,12 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
134138
: await StreamExtension.AsClonableStreamAsync(request.Content)))
135139
{
136140
IReadOnlyCollection<string> hedgeRegions = client.DocumentClient.GlobalEndpointManager
137-
.GetApplicableRegions(
138-
request.RequestOptions?.ExcludeRegions,
139-
OperationTypeExtensions.IsReadOperation(request.OperationType));
141+
.GetApplicableRegions(
142+
request.RequestOptions?.ExcludeRegions,
143+
OperationTypeExtensions.IsReadOperation(request.OperationType));
140144

141145
List<Task> requestTasks = new List<Task>(hedgeRegions.Count + 1);
142146

143-
Task<HedgingResponse> primaryRequest = null;
144147
HedgingResponse hedgeResponse = null;
145148

146149
//Send out hedged requests
@@ -153,33 +156,17 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
153156
CancellationToken timerToken = timerTokenSource.Token;
154157
using (Task hedgeTimer = Task.Delay(awaitTime, timerToken))
155158
{
156-
if (requestNumber == 0)
157-
{
158-
primaryRequest = this.RequestSenderAndResultCheckAsync(
159-
sender,
160-
request,
161-
hedgeRegions.ElementAt(requestNumber),
162-
cancellationToken,
163-
cancellationTokenSource,
164-
trace);
165-
166-
requestTasks.Add(primaryRequest);
167-
}
168-
else
169-
{
170-
Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
171-
sender: sender,
172-
request: request,
173-
clonedBody: clonedBody,
174-
hedgeRegions: hedgeRegions,
175-
requestNumber: requestNumber,
176-
trace: trace,
177-
cancellationToken: cancellationToken,
178-
cancellationTokenSource: cancellationTokenSource);
179-
180-
requestTasks.Add(requestTask);
181-
}
182-
159+
Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
160+
sender: sender,
161+
request: request,
162+
clonedBody: clonedBody,
163+
hedgeRegions: hedgeRegions,
164+
requestNumber: requestNumber,
165+
trace: trace,
166+
cancellationToken: cancellationToken,
167+
cancellationTokenSource: cancellationTokenSource);
168+
169+
requestTasks.Add(requestTask);
183170
requestTasks.Add(hedgeTimer);
184171

185172
Task completedTask = await Task.WhenAny(requestTasks);
@@ -202,13 +189,14 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
202189
if (hedgeResponse.IsNonTransient)
203190
{
204191
cancellationTokenSource.Cancel();
192+
193+
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
194+
HedgeConfig,
195+
this.HedgeConfigText);
205196
//Take is not inclusive, so we need to add 1 to the request number which starts at 0
206197
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
207198
HedgeContext,
208199
hedgeRegions.Take(requestNumber + 1));
209-
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
210-
ResponseRegion,
211-
hedgeResponse.ResponseRegion);
212200
return hedgeResponse.ResponseMessage;
213201
}
214202
}
@@ -231,12 +219,12 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
231219
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
232220
{
233221
cancellationTokenSource.Cancel();
222+
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
223+
HedgeConfig,
224+
this.HedgeConfigText);
234225
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
235226
HedgeContext,
236227
hedgeRegions);
237-
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
238-
ResponseRegion,
239-
hedgeResponse.ResponseRegion);
240228
return hedgeResponse.ResponseMessage;
241229
}
242230
}
@@ -270,15 +258,17 @@ private async Task<HedgingResponse> CloneAndSendAsync(
270258
{
271259
clonedRequest.RequestOptions ??= new RequestOptions();
272260

273-
List<string> excludeRegions = new List<string>(hedgeRegions);
274-
string region = excludeRegions[requestNumber];
275-
excludeRegions.RemoveAt(requestNumber);
276-
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
261+
//we do not want to exclude any regions for the primary request
262+
if (requestNumber > 0)
263+
{
264+
List<string> excludeRegions = new List<string>(hedgeRegions);
265+
excludeRegions.RemoveAt(requestNumber);
266+
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
267+
}
277268

278269
return await this.RequestSenderAndResultCheckAsync(
279270
sender,
280271
clonedRequest,
281-
region,
282272
cancellationToken,
283273
cancellationTokenSource,
284274
trace);
@@ -288,7 +278,6 @@ private async Task<HedgingResponse> CloneAndSendAsync(
288278
private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
289279
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
290280
RequestMessage request,
291-
string hedgedRegion,
292281
CancellationToken cancellationToken,
293282
CancellationTokenSource cancellationTokenSource,
294283
ITrace trace)
@@ -303,12 +292,12 @@ private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
303292
cancellationTokenSource.Cancel();
304293
}
305294

306-
return new HedgingResponse(true, response, hedgedRegion);
295+
return new HedgingResponse(true, response);
307296
}
308297

309-
return new HedgingResponse(false, response, hedgedRegion);
298+
return new HedgingResponse(false, response);
310299
}
311-
catch (OperationCanceledException oce ) when (cancellationTokenSource.IsCancellationRequested)
300+
catch (OperationCanceledException oce) when (cancellationTokenSource.IsCancellationRequested)
312301
{
313302
throw new CosmosOperationCanceledException(oce, trace);
314303
}
@@ -348,13 +337,11 @@ private sealed class HedgingResponse
348337
{
349338
public readonly bool IsNonTransient;
350339
public readonly ResponseMessage ResponseMessage;
351-
public readonly string ResponseRegion;
352340

353-
public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion)
341+
public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage)
354342
{
355343
this.IsNonTransient = isNonTransient;
356344
this.ResponseMessage = responseMessage;
357-
this.ResponseRegion = responseRegion;
358345
}
359346
}
360347
}

0 commit comments

Comments
 (0)