Skip to content
Merged
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ public System.Text.Json.JsonSerializerOptions UseSystemTextJsonSerializerWithOpt
/// <remarks>
/// <para>This is optimal for latency-sensitive workloads. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
/// </remarks>
internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; }
internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; }

/// <summary>
/// Gets or sets stack trace optimization to reduce stack trace proliferation in high-concurrency scenarios where exceptions are frequently thrown.
/// When enabled, critical SDK components optimize exception handling to minimize performance overhead.
/// The default value is 'false'.
/// </summary>
internal bool EnableAsyncCacheExceptionSharing { get; set; }

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
Expand Down
29 changes: 20 additions & 9 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool IsLocalQuorumConsistency = false;
private readonly bool isReplicaAddressValidationEnabled;
private readonly bool enableStackTraceOptimization;

//Fault Injection
private readonly IChaosInterceptorFactory chaosInterceptorFactory;
Expand Down Expand Up @@ -243,7 +244,9 @@ public DocumentClient(Uri serviceEndpoint,
}

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(
cancellationToken: this.cancellationTokenSource.Token,
enableStackTraceOptimization: this.enableStackTraceOptimization);
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);
}

Expand Down Expand Up @@ -444,6 +447,7 @@ internal DocumentClient(Uri serviceEndpoint,
/// <param name="remoteCertificateValidationCallback">This delegate responsible for validating the third party certificate. </param>
/// <param name="cosmosClientTelemetryOptions">This is distributed tracing flag</param>
/// <param name="chaosInterceptorFactory">This is the chaos interceptor used for fault injection</param>
/// <param name="enableStackTraceOptimization">A boolean flag indicating if stack trace optimization is enabled.</param>
/// <remarks>
/// The service endpoint can be obtained from the Azure Management Portal.
/// If you are connecting using one of the Master Keys, these can be obtained along with the endpoint from the Azure Management Portal
Expand Down Expand Up @@ -472,7 +476,8 @@ internal DocumentClient(Uri serviceEndpoint,
string cosmosClientId = null,
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null,
CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null,
IChaosInterceptorFactory chaosInterceptorFactory = null)
IChaosInterceptorFactory chaosInterceptorFactory = null,
bool enableStackTraceOptimization = false)
{
if (sendingRequestEventArgs != null)
{
Expand All @@ -491,10 +496,13 @@ internal DocumentClient(Uri serviceEndpoint,
this.receivedResponse += receivedResponseEventArgs;
}

this.enableStackTraceOptimization = enableStackTraceOptimization;
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(cosmosAuthorization));
this.transportClientHandlerFactory = transportClientHandlerFactory;
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(
cancellationToken: this.cancellationTokenSource.Token,
enableStackTraceOptimization: this.enableStackTraceOptimization);
this.chaosInterceptorFactory = chaosInterceptorFactory;
this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this);

Expand Down Expand Up @@ -675,8 +683,9 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
telemetryToServiceHelper: this.telemetryToServiceHelper,
enableStackTraceOptimization: this.enableStackTraceOptimization);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager, this.enableStackTraceOptimization);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
}
Expand Down Expand Up @@ -938,7 +947,7 @@ internal virtual void Initialize(Uri serviceEndpoint,
servicePoint.ConnectionLimit = this.ConnectionPolicy.MaxConnectionLimit;
#endif

this.GlobalEndpointManager = new GlobalEndpointManager(this, this.ConnectionPolicy);
this.GlobalEndpointManager = new GlobalEndpointManager(this, this.ConnectionPolicy, this.enableStackTraceOptimization);
this.PartitionKeyRangeLocation = this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
Expand Down Expand Up @@ -1059,8 +1068,9 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
telemetryToServiceHelper: this.telemetryToServiceHelper,
enableStackTraceOptimization: this.enableStackTraceOptimization);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager, this.enableStackTraceOptimization);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);
Expand Down Expand Up @@ -6722,7 +6732,8 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
this.accountServiceConfiguration,
this.ConnectionPolicy,
this.httpClient,
this.storeClientFactory.GetConnectionStateListener());
this.storeClientFactory.GetConnectionStateListener(),
this.enableStackTraceOptimization);

