Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public static Props Props(DistributedPubSubSettings settings)
private readonly ILoggingAdapter _log;
private readonly Dictionary<Address, Bucket> _registry = new();

private readonly string _topicPrefix;
private readonly PubSubCache _cache;

public ITimerScheduler Timers { get; set; }

/// <summary>
Expand Down Expand Up @@ -161,6 +164,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
_pruneInterval = new TimeSpan(_settings.RemovedTimeToLive.Ticks / 2);
_buffer = new PerGroupingBuffer();

_topicPrefix = Self.Path.ToStringWithoutAddress();
_cache = new PubSubCache();

Receive<Send>(send =>
{
var routees = new List<Routee>();
Expand Down Expand Up @@ -193,15 +199,17 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive<SendToAll>(sendToAll =>
{
// TODO: Investigate this, this code looks very sketchy
PublishMessage(sendToAll.Path, sendToAll, sendToAll.ExcludeSelf);
});
Receive<Publish>(publish =>
{
var path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic));
var encodedTopic = _cache.EncodeName(publish.Topic);
var key = _cache.MakeKey(Self.Path, encodedTopic);
if (publish.SendOneMessageToEachGroup)
PublishToEachGroup(path, publish);
PublishToEachGroup(key, publish);
else
PublishMessage(path, publish);
PublishMessage(key, publish);
});
Receive<Put>(put =>
{
Expand All @@ -226,9 +234,9 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive<Subscribe>(subscribe =>
{
// each topic is managed by a child actor with the same name as the topic
var encodedTopic = Internal.Utils.EncodeName(subscribe.Topic);
var encodedTopic = _cache.EncodeName(subscribe.Topic);

_buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encodedTopic), subscribe, Sender, () =>
_buffer.BufferOr(_cache.MakeKey(Self.Path, encodedTopic), subscribe, Sender, () =>
{
var child = Context.Child(encodedTopic);
if (!child.IsNobody())
Expand Down Expand Up @@ -262,16 +270,17 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive<Unsubscribe>(unsubscribe =>
{
var encodedTopic = Internal.Utils.EncodeName(unsubscribe.Topic);
var encodedTopic = _cache.EncodeName(unsubscribe.Topic);

_buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encodedTopic), unsubscribe, Sender, () =>
_buffer.BufferOr(_cache.MakeKey(Self.Path, encodedTopic), unsubscribe, Sender, () =>
{
var child = Context.Child(encodedTopic);
if (!child.IsNobody())
child.Forward(unsubscribe);
else
{
// no such topic here
_cache.TryRemoveTopic(unsubscribe.Topic);
}
});
});
Expand Down Expand Up @@ -324,8 +333,13 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
var key = Internal.Utils.MakeKey(terminated.ActorRef);

if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
{
if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref))
{
PutToRegistry(key, null); // remove
_cache.TryRemoveKey(key, _topicPrefix);
}
}

_buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewTopicActor(terminated.ActorRef.Path.Name));
});
Expand Down Expand Up @@ -365,7 +379,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
var member = removed.Member;
if (member.Address == _cluster.SelfAddress)
{
Context.Stop(Self);
_cache.Clear();
}
else if (IsMatchingRole(member, _role))
{
_nodes.Remove(member.Address);
Expand All @@ -384,8 +401,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
});
Receive<CountSubscribers>(msg =>
{
var encTopic = Internal.Utils.EncodeName(msg.Topic);
_buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encTopic), msg, Sender, () =>
var encTopic = _cache.EncodeName(msg.Topic);
_buffer.BufferOr(_cache.MakeKey(Self.Path, encTopic), msg, Sender, () =>
{
var child = Context.Child(encTopic);
if (!child.IsNobody())
Expand Down Expand Up @@ -454,19 +471,15 @@ private IEnumerable<Bucket> CollectDelta(IImmutableDictionary<Address, long> ver

private IEnumerable<string> GetCurrentTopics()
{
var topicPrefix = Self.Path.ToStringWithoutAddress();
var topicPrefix = _topicPrefix;
foreach (var (_, bucket) in _registry)
{
foreach (var (key, _) in bucket.Content)
{
if (!key.StartsWith(topicPrefix))
var encodedTopic = Internal.Utils.KeyToEncodedTopic(key, topicPrefix);
if (key is null)
continue;

var topic = key[(topicPrefix.Length + 1)..];
if (!topic.Contains('/'))
{
yield return topic;
}
yield return encodedTopic;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ internal class Topic : TopicLike
{
private readonly RoutingLogic _routingLogic;
private readonly PerGroupingBuffer _buffer;
private readonly PubSubCache _cache;
private readonly string _topicPrefix;

/// <summary>
/// Creates a new topic actor
Expand All @@ -205,6 +207,9 @@ public Topic(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDea
{
_routingLogic = routingLogic;
_buffer = new PerGroupingBuffer();

_topicPrefix = Self.Path.ToStringWithoutAddress();
_cache = new PubSubCache();
}

/// <inheritdoc cref="TopicLike.Business"/>
Expand All @@ -213,8 +218,8 @@ protected override bool Business(object message)
switch (message)
{
case Subscribe { Group: not null } subscribe:
var encodedGroup = Utils.EncodeName(subscribe.Group);
_buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), subscribe, Sender, () =>
var encodedGroup = _cache.EncodeName(subscribe.Group);
_buffer.BufferOr(_cache.MakeKey(Self.Path, encodedGroup), subscribe, Sender, () =>
{
var child = Context.Child(encodedGroup);
if (!child.IsNobody())
Expand All @@ -234,8 +239,8 @@ protected override bool Business(object message)
return true;

case Unsubscribe { Group: not null } unsubscribe:
encodedGroup = Utils.EncodeName(unsubscribe.Group);
_buffer.BufferOr(Utils.MakeKey(Self.Path / encodedGroup), unsubscribe, Sender, () =>
encodedGroup = _cache.EncodeName(unsubscribe.Group);
_buffer.BufferOr(_cache.MakeKey(Self.Path, encodedGroup), unsubscribe, Sender, () =>
{
var child = Context.Child(encodedGroup);
if (!child.IsNobody())
Expand All @@ -245,6 +250,7 @@ protected override bool Business(object message)
else
{
// no such group here
_cache.TryRemoveTopic(unsubscribe.Group);
}
});
return true;
Expand All @@ -268,6 +274,7 @@ protected override bool Business(object message)
key = Utils.MakeKey(terminated.ActorRef);
_buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewGroupActor(terminated.ActorRef.Path.Name));
Remove(terminated.ActorRef);
_cache.TryRemoveKey(key, _topicPrefix);
return true;
}

Expand Down Expand Up @@ -308,17 +315,19 @@ public Group(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDea
/// <inheritdoc cref="TopicLike.Business"/>
protected override bool Business(object message)
{
if (message is SendToOneSubscriber send)
switch (message)
{
if (Subscribers.Count != 0)
{
var routees = Subscribers.Select(sub => (Routee)new ActorRefRoutee(sub)).ToArray();
case SendToOneSubscriber when Subscribers.Count == 0:
return true;

case SendToOneSubscriber send:
var routees = Subscribers.Select(Routee (sub) => new ActorRefRoutee(sub)).ToArray();
new Router(_routingLogic, routees).Route(Utils.WrapIfNeeded(send.Message), Sender);
}
return true;

default:
return false;
}
else return false;

return true;
}
}

Expand All @@ -329,6 +338,8 @@ protected override bool Business(object message)
/// </summary>
internal static class Utils
{
public readonly record struct MakeKeyInfo(ActorPath Path, string Topic);

private static readonly System.Text.RegularExpressions.Regex PathRegex = new("^/remote/.+(/user/.+)");

/// <summary>
Expand All @@ -344,40 +355,118 @@ internal static class Utils
/// </para>
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
[MethodImpl(MethodImplOptions.NoInlining)]
public static object WrapIfNeeded(object message)
{
return message is RouterEnvelope ? new MediatorRouterEnvelope(message) : message;
}

[MethodImpl(MethodImplOptions.NoInlining)]
public static string? KeyToEncodedTopic(string key, string topicPrefix)
{
if (!key.StartsWith(topicPrefix))
return null;

var topic = key[(topicPrefix.Length + 1)..];
return !topic.Contains('/') ? topic : null;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="actorRef">TBD</param>
/// <returns>TBD</returns>
#region Key related methods

[MethodImpl(MethodImplOptions.NoInlining)]
public static string MakeKey(IActorRef actorRef)
{
return MakeKey(actorRef.Path);
return PathRegex.Replace(actorRef.Path.ToStringWithoutAddress(), "$1");
}

/// <summary>
/// TBD
/// </summary>
/// <param name="name">TBD</param>
/// <returns>TBD</returns>
public static string EncodeName(string name)
[MethodImpl(MethodImplOptions.NoInlining)]
public static string MakeKey(this PubSubCache cache, ActorPath path, string topic)
{
return name == null ? null : Uri.EscapeDataString(name);
var info = new MakeKeyInfo(path, topic);
if(cache.MakeKeyMap.TryGetValue(info, out var key))
return key;

key = PathRegex.Replace((path / topic).ToStringWithoutAddress(), "$1");
cache.MakeKeyMap[info] = key;
cache.MakeKeyReverseMap[key] = info;
return key;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="path">TBD</param>
/// <returns>TBD</returns>
public static string MakeKey(ActorPath path)
[MethodImpl(MethodImplOptions.NoInlining)]
public static void TryRemoveKey(this PubSubCache cache, string key, string topicPrefix)
{
return PathRegex.Replace(path.ToStringWithoutAddress(), "$1");
if (cache.MakeKeyReverseMap.TryGetValue(key, out var keyInfo))
{
cache.MakeKeyMap.Remove(keyInfo);
cache.MakeKeyReverseMap.Remove(key);
}

var encodedTopic = Utils.KeyToEncodedTopic(key, topicPrefix);
if (encodedTopic == null)
return;

if (!cache.EncodedToTopicMap.TryGetValue(encodedTopic, out var topic))
return;

cache.TopicToEncodedMap.Remove(topic);
cache.EncodedToTopicMap.Remove(encodedTopic);
}

#endregion

#region Topic/group name related methods

[MethodImpl(MethodImplOptions.NoInlining)]
public static string EncodeName(this PubSubCache cache, string name)
{
if (string.IsNullOrWhiteSpace(name))
return null;

if (cache.TopicToEncodedMap.TryGetValue(name, out var encoded))
return encoded;

encoded = Uri.EscapeDataString(name);
cache.TopicToEncodedMap[name] = encoded;
cache.EncodedToTopicMap[encoded] = name;
return encoded;
}

[MethodImpl(MethodImplOptions.NoInlining)]
public static void TryRemoveEncodedTopic(this PubSubCache cache, string encodedTopic)
{
if (!cache.EncodedToTopicMap.TryGetValue(encodedTopic, out var topic))
return;

cache.TopicToEncodedMap.Remove(topic);
cache.EncodedToTopicMap.Remove(encodedTopic);
}

[MethodImpl(MethodImplOptions.NoInlining)]
public static void TryRemoveTopic(this PubSubCache cache, string topic)
{
if(!cache.TopicToEncodedMap.TryGetValue(topic, out var encodedTopic))
return;

cache.EncodedToTopicMap.Remove(encodedTopic);
cache.TopicToEncodedMap.Remove(topic);
}

#endregion

public static void Clear(this PubSubCache cache)
{
cache.TopicToEncodedMap.Clear();
cache.EncodedToTopicMap.Clear();
cache.MakeKeyMap.Clear();
cache.MakeKeyReverseMap.Clear();
}
}

internal sealed class PubSubCache
{
public readonly Dictionary<string, string> TopicToEncodedMap = new();
public readonly Dictionary<string, string> EncodedToTopicMap = new();
public readonly Dictionary<Utils.MakeKeyInfo, string> MakeKeyMap = new();
public readonly Dictionary<string, Utils.MakeKeyInfo> MakeKeyReverseMap = new();
}
}