Skip to content

Commit bda8290

Browse files
authored
Switch non streaming order by to use flag from query plan instead of _streaming in the response (#4459)
1 parent bf2f5ee commit bda8290

File tree

5 files changed

+63
-37
lines changed

5 files changed

+63
-37
lines changed

Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,17 @@ private sealed class InitializationParameters
5555

5656
public int MaxConcurrency { get; }
5757

58+
public bool NonStreamingOrderBy { get; }
59+
5860
public InitializationParameters(
5961
IDocumentContainer documentContainer,
6062
SqlQuerySpec sqlQuerySpec,
6163
IReadOnlyList<FeedRangeEpk> targetRanges,
6264
PartitionKey? partitionKey,
6365
IReadOnlyList<OrderByColumn> orderByColumns,
6466
QueryPaginationOptions queryPaginationOptions,
65-
int maxConcurrency)
67+
int maxConcurrency,
68+
bool nonStreamingOrderBy)
6669
{
6770
this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
6871
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
@@ -71,6 +74,7 @@ public InitializationParameters(
7174
this.OrderByColumns = orderByColumns ?? throw new ArgumentNullException(nameof(orderByColumns));
7275
this.QueryPaginationOptions = queryPaginationOptions ?? throw new ArgumentNullException(nameof(queryPaginationOptions));
7376
this.MaxConcurrency = maxConcurrency;
77+
this.NonStreamingOrderBy = nonStreamingOrderBy;
7478
}
7579
}
7680

@@ -181,6 +185,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
181185
IReadOnlyList<OrderByColumn> orderByColumns,
182186
QueryPaginationOptions queryPaginationOptions,
183187
int maxConcurrency,
188+
bool nonStreamingOrderBy,
184189
CosmosElement continuationToken)
185190
{
186191
if (documentContainer == null)
@@ -233,24 +238,28 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
233238
partitionKey,
234239
orderByColumns,
235240
queryPaginationOptions,
236-
maxConcurrency);
241+
maxConcurrency,
242+
nonStreamingOrderBy);
237243

238244
return TryCatch<IQueryPipelineStage>.FromResult(new OrderByCrossPartitionQueryPipelineStage(init));
239245
}
240246

241-
private static async ValueTask<(TryCatch<IQueryPipelineStage>, Queue<QueryPage>)> MoveNextAsync_InitializeAsync(InitializationParameters init, ITrace trace, CancellationToken cancellationToken)
247+
private static async ValueTask<(TryCatch<IQueryPipelineStage>, Queue<QueryPage>)> MoveNextAsync_InitializeAsync(
248+
InitializationParameters parameters,
249+
ITrace trace,
250+
CancellationToken cancellationToken)
242251
{
243252
SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
244-
init.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
245-
init.SqlQuerySpec.Parameters);
253+
parameters.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
254+
parameters.SqlQuerySpec.Parameters);
246255

247-
List<OrderByQueryPartitionRangePageAsyncEnumerator> uninitializedEnumerators = init.TargetRanges
256+
List<OrderByQueryPartitionRangePageAsyncEnumerator> uninitializedEnumerators = parameters.TargetRanges
248257
.Select(range => OrderByQueryPartitionRangePageAsyncEnumerator.Create(
249-
init.DocumentContainer,
258+
parameters.DocumentContainer,
250259
rewrittenQueryForOrderBy,
251260
new FeedRangeState<QueryState>(range, state: default),
252-
init.PartitionKey,
253-
init.QueryPaginationOptions,
261+
parameters.PartitionKey,
262+
parameters.QueryPaginationOptions,
254263
TrueFilter,
255264
PrefetchPolicy.PrefetchSinglePage))
256265
.ToList();
@@ -259,13 +268,12 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
259268
uninitializedEnumerators
260269
.Select(x => (x, (OrderByContinuationToken)null)));
261270

262-
await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, init.MaxConcurrency, trace, cancellationToken);
271+
await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, parameters.MaxConcurrency, trace, cancellationToken);
263272