this.CreateStoreModel(subscribeRntbdStatus: true);
}
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ internal static CosmosClientContext Create(
cosmosClientId: cosmosClient.Id,
remoteCertificateValidationCallback: ClientContextCore.SslCustomValidationCallBack(clientOptions.GetServerCertificateCustomValidationCallback()),
cosmosClientTelemetryOptions: clientOptions.CosmosClientTelemetryOptions,
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory);
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory,
enableStackTraceOptimization: clientOptions.EnableAsyncCacheExceptionSharing);

return ClientContextCore.Create(
cosmosClient,
Expand Down
12 changes: 9 additions & 3 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ namespace Microsoft.Azure.Cosmos.Common
/// <typeparam name="TValue">Type of values.</typeparam>
internal sealed class AsyncCache<TKey, TValue>
{
private readonly bool isStackTraceOptimizationEnabled;
private readonly IEqualityComparer<TValue> valueEqualityComparer;
private readonly IEqualityComparer<TKey> keyEqualityComparer;

private ConcurrentDictionary<TKey, AsyncLazy<TValue>> values;

public AsyncCache(IEqualityComparer<TValue> valueEqualityComparer, IEqualityComparer<TKey> keyEqualityComparer = null)
public AsyncCache(
IEqualityComparer<TValue> valueEqualityComparer,
IEqualityComparer<TKey> keyEqualityComparer = null,
bool enableStackTraceOptimization = false)
{
this.keyEqualityComparer = keyEqualityComparer ?? EqualityComparer<TKey>.Default;
this.values = new ConcurrentDictionary<TKey, AsyncLazy<TValue>>(this.keyEqualityComparer);
this.valueEqualityComparer = valueEqualityComparer;
this.isStackTraceOptimizationEnabled = enableStackTraceOptimization;
}

public AsyncCache()
: this(EqualityComparer<TValue>.Default)
public AsyncCache(bool enableStackTraceOptimization = false)
: this(valueEqualityComparer: EqualityComparer<TValue>.Default,
enableStackTraceOptimization: enableStackTraceOptimization)
{
}

Expand Down
13 changes: 9 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable
{
private readonly bool isStackTraceOptimizationEnabled;
private readonly CancellationTokenSource cancellationTokenSource;
private readonly ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>> values;
private readonly Func<Exception, bool> removeFromCacheOnBackgroundRefreshException;
Expand All @@ -30,18 +31,22 @@ internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable
public AsyncCacheNonBlocking(
Func<Exception, bool> removeFromCacheOnBackgroundRefreshException = null,
IEqualityComparer<TKey> keyEqualityComparer = null,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
bool enableStackTraceOptimization = false)
{
this.keyEqualityComparer = keyEqualityComparer ?? EqualityComparer<TKey>.Default;
this.values = new ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>>(this.keyEqualityComparer);
this.removeFromCacheOnBackgroundRefreshException = removeFromCacheOnBackgroundRefreshException ?? AsyncCacheNonBlocking<TKey, TValue>.RemoveNotFoundFromCacheOnException;
this.cancellationTokenSource = cancellationToken == default
? new CancellationTokenSource()
: CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.isStackTraceOptimizationEnabled = enableStackTraceOptimization;
}

public AsyncCacheNonBlocking()
: this(removeFromCacheOnBackgroundRefreshException: null, keyEqualityComparer: null)
public AsyncCacheNonBlocking(bool enableStackTraceOptimization = false)
: this(removeFromCacheOnBackgroundRefreshException: null,
keyEqualityComparer: null,
enableStackTraceOptimization: enableStackTraceOptimization)
{
}

Expand Down Expand Up @@ -279,7 +284,7 @@ public AsyncLazyWithRefreshTask(

public bool IsValueCreated => this.value != null;

public Task<T> GetValueAsync(
public Task<T> GetValueAsync(
Func<T, Task<T>> createValueFunc)
{
// The task was already created so just return it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public ClientCollectionCache(
IStoreModel storeModel,
ICosmosAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy,
TelemetryToServiceHelper telemetryToServiceHelper)
TelemetryToServiceHelper telemetryToServiceHelper,
bool enableStackTraceOptimization)
: base(enableStackTraceOptimization)
{
this.storeModel = storeModel ?? throw new ArgumentNullException("storeModel");
this.tokenProvider = tokenProvider;
Expand Down
20 changes: 14 additions & 6 deletions Microsoft.Azure.Cosmos/src/Routing/CollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ internal abstract class CollectionCache
/// </summary>
protected class InternalCache
{
internal InternalCache()
internal InternalCache(
bool enableStackTraceOptimization)
{
this.collectionInfoByName = new AsyncCache<string, ContainerProperties>(new CollectionRidComparer());
this.collectionInfoById = new AsyncCache<string, ContainerProperties>(new CollectionRidComparer());
this.collectionInfoByName = new AsyncCache<string, ContainerProperties>(
new CollectionRidComparer(),
enableStackTraceOptimization: enableStackTraceOptimization);

this.collectionInfoById = new AsyncCache<string, ContainerProperties>(
new CollectionRidComparer(),
enableStackTraceOptimization: enableStackTraceOptimization);

this.collectionInfoByNameLastRefreshTime = new ConcurrentDictionary<string, DateTime>();
this.collectionInfoByIdLastRefreshTime = new ConcurrentDictionary<string, DateTime>();
}
Expand All @@ -48,11 +55,12 @@ internal InternalCache()
/// </summary>
protected readonly InternalCache[] cacheByApiList;

protected CollectionCache()
protected CollectionCache(
bool enableStackTraceOptimization)
{
this.cacheByApiList = new InternalCache[2];
this.cacheByApiList[0] = new InternalCache(); // for API version < 2018-12-31
this.cacheByApiList[1] = new InternalCache(); // for API version >= 2018-12-31
this.cacheByApiList[0] = new InternalCache(enableStackTraceOptimization); // for API version < 2018-12-31
this.cacheByApiList[1] = new InternalCache(enableStackTraceOptimization); // for API version >= 2018-12-31
}

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ public GatewayAddressCache(
IConnectionStateListener connectionStateListener,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
bool replicaAddressValidationEnabled = false,
bool enableStackTraceOptimization = false)
{
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
this.protocol = protocol;
this.tokenProvider = tokenProvider;
this.serviceEndpoint = serviceEndpoint;
this.serviceConfigReader = serviceConfigReader;
this.serverPartitionAddressCache = new AsyncCacheNonBlocking<PartitionKeyRangeIdentity, PartitionAddressInformation>();
this.serverPartitionAddressCache = new AsyncCacheNonBlocking<PartitionKeyRangeIdentity, PartitionAddressInformation>(enableStackTraceOptimization);
this.suboptimalServerPartitionTimestamps = new ConcurrentDictionary<PartitionKeyRangeIdentity, DateTime>();
this.serverPartitionAddressToPkRangeIdMap = new ConcurrentDictionary<ServerKey, HashSet<PartitionKeyRangeIdentity>>();
this.suboptimalMasterPartitionTimestamp = DateTime.MaxValue;
Expand Down
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool isReplicaAddressValidationEnabled;
private readonly bool enableStackTraceOptimization;
private readonly IConnectionStateListener connectionStateListener;
private IOpenConnectionsHandler openConnectionsHandler;

Expand All @@ -52,7 +53,8 @@ public GlobalAddressResolver(
IServiceConfigurationReader serviceConfigReader,
ConnectionPolicy connectionPolicy,
CosmosHttpClient httpClient,
IConnectionStateListener connectionStateListener)
IConnectionStateListener connectionStateListener,
bool enableStackTraceOptimization = false)
{
this.endpointManager = endpointManager;
this.partitionKeyRangeLocationCache = partitionKeyRangeLocationCache;
Expand All @@ -72,6 +74,8 @@ public GlobalAddressResolver(

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);

this.enableStackTraceOptimization = enableStackTraceOptimization;

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

this.addressCacheByEndpoint = new ConcurrentDictionary<Uri, EndpointCache>();
Expand Down Expand Up @@ -344,7 +348,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.openConnectionsHandler,
this.connectionStateListener,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled,
enableStackTraceOptimization: this.enableStackTraceOptimization);

string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
Loading
Loading