Skip to content

Commit d5f17f2

Browse files
authored
Query: Fixes non streaming OrderByCrossPartitionQueryPipelineStage to remove state and handle splits (#4493)
* simplify OrderByCrossPartitionQueryPipelineStage to remove state and be a static class. * Add more test coverage for splits/merge and request charge * Add more test coverage for splits * incorporate code review feedback
1 parent 4746295 commit d5f17f2

File tree

6 files changed

+667
-535
lines changed

6 files changed

+667
-535
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// ------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
6+
{
7+
using System;
8+
using System.Collections.Generic;
9+
10+
internal sealed class BufferedOrderByResults
11+
{
12+
public IEnumerator<OrderByQueryResult> Enumerator { get; }
13+
14+
public int Count { get; }
15+
16+
public double TotalRequestCharge { get; }
17+
18+
public QueryPageParameters QueryPageParameters { get; }
19+
20+
public BufferedOrderByResults(
21+
IEnumerator<OrderByQueryResult> enumerator,
22+
int itemCount,
23+
double totalRequestCharge,
24+
QueryPageParameters queryPageParameters)
25+
{
26+
this.Enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
27+
this.Count = itemCount;
28+
this.TotalRequestCharge = totalRequestCharge;
29+
this.QueryPageParameters = queryPageParameters ?? throw new ArgumentNullException(nameof(queryPageParameters));
30+
}
31+
}
32+
}

Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionEnumerator.cs

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
// Copyright (c) Microsoft Corporation. All rights reserved.
33
// ------------------------------------------------------------
44

5-
namespace Microsoft.Azure.Cosmos.Pagination
5+
namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
66
{
77
using System;
88
using System.Collections;
99
using System.Collections.Generic;
1010
using System.Threading;
1111
using System.Threading.Tasks;
1212
using Microsoft.Azure.Cosmos.CosmosElements;
13+
using Microsoft.Azure.Cosmos.Pagination;
1314
using Microsoft.Azure.Cosmos.Query.Core.Collections;
1415
using Microsoft.Azure.Cosmos.Query.Core.Monads;
15-
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy;
1616
using Microsoft.Azure.Cosmos.Tracing;
1717

1818
internal sealed class OrderByCrossPartitionEnumerator : IEnumerator<OrderByQueryResult>
@@ -25,69 +25,86 @@ internal sealed class OrderByCrossPartitionEnumerator : IEnumerator<OrderByQuery
2525

2626
object IEnumerator.Current => this.Current;
2727

28-
public OrderByCrossPartitionEnumerator(PriorityQueue<IEnumerator<OrderByQueryResult>> queue)
28+
private OrderByCrossPartitionEnumerator(PriorityQueue<IEnumerator<OrderByQueryResult>> queue)
2929
{
3030
this.queue = queue ?? throw new ArgumentNullException(nameof(queue));
3131
}
3232

33-
public static async Task<(IEnumerator<OrderByQueryResult> orderbyQueryResultEnumerator, double totalRequestCharge)> CreateAsync(
34-
IEnumerable<OrderByQueryPartitionRangePageAsyncEnumerator> enumerators,
33+
public static async Task<BufferedOrderByResults> CreateAsync(
34+
ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>> enumerator,
3535
IComparer<OrderByQueryResult> comparer,
3636
int levelSize,
3737
ITrace trace,
3838
CancellationToken cancellationToken)
3939
{
40-
if (enumerators == null)
40+
if (enumerator == null)
4141
{
42-
throw new ArgumentNullException(nameof(enumerators));
42+
throw new ArgumentNullException(nameof(enumerator));
4343
}
4444

4545
if (comparer == null)
4646
{
4747
throw new ArgumentNullException(nameof(comparer));
4848
}
4949

50+
QueryPageParameters queryPageParameters = null;
5051
double totalRequestCharge = 0;
52+
int bufferedItemCount = 0;
5153
EnumeratorComparer enumeratorComparer = new EnumeratorComparer(comparer);
5254
PriorityQueue<IEnumerator<OrderByQueryResult>> queue = new PriorityQueue<IEnumerator<OrderByQueryResult>>(enumeratorComparer);
53-
foreach (ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>> enumerator in enumerators)
55+
while (await enumerator.MoveNextAsync(trace, cancellationToken))
5456
{
55-
while (await enumerator.MoveNextAsync(trace, cancellationToken))
57+
TryCatch<OrderByQueryPage> currentPage = enumerator.Current;
58+
if (currentPage.Failed)
5659
{
57-
TryCatch<OrderByQueryPage> currentPage = enumerator.Current;
58-
if (currentPage.Failed)
59-
{
60-
throw currentPage.Exception;
61-
}
60+
throw currentPage.Exception;
61+
}
6262

63-
totalRequestCharge += currentPage.Result.RequestCharge;
64-
IReadOnlyList<CosmosElement> page = currentPage.Result.Page.Documents;
63+
if (queryPageParameters == null)
64+
{
65+
queryPageParameters = new QueryPageParameters(
66+
activityId: currentPage.Result.ActivityId,
67+
cosmosQueryExecutionInfo: currentPage.Result.Page.CosmosQueryExecutionInfo,
68+
distributionPlanSpec: currentPage.Result.Page.DistributionPlanSpec,
69+
additionalHeaders: currentPage.Result.AdditionalHeaders);
70+
}
6571

66-
if (page.Count > 0)
67-
{
68-
PageEnumerator pageEnumerator = new PageEnumerator(page);
69-
pageEnumerator.MoveNext();
72+
totalRequestCharge += currentPage.Result.RequestCharge;
73+
IReadOnlyList<CosmosElement> page = currentPage.Result.Page.Documents;
74+
bufferedItemCount += page.Count;
75+
76+
if (page.Count > 0)
77+
{
78+
PageEnumerator pageEnumerator = new PageEnumerator(page);
79+
pageEnumerator.MoveNext();
7080

71-
queue.Enqueue(pageEnumerator);
81+
queue.Enqueue(pageEnumerator);
7282

73-
if (queue.Count >= levelSize)
74-
{
75-
OrderByCrossPartitionEnumerator newEnumerator = new OrderByCrossPartitionEnumerator(queue);
76-
newEnumerator.MoveNext();
83+
if (queue.Count >= levelSize)
84+
{
85+
OrderByCrossPartitionEnumerator newEnumerator = new OrderByCrossPartitionEnumerator(queue);
86+
newEnumerator.MoveNext();
7787

78-
queue = new PriorityQueue<IEnumerator<OrderByQueryResult>>(enumeratorComparer);
79-
queue.Enqueue(newEnumerator);
80-
}
88+
queue = new PriorityQueue<IEnumerator<OrderByQueryResult>>(enumeratorComparer);
89+
queue.Enqueue(newEnumerator);
8190
}
8291
}
8392
}
8493

8594
if (queue.Count == 0)
8695
{
87-
return (EmptyEnumerator.Instance, totalRequestCharge);
96+
return new BufferedOrderByResults(
97+
EmptyEnumerator.Instance,
98+
itemCount: 0,
99+
totalRequestCharge,
100+
queryPageParameters);
88101
}
89102

90-
return (new OrderByCrossPartitionEnumerator(queue), totalRequestCharge);
103+
return new BufferedOrderByResults(
104+
new OrderByCrossPartitionEnumerator(queue),
105+
bufferedItemCount,
106+
totalRequestCharge,
107+
queryPageParameters);
91108
}
92109

93110
public bool MoveNext()

0 commit comments

Comments
 (0)