Skip to content

Commit 158dbe2

Browse files
authored
Merge pull request #24 from Cysharp/feature/ShardedPubSub
Adds support for Redis Sharded Pub/Sub
2 parents c419bc0 + 10bc452 commit 158dbe2

File tree

9 files changed

+521
-37
lines changed

9 files changed

+521
-37
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.6.0" />
1313
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
1414
<PackageVersion Include="NATS.Net" Version="2.2.2" />
15-
<PackageVersion Include="StackExchange.Redis" Version="2.7.33" />
15+
<PackageVersion Include="StackExchange.Redis" Version="2.8.41" />
1616
<PackageVersion Include="Testcontainers.Nats" Version="3.8.0" />
1717
<PackageVersion Include="Testcontainers.Redis" Version="3.8.0" />
1818
<PackageVersion Include="xunit" Version="2.9.2" />
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using StackExchange.Redis;
2+
3+
namespace Cysharp.Runtime.Multicast.Distributed.Redis;
4+
5+
/// <summary>
6+
/// Provides factory methods for creating <see cref="RedisChannel"/> instances with predefined naming conventions.
7+
/// </summary>
8+
public static class RedisChannelFactory
9+
{
10+
private const string Prefix = "Multicaster.Group?name=";
11+
12+
/// <summary>
13+
/// Gets a factory that creates <see cref="RedisChannel"/> instances.
14+
/// </summary>
15+
public static Func<string, RedisChannel> Default { get; } = static (groupName) => RedisChannel.Literal($"{Prefix}{groupName}");
16+
17+
/// <summary>
18+
/// Gets a factory that creates <see cref="RedisChannel"/> instances for use with Sharded Pub/Sub.
19+
/// </summary>
20+
public static Func<string, RedisChannel> Sharded { get; } = static (groupName) => RedisChannel.Sharded($"{Prefix}{groupName}");
21+
}

src/Multicaster.Distributed.Redis/RedisGroup.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ internal class RedisGroup<TKey, T> : IMulticastAsyncGroup<TKey, T>, IMulticastSy
2828

2929
internal string Name { get; }
3030

31-
public RedisGroup(string name, ISubscriber subscriber, IRemoteProxyFactory proxyFactory, IRemoteSerializer serializer, MessagePackSerializerOptions messagePackSerializerOptions, Action<RedisGroup<TKey, T>> onDisposeAction)
31+
public RedisGroup(string name, Func<string, RedisChannel> redisChannelFactory, ISubscriber subscriber, IRemoteProxyFactory proxyFactory, IRemoteSerializer serializer, MessagePackSerializerOptions messagePackSerializerOptions, Action<RedisGroup<TKey, T>> onDisposeAction)
3232
{
3333
Name = name;
3434
_subscriber = subscriber;
3535
_proxyFactory = proxyFactory;
3636
_serializer = serializer;
37-
_channel = new RedisChannel($"Multicaster.Group?name={name}", RedisChannel.PatternMode.Literal);
37+
_channel = redisChannelFactory(name);
3838
_messagePackSerializerOptionsForKey = messagePackSerializerOptions;
3939
_onDisposeAction = onDisposeAction;
4040

@@ -354,4 +354,4 @@ private void ThrowIfDisposed()
354354
throw new ObjectDisposedException(nameof(RedisGroup<TKey, T>));
355355
}
356356
}
357-
}
357+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using MessagePack;
2+
using StackExchange.Redis;
3+
4+
namespace Cysharp.Runtime.Multicast.Distributed.Redis;
5+
6+
/// <summary>
7+
/// Represents configuration options for connecting to and interacting with a Redis group.
8+
/// </summary>
9+
/// <remarks>This class provides various settings to configure Redis connectivity, key serialization, and channel
10+
/// creation. Use these options to customize the behavior of Redis-based operations, such as specifying a connection
11+
/// string, providing a custom <see cref="ConnectionMultiplexer"/>, or defining a key prefix.</remarks>
12+
public class RedisGroupOptions
13+
{
14+
/// <summary>
15+
/// Gets or sets the connection string to connect to Redis. If <see cref="ConnectionMultiplexer"/> property is not set, this will be used.
16+
/// </summary>
17+
public string ConnectionString { get; set; } = "localhost:6379";
18+
19+
/// <summary>
20+
/// Gets or sets a ConnectionMultiplexer instance to connect to Redis. If this is set, <see cref="ConnectionString"/> property will be ignored.
21+
/// </summary>
22+
public ConnectionMultiplexer? ConnectionMultiplexer { get; set; }
23+
24+
/// <summary>
25+
/// Gets or sets a prefix for the Redis key.
26+
/// </summary>
27+
public string? Prefix { get; set; }
28+
29+
/// <summary>
30+
/// Gets or sets a MessagePackSerializerOptions used for serializing the key.
31+
/// </summary>
32+
public MessagePackSerializerOptions? MessagePackSerializerOptionsForKey { get; set; }
33+
34+
/// <summary>
35+
/// Gets or sets the factory method used to create <see cref="RedisChannel"/> instances. Default is <see cref="RedisChannelFactory.Default"/>.
36+
/// </summary>
37+
/// <remarks>Use this property to customize how <see cref="RedisChannel"/> instances are created, such as
38+
/// applying specific naming conventions or transformations to channel names.</remarks>
39+
public Func<string, RedisChannel> ChannelFactory { get; set; } = RedisChannelFactory.Default;
40+
}