264-
IReadOnlyList<SortOrder> sortOrders = init.OrderByColumns.Select(column => column.SortOrder).ToList();
273+
IReadOnlyList<SortOrder> sortOrders = parameters.OrderByColumns.Select(column => column.SortOrder).ToList();
265274
PriorityQueue<OrderByQueryPartitionRangePageAsyncEnumerator> initializedEnumerators = new PriorityQueue<OrderByQueryPartitionRangePageAsyncEnumerator>(new OrderByEnumeratorComparer(sortOrders));
266275
Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens = new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>();
267276

268-
bool nonStreaming = false;
269277
Queue<QueryPage> bufferedPages = new Queue<QueryPage>();
270278
QueryPageParameters queryPageParameters = null;
271279
while (uninitializedEnumeratorsAndTokens.Count != 0)
@@ -278,7 +286,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
278286
if (IsSplitException(enumerator.Current.Exception))
279287
{
280288
await MoveNextAsync_InitializeAsync_HandleSplitAsync(
281-
init.DocumentContainer,
289+
parameters.DocumentContainer,
282290
uninitializedEnumeratorsAndTokens,
283291
enumerator,
284292
token,
@@ -307,9 +315,6 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
307315
additionalHeaders: page.AdditionalHeaders);
308316
}
309317

310-
// For backwards compatibility the default value of streaming for ORDER BY is _true_
311-
nonStreaming = nonStreaming || (!page.Streaming.GetValueOrDefault(true) && (page.State != null));
312-
313318
if (enumerator.Current.Result.Enumerator.MoveNext())
314319
{
315320
// the page is non-empty then we need to enqueue the enumerator in the PriorityQueue
@@ -335,7 +340,7 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
335340
}
336341

337342
IQueryPipelineStage pipelineStage;
338-
if (nonStreaming)
343+
if (parameters.NonStreamingOrderBy)
339344
{
340345
Queue<OrderByQueryPartitionRangePageAsyncEnumerator> orderbyEnumerators = new Queue<OrderByQueryPartitionRangePageAsyncEnumerator>();
341346
foreach ((OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken _) in enumeratorsAndTokens)
@@ -350,10 +355,10 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
350355
orderbyEnumerators.Enqueue(bufferedEnumerator);
351356
}
352357

353-
await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, init.MaxConcurrency, trace, cancellationToken);
358+
await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, parameters.MaxConcurrency, trace, cancellationToken);
354359

355360
pipelineStage = await NonStreamingOrderByPipelineStage.CreateAsync(
356-
init.QueryPaginationOptions,
361+
parameters.QueryPaginationOptions,
357362
sortOrders,
358363
orderbyEnumerators,
359364
queryPageParameters,
@@ -363,12 +368,12 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
363368
else
364369
{
365370
pipelineStage = StreamingOrderByCrossPartitionQueryPipelineStage.Create(
366-
init.DocumentContainer,
371+
parameters.DocumentContainer,
367372
sortOrders,
368373
initializedEnumerators,
369374
enumeratorsAndTokens,
370-
init.QueryPaginationOptions,
371-
init.MaxConcurrency);
375+
parameters.QueryPaginationOptions,
376+
parameters.MaxConcurrency);
372377
}
373378

374379
return (TryCatch<IQueryPipelineStage>.FromResult(pipelineStage), bufferedPages);

Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
7878
.OrderByExpressions
7979
.Zip(queryInfo.OrderBy, (expression, sortOrder) => new OrderByColumn(expression, sortOrder)).ToList(),
8080
queryPaginationOptions: queryPaginationOptions,
81-
maxConcurrency: maxConcurrency,
81+
maxConcurrency: maxConcurrency,
82+
nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy,
8283
continuationToken: continuationToken);
8384
}
8485
else

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,16 @@ FROM c
4444
[Benchmark(Baseline = true)]
4545
public Task StreamingOrderByPipelineStage()
4646
{
47-
return CreateAndRunPipeline(StreamingContainer);
47+
return CreateAndRunPipeline(StreamingContainer, nonStreamingOrderBy: false);
4848
}
4949

