Skip to content

Commit 7c58370

Browse files
committed
docs: Added AMQP transport benchmarks and updated documentation for v0.8.0
1 parent 1883eb8 commit 7c58370

File tree

4 files changed

+238
-7
lines changed

4 files changed

+238
-7
lines changed

README.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
[![Go Reference](https://pkg.go.dev/badge/github.com/Gerfey/messenger.svg)](https://pkg.go.dev/github.com/Gerfey/messenger)
88
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)
99

10-
> `v0.7.0` is a pre-release version — feel free to test and report issues!
10+
> `v0.8.0` is a pre-release version — feel free to test and report issues!
1111
1212
> Full documentation available in the [GitHub Wiki](https://github.com/Gerfey/messenger/wiki/Documentation)
1313
1414
🇷🇺 [Русская версия](README.ru.md)
1515

1616
## Features
17-
- **Multiple Transports**: AMQP (RabbitMQ), In-Memory (sync)
17+
- **Multiple Transports**: AMQP (RabbitMQ), Kafka, Redis (Stream), In-Memory (sync)
1818
- **Middleware Chain**: Extensible middleware system for message processing
1919
- **Event-Driven**: Built-in event dispatcher for lifecycle hooks
2020
- **Retry Mechanism**: Configurable retry strategies with exponential backoff
@@ -25,7 +25,7 @@
2525
## Installation
2626
> Requires Go 1.24+
2727
```bash
28-
go get github.com/gerfey/messenger@v0.7.0
28+
go get github.com/gerfey/messenger@v0.8.0
2929
```
3030

3131
## Quick Start
@@ -93,6 +93,10 @@ _, _ = bus.Dispatch(ctx, &HelloMessage{Text: "World"})
9393
9494
> See [Usage Scenarios](https://github.com/Gerfey/messenger/wiki/Usage-Scenarios) for commands, queries, return values and advanced use-cases.
9595
96+
## Efficiency
97+
98+
- AMQP (RabbitMQ): [AMQP Benchmark Report](docs/benchmark/AMQP-Benchmark.md)
99+
96100
## Contributing
97101
98102
1. Fork the repository
@@ -101,7 +105,7 @@ _, _ = bus.Dispatch(ctx, &HelloMessage{Text: "World"})
101105
4. Push to the branch (`git push origin feature/amazing-feature`)
102106
5. Open a Pull Request
103107

104-
## License
108+
## License
105109

106110
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
107111

README.ru.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
[![Go Reference](https://pkg.go.dev/badge/github.com/Gerfey/messenger.svg)](https://pkg.go.dev/github.com/Gerfey/messenger)
88
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)
99

10-
> Версия `v0.7.0` — это пре-релиз. Тестируйте и сообщайте о багах!
10+
> Версия `v0.8.0` — это пре-релиз. Тестируйте и сообщайте о багах!
1111
1212
> Полная документация доступна на [GitHub Wiki](https://github.com/Gerfey/messenger/wiki/Documentation)
1313
@@ -16,7 +16,7 @@
1616
---
1717

1818
## Возможности
19-
- **Множественные транспорты**: AMQP (RabbitMQ), In-Memory (`sync`)
19+
- **Множественные транспорты**: AMQP (RabbitMQ), Kafka, Redis (Stream), In-Memory (sync)
2020
- **Цепочка middleware**: Расширяемая система промежуточной обработки
2121
- **Событийный движок**: Встроенный dispatcher событий жизненного цикла
2222
- **Механизм повторов**: Настраиваемые стратегии ретраев с поддержкой DLQ
@@ -27,7 +27,7 @@
2727
## Установка
2828
> Требуется Go версии **1.24+**
2929
```bash
30-
go get github.com/gerfey/messenger@v0.7.0
30+
go get github.com/gerfey/messenger@v0.8.0
3131
```
3232

3333
## Быстрый старт
@@ -95,6 +95,10 @@ _, _ = bus.Dispatch(ctx, &HelloMessage{Text: "World"})
9595
9696
> Смотри [Сценарии использования](https://github.com/Gerfey/messenger/wiki/Сценарии-использования).
9797
98+
## Производительность
99+
100+
- AMQP (RabbitMQ): [AMQP Benchmark Report](docs/benchmark/AMQP-Benchmark.md)
101+
98102
## Как внести вклад
99103
100104
1. Форкните репозиторий

docs/benchmark/AMQP-Benchmark.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# AMQP Benchmark Report
2+
3+
Current performance results of the AMQP transport in Messenger (`v0.8.0`), tested with `RabbitMQ` using the `amqp091-go` client.
4+
5+
## Overall Performance
6+
7+
| Benchmark | Time (ns/op) | Throughput (msg/sec) | Memory (B/op) | Allocs/op |
8+
|------------------------------|---------------|---------------------|----------------|-----------|
9+
| `BenchmarkAMQPSend` | 465,508 | ~2,148 | 13,822 | 267 |
10+
| `BenchmarkAMQPConcurrentSend`| 296,922 | ~3,368 | 13,829 | 266 |
11+
12+
*Concurrent sending is ~36% faster with 100B messages.*
13+
14+
---
15+
16+
## Message Size Impact
17+
18+
| Message Size | Time (ns/op) | Throughput (msg/sec) | Memory (B/op) | Allocs/op |
19+
|------------------|---------------|---------------------|----------------|-----------|
20+
| 100 B | 472,831 | ~2,115 | 13,826 | 267 |
21+
| 1 KB | 557,170 | ~1,794 | 19,874 | 267 |
22+
| 10 KB | 704,831 | ~1,418 | 82,185 | 269 |
23+
| 100 KB | 1,788,004 | ~559 | 726,393 | 276 |
24+
25+
*As the payload size increases, throughput decreases and memory pressure on GC grows, as expected.*
26+
27+
---
28+
29+
## Allocations: `pprof` Analysis
30+
> Collected using `go test -bench=BenchmarkAMQP -benchmem -memprofile mem.out`, analyzed via `pprof`.
31+
32+
(pprof) top
33+
34+
- encoding/json: ~20%
35+
- amqp091-go (sendOpen, Ack, readLongstr): ~25%
36+
- Envelope.WithStamp: 5.75%
37+
- Middleware chain: ~5%
38+
39+
---
40+
41+
## Summary
42+
43+
- AMQP transport in Messenger demonstrates stable and predictable performance
44+
- Memory and allocation optimization opportunities are being addressed in upcoming versions
45+

transport/amqp/benchmark_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package amqp_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log/slog"
8+
"os"
9+
"sync"
10+
"sync/atomic"
11+
"testing"
12+
"time"
13+
14+
"github.com/gerfey/messenger/api"
15+
"github.com/gerfey/messenger/builder"
16+
"github.com/gerfey/messenger/config"
17+
)
18+
19+
const (
20+
benchmarkAMQPDSN = "amqp://guest:guest@localhost:5672/"
21+
benchmarkTimeout = 60 * time.Second
22+
)
23+
24+
type BenchmarkMessage struct {
25+
ID string
26+
Content string
27+
Data []byte
28+
}
29+
30+
func (m *BenchmarkMessage) RoutingKey() string {
31+
return "benchmark_routing_key"
32+
}
33+
34+
type BenchmarkHandler struct {
35+
processedCount *int64
36+
wg *sync.WaitGroup
37+
}
38+
39+
func (h *BenchmarkHandler) Handle(_ context.Context, _ *BenchmarkMessage) error {
40+
atomic.AddInt64(h.processedCount, 1)
41+
if h.wg != nil {
42+
h.wg.Done()
43+
}
44+
45+
return nil
46+
}
47+
48+
func (h *BenchmarkHandler) GetBusName() string {
49+
return "default"
50+
}
51+
52+
func setupAMQPMessenger(b *testing.B, withWaitGroup bool) api.MessageBus {
53+
b.Helper()
54+
55+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
56+
Level: slog.LevelError,
57+
}))
58+
59+
cfg := &config.MessengerConfig{
60+
DefaultBus: "default",
61+
Buses: map[string]config.BusConfig{
62+
"default": {},
63+
},
64+
Transports: map[string]config.TransportConfig{
65+
"amqp": {
66+
DSN: benchmarkAMQPDSN,
67+
Serializer: "default.transport.serializer",
68+
Options: map[string]any{
69+
"auto_setup": true,
70+
"exchange": map[string]any{
71+
"name": "benchmark_exchange",
72+
"type": "topic",
73+
},
74+
"queues": map[string]any{
75+
"benchmark_queue": map[string]any{
76+
"binding_keys": []string{"benchmark_routing_key"},
77+
},
78+
},
79+
},
80+
},
81+
},
82+
Routing: map[string]string{
83+
"*amqp_test.BenchmarkMessage": "amqp",
84+
},
85+
}
86+
87+
processedCount := int64(0)
88+
var wg *sync.WaitGroup
89+
if withWaitGroup {
90+
wg = &sync.WaitGroup{}
91+
}
92+
93+
handler := &BenchmarkHandler{
94+
processedCount: &processedCount,
95+
wg: wg,
96+
}
97+
98+
builderInstance := builder.NewBuilder(cfg, logger)
99+
if err := builderInstance.RegisterHandler(handler); err != nil {
100+
b.Fatalf("Register handler failed: %v", err)
101+
}
102+
103+
messenger, err := builderInstance.Build()
104+
if err != nil {
105+
b.Fatalf("Build messenger failed: %v", err)
106+
}
107+
108+
ctx, cancel := context.WithTimeout(b.Context(), benchmarkTimeout)
109+
go func() {
110+
defer cancel()
111+
if runErr := messenger.Run(ctx); runErr != nil && !errors.Is(runErr, context.Canceled) {
112+
b.Logf("Messenger run error: %v", runErr)
113+
}
114+
}()
115+
116+
time.Sleep(2 * time.Second)
117+
118+
bus, err := messenger.GetDefaultBus()
119+
if err != nil {
120+
b.Fatalf("Get default bus failed: %v", err)
121+
}
122+
123+
return bus
124+
}
125+
126+
func dispatchMessages(b *testing.B, bus api.MessageBus, size int, parallel bool) {
127+
ctx := b.Context()
128+
b.ResetTimer()
129+
b.ReportAllocs()
130+
131+
if parallel {
132+
concurrency := 10
133+
var wg sync.WaitGroup
134+
messagesPerWorker := b.N / concurrency
135+
for w := range concurrency {
136+
wg.Add(1)
137+
go func(id int) {
138+
defer wg.Done()
139+
for i := range messagesPerWorker {
140+
bus.Dispatch(ctx, &BenchmarkMessage{
141+
ID: fmt.Sprintf("worker-%d-msg-%d", id, i),
142+
Content: "benchmark content",
143+
Data: make([]byte, size),
144+
})
145+
}
146+
}(w)
147+
}
148+
wg.Wait()
149+
} else {
150+
for i := range b.N {
151+
bus.Dispatch(ctx, &BenchmarkMessage{
152+
ID: fmt.Sprintf("msg-%d", i),
153+
Content: "benchmark content",
154+
Data: make([]byte, size),
155+
})
156+
}
157+
}
158+
}
159+
160+
func BenchmarkAMQPSend(b *testing.B) {
161+
bus := setupAMQPMessenger(b, false)
162+
dispatchMessages(b, bus, 100, false)
163+
}
164+
165+
func BenchmarkAMQPConcurrentSend(b *testing.B) {
166+
bus := setupAMQPMessenger(b, false)
167+
dispatchMessages(b, bus, 100, true)
168+
}
169+
170+
func BenchmarkAMQPMessageSizes(b *testing.B) {
171+
sizes := []int{100, 1024, 10240, 102400}
172+
for _, size := range sizes {
173+
b.Run(fmt.Sprintf("Size_%dB", size), func(b *testing.B) {
174+
bus := setupAMQPMessenger(b, false)
175+
dispatchMessages(b, bus, size, false)
176+
})
177+
}
178+
}

0 commit comments

Comments
 (0)