Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ Current package versions:

## Unreleased

No pending changes.
- Fix [#2400](https://github.com/StackExchange/StackExchange.Redis/issues/2400): Expose `ChannelMessageQueue` as `IAsyncEnumerable<ChannelMessage>` ([#2402 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2402))

## 2.6.96

- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script paramters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script parameters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
- Fix [#2362](https://github.com/StackExchange/StackExchange.Redis/issues/2362): Set `RedisConnectionException.FailureType` to `AuthenticationFailure` on all authentication scenarios for better handling ([#2367 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2367))
- Fix [#2368](https://github.com/StackExchange/StackExchange.Redis/issues/2368): Support `RedisValue.Length()` for all storage types ([#2370 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2370))
- Fix [#2376](https://github.com/StackExchange/StackExchange.Redis/issues/2376): Avoid a (rare) deadlock scenario ([#2378 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2378))
Expand Down
26 changes: 21 additions & 5 deletions src/StackExchange.Redis/ChannelMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -66,7 +68,7 @@ internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in R
/// To create a ChannelMessageQueue, use <see cref="ISubscriber.Subscribe(RedisChannel, CommandFlags)"/>
/// or <see cref="ISubscriber.SubscribeAsync(RedisChannel, CommandFlags)"/>.
/// </remarks>
public sealed class ChannelMessageQueue
public sealed class ChannelMessageQueue : IAsyncEnumerable<ChannelMessage>
{
private readonly Channel<ChannelMessage> _queue;
/// <summary>
Expand Down Expand Up @@ -319,10 +321,7 @@ internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = Comm
{
var parent = _parent;
_parent = null;
if (parent != null)
{
parent.UnsubscribeAsync(Channel, null, this, flags);
}
parent?.UnsubscribeAsync(Channel, null, this, flags);
_queue.Writer.TryComplete(error);
}

Expand All @@ -348,5 +347,22 @@ internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags f
/// </summary>
/// <param name="flags">The flags to use when unsubscribing.</param>
public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);

/// <inheritdoc cref="IAsyncEnumerable{ChannelMessage}.GetAsyncEnumerator(CancellationToken)"/>
#if NETCOREAPP3_0_OR_GREATER
public IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationToken cancellationToken = default)
=> _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
#else
public async IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_queue.Reader.TryRead(out var item))
{
yield return item;
}
}
}
#endif
}
}
1 change: 1 addition & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ StackExchange.Redis.ChannelMessage.SubscriptionChannel.get -> StackExchange.Redi
StackExchange.Redis.ChannelMessageQueue
StackExchange.Redis.ChannelMessageQueue.Channel.get -> StackExchange.Redis.RedisChannel
StackExchange.Redis.ChannelMessageQueue.Completion.get -> System.Threading.Tasks.Task!
StackExchange.Redis.ChannelMessageQueue.GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerator<StackExchange.Redis.ChannelMessage>!
StackExchange.Redis.ChannelMessageQueue.OnMessage(System.Action<StackExchange.Redis.ChannelMessage>! handler) -> void
StackExchange.Redis.ChannelMessageQueue.OnMessage(System.Func<StackExchange.Redis.ChannelMessage, System.Threading.Tasks.Task!>! handler) -> void
StackExchange.Redis.ChannelMessageQueue.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<StackExchange.Redis.ChannelMessage>
Expand Down
34 changes: 34 additions & 0 deletions tests/StackExchange.Redis.Tests/PubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,40 @@ private void TestMassivePublish(ISubscriber sub, string channel, string caption)
Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds + 3000, caption);
}

[Fact]
public async Task SubscribeAsyncEnumerable()
{
using var conn = Create(syncTimeout: 20000, shared: false, log: Writer);

var sub = conn.GetSubscriber();
RedisChannel channel = Me();

const int TO_SEND = 5;
var gotall = new TaskCompletionSource<int>();

var source = await sub.SubscribeAsync(channel);
var op = Task.Run(async () => {
int count = 0;
await foreach (var item in source)
{
count++;
if (count == TO_SEND) gotall.TrySetResult(count);
}
return count;
});

for (int i = 0; i < TO_SEND; i++)
{
await sub.PublishAsync(channel, i);
}
await gotall.Task.WithTimeout(5000);

// check the enumerator exits cleanly
sub.Unsubscribe(channel);
var count = await op.WithTimeout(1000);
Assert.Equal(5, count);
}

[Fact]
public async Task PubSubGetAllAnyOrder()
{
Expand Down