Skip to content
Merged
Show file tree
Hide file tree
Changes from 150 commits
Commits
Show all changes
158 commits
Select commit Hold shift + click to select a range
7eb4775
Bridge & Connection stats: cleanup
NickCraver Oct 26, 2021
b3038df
Bump SDK versions
NickCraver Oct 26, 2021
50def47
Dammit
NickCraver Oct 26, 2021
69d907c
Let's try this...
NickCraver Oct 26, 2021
133c4bd
C# 10 is too hard.
NickCraver Oct 26, 2021
4f4be8b
Fully revert to non-branch
NickCraver Oct 26, 2021
265c028
Backlog; v3 implementation start
NickCraver Oct 26, 2021
f692312
Woops, good catch!
NickCraver Oct 27, 2021
a207d8b
Merge branch 'craver/stats-cleanup' into craver/backlog-v3
NickCraver Oct 27, 2021
4b5a410
Fix reader/writer states on catastrophic stat failure.
NickCraver Oct 27, 2021
0084522
Merge branch 'craver/stats-cleanup' into craver/backlog-v3
NickCraver Oct 27, 2021
75545f0
Fix tests (token disposal oops) and tweak for speed
NickCraver Oct 27, 2021
6c8edf6
Revert ServerEndPoint (bad commit split)
NickCraver Oct 27, 2021
c86c7d2
Merge remote-tracking branch 'origin/main' into craver/backlog-v3
NickCraver Nov 2, 2021
076bc00
Backlog add hook points past .IsConnected and initial tests
NickCraver Nov 2, 2021
3353ce2
Merge branch 'main' into craver/backlog-v3
NickCraver Nov 12, 2021
9f86830
Remove server-specific for now
NickCraver Nov 12, 2021
f7b0975
Cleanup
NickCraver Nov 12, 2021
eb9c0c3
Update ServerEndPoint.cs
NickCraver Nov 16, 2021
7e17689
ServerEndPoint: clear flags much faster
NickCraver Nov 16, 2021
dc32e0d
Tweak so sync messages get ejected from the queue
NickCraver Nov 16, 2021
6f26239
Add prerelease label
NickCraver Nov 16, 2021
2d15806
Typo fix
NickCraver Dec 12, 2021
bed33f8
ProcessBacklogs comment
NickCraver Dec 12, 2021
1a343c6
Name/comment fixes
NickCraver Dec 12, 2021
b7c3dff
Fix TieBreaker queue - this was going into a never-started backlog
NickCraver Dec 13, 2021
dc3f9b3
Yeah....that'd be bad
NickCraver Dec 13, 2021
fc8a100
Tiebreakers: move into the handshake
NickCraver Dec 14, 2021
77005e7
Merge branch 'craver/handshake-tiebreaker' into craver/backlog-v2.5
NickCraver Dec 14, 2021
7b98bc2
Remove handshake changes
NickCraver Dec 14, 2021
5931685
Rename isHandshake to bypassBacklog
NickCraver Dec 14, 2021
e830cb0
Add release notes
NickCraver Dec 14, 2021
ed60b39
Merge branch 'main' into craver/handshake-tiebreaker
NickCraver Dec 14, 2021
a79c89d
PR fixes!
NickCraver Dec 15, 2021
fa5200c
Tiebreaker: add tests
NickCraver Dec 28, 2021
fb6a135
Add incorrect tiebreaker type test
NickCraver Dec 28, 2021
3bd116b
Merge branch 'craver/handshake-tiebreaker' into craver/backlog-v2.5
NickCraver Dec 28, 2021
0ca6c47
Merge branch 'main' into craver/handshake-tiebreaker
NickCraver Jan 2, 2022
ff59515
Merge branch 'main' into craver/backlog-v2.5
NickCraver Jan 4, 2022
7fccba9
Fix ServerTakesPrecendenceOverSnapshot behavior
NickCraver Jan 4, 2022
0ed0c4c
Merge branch 'craver/handshake-tiebreaker' into craver/backlog-v2.5
NickCraver Jan 4, 2022
410bf80
Pull in the Area Fix
NickCraver Jan 4, 2022
b364d47
Fix naming and remove unused asyncState
NickCraver Jan 4, 2022
b280c87
Issue922_ReconnectRaised: Add logging
NickCraver Jan 4, 2022
7b42366
Backlog: handle timeouts better
NickCraver Jan 4, 2022
784d78a
Revert subscribe -> internal
NickCraver Jan 4, 2022
582e5f1
Issue922: Fix assumptions
NickCraver Jan 4, 2022
b8d2636
Increase PubSubGetAllCorrectOrder_OnMessage_Async gap
NickCraver Jan 4, 2022
91339f1
PubSub tests: make it more specific and resilient
NickCraver Jan 4, 2022
3018442
Add more data to this bugger...
NickCraver Jan 4, 2022
ade853b
Subscriber: simplify
NickCraver Jan 5, 2022
1f946d4
add new client flags
mgravell Jan 6, 2022
272f1a3
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 6, 2022
a4425ec
Changes for #1912
NickCraver Jan 10, 2022
b0fb2a1
Revert 1 mismatch
NickCraver Jan 10, 2022
b567956
Merge branch 'craver/backlog-prep' into craver/backlog-v2.5
NickCraver Jan 10, 2022
65d6268
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 10, 2022
990f5e8
WIP: Pub/Sub portion of #1912
NickCraver Jan 10, 2022
38132a3
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 10, 2022
d552097
Lots of things - need to writeup in PR
NickCraver Jan 10, 2022
fac5a1b
Fix KeepAlive on PhysicalBridge
NickCraver Jan 10, 2022
1cb00ff
Fix default version tests
NickCraver Jan 10, 2022
a2d0943
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 11, 2022
7fdb45a
Fix up Isue922 test now that we ping the right things
NickCraver Jan 11, 2022
85c5a4d
Migrate PubSub tests off sync threads
NickCraver Jan 11, 2022
98701c9
Fix shared connections with simulated failures (cross-test noise)
NickCraver Jan 11, 2022
377c813
Compensate for delay removal
NickCraver Jan 11, 2022
d9c68e1
Add logging to pubsub methods
NickCraver Jan 11, 2022
3f6e030
Add logging to PubSubGetAllCorrectOrder
NickCraver Jan 11, 2022
b63648a
Tidy exception messages
NickCraver Jan 11, 2022
25a7058
Fix stupid
NickCraver Jan 12, 2022
a38fac2
Remove unneeded retry change
NickCraver Jan 12, 2022
148c975
Eliminate writer here
NickCraver Jan 12, 2022
bf9fa07
Writer: switch back to SemaphoreSlim
NickCraver Jan 15, 2022
e4e6d73
Ignore message ordering on the hot paths
NickCraver Jan 16, 2022
62c6b7d
Change up Issue 922 for better reporting and accuracy
NickCraver Jan 16, 2022
5141cc8
Merge + code cleanup
NickCraver Jan 17, 2022
532c01f
Move to Task.Run() for .NET 6, cleanup, and backlog queue contention …
NickCraver Jan 17, 2022
78ebf5e
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 18, 2022
78efb6b
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 18, 2022
d496348
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 18, 2022
bc11b96
Fix merge
NickCraver Jan 18, 2022
47f0a12
Format options
NickCraver Jan 18, 2022
60b3c42
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 18, 2022
8e168d9
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 18, 2022
412f9e8
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Jan 19, 2022
21d36f0
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 19, 2022
49a917d
Sync tests
NickCraver Jan 19, 2022
8f5bf58
Debug: lots of pruning
NickCraver Jan 19, 2022
3d20350
Merge remote-tracking branch 'origin/craver/backlog-v2.5' into craver…
NickCraver Jan 20, 2022
c958320
Fix revert
NickCraver Jan 20, 2022
185c62c
Merge remote-tracking branch 'origin/craver/debug-cleanup' into crave…
NickCraver Jan 20, 2022
2b57b0b
Fix merge
NickCraver Jan 20, 2022
144c22e
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 20, 2022
92cecfc
WIP: This could all be a bad idea
NickCraver Jan 20, 2022
7d7f020
Gap commit
NickCraver Jan 20, 2022
f247980
Pub/Sub: default to 3.0, fix PING, fix server selection in cluster, a…
NickCraver Jan 20, 2022
6ecde2a
Include PING routing
NickCraver Jan 20, 2022
f91e4c5
Revert testing change
NickCraver Jan 20, 2022
bca9de0
Merge remote-tracking branch 'origin/main' into craver/pub-sub-prep
NickCraver Jan 20, 2022
5a6db1c
Merge branch 'craver/pub-sub-prep' into craver/pub-sub-issues
NickCraver Jan 20, 2022
daa1b9c
Revert that bandaid test
NickCraver Jan 20, 2022
f36b6d9
Merge remote-tracking branch 'origin/craver/pub-sub-prep' into craver…
NickCraver Jan 20, 2022
70e1735
Nope.
NickCraver Jan 20, 2022
a814231
Bits
NickCraver Jan 20, 2022
1d4b4ad
Sync work stop commit (moving to laptop!)
NickCraver Jan 21, 2022
5de45a2
Tests: profiler logging made easier
NickCraver Jan 22, 2022
64565dd
Tests: use profiling and add more logging
NickCraver Jan 22, 2022
00f851c
Pub/Sub: Register immediately, but complete async
NickCraver Jan 22, 2022
029c2a3
Pre-clear rather than await
NickCraver Jan 22, 2022
341f532
Fix more things
NickCraver Jan 22, 2022
48127a1
Merge branch 'craver/pub-sub-wip' into craver/pub-sub-issues
NickCraver Jan 22, 2022
5dbf575
Add logging to TestPublishWithSubscribers
NickCraver Jan 22, 2022
237848f
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 22, 2022
95b39b8
Merge branch 'craver/pub-sub-issues' into craver/backlog-v2.5
NickCraver Jan 22, 2022
ab41493
Meeeeeeerge fun
NickCraver Jan 22, 2022
51c927b
Light up more pubsub tests
NickCraver Jan 23, 2022
20d2d28
Several subtle but big changes
NickCraver Jan 23, 2022
814b401
More PubSub logging
NickCraver Jan 23, 2022
667aa34
TestPatternPubSub: give reception a moment
NickCraver Jan 23, 2022
cf96bba
Moar!
NickCraver Jan 23, 2022
a6be64a
Add logging to Issue1101 pipe
NickCraver Jan 23, 2022
42a9267
Merge branch 'craver/pub-sub-issues' into craver/backlog-v2.5
NickCraver Jan 23, 2022
31b38dc
PubSubGetAllAnyOrder: don't share conn
NickCraver Jan 23, 2022
79e5090
TextWriterOutputHelper: fix end-of-tests race case
NickCraver Jan 23, 2022
aef7832
Merge branch 'craver/pub-sub-issues' into craver/backlog-v2.5
NickCraver Jan 23, 2022
8ea3b6d
Bump Docker images to 6.2.6
NickCraver Jan 23, 2022
d9caedb
Nick, you idiot.
NickCraver Jan 23, 2022
3fa77a4
Fix log message too
NickCraver Jan 23, 2022
04233fe
Sentinel: Fire and Forget on startup
NickCraver Jan 23, 2022
ff54012
ExplicitPublishMode: remove delay
NickCraver Jan 23, 2022
dc26751
Merge branch 'craver/pub-sub-issues' into craver/backlog-v2.5
NickCraver Jan 23, 2022
84439d4
Cleanup and comments!
NickCraver Jan 23, 2022
60d9b80
More comments!
NickCraver Jan 23, 2022
c6fb569
Annnnnd the other Sentinel one
NickCraver Jan 23, 2022
c0206fa
Start PubSubMultiserver
NickCraver Jan 24, 2022
b484e5f
Tests, yay!
NickCraver Jan 24, 2022
837a654
Assert up front
NickCraver Jan 24, 2022
b0001ab
PubSub tests: log everything
NickCraver Jan 24, 2022
1bc971d
Primary/Replica tests
NickCraver Jan 25, 2022
907cd20
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 25, 2022
9c31958
Fix PubSub tests: can't share that connection yo
NickCraver Jan 25, 2022
eed3ba0
Remove the .NET 5.0 from Windows build too...
NickCraver Jan 25, 2022
cd29add
Merge branch 'craver/pub-sub-issues' into craver/backlog-v2.5
NickCraver Jan 25, 2022
2fe8d13
Sentinel: account for multi-suite failover states
NickCraver Jan 25, 2022
6f9ae70
Merge branch 'craver/pub-sub-issues' into craver/backlog-v2.5
NickCraver Jan 25, 2022
d234235
ExecuteWithUnsubscribeViaSubscriber: don't share conn
NickCraver Jan 25, 2022
33f52af
Re-disable TestMassivePublishWithWithoutFlush_Local
NickCraver Jan 25, 2022
6140700
Merge remote-tracking branch 'origin/main' into craver/pub-sub-issues
NickCraver Jan 25, 2022
5d0ee35
Merge remote-tracking branch 'origin/craver/pub-sub-issues' into crav…
NickCraver Jan 26, 2022
f88e8e1
PR feedback!
NickCraver Jan 26, 2022
2185911
Adjust RedisServer.ExecuteSync/Async<T> for "don't care" cases
NickCraver Jan 26, 2022
3f694c5
Nick you dummy
NickCraver Jan 26, 2022
2edf892
PubSubMultiserver: separate channels on publishers
NickCraver Jan 26, 2022
221509a
Merge remote-tracking branch 'origin/main' into craver/backlog-v2.5
NickCraver Feb 4, 2022
b77d0a3
CI Tweaks (#1970)
NickCraver Feb 4, 2022
04ce70b
Seal backlog
NickCraver Feb 4, 2022
2c2b0ea
Backlog tests: add QueuesAndFlushesAfterReconnectingClusterAsync
NickCraver Feb 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ jobs:
with:
dotnet-version: |
3.1.x
5.0.x
6.0.x
- name: .NET Build
run: dotnet build Build.csproj -c Release /p:CI=true
Expand Down
43 changes: 43 additions & 0 deletions src/StackExchange.Redis/BacklogPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace StackExchange.Redis
{
/// <summary>
/// The backlog policy to use for commands. This policy comes into effect when a connection is unhealthy or unavailable.
/// The policy can choose to backlog commands and wait to try them (within their timeout) against a connection when it comes up,
/// or it could choose to fail fast and throw ASAP. Different apps desire different behaviors with backpressure and how to handle
/// large amounts of load, so this is configurable to optimize the happy path but avoid spiral-of-death queue scenarios for others.
/// </summary>
public class BacklogPolicy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this designed for inheritance? if not, recommend sealed

{
/// <summary>
/// Backlog behavior matching StackExchange.Redis's 2.x line, failing fast and not attempting to queue
/// and retry when a connection is available again.
/// </summary>
public static BacklogPolicy FailFast { get; } = new()
{
QueueWhileDisconnected = false,
AbortPendingOnConnectionFailure = true,
};

/// <summary>
/// Default backlog policy which will allow commands to be issues against an endpoint and queue up.
/// Commands are still subject to their async timeout (which serves as a queue size check).
/// </summary>
public static BacklogPolicy Default { get; } = new()
{
QueueWhileDisconnected = true,
AbortPendingOnConnectionFailure = false,
};

/// <summary>
/// Whether to queue commands while disconnected.
/// True means queue for attempts up until their timeout.
/// False means to fail ASAP and queue nothing.
/// </summary>
public bool QueueWhileDisconnected { get; init; }

/// <summary>
/// Whether to immediately abandon (with an exception) all pending commands when a connection goes unhealthy.
/// </summary>
public bool AbortPendingOnConnectionFailure { get; init; }
}
}
12 changes: 12 additions & 0 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public static string TryNormalize(string value)

private IReconnectRetryPolicy reconnectRetryPolicy;

private BacklogPolicy backlogPolicy;

/// <summary>
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
/// that this cannot be specified in the configuration-string.
Expand Down Expand Up @@ -372,6 +374,15 @@ public IReconnectRetryPolicy ReconnectRetryPolicy
set => reconnectRetryPolicy = value;
}

