Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ConnectionPolicy()
this.RetryOptions = new RetryOptions();
this.EnableReadRequestsFallback = null;
this.ServerCertificateCustomValidationCallback = null;

this.AvailabilityStrategy = null;
this.CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions();
}

Expand Down Expand Up @@ -512,6 +512,34 @@ public bool? EnableAdvancedReplicaSelectionForTcp
set;
}

/// <summary>
/// Availability Strategy to be used for periods of high latency
/// </summary>
/// /// <example>
/// An example on how to set an availability strategy custom serializer.
/// <code language="c#">
/// <![CDATA[
/// CosmosClient client = new CosmosClientBuilder("connection string")
/// .WithApplicationPreferredRegions(
/// new List<string> { "East US", "Central US", "West US" } )
/// .WithAvailabilityStrategy(
/// AvailabilityStrategy.CrossRegionHedgingStrategy(
/// threshold: TimeSpan.FromMilliseconds(500),
/// thresholdStep: TimeSpan.FromMilliseconds(100)
/// ))
/// .Build();
/// ]]>
/// </code>
/// </example>
/// <remarks>
/// The availability strategy in the example is a Cross Region Hedging Strategy.
/// These strategies take two values, a threshold and a threshold step.When a request that is sent
/// out takes longer than the threshold time, the SDK will hedge to the second region in the application preferred regions list.
/// If a response from either the primary request or the first hedged request is not received
/// after the threshold step time, the SDK will hedge to the third region and so on.
/// </remarks>
public AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down Expand Up @@ -545,6 +573,15 @@ internal SessionRetryOptions SessionRetryOptions
set;
}

/// <summary>
/// A string containing the application name.
/// </summary>
internal string ApplicationName
{
get;
set;
}

/// <summary>
/// GlobalEndpointManager will subscribe to this event if user updates the preferredLocations list in the Azure Cosmos DB service.
/// </summary>
Expand Down
69 changes: 8 additions & 61 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,7 @@ public class CosmosClientOptions
private const string ConnectionStringAccountKey = "AccountKey";
private const string ConnectionStringDisableServerCertificateValidation = "DisableServerCertificateValidation";

private const ApiType DefaultApiType = ApiType.None;

/// <summary>
/// Default thresholds for PPAF request hedging.
/// </summary>
private const int DefaultHedgingThresholdInMilliseconds = 1000;
private const int DefaultHedgingThresholdStepInMilliseconds = 500;
private const ApiType DefaultApiType = ApiType.None;

/// <summary>
/// Default request timeout
Expand Down Expand Up @@ -771,15 +765,6 @@ bool EnableRemoteRegionPreferredForSessionRetry
set => this.SessionRetryOptions.RemoteRegionPreferred = value;
}

/// <summary>
/// Gets or sets a value indicating whether partition-level failover is enabled. When this feature is enabled,
/// the SDK by default applies a cross-region hedging strategy with a default threshold of 1 seconds.
/// If an availability strategy is provided explicitly, then it will be honored, and the default policy wouldn't be applied. Note that
/// the default availability strategy can be opted out by setting <see cref="DisabledAvailabilityStrategy"/> as the availability strategy in
/// cosmos client options.
/// </summary>
internal bool EnablePartitionLevelFailover { get; set; } = ConfigurationManager.IsPartitionLevelFailoverEnabled(defaultValue: false);

/// <summary>
/// Enable partition level circuit breaker (aka PPCB). For compute gateway use case, by default per partition automatic failover will be disabled, so does the PPCB.
/// If compute gateway chooses to enable PPAF, then the .NET SDK will enable PPCB by default, which will improve the read availability and latency. This would mean
Expand Down Expand Up @@ -1028,10 +1013,10 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
{
this.ValidateDirectTCPSettings();
this.ValidateLimitToEndpointSettings();
this.InitializePartitionLevelFailoverWithDefaultHedging();

ConnectionPolicy connectionPolicy = new ConnectionPolicy()
{
{
ApplicationName = this.ApplicationName,
MaxConnectionLimit = this.GatewayModeMaxConnectionLimit,
RequestTimeout = this.RequestTimeout,
ConnectionMode = this.ConnectionMode,
Expand All @@ -1044,14 +1029,14 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
MaxRequestsPerTcpConnection = this.MaxRequestsPerTcpConnection,
MaxTcpConnectionsPerEndpoint = this.MaxTcpConnectionsPerEndpoint,
EnableEndpointDiscovery = !this.LimitToEndpoint,
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
EnablePartitionLevelCircuitBreaker = this.EnablePartitionLevelFailover || this.EnablePartitionLevelCircuitBreaker,
EnablePartitionLevelCircuitBreaker = this.EnablePartitionLevelCircuitBreaker,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback,
CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions()
CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions(),
AvailabilityStrategy = this.AvailabilityStrategy,
};

if (this.CosmosClientTelemetryOptions != null)
Expand Down Expand Up @@ -1262,47 +1247,9 @@ internal UserAgentContainer CreateUserAgentContainerWithFeatures(int clientId)
return new UserAgentContainer(
clientId: clientId,
features: featureString,
regionConfiguration: regionConfiguration,
suffix: this.GetUserAgentSuffix());
}

internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.EnablePartitionLevelFailover
&& this.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(CosmosClientOptions.DefaultHedgingThresholdInMilliseconds, this.RequestTimeout.TotalMilliseconds / 2);

this.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(CosmosClientOptions.DefaultHedgingThresholdStepInMilliseconds));
}
regionConfiguration: regionConfiguration,
suffix: this.ApplicationName);
}