src/Multicaster.Distributed.Redis/RedisGroupProvider.cs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
using System.Collections.Concurrent;
2+
23
using Cysharp.Runtime.Multicast.Remoting;
34

45
using MessagePack;
56
using MessagePack.Formatters;
67
using MessagePack.Resolvers;
78

89
using Microsoft.Extensions.Options;
10+
911
using StackExchange.Redis;
1012

1113
namespace Cysharp.Runtime.Multicast.Distributed.Redis;
1214

15+
/// <summary>
16+
/// Provides functionality for managing multicast groups using Redis as the underlying communication mechanism.
17+
/// </summary>
18+
/// <remarks>This class allows the creation and management of asynchronous and synchronous multicast groups, where
19+
/// messages can be sent to multiple receivers. It uses Redis channels for communication and supports custom
20+
/// serialization and deserialization of keys and messages.</remarks>
1321
public class RedisGroupProvider : IMulticastGroupProvider, IDisposable
1422
{
1523
private readonly ConcurrentDictionary<(string Name, Type KeyType, Type ReceiverType), object> _groups = new();
@@ -19,11 +27,32 @@ public class RedisGroupProvider : IMulticastGroupProvider, IDisposable
1927
private readonly ISubscriber _subscriber;
2028
private readonly string? _prefix;
2129
private readonly MessagePackSerializerOptions _messagePackSerializerOptionsForKey;
30+
private readonly Func<string, RedisChannel> _channelFactory;
2231

32+
/// <summary>
33+
/// Initializes a new instance of the <see cref="RedisGroupProvider"/> class with the specified proxy factory,
34+
/// serializer, and configuration options.
35+
/// </summary>
36+
/// <param name="proxyFactory">The factory used to create remote proxies for interacting with Redis.</param>
37+
/// <param name="serializer">The serializer used to serialize and deserialize data for Redis operations.</param>
38+
/// <param name="options">The configuration options for the Redis group provider.</param>
2339
public RedisGroupProvider(IRemoteProxyFactory proxyFactory, IRemoteSerializer serializer, IOptions<RedisGroupOptions> options)
2440
: this(proxyFactory, serializer, options.Value)
2541
{}
2642

43+
/// <summary>
44+
/// Initializes a new instance of the <see cref="RedisGroupProvider"/> class, which provides functionality for
45+
/// managing Redis-based group communication using a specified remote proxy factory, serializer, and configuration
46+
/// options.
47+
/// </summary>
48+
/// <remarks>If the <see cref="RedisGroupOptions.ConnectionMultiplexer"/> property in <paramref
49+
/// name="options"/> is <see langword="null"/>, a new connection multiplexer is created using the connection string
50+
/// specified in <see cref="RedisGroupOptions.ConnectionString"/>. Otherwise, the provided connection multiplexer is
51+
/// used.</remarks>
52+
/// <param name="proxyFactory">The factory used to create remote proxies for communication. This parameter cannot be <see langword="null"/>.</param>
53+
/// <param name="serializer">The serializer used to serialize and deserialize messages. This parameter cannot be <see langword="null"/>.</param>
54+
/// <param name="options">The configuration options for the Redis group provider, including connection settings, channel factory, and
55+
/// serialization options. This parameter cannot be <see langword="null"/>.</param>
2756
public RedisGroupProvider(IRemoteProxyFactory proxyFactory, IRemoteSerializer serializer, RedisGroupOptions options)
2857
{
2958
_proxyFactory = proxyFactory;
@@ -39,52 +68,33 @@ public RedisGroupProvider(IRemoteProxyFactory proxyFactory, IRemoteSerializer se
3968
_subscriber = options.ConnectionMultiplexer.GetSubscriber();
4069
}
4170
_prefix = options.Prefix;
71+
_channelFactory = options.ChannelFactory;
4272

4373
var messagePackSerializerOptions = (options.MessagePackSerializerOptionsForKey ?? MessagePackSerializer.DefaultOptions);
4474
_messagePackSerializerOptionsForKey = messagePackSerializerOptions.WithResolver(
4575
CompositeResolver.Create([NativeGuidFormatter.Instance], [messagePackSerializerOptions.Resolver])
4676
);
4777
}
4878

79+
/// <inheritdoc />
4980
public IMulticastAsyncGroup<TKey, TReceiver> GetOrAddGroup<TKey, TReceiver>(string name)
5081
where TKey : IEquatable<TKey>
51-
=> (IMulticastAsyncGroup<TKey, TReceiver>)_groups.GetOrAdd((name, typeof(TKey), typeof(TReceiver)), _ => new RedisGroup<TKey, TReceiver>(_prefix + name, _subscriber, _proxyFactory, _serializer, _messagePackSerializerOptionsForKey, Remove));
82+
=> (IMulticastAsyncGroup<TKey, TReceiver>)_groups.GetOrAdd((name, typeof(TKey), typeof(TReceiver)), _ => new RedisGroup<TKey, TReceiver>(_prefix + name, _channelFactory, _subscriber, _proxyFactory, _serializer, _messagePackSerializerOptionsForKey, Remove));
5283

84+
/// <inheritdoc />
5385
public IMulticastSyncGroup<TKey, TReceiver> GetOrAddSynchronousGroup<TKey, TReceiver>(string name)
5486
where TKey : IEquatable<TKey>
55-
=> (IMulticastSyncGroup<TKey, TReceiver>)_groups.GetOrAdd((name, typeof(TKey), typeof(TReceiver)), _ => new RedisGroup<TKey, TReceiver>(_prefix + name, _subscriber, _proxyFactory, _serializer, _messagePackSerializerOptionsForKey, Remove));
87+
=> (IMulticastSyncGroup<TKey, TReceiver>)_groups.GetOrAdd((name, typeof(TKey), typeof(TReceiver)), _ => new RedisGroup<TKey, TReceiver>(_prefix + name, _channelFactory, _subscriber, _proxyFactory, _serializer, _messagePackSerializerOptionsForKey, Remove));
5688

5789
private void Remove<TKey, TReceiver>(RedisGroup<TKey, TReceiver> group)
5890
where TKey : IEquatable<TKey>
5991
{
6092
_groups.TryRemove((group.Name, typeof(TKey), typeof(TReceiver)), out _);
6193
}
6294

95+
/// <inheritdoc />
6396
public void Dispose()
6497
{
6598
_createdConnectionMultiplexer?.Dispose();
6699
}
67100
}
68-
69-
public class RedisGroupOptions
70-
{
71-
/// <summary>
72-
/// Gets or sets the connection string to connect to Redis. If <see cref="ConnectionMultiplexer"/> property is not set, this will be used.
73-
/// </summary>
74-
public string ConnectionString { get; set; } = "localhost:6379";
75-
76-
/// <summary>
77-
/// Gets or sets a ConnectionMultiplexer instance to connect to Redis. If this is set, <see cref="ConnectionString"/> property will be ignored.
78-
/// </summary>
79-
public ConnectionMultiplexer? ConnectionMultiplexer { get; set; }
80-
81-
/// <summary>
82-
/// Gets or sets a prefix for the Redis key.
83-
/// </summary>
84-
public string? Prefix { get; set; }
85-
86-
/// <summary>
87-
/// Gets or sets a MessagePackSerializerOptions used for serializing the key.
88-
/// </summary>
89-
public MessagePackSerializerOptions? MessagePackSerializerOptionsForKey { get; set; }
90-
}

0 commit comments

Comments
 (0)