/// <summary>
/// The backlog policy to be used for commands when a connection is unhealthy.
/// </summary>
public BacklogPolicy BacklogPolicy
{
get => backlogPolicy ?? BacklogPolicy.Default;
set => backlogPolicy = value;
}

/// <summary>
/// Indicates whether endpoints should be resolved via DNS before connecting.
/// If enabled the ConnectionMultiplexer will not re-resolve DNS
Expand Down Expand Up @@ -541,6 +552,7 @@ public ConfigurationOptions Clone()
responseTimeout = responseTimeout,
DefaultDatabase = DefaultDatabase,
ReconnectRetryPolicy = reconnectRetryPolicy,
BacklogPolicy = backlogPolicy,
SslProtocols = SslProtocols,
checkCertificateRevocation = checkCertificateRevocation,
};
Expand Down
20 changes: 14 additions & 6 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,15 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
/// <param name="key">The key to get a hash slot ID for.</param>
public int HashSlot(RedisKey key) => ServerSelectionStrategy.HashSlot(key);

internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags)
internal ServerEndPoint AnyServer(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags, bool allowDisconnected)
{
var tmp = GetServerSnapshot();
int len = tmp.Length;
ServerEndPoint fallback = null;
for (int i = 0; i < len; i++)
{
var server = tmp[(int)(((uint)i + startOffset) % len)];
if (server != null && server.ServerType == serverType && server.IsSelectable(command))
if (server != null && server.ServerType == serverType && server.IsSelectable(command, allowDisconnected))
{
if (server.IsReplica)
{
Expand Down Expand Up @@ -1898,7 +1898,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
}
if (!first)
{
long subscriptionChanges = ValidateSubscriptions();
long subscriptionChanges = await EnsureSubscriptionsAsync();
if (subscriptionChanges == 0)
{
log?.WriteLine("No subscription changes necessary");
Expand Down Expand Up @@ -2168,6 +2168,12 @@ private bool PrepareToPushMessageToBridge<T>(Message message, ResultProcessor<T>
{
// Infer a server automatically
server = SelectServer(message);

// If we didn't find one successfully, and we're allowed, queue for any viable server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am understand it correctly, I think it will be interesting to test this logic with a clustered cache. In case of a failure, the logic will select any viable server. This viable server might return a moved exception and the message will get redirected to the right endpoint. It should be fine if eventually it gets to the right endpoint. Does that sound correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! Let's make sure in tests though - I'll add some cluster tests around this to the PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added additional testing in 2c2b0ea!

if (server == null && message != null && RawConfig.BacklogPolicy.QueueWhileDisconnected)
{
server = ServerSelectionStrategy.Select(message, allowDisconnected: true);
}
}
else // a server was specified; do we trust their choice, though?
{
Expand All @@ -2185,7 +2191,9 @@ private bool PrepareToPushMessageToBridge<T>(Message message, ResultProcessor<T>
}
break;
}
if (!server.IsConnected)

// If we're not allowed to queue while disconnected, we'll bomb out below.
if (!server.IsConnected && !RawConfig.BacklogPolicy.QueueWhileDisconnected)
{
// well, that's no use!
server = null;
Expand Down Expand Up @@ -2325,7 +2333,7 @@ internal void InitializeSentinel(LogProxy logProxy)
}
}
}
});
}, CommandFlags.FireAndForget);
}