5050
[Benchmark]
5151
public Task NonStreamingOrderByPipelineStage()
5252
{
53-
return CreateAndRunPipeline(NonStreamingContainer);
53+
return CreateAndRunPipeline(NonStreamingContainer, nonStreamingOrderBy: true);
5454
}
5555

56-
private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer)
56+
private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer, bool nonStreamingOrderBy)
5757
{
5858
IReadOnlyList<FeedRangeEpk> ranges = await documentContainer.GetFeedRangesAsync(
5959
trace: NoOpTrace.Singleton,
@@ -67,6 +67,7 @@ private static async Task CreateAndRunPipeline(IDocumentContainer documentContai
6767
orderByColumns: OrderByColumns,
6868
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: EndUserPageSize),
6969
maxConcurrency: MaxConcurrency,
70+
nonStreamingOrderBy: nonStreamingOrderBy,
7071
continuationToken: null);
7172

7273
IQueryPipelineStage pipeline = pipelineStage.Result;

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,16 @@ private static async Task RunParityTests(
228228
ranges: ranges,
229229
queryText: testCase.QueryText,
230230
orderByColumns: testCase.OrderByColumns,
231-
pageSize: pageSize);
231+
pageSize: pageSize,
232+
nonStreamingOrderBy: true);
232233

233234
IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
234235
documentContainer: documentContainer,
235236
ranges: ranges,
236237
queryText: testCase.QueryText,
237238
orderByColumns: testCase.OrderByColumns,
238-
pageSize: pageSize);
239+
pageSize: pageSize,
240+
nonStreamingOrderBy: false);
239241

240242
if (!streamingResult.SequenceEqual(nonStreamingResult))
241243
{
@@ -255,7 +257,8 @@ private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStag
255257
IReadOnlyList<FeedRangeEpk> ranges,
256258
string queryText,
257259
IReadOnlyList<OrderByColumn> orderByColumns,
258-
int pageSize)
260+
int pageSize,
261+
bool nonStreamingOrderBy)
259262
{
260263
TryCatch<IQueryPipelineStage> pipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
261264
documentContainer: documentContainer,
@@ -265,6 +268,7 @@ private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStag
265268
orderByColumns: orderByColumns,
266269
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: pageSize),
267270
maxConcurrency: MaxConcurrency,
271+
nonStreamingOrderBy: nonStreamingOrderBy,
268272
continuationToken: null);
269273

270274
Assert.IsTrue(pipelineStage.Succeeded);
@@ -311,15 +315,17 @@ private static async Task RunParityTests(IReadOnlyList<ParityTestCase> testCases
311315
ranges: ranges,
312316
queryText: testCase.QueryText,
313317
orderByColumns: testCase.OrderByColumns,
314-
pageSize: pageSize);
318+
pageSize: pageSize,
319+
nonStreamingOrderBy: true);
315320

316321
DebugTraceHelpers.TraceStreamingPipelineStarting();
317322
IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
318323
documentContainer: streamingDocumentContainer,
319324
ranges: ranges,
320325
queryText: testCase.QueryText,
321326
orderByColumns: testCase.OrderByColumns,
322-
pageSize: pageSize);
327+
pageSize: pageSize,
328+
nonStreamingOrderBy: false);
323329

