Skip to content

Commit 57a5aa1

Browse files
authored
Merge branch 'master' into users/nalutripician/directUpgrade3.33.0
2 parents dc30caa + dbdf806 commit 57a5aa1

16 files changed

+394
-44
lines changed

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,8 @@ private async Task InitializeLeaseStoreAsync(ITrace trace, CancellationToken can
334334
monitoredContainer: this.monitoredContainer,
335335
leaseContainer: this.leaseContainer,
336336
leaseContainerPrefix: leasePrefix,
337-
instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName);
337+
instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName,
338+
changeFeedMode: ChangeFeedMode.LatestVersion);
338339

339340
this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
340341
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,14 @@ private async Task InitializeLeaseStoreAsync()
139139
{
140140
string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default);
141141
string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid);
142-
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
143-
monitoredContainer: this.monitoredContainer,
144-
leaseContainer: this.leaseContainer,
145-
leaseContainerPrefix: leasePrefix,
146-
instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName);
142+
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder
143+
.InitializeAsync(
144+
monitoredContainer: this.monitoredContainer,
145+
leaseContainer: this.leaseContainer,
146+
leaseContainerPrefix: leasePrefix,
147+
instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName,
148+
changeFeedMode: ChangeFeedMode.LatestVersion)
149+
.ConfigureAwait(false);
147150

148151
this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
149152
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ public ChangeFeedProcessorBuilder WithInstanceName(string instanceName)
7070
}
7171

7272
/// <summary>
73-
/// Sets the mode for the change freed processor.
73+
/// Sets the mode for the change feed processor.
7474
/// </summary>
7575
/// <param name="changeFeedMode"></param>
7676
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
7777
internal ChangeFeedProcessorBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode)
7878
{
7979
this.changeFeedProcessorOptions.Mode = changeFeedMode;
80+
this.changeFeedLeaseOptions.Mode = changeFeedMode;
81+
8082
return this;
8183
}
8284

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Microsoft.Azure.Cosmos.ChangeFeed
66
{
77
using System;
8+
using System.Collections.Generic;
89
using System.Threading.Tasks;
910
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
1011
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
@@ -14,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
1415
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
1516
using Microsoft.Azure.Cosmos.Core.Trace;
1617
using Microsoft.Azure.Cosmos.Tracing;
18+
using Microsoft.Azure.Documents;
1719

1820
internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor
1921
{
@@ -75,19 +77,36 @@ public override async Task StopAsync()
7577
private async Task InitializeAsync()
7678
{
7779
string containerRid = await this.monitoredContainer.GetCachedRIDAsync(
78-
forceRefresh: false,
79-
NoOpTrace.Singleton,
80+
forceRefresh: false,
81+
NoOpTrace.Singleton,
8082
default);
83+
8184
string monitoredDatabaseAndContainerRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync();
8285
string leaseContainerPrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredDatabaseAndContainerRid);
8386
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.monitoredContainer.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton);
8487
if (this.documentServiceLeaseStoreManager == null)
8588
{
86-
this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(this.monitoredContainer, this.leaseContainer, leaseContainerPrefix, this.instanceName).ConfigureAwait(false);
89+
this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder
90+
.InitializeAsync(
91+
this.monitoredContainer,
92+
this.leaseContainer,
93+
leaseContainerPrefix,
94+
this.instanceName,
95+
changeFeedMode: this.changeFeedProcessorOptions.Mode)
96+
.ConfigureAwait(false);
8797
}
8898

99+
this.documentServiceLeaseStoreManager
100+
.LeaseManager
101+
.ChangeFeedModeSwitchingCheck(
102+
documentServiceLeases: await this.documentServiceLeaseStoreManager
103+
.LeaseContainer
104+
.GetAllLeasesAsync()
105+
.ConfigureAwait(false),
106+
changeFeedLeaseOptionsMode: this.changeFeedLeaseOptions.Mode);
107+
89108
this.partitionManager = this.BuildPartitionManager(
90-
containerRid,
109+
containerRid,
91110
partitionKeyRangeCache);
92111
this.initialized = true;
93112
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,10 @@ public ChangeFeedLeaseOptions()
4444
/// instances pointing at the same feed while using the same auxiliary collection.
4545
/// </summary>
4646
public string LeasePrefix { get; set; }
47+
48+
/// <summary>
49+
/// Gets or sets the <see cref="ChangeFeedMode"/>.
50+
/// </summary>
51+
public ChangeFeedMode Mode { get; set; }
4752
}
4853
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,10 @@ internal abstract class DocumentServiceLease
7070
/// Gets or sets custom lease properties which can be managed from <see cref="LoadBalancingStrategy"/>.
7171
/// </summary>
7272
public abstract Dictionary<string, string> Properties { get; set; }
73+
74+
/// <summary>
75+
/// Gets or sets the ChangeFeedMode.
76+
/// </summary>
77+
public abstract string Mode { get; set; }
7378
}
7479
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public override DateTime Timestamp
9595
[JsonProperty("_ts")]
9696
private long TS { get; set; }
9797