internal string GetUserAgentSuffix()
{
int featureFlag = 0;
if (this.EnablePartitionLevelFailover)
{
featureFlag += (int)UserAgentFeatureFlags.PerPartitionAutomaticFailover;
}

if (this.EnablePartitionLevelFailover || this.EnablePartitionLevelCircuitBreaker)
{
featureFlag += (int)UserAgentFeatureFlags.PerPartitionCircuitBreaker;
}

if (featureFlag == 0)
{
return this.ApplicationName;
}

return string.IsNullOrEmpty(this.ApplicationName) ?
$"F{featureFlag:X}" :
$"F{featureFlag:X}|{this.ApplicationName}";
}

/// <summary>
/// This generates a key that added to the user agent to make it
Expand Down
97 changes: 74 additions & 23 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private const int DefaultRntbdSendHangDetectionTimeSeconds = 10;
private const bool DefaultEnableCpuMonitor = true;
private const string DefaultInitTaskKey = "InitTaskKey";

/// <summary>
/// Default thresholds for PPAF request hedging.
/// </summary>
private const int DefaultHedgingThresholdInMilliseconds = 1000;
private const int DefaultHedgingThresholdStepInMilliseconds = 500;

private static readonly char[] resourceIdOrFullNameSeparators = new char[] { '/' };
private static readonly char[] resourceIdSeparators = new char[] { '/', '\\', '?', '#' };
Expand Down Expand Up @@ -425,8 +431,8 @@ internal DocumentClient(Uri serviceEndpoint,
transportClientHandlerFactory,
storeClientFactory)
{
}

}
/// <summary>
/// Initializes a new instance of the <see cref="DocumentClient"/> class using the
/// specified service endpoint, an authorization key (or resource token) and a connection policy
Expand Down Expand Up @@ -508,7 +514,7 @@ internal DocumentClient(Uri serviceEndpoint,
enableAsyncCacheExceptionNoSharing: this.enableAsyncCacheExceptionNoSharing);
this.chaosInterceptorFactory = chaosInterceptorFactory;
this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this);
this.isThinClientEnabled = ConfigurationManager.IsThinClientEnabled(defaultValue: false);
this.isThinClientEnabled = ConfigurationManager.IsThinClientEnabled(defaultValue: false);

this.Initialize(
serviceEndpoint: serviceEndpoint,
Expand Down Expand Up @@ -954,15 +960,8 @@ internal virtual void Initialize(Uri serviceEndpoint,
ServicePointAccessor servicePoint = ServicePointAccessor.FindServicePoint(this.ServiceEndpoint);
servicePoint.ConnectionLimit = this.ConnectionPolicy.MaxConnectionLimit;
}
#endif

this.GlobalEndpointManager = new GlobalEndpointManager(this, this.ConnectionPolicy, this.enableAsyncCacheExceptionNoSharing);
this.PartitionKeyRangeLocation = this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker)
: GlobalPartitionEndpointManagerNoOp.Instance;
#endif
this.GlobalEndpointManager = new GlobalEndpointManager(this, this.ConnectionPolicy, this.enableAsyncCacheExceptionNoSharing);

this.httpClient = CosmosHttpClientCore.CreateWithConnectionPolicy(
this.ApiType,
Expand Down Expand Up @@ -1003,13 +1002,6 @@ internal virtual void Initialize(Uri serviceEndpoint,
this.sessionContainer = new SessionContainer(this.ServiceEndpoint.Host);
}

this.retryPolicy = new RetryPolicy(
globalEndpointManager: this.GlobalEndpointManager,
connectionPolicy: this.ConnectionPolicy,
partitionKeyRangeLocationCache: this.PartitionKeyRangeLocation);

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

this.desiredConsistencyLevel = desiredConsistencyLevel;
// Setup the proxy to be used based on connection mode.
// For gateway: GatewayProxy.
Expand Down Expand Up @@ -1062,7 +1054,34 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
if (this.desiredConsistencyLevel.HasValue)
{
this.EnsureValidOverwrite(this.desiredConsistencyLevel.Value);
}
}