// If we lose connection to a sentinel server,
Expand All @@ -2344,7 +2352,7 @@ internal void InitializeSentinel(LogProxy logProxy)
{
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
UpdateSentinelAddressList(messageParts[0]);
});
}, CommandFlags.FireAndForget);
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/StackExchange.Redis/Enums/CommandFlags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public enum CommandFlags
/// </summary>
NoScriptCache = 512,

// 1024: used for timed-out; never user-specified, so not visible on the public API

// 2048: Use subscription connection type; never user-specified, so not visible on the public API
// 1024: Use subscription connection type; never user-specified, so not visible on the public API
}
}
43 changes: 18 additions & 25 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
using static StackExchange.Redis.ConnectionMultiplexer;

Expand Down Expand Up @@ -60,8 +58,7 @@ internal abstract class Message : ICompletable

private const CommandFlags AskingFlag = (CommandFlags)32,
ScriptUnavailableFlag = (CommandFlags)256,
NeedsAsyncTimeoutCheckFlag = (CommandFlags)1024,
DemandSubscriptionConnection = (CommandFlags)2048;
DemandSubscriptionConnection = (CommandFlags)1024;

private const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster
| CommandFlags.DemandReplica
Expand Down Expand Up @@ -589,7 +586,7 @@ internal bool TrySetResult<T>(T value)
internal void SetEnqueued(PhysicalConnection connection)
{
SetWriteTime();
performance?.SetEnqueued();
performance?.SetEnqueued(connection?.BridgeCouldBeNull?.ConnectionType);
_enqueuedTo = connection;
if (connection == null)
{
Expand Down Expand Up @@ -645,10 +642,7 @@ internal void SetRequestSent()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void SetWriteTime()
{
if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0)
{
_writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that
}
_writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that
}
private int _writeTickCount;
public int GetWriteTime() => Volatile.Read(ref _writeTickCount);
Expand All @@ -662,21 +656,17 @@ internal void SetWriteTime()
/// </summary>
internal void SetForSubscriptionBridge() => Flags |= DemandSubscriptionConnection;

private void SetNeedsTimeoutCheck() => Flags |= NeedsAsyncTimeoutCheckFlag;
internal bool HasAsyncTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken)
/// <summary>
/// Checks if this message has violated the provided timeout.
/// Whether it's a sync operation in a .Wait() or in the backlog queue or written/pending asynchronously, we need to timeout everything.
/// ...or we get indefinite Task hangs for completions.
/// </summary>
internal bool HasTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken)
{
if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0)
millisecondsTaken = unchecked(now - _writeTickCount); // note: we can't just check "if sent < cutoff" because of wrap-aro
if (millisecondsTaken >= timeoutMilliseconds)
{
millisecondsTaken = unchecked(now - _writeTickCount); // note: we can't just check "if sent < cutoff" because of wrap-aro
if (millisecondsTaken >= timeoutMilliseconds)
{
Flags &= ~NeedsAsyncTimeoutCheckFlag; // note: we don't remove it from the queue - still might need to marry it up; but: it is toast
return true;
}
}
else
{
millisecondsTaken = default;
return true;
}
return false;
}
Expand All @@ -695,16 +685,17 @@ internal void SetPreferMaster() =>
internal void SetPreferReplica() =>
Flags = (Flags & ~MaskMasterServerPreference) | CommandFlags.PreferReplica;

/// <remarks>
/// Note order here reversed to prevent overload resolution errors
/// </remarks>
internal void SetSource(ResultProcessor resultProcessor, IResultBox resultBox)
{ // note order here reversed to prevent overload resolution errors
if (resultBox != null && resultBox.IsAsync) SetNeedsTimeoutCheck();
{
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}

internal void SetSource<T>(IResultBox<T> resultBox, ResultProcessor<T> resultProcessor)
{
if (resultBox != null && resultBox.IsAsync) SetNeedsTimeoutCheck();
this.resultBox = resultBox;
this.resultProcessor = resultProcessor;
}
Expand Down Expand Up @@ -735,6 +726,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i
}

public override string CommandAndKey => Command + " " + Channel;

public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
}

internal abstract class CommandKeyBase : Message
Expand Down
Loading