98+
[JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)]
99+
public override string Mode { get; set; }
100+
98101
public override string ToString()
99102
{
100103
return string.Format(

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public override DateTime Timestamp
6666
[JsonProperty("properties")]
6767
public override Dictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
6868

69+
[JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)]
70+
public override string Mode { get; set; }
71+
6972
[JsonProperty("timestamp")]
7073
private DateTime? ExplicitTimestamp { get; set; }
7174

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
7+
using System;
8+
using System.Collections.Generic;
79
using System.Threading.Tasks;
810
using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions;
911
using Microsoft.Azure.Documents;
@@ -63,5 +65,57 @@ internal abstract class DocumentServiceLeaseManager
6365
/// <returns>Updated lease.</returns>
6466
/// <exception cref="LeaseLostException">Thrown if other host acquired the lease</exception>
6567
public abstract Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentServiceLease leaseToUpdatePropertiesFrom);
68+
69+
/// <summary>
70+
/// If the lease container's lease document is found, this method checks for lease
71+
/// document's ChangeFeedMode and if the new ChangeFeedMode is different
72+
/// from the current ChangeFeedMode, an exception is thrown.
73+
/// This is based on an issue located at <see href="https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4308"/>.
74+
/// </summary>
75+
public void ChangeFeedModeSwitchingCheck(
76+
IReadOnlyList<DocumentServiceLease> documentServiceLeases,
77+
ChangeFeedMode changeFeedLeaseOptionsMode)
78+
{
79+
// No lease documents. Return.
80+
81+
if (documentServiceLeases.Count == 0)
82+
{
83+
return;
84+
}
85+
86+
DocumentServiceLease documentServiceLease = documentServiceLeases[0];
87+
88+
// Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because
89+
// AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are
90+
// AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown.
91+
// If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown.
92+
93+
bool shouldThrowException = this.VerifyChangeFeedProcessorMode(
94+
changeFeedMode:
95+
string.IsNullOrEmpty(documentServiceLease.Mode)
96+
? ChangeFeedMode.LatestVersion
97+
: changeFeedLeaseOptionsMode,
98+
leaseChangeFeedMode: documentServiceLease.Mode,
99+
normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode);
100+
101+
// If shouldThrowException is true, throw the exception.
102+
103+
if (shouldThrowException)
104+
{
105+
throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed.");
106+
}
107+
}
108+
109+
private bool VerifyChangeFeedProcessorMode(
110+
ChangeFeedMode changeFeedMode,
111+
string leaseChangeFeedMode,
112+
out string normalizedProcessorChangeFeedMode)
113+
{
114+
normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes
115+
? HttpConstants.A_IMHeaderValues.FullFidelityFeed
116+
: HttpConstants.A_IMHeaderValues.IncrementalFeed;
117+
118+
return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0;
119+
}
66120
}
67121
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
124124
LeaseId = leaseDocId,
125125
LeaseToken = leaseToken,
126126
ContinuationToken = continuationToken,
127-
FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange())
127+
FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange()),
128+
Mode = this.GetChangeFeedMode()
128129
};
129130

130131
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
@@ -148,14 +149,22 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
148149
LeaseId = leaseDocId,
149150
LeaseToken = leaseToken,
150151
ContinuationToken = continuationToken,
151-
FeedRange = feedRange
152+
FeedRange = feedRange,
153+
Mode = this.GetChangeFeedMode()
152154
};
153155

154156
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
155157

156158
return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
157159
}
158160

161+
private string GetChangeFeedMode()
162+
{
163+
return this.options.Mode == ChangeFeedMode.AllVersionsAndDeletes
164+
? HttpConstants.A_IMHeaderValues.FullFidelityFeed
165+
: HttpConstants.A_IMHeaderValues.IncrementalFeed;
166+
}
167+
159168
public override async Task ReleaseAsync(DocumentServiceLease lease)
160169
{
161170
if (lease == null)

0 commit comments

Comments
 (0)