Skip to content

Commit 71e58ee

Browse files
ChangeFeedProcessor: Adds AllVersionsAndDeletes support to ChangeFeedProcessor (#4370)
* preview cfp ffcf * ran updatecontracts * including this in Encryption * fixing name onChangesDelegate * sdkproject on encryptioncontainer * try this again * try, try, try again * with impl
1 parent 75a2e5f commit 71e58ee

File tree

6 files changed

+100
-86
lines changed

6 files changed

+100
-86
lines changed

Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionContainer.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.Encryption.Custom
1010
using System.Linq;
1111
using System.Threading;
1212
using System.Threading.Tasks;
13-
using Microsoft.Azure.Cosmos;
1413
using Newtonsoft.Json.Linq;
1514

1615
internal sealed class EncryptionContainer : Container
@@ -1023,6 +1022,16 @@ public override Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
10231022
}
10241023
#endif
10251024

1025+
#if SDKPROJECTREF
1026+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
1027+
string processorName,
1028+
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
1029+
{
1030+
return this.container.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
1031+
processorName,
1032+
onChangesDelegate);
1033+
}
1034+
#endif
10261035
private async Task<ResponseMessage> ReadManyItemsHelperAsync(
10271036
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
10281037
ReadManyRequestOptions readManyRequestOptions = null,

Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.Encryption
1111
using System.Net;
1212
using System.Threading;
1313
using System.Threading.Tasks;
14-
using Microsoft.Azure.Cosmos;
1514
using Newtonsoft.Json.Linq;
1615

1716
internal sealed class EncryptionContainer : Container
@@ -756,6 +755,14 @@ public override Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
756755
}
757756
#endif
758757

758+
#if SDKPROJECTREF
759+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
760+
string processorName,
761+
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
762+
{
763+
throw new NotImplementedException();
764+
}
765+
#endif
759766
/// <summary>
760767
/// This function handles the scenario where a container is deleted(say from different Client) and recreated with same Id but with different client encryption policy.
761768
/// The idea is to have the container Rid cached and sent out as part of RequestOptions with Container Rid set in "x-ms-cosmos-intended-collection-rid" header.

Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos
1010
using System.Linq;
1111
using System.Threading;
1212
using System.Threading.Tasks;
13-
using Microsoft.Azure.Cosmos.Serializer;
1413

1514
/// <summary>
1615
/// Operations for reading, replacing, or deleting a specific, existing container or item in a container by id.
@@ -1681,6 +1680,81 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
16811680
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
16821681
FeedRange feedRange,
16831682
CancellationToken cancellationToken = default);
1683+
1684+
/// <summary>
1685+
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
1686+
/// </summary>
1687+
/// <typeparam name="T">Document type</typeparam>
1688+
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
1689+
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
1690+
/// <example>
1691+
/// <code language="c#">
1692+
/// <![CDATA[
1693+
/// Container leaseContainer = await this.database.CreateContainerAsync(
1694+
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
1695+
/// cancellationToken: this.cancellationToken);
1696+
///
1697+
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
1698+
///
1699+
/// ChangeFeedProcessor changeFeedProcessor = this.Container
1700+
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
1701+
/// {
1702+
/// Console.WriteLine($"number of documents processed: {documents.Count}");
1703+
///
1704+
/// string id = default;
1705+
/// string pk = default;
1706+
/// string description = default;
1707+
///
1708+
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
1709+
/// {
1710+
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
1711+
/// {
1712+
/// id = changeFeedItem.Current.id.ToString();
1713+
/// pk = changeFeedItem.Current.pk.ToString();
1714+
/// description = changeFeedItem.Current.description.ToString();
1715+
/// }
1716+
/// else
1717+
/// {
1718+
/// id = changeFeedItem.Previous.id.ToString();
1719+
/// pk = changeFeedItem.Previous.pk.ToString();
1720+
/// description = changeFeedItem.Previous.description.ToString();
1721+
/// }
1722+
///
1723+
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
1724+
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
1725+
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
1726+
/// long lsn = changeFeedItem.Metadata.Lsn;
1727+
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
1728+
/// }
1729+
///
1730+
/// return Task.CompletedTask;
1731+
/// })
1732+
/// .WithInstanceName(Guid.NewGuid().ToString())
1733+
/// .WithLeaseContainer(leaseContainer)
1734+
/// .WithErrorNotification((leaseToken, error) =>
1735+
/// {
1736+
/// Console.WriteLine(error.ToString());
1737+
///
1738+
/// return Task.CompletedTask;
1739+
/// })
1740+
/// .Build();
1741+
///
1742+
/// await changeFeedProcessor.StartAsync();
1743+
/// await Task.Delay(1000);
1744+
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
1745+
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
1746+
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
1747+
///
1748+
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
1749+
///
1750+
/// await changeFeedProcessor.StopAsync();
1751+
/// ]]>
1752+
/// </code>
1753+
/// </example>
1754+
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
1755+
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
1756+
string processorName,
1757+
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
16841758
#endif
16851759
}
1686-
}
1760+
}

Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ namespace Microsoft.Azure.Cosmos
1414
using Microsoft.Azure.Cosmos.Query.Core.Monads;
1515
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
1616
using Microsoft.Azure.Cosmos.ReadFeed;
17-
using Microsoft.Azure.Cosmos.Serializer;
1817
using Microsoft.Azure.Cosmos.Tracing;
1918