324330
if (!streamingResult.SequenceEqual(nonStreamingResult))
325331
{

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public void MonadicCreate_NullContinuationToken()
7777
new OrderByColumn("_ts", SortOrder.Ascending)
7878
},
7979
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
80-
maxConcurrency: 10,
80+
maxConcurrency: 10,
81+
nonStreamingOrderBy: false,
8182
continuationToken: null);
8283
Assert.IsTrue(monadicCreate.Succeeded);
8384
}
@@ -98,6 +99,7 @@ public void MonadicCreate_NonCosmosArrayContinuationToken()
9899
},
99100
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
100101
maxConcurrency: 10,
102+
nonStreamingOrderBy: false,
101103
continuationToken: CosmosObject.Create(new Dictionary<string, CosmosElement>()));
102104
Assert.IsTrue(monadicCreate.Failed);
103105
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
@@ -119,6 +121,7 @@ public void MonadicCreate_EmptyArrayContinuationToken()
119121
},
120122
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
121123
maxConcurrency: 10,
124+
nonStreamingOrderBy: false,
122125
continuationToken: CosmosArray.Create(new List<CosmosElement>()));
123126
Assert.IsTrue(monadicCreate.Failed);
124127
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
@@ -140,6 +143,7 @@ public void MonadicCreate_NonParallelContinuationToken()
140143
},
141144
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
142145
maxConcurrency: 10,
146+
nonStreamingOrderBy: false,
143147
continuationToken: CosmosArray.Create(new List<CosmosElement>() { CosmosString.Create("asdf") }));
144148
Assert.IsTrue(monadicCreate.Failed);
145149
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
@@ -176,6 +180,7 @@ public void MonadicCreate_SingleOrderByContinuationToken()
176180
},
177181
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
178182
maxConcurrency: 10,
183+
nonStreamingOrderBy: false,
179184
continuationToken: CosmosArray.Create(
180185
new List<CosmosElement>()
181186
{
@@ -220,6 +225,7 @@ public void MonadicCreate_SingleOrderByContinuationToken()
220225
},
221226
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
222227
maxConcurrency: 10,
228+
nonStreamingOrderBy: false,
223229
continuationToken: CosmosArray.Create(
224230
new List<CosmosElement>()
225231
{
@@ -279,6 +285,7 @@ public void MonadicCreate_MultipleOrderByContinuationToken()
279285
},
280286
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
281287
maxConcurrency: 10,
288+
nonStreamingOrderBy: false,
282289
continuationToken: CosmosArray.Create(
283290
new List<CosmosElement>()
284291
{
@@ -321,6 +328,7 @@ public void MonadicCreate_OrderByWithResumeValues()
321328
},
322329
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
323330
maxConcurrency: 10,
331+
nonStreamingOrderBy: false,
324332
continuationToken: CosmosArray.Create(
325333
new List<CosmosElement>()
326334
{
@@ -361,7 +369,8 @@ public void MonadicCreate_OrderByWithResumeValues()
361369
new OrderByColumn("item2", SortOrder.Ascending)
362370
},
363371
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
364-
maxConcurrency: 10,
372+
maxConcurrency: 10,
373+
nonStreamingOrderBy: false,
365374
continuationToken: CosmosArray.Create(
366375
new List<CosmosElement>()
367376
{
@@ -416,7 +425,8 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy
416425
new OrderByColumn("c._ts", SortOrder.Ascending)
417426
},
418427
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 1),
419-
maxConcurrency: 0,
428+
maxConcurrency: 0,
429+
nonStreamingOrderBy: false,
420430
continuationToken: CosmosElement.Parse(continuationToken));
421431
Assert.IsTrue(monadicCreate.Succeeded);
422432

@@ -451,7 +461,8 @@ FROM c
451461
new OrderByColumn("c._ts", SortOrder.Ascending)
452462
},
453463
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
454-
maxConcurrency: 10,
464+
maxConcurrency: 10,
465+
nonStreamingOrderBy: false,
455466
continuationToken: null);
456467
Assert.IsTrue(monadicCreate.Succeeded);
457468
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@@ -500,7 +511,8 @@ FROM c
500511
new OrderByColumn("c._ts", SortOrder.Ascending)
501512
},
502513
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
503-
maxConcurrency: 10,
514+
maxConcurrency: 10,
515+
nonStreamingOrderBy: false,
504516
continuationToken: null);
505517
Assert.IsTrue(monadicCreate.Succeeded);
506518
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@@ -557,7 +569,8 @@ FROM c
557569
new OrderByColumn("c.pk", SortOrder.Ascending)
558570
},
559571
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
560-
maxConcurrency: 10,
572+
maxConcurrency: 10,
573+
nonStreamingOrderBy: false,
561574
continuationToken: continuationToken);
562575
monadicQueryPipelineStage.ThrowIfFailed();
563576
IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result;

0 commit comments

Comments
 (0)