-
Notifications
You must be signed in to change notification settings - Fork 1.1k
DistributedPubSub
: clearer logging when DeadLetter
publishing due to no subscribers
#7646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e1f13f4
9f625df
e51228f
9bbba7e
9bdd9f9
99abc7d
77fdd9f
6706326
3cb5e84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
# Akka.NET Agent Guidelines | ||
|
||
## Build/Test Commands | ||
- Build solution: `dotnet build` | ||
- Build with warnings as errors: `dotnet build -warnaserror` | ||
- Run all tests: `dotnet test -c Release` | ||
- Run specific test: `dotnet test -c Release --filter DisplayName="TestName"` or `dotnet test path/to/project.csproj` | ||
- Format check: `dotnet format --verify-no-changes` | ||
|
||
## Git Repository Management | ||
- Setup remotes: | ||
- `git remote add upstream https://github.com/akkadotnet/akka.net.git` (main repository) | ||
- `git remote add origin https://github.com/yourusername/akka.net.git` (your fork) | ||
- Sync with upstream: | ||
- `git fetch upstream` (get latest changes from main repo) | ||
- `git checkout dev` (switch to dev branch) | ||
- `git merge upstream/dev` (merge changes from upstream) | ||
- `git push origin dev` (update your fork) | ||
- Create feature branch: | ||
- `git checkout -b feature/your-feature-name` (create and switch to new branch) | ||
- `git push -u origin feature/your-feature-name` (push branch to your fork) | ||
|
||
## Code Style Guidelines | ||
- Use Allman style brackets for C# code (opening brace on new line) | ||
- 4 spaces for indentation | ||
- Prefer "var" everywhere when type is apparent | ||
- Private fields start with `_` (underscore), PascalCase for public/protected members | ||
- No "this." qualifier when unnecessary | ||
- Use exceptions for error handling (IllegalStateException for invalid states) | ||
- Sort using statements with System.* appearing first | ||
- XML comments for public APIs | ||
- Name tests with descriptive `DisplayName=` attributes | ||
- Default to `sealed` classes and records for data objects | ||
- Enable nullability in new/modified files with `#nullable enable` | ||
- Never use `async void`, `.Result`, or `.Wait()` - these cause deadlocks | ||
- Always pass `CancellationToken` in async methods | ||
|
||
## API Approvals | ||
- Run API approval tests when making public API changes: `dotnet test -c Release src/core/Akka.API.Tests` | ||
- Approval files are located at `src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt` | ||
- Install a diff viewer like WinMerge or TortoiseMerge to approve API changes | ||
- Follow extend-only design principles - don't modify existing public APIs, only extend them | ||
- Mark deprecated APIs with `[Obsolete("Obsolete since v{current-akka-version}")]` | ||
|
||
## Conventions | ||
- Stay close to JVM Akka where applicable but be .NET idiomatic | ||
- Use Task<T> instead of Future, TimeSpan instead of Duration | ||
- Include unit tests with changes | ||
- Preserve public API and wire compatibility | ||
- Keep pull requests small and focused (<300 lines when possible) | ||
- Fix warnings instead of suppressing them | ||
- Treat TBD comments as action items to be resolved | ||
- Benchmark performance-critical code changes with BenchmarkDotNet | ||
- Avoid adding new dependencies without license/security checks | ||
|
||
## Akka.NET TestKit Guidelines | ||
- Actor tests should derive from `AkkaSpec` or `TestKit` to access actor testing facilities | ||
- Pass `ITestOutputHelper output` to the constructor and base constructor: `public MySpec(ITestOutputHelper output) : base(config, output)` | ||
- Use the `ITestOutputHelper` output for debugging: it captures all test output including actor system logs | ||
- Configure proper logging in tests: `akka.loglevel = DEBUG` or `akka.loglevel = INFO` | ||
- Use `EventFilter` to assert on log messages (e.g., `EventFilter.Error().ExpectOne(() => { /* test code */ });`) | ||
- For testing deadletters, use `EventFilter.DeadLetter().Expect(1, () => { /* code that should produce dead letter */ });` | ||
- Test message assertions using `ExpectMsg<T>()`, `ExpectNoMsg()`, or `FishForMessage<T>()` | ||
- Set explicit timeouts for message expectations to avoid long-running tests | ||
- Use `TestProbe` to create lightweight test actors to verify interactions | ||
- Tests should clean up after themselves (stop created actors, reset state) | ||
- To test specialized message types, verify the type wrapper in logs: `wrapped in [$TypeName]` | ||
|
||
## Repository Landmarks | ||
- `src/` - All runtime / library code | ||
- `src/benchmark/` - Micro-benchmarks (BenchmarkDotNet) | ||
- `src/…Tests/` - xUnit test projects | ||
- `docs/community/contributing/` - Contributor policies & style guides | ||
- `docs/` - Public facing documentation |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="DistributedPubSubDeadLetterSpec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Cluster.Tools.PublishSubscribe; | ||
using Akka.Configuration; | ||
using Akka.Event; | ||
using Akka.TestKit; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace Akka.Cluster.Tools.Tests.PublishSubscribe | ||
{ | ||
public class DistributedPubSubDeadLetterSpec : AkkaSpec | ||
{ | ||
public DistributedPubSubDeadLetterSpec(ITestOutputHelper output) : base(GetConfig(), output) | ||
{ | ||
} | ||
|
||
public static Config GetConfig() | ||
{ | ||
return ConfigurationFactory.ParseString( | ||
@"akka.actor.provider = cluster" | ||
+ "\nakka.loglevel = INFO" | ||
+ "\nakka.log-dead-letters = on"); | ||
} | ||
|
||
[Fact] | ||
public async Task DistributedPubSubMediator_should_send_specialized_dead_letter_message_when_no_subscribers() | ||
{ | ||
// arrange | ||
var mediator = DistributedPubSub.Get(Sys).Mediator; | ||
var testMessage = "test-message"; | ||
|
||
// act - publish to a topic that no one is subscribed to | ||
await EventFilter.Info(contains: "DeadLetterWithNoSubscribers") | ||
.ExpectAsync(1, () => | ||
{ | ||
mediator.Tell(new Publish("unused-topic", testMessage)); | ||
return Task.CompletedTask; | ||
}); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="DeadLetterWithNoSubscribers.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using Akka.Actor; | ||
using Akka.Event; | ||
|
||
namespace Akka.Cluster.Tools.PublishSubscribe | ||
{ | ||
/// <summary> | ||
/// Special case of Dead Letter that explicitly indicates the message was sent to | ||
/// DeadLetters because there were no subscribers for the topic in DistributedPubSub, | ||
/// NOT because the mediator itself is dead. | ||
/// </summary> | ||
internal sealed class DeadLetterWithNoSubscribers : AllDeadLetters | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
{ | ||
/// <summary> | ||
/// Initializes a new instance of the <see cref="DeadLetterWithNoSubscribers"/> class. | ||
/// </summary> | ||
/// <param name="message">The original message that could not be delivered.</param> | ||
/// <param name="topic">The topic that the message was sent to.</param> | ||
/// <param name="sender">The actor that sent the message.</param> | ||
/// <param name="recipient">The actor that was to receive the message (usually the mediator itself).</param> | ||
public DeadLetterWithNoSubscribers(object message, string? topic, IActorRef sender, IActorRef recipient) | ||
: base(message, sender, recipient) | ||
{ | ||
Topic = topic; | ||
} | ||
|
||
public string? Topic { get; } | ||
|
||
/// <summary> | ||
/// Returns a string that represents the current object. | ||
/// </summary> | ||
/// <returns>A string that represents the current object.</returns> | ||
public override string ToString() | ||
{ | ||
return $"DeadLetterWithNoSubscribers from {Sender} to {Recipient}: <{Message}> - No subscribers found for topic {Topic}"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Topic should never actually be |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
//----------------------------------------------------------------------- | ||
//----------------------------------------------------------------------- | ||
// <copyright file="DistributedPubSubMediator.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
|
@@ -500,10 +500,23 @@ private void PutToRegistry(string key, IActorRef value) | |
_registry[_cluster.SelfAddress] = new Bucket(bucket.Owner, v, bucket.Content.SetItem(key, new ValueHolder(v, value))); | ||
} | ||
|
||
private void IgnoreOrSendToDeadLetters(object message) | ||
private void IgnoreOrSendToDeadLetters(IWrappedMessage message) | ||
{ | ||
if (_settings.SendToDeadLettersWhenNoSubscribers) | ||
Context.System.DeadLetters.Tell(new DeadLetter(message, Sender, Context.Self)); | ||
{ | ||
var topic = message switch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tries to extract the topic from the message so we can be as accurate as possible during the |
||
{ | ||
Publish publish => publish.Topic, | ||
Send send => $"Send:{send.Path}", | ||
_ => null | ||
}; | ||
|
||
// Use the specialized DeadLetterWithNoSubscribers class to clearly indicate | ||
// that the message was not delivered because there were no subscribers, | ||
// not because the mediator itself is dead. | ||
var deadLetter = new DeadLetterWithNoSubscribers(message, topic, Sender, Context.Self); | ||
Context.System.DeadLetters.Tell(deadLetter); | ||
} | ||
} | ||
|
||
private void PublishMessage(string path, IWrappedMessage publish, bool allButSelf = false) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validates that we get a more descriptive log message indicating that this message got
DeadLetter
'd because there were no subscribers