2019
// This class acts as a wrapper for environments that use SynchronizationContext.
@@ -661,14 +660,5 @@ public override Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
661660
task: (trace) => base.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey, trace, requestOptions, cancellationToken),
662661
openTelemetry: (response) => new OpenTelemetryResponse(response));
663662
}
664-
665-
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
666-
string processorName,
667-
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
668-
{
669-
return base.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
670-
processorName,
671-
onChangesDelegate);
672-
}
673663
}
674664
}

Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -147,82 +147,11 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
147147
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
148148
FeedRange feedRange,
149149
CancellationToken cancellationToken = default);
150-
#endif
151150

152-
/// <summary>
153-
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
154-
/// </summary>
155-
/// <typeparam name="T">Document type</typeparam>
156-
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
157-
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
158-
/// <example>
159-
/// <code language="c#">
160-
/// <![CDATA[
161-
/// Container leaseContainer = await this.database.CreateContainerAsync(
162-
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
163-
/// cancellationToken: this.cancellationToken);
164-
///
165-
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
166-
///
167-
/// ChangeFeedProcessor changeFeedProcessor = this.Container
168-
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
169-
/// {
170-
/// Console.WriteLine($"number of documents processed: {documents.Count}");
171-
///
172-
/// string id = default;
173-
/// string pk = default;
174-
/// string description = default;
175-
///
176-
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
177-
/// {
178-
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
179-
/// {
180-
/// id = changeFeedItem.Current.id.ToString();
181-
/// pk = changeFeedItem.Current.pk.ToString();
182-
/// description = changeFeedItem.Current.description.ToString();
183-
/// }
184-
/// else
185-
/// {
186-
/// id = changeFeedItem.Previous.id.ToString();
187-
/// pk = changeFeedItem.Previous.pk.ToString();
188-
/// description = changeFeedItem.Previous.description.ToString();
189-
/// }
190-
///
191-
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
192-
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
193-
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
194-
/// long lsn = changeFeedItem.Metadata.Lsn;
195-
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
196-
/// }
197-
///
198-
/// return Task.CompletedTask;
199-
/// })
200-
/// .WithInstanceName(Guid.NewGuid().ToString())
201-
/// .WithLeaseContainer(leaseContainer)
202-
/// .WithErrorNotification((leaseToken, error) =>
203-
/// {
204-
/// Console.WriteLine(error.ToString());
205-
///
206-
/// return Task.CompletedTask;
207-
/// })
208-
/// .Build();
209-
///
210-
/// await changeFeedProcessor.StartAsync();
211-
/// await Task.Delay(1000);
212-
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
213-
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
214-
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
215-
///
216-
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
217-
///
218-
/// await changeFeedProcessor.StopAsync();
219-
/// ]]>
220-
/// </code>
221-
/// </example>
222-
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
223151
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
224152
string processorName,
225153
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
154+
#endif
226155

227156
public abstract class TryExecuteQueryResult
228157
{

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,11 @@
299299
"Microsoft.Azure.Cosmos.Container;System.Object;IsAbstract:True;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": {
300300
"Subclasses": {},
301301
"Members": {
302+
"Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes[T](System.String, ChangeFeedHandler`1)": {
303+
"Type": "Method",
304+
"Attributes": [],
305+
"MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes[T](System.String, ChangeFeedHandler`1);IsAbstract:True;IsStatic:False;IsVirtual:True;IsGenericMethod:True;IsConstructor:False;IsFinal:False;"
306+
},
302307
"System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.ResponseMessage] DeleteAllItemsByPartitionKeyStreamAsync(Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)": {
303308
"Type": "Method",
304309
"Attributes": [],

0 commit comments

Comments
 (0)