bool isPPafEnabled = ConfigurationManager.IsPartitionLevelFailoverEnabled(defaultValue: false);
if (this.accountServiceConfiguration != null && this.accountServiceConfiguration.AccountProperties.EnablePartitionLevelFailover.HasValue)
{
isPPafEnabled = this.accountServiceConfiguration.AccountProperties.EnablePartitionLevelFailover.Value;
}

this.ConnectionPolicy.EnablePartitionLevelFailover = isPPafEnabled;
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= this.ConnectionPolicy.EnablePartitionLevelFailover;
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());
this.InitializePartitionLevelFailoverWithDefaultHedging();

this.PartitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker)
: GlobalPartitionEndpointManagerNoOp.Instance;

this.retryPolicy = new RetryPolicy(
globalEndpointManager: this.GlobalEndpointManager,
connectionPolicy: this.ConnectionPolicy,
partitionKeyRangeLocationCache: this.PartitionKeyRangeLocation);

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
Expand Down Expand Up @@ -6820,12 +6839,44 @@ private async Task InitializeGatewayConfigurationReaderAsync()

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && accountProperties.EnableMultipleWriteLocations;

this.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(accountProperties);
}

internal string GetUserAgentFeatures()
{
int featureFlag = 0;
if (this.ConnectionPolicy.EnablePartitionLevelFailover)
{
featureFlag += (int)UserAgentFeatureFlags.PerPartitionAutomaticFailover;
}

if (this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker)
{
featureFlag += (int)UserAgentFeatureFlags.PerPartitionCircuitBreaker;
}

return featureFlag == 0 ? string.Empty : $"F{featureFlag:X}";
}

internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
}

internal void CaptureSessionToken(DocumentServiceRequest request, DocumentServiceResponse response)
{
Expand Down
9 changes: 0 additions & 9 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -754,15 +754,6 @@ internal CosmosClientBuilder WithCpuMonitorDisabled()
return this;
}

/// <summary>
/// Enabled partition level failover in the SDK
/// </summary>
internal CosmosClientBuilder WithPartitionLevelFailoverEnabled()
{
this.clientOptions.EnablePartitionLevelFailover = true;
return this;
}

/// <summary>
/// Enables SDK to inject fault. Used for testing applications.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public override async Task<ResponseMessage> SendAsync(
public AvailabilityStrategyInternal AvailabilityStrategy(RequestMessage request)
{
AvailabilityStrategy strategy = request.RequestOptions?.AvailabilityStrategy
?? this.client.ClientOptions.AvailabilityStrategy;
?? this.client.DocumentClient.ConnectionPolicy.AvailabilityStrategy;

if (strategy == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ internal long ProvisionedDocumentStorageInMB
[JsonProperty(PropertyName = Constants.Properties.EnableMultipleWriteLocations)]
internal bool EnableMultipleWriteLocations { get; set; }

/// <summary>
/// Gets the featured enabled value for Per Partition Automatic Failover
/// </summary>
[JsonProperty(PropertyName = Constants.Properties.EnablePerPartitionFailoverBehavior)]
internal bool? EnablePartitionLevelFailover { get; set; }

private IDictionary<string, object> QueryStringToDictConverter()
{
if (!string.IsNullOrEmpty(this.QueryEngineConfigurationString))
Expand Down
8 changes: 8 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ public virtual void InitializeAccountPropertiesAndStartBackgroundRefresh(Account
{
return;
}

bool isPPafEnabled = ConfigurationManager.IsPartitionLevelFailoverEnabled(defaultValue: false);
if (databaseAccount.EnablePartitionLevelFailover.HasValue)
{
isPPafEnabled = databaseAccount.EnablePartitionLevelFailover.Value;
}

this.connectionPolicy.EnablePartitionLevelFailover = isPPafEnabled;
GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(databaseAccount);

this.locationCache.OnDatabaseAccountRead(databaseAccount);
Expand Down
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/UserAgentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ public UserAgentContainer(
this.Suffix = suffix ?? string.Empty;
}

public void AppendFeatures(
string features)
{
if (!string.IsNullOrEmpty(features))
{
this.Suffix = string.IsNullOrEmpty(this.Suffix)
? features
: $"{features}|{this.Suffix}";
}
}

internal override string BaseUserAgent => this.cosmosBaseUserAgent ?? string.Empty;

protected virtual void GetEnvironmentInformation(
Expand Down
Loading
Loading