Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session Consistency: Adds SessionTokenMismatchRetryPolicy optimization through customer supplied region switch hints #4976

Draft
wants to merge 5 commits into
base: msdata/direct
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ public bool? EnableAdvancedReplicaSelectionForTcp
set;
}


/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand All @@ -530,6 +531,15 @@ internal CosmosClientTelemetryOptions CosmosClientTelemetryOptions
set;
}

/// <summary>
/// provides SessionTokenMismatchRetryPolicy optimization through customer supplied region switch hints
/// </summary>
internal SessionRetryOptions SessionRetryOptions
{
get;
set;
}

/// <summary>
/// GlobalEndpointManager will subscribe to this event if user updates the preferredLocations list in the Azure Cosmos DB service.
/// </summary>
Expand Down
62 changes: 49 additions & 13 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CosmosClientOptions
private const string ConnectionStringAccountKey = "AccountKey";
private const string ConnectionStringDisableServerCertificateValidation = "DisableServerCertificateValidation";

private const ApiType DefaultApiType = ApiType.None;
private const ApiType DefaultApiType = ApiType.None;

/// <summary>
/// Default request timeout
Expand All @@ -65,14 +65,15 @@ public class CosmosClientOptions

private ConnectionMode connectionMode;
private Protocol connectionProtocol;
private TimeSpan? idleTcpConnectionTimeout;
private TimeSpan? idleTcpConnectionTimeout;
private TimeSpan? openTcpConnectionTimeout;
private int? maxRequestsPerTcpConnection;
private int? maxTcpConnectionsPerEndpoint;
private PortReuseMode? portReuseMode;
private IWebProxy webProxy;
private Func<HttpClient> httpClientFactory;
private string applicationName;
private string applicationName;
private SessionRetryOptions sessionRetryOptions;

/// <summary>
/// Creates a new CosmosClientOptions
Expand All @@ -86,7 +87,8 @@ public CosmosClientOptions()
this.ConnectionProtocol = CosmosClientOptions.DefaultProtocol;
this.ApiType = CosmosClientOptions.DefaultApiType;
this.CustomHandlers = new Collection<RequestHandler>();
this.CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions();
this.CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions();
this.sessionRetryOptions = new SessionRetryOptions();
}

/// <summary>
Expand Down Expand Up @@ -315,8 +317,8 @@ public ConnectionMode ConnectionMode
/// This can be used to weaken the database account consistency level for read operations.
/// If this is not set the database account consistency level will be used for all requests.
/// </summary>
public ConsistencyLevel? ConsistencyLevel { get; set; }

public ConsistencyLevel? ConsistencyLevel { get; set; }
/// <summary>
/// Sets the priority level for requests created using cosmos client.
/// </summary>
Expand Down Expand Up @@ -460,8 +462,8 @@ public TimeSpan? IdleTcpConnectionTimeout
this.idleTcpConnectionTimeout = value;
this.ValidateDirectTCPSettings();
}
}

}
/// <summary>
/// (Direct/TCP) Controls the amount of time allowed for trying to establish a connection.
/// </summary>
Expand Down Expand Up @@ -726,7 +728,39 @@ public Func<HttpClient> HttpClientFactory
internal
#endif
AvailabilityStrategy AvailabilityStrategy { get; set; }


/// <summary>
/// provides SessionTokenMismatchRetryPolicy optimization through customer supplied region switch hints
/// </summary>
internal SessionRetryOptions SessionRetryOptions
{
get => this.sessionRetryOptions;
set
{
if (value.RemoteRegionPreferred)
{

if (value.MinInRegionRetryTime == null)
{
throw new ArgumentException($" Argument 'MinInRegionRetryTime' must not be null when RemoteRegionPreferred option is selected.");
}

if (value.MinInRegionRetryTime.TotalMilliseconds < ConfigurationManager.MinMinInRegionRetryTimeForWritesInMs)
{
throw new ArgumentException($" Argument 'MinInRegionRetryTime' in the SessionRetryOptions must be set and have at least a value of " +
"{ConfigurationManager.MinMinInRegionRetryTimeForWritesInMs} ms");
}

if (value.MaxInRegionRetryCount < ConfigurationManager.MinMaxRetriesInLocalRegionWhenRemoteRegionPreferred)
{
throw new ArgumentException($" Argument 'MaxInRegionRetryCount' in the SessionRetryOptions must have at least a value of " +
"{ConfigurationManager.MinMaxRetriesInLocalRegionWhenRemoteRegionPreferred}");
}
}

this.sessionRetryOptions = value;
}
}
/// <summary>
/// Enable partition key level failover
/// </summary>
Expand Down Expand Up @@ -919,7 +953,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
this.ValidateDirectTCPSettings();
this.ValidateLimitToEndpointSettings();
this.ValidatePartitionLevelFailoverSettings();
this.ValidateAvailabilityStrategy();
this.ValidateAvailabilityStrategy();

ConnectionPolicy connectionPolicy = new ConnectionPolicy()
{
Expand All @@ -929,7 +963,8 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
ConnectionProtocol = this.ConnectionProtocol,
UserAgentContainer = this.CreateUserAgentContainerWithFeatures(clientId),
UseMultipleWriteLocations = true,
IdleTcpConnectionTimeout = this.IdleTcpConnectionTimeout,
IdleTcpConnectionTimeout = this.IdleTcpConnectionTimeout,
SessionRetryOptions = this.SessionRetryOptions,
OpenTcpConnectionTimeout = this.OpenTcpConnectionTimeout,
MaxRequestsPerTcpConnection = this.MaxRequestsPerTcpConnection,
MaxTcpConnectionsPerEndpoint = this.MaxTcpConnectionsPerEndpoint,
Expand All @@ -940,7 +975,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback,
CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions()
CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions()
};

if (this.CosmosClientTelemetryOptions != null)
Expand Down Expand Up @@ -1105,7 +1140,8 @@ private void ValidateAvailabilityStrategy()
{
throw new ArgumentException($"{nameof(this.ApplicationPreferredRegions)} or {nameof(this.ApplicationRegion)} must be set to use {nameof(this.AvailabilityStrategy)}");
}
}
}


private void ValidateDirectTCPSettings()
{
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6723,7 +6723,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true,
enableReplicaValidation: this.isReplicaAddressValidationEnabled);
enableReplicaValidation: this.isReplicaAddressValidationEnabled,
this.ConnectionPolicy.SessionRetryOptions);

if (subscribeRntbdStatus)
{
Expand Down
15 changes: 14 additions & 1 deletion Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,20 @@ public CosmosClientBuilder WithSerializerOptions(CosmosSerializationOptions cosm
{
this.clientOptions.SerializerOptions = cosmosSerializerOptions;
return this;
}
}

/// <summary>
/// provides SessionTokenMismatchRetryPolicy optimization through customer supplied region switch hints
/// </summary>
/// <param name="sessionRetryOptions">customer supplied region switch hints </param>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
public CosmosClientBuilder WithSessionRetryOptions(SessionRetryOptions sessionRetryOptions)
{
this.clientOptions.SessionRetryOptions = sessionRetryOptions;
return this;
}



/// <summary>
/// Set a custom JSON serializer.
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/RMResources.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class RequestOptions
internal
#endif
AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// Gets or sets the boolean to use effective partition key routing in the cosmos db request.
/// </summary>
Expand Down
33 changes: 33 additions & 0 deletions Microsoft.Azure.Cosmos/src/SessionRetryOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using Microsoft.Azure.Documents;

/// <summary>
/// Implementation of ISessionRetryOptions interface, do not want clients to subclass.
/// </summary>
public sealed class SessionRetryOptions : ISessionRetryOptions
{
/// <summary>
/// Sets the minimum retry time for 404/1002 retries within each region for read and write operations.
/// The minimum value is 100ms - this minimum is enforced to provide a way for the local region to catch-up on replication lag. The default value is 500ms - as a recommendation ensure that this value is higher than the steady-state
/// replication latency between the regions you chose
/// </summary>
public TimeSpan MinInRegionRetryTime { get; set; } = ConfigurationManager.GetMinRetryTimeInLocalRegionWhenRemoteRegionPreferred();

/// <summary>
/// Sets the maximum number of retries within each region for read and write operations. The minimum value is 1 - the backoff time for the last in-region retry will ensure that the total retry time within the
/// region is at least the min. in-region retry time.
/// </summary>
public int MaxInRegionRetryCount { get; set; } = ConfigurationManager.GetMaxRetriesInLocalRegionWhenRemoteRegionPreferred();


/// <summary>
/// hints which guide SDK-internal retry policies on how early to switch retries to a different region.
/// </summary>
public Boolean RemoteRegionPreferred { get; set; } = false;



}
}
50 changes: 48 additions & 2 deletions Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,25 @@ internal static class ConfigurationManager
/// <summary>
/// Environment variable name to enable distributed query gateway mode.
/// </summary>
internal static readonly string DistributedQueryGatewayModeEnabled = "AZURE_COSMOS_DISTRIBUTED_QUERY_GATEWAY_ENABLED";
internal static readonly string DistributedQueryGatewayModeEnabled = "AZURE_COSMOS_DISTRIBUTED_QUERY_GATEWAY_ENABLED";


/// <summary>
/// intent is If a client specify a value, we will force it to be atleast 100ms, otherwise default is going to be 500ms
/// </summary>
internal static readonly string MinInRegionRetryTimeForWritesInMs = "AZURE_COSMOS_SESSION_TOKEN_MISMATCH_IN_REGION_RETRY_TIME_IN_MILLISECONDS";
internal static readonly int DefaultMinInRegionRetryTimeForWritesInMs = 500;
internal static readonly int MinMinInRegionRetryTimeForWritesInMs = 100;


/// <summary>
/// intent is If a client specify a value, we will force it to be atleast 1, otherwise default is going to be 1(right now both the values are 1 but we have the provision to change them in future).
/// </summary>
internal static readonly string MaxRetriesInLocalRegionWhenRemoteRegionPreferred = "AZURE_COSMOS_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED";
internal static readonly int DefaultMaxRetriesInLocalRegionWhenRemoteRegionPreferred = 1;
internal static readonly int MinMaxRetriesInLocalRegionWhenRemoteRegionPreferred = 1;



public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
{
Expand Down Expand Up @@ -124,6 +142,34 @@ public static bool IsDistributedQueryGatewayModeEnabled(
.GetEnvironmentVariable(
variable: DistributedQueryGatewayModeEnabled,
defaultValue: defaultValue);
}
}


public static int GetMaxRetriesInLocalRegionWhenRemoteRegionPreferred()
{
return Math.Max(
ConfigurationManager
.GetEnvironmentVariable(
variable: MaxRetriesInLocalRegionWhenRemoteRegionPreferred,
defaultValue: DefaultMaxRetriesInLocalRegionWhenRemoteRegionPreferred),
MinMaxRetriesInLocalRegionWhenRemoteRegionPreferred);


}

public static TimeSpan GetMinRetryTimeInLocalRegionWhenRemoteRegionPreferred()
{

return TimeSpan.FromMilliseconds(Math.Max(
ConfigurationManager
.GetEnvironmentVariable(
variable: MinInRegionRetryTimeForWritesInMs,
defaultValue: DefaultMinInRegionRetryTimeForWritesInMs),
MinMinInRegionRetryTimeForWritesInMs));


}


}
}
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Documents
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents.Collections;

Expand Down Expand Up @@ -138,18 +139,21 @@ internal sealed class ConsistencyReader
private readonly IAuthorizationTokenProvider authorizationTokenProvider;
private readonly StoreReader storeReader;
private readonly QuorumReader quorumReader;
private readonly ISessionRetryOptions sessionRetryOptions;

public ConsistencyReader(
AddressSelector addressSelector,
ISessionContainer sessionContainer,
TransportClient transportClient,
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider,
bool enableReplicaValidation)
bool enableReplicaValidation,
ISessionRetryOptions sessionRetryOptions = null)
{
this.addressSelector = addressSelector;
this.serviceConfigReader = serviceConfigReader;
this.authorizationTokenProvider = authorizationTokenProvider;
this.sessionRetryOptions = sessionRetryOptions;
this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer, enableReplicaValidation);
this.quorumReader = new QuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider);
}
Expand Down Expand Up @@ -233,7 +237,8 @@ public Task<StoreResponse> ReadAsync(
{
return BackoffRetryUtility<StoreResponse>.ExecuteAsync(
callbackMethod: () => this.ReadSessionAsync(entity, desiredReadMode),
retryPolicy: new SessionTokenMismatchRetryPolicy(),
retryPolicy: new SessionTokenMismatchRetryPolicy(
sessionRetryOptions: this.sessionRetryOptions),
cancellationToken: cancellationToken);
}
else
Expand Down
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Documents
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Core.Trace;

/*
Expand Down Expand Up @@ -57,6 +58,7 @@ internal sealed class ConsistencyWriter
private readonly IServiceConfigurationReader serviceConfigReader;
private readonly IAuthorizationTokenProvider authorizationTokenProvider;
private readonly bool useMultipleWriteLocations;
private readonly ISessionRetryOptions sessionRetryOptions;

public ConsistencyWriter(
AddressSelector addressSelector,
Expand All @@ -65,14 +67,16 @@ public ConsistencyWriter(
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider,
bool useMultipleWriteLocations,
bool enableReplicaValidation)
bool enableReplicaValidation,
ISessionRetryOptions sessionRetryOptions = null)
{
this.transportClient = transportClient;
this.addressSelector = addressSelector;
this.sessionContainer = sessionContainer;
this.serviceConfigReader = serviceConfigReader;
this.authorizationTokenProvider = authorizationTokenProvider;
this.useMultipleWriteLocations = useMultipleWriteLocations;
this.sessionRetryOptions = sessionRetryOptions;
this.storeReader = new StoreReader(
transportClient,
addressSelector,
Expand Down Expand Up @@ -130,7 +134,8 @@ public async Task<StoreResponse> WriteAsync(
{
return await BackoffRetryUtility<StoreResponse>.ExecuteAsync(
callbackMethod: () => this.WritePrivateAsync(entity, timeout, forceRefresh),
retryPolicy: new SessionTokenMismatchRetryPolicy(),
retryPolicy: new SessionTokenMismatchRetryPolicy(
sessionRetryOptions: this.sessionRetryOptions),
cancellationToken: cancellationToken);
}
finally
Expand Down
Loading