Skip to content

Commit 750ed4b

Browse files
committed
Add round robin partitioner
1 parent 1862620 commit 750ed4b

File tree

3 files changed

+77
-4
lines changed

3 files changed

+77
-4
lines changed

aiokafka/partitioner.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import logging
12
import random
23

4+
log = logging.getLogger(__name__)
5+
36

47
class DefaultPartitioner:
58
"""Default partitioner.
@@ -94,3 +97,37 @@ def murmur2(data):
9497
h &= 0xFFFFFFFF
9598

9699
return h
100+
101+
102+
class RoundRobinPartitioner:
103+
"""
104+
Round robin partitioner.
105+
106+
If partial ordering is not needed in your use case, round robin partitioning
107+
achieves a more even distribution of messages across partitions and enables
108+
a higher rate of consumption through a more uniform temporal spacing between
109+
messages in a single partition.
110+
"""
111+
112+
def __init__(self):
113+
self._index = 0
114+
115+
def __call__(self, key, all_partitions, available_partitions):
116+
"""
117+
Get the next partition according to the round robin algorithm.
118+
:param key: partitioning key, expects `None`
119+
:param all_partitions: list of all partitions
120+
:param available_partitions: list of available partitions
121+
:return: one of the values from available_partitions or all_partitions
122+
"""
123+
if key:
124+
log.warning(
125+
"Partitioning key is ignored by RoundRobinPartitioner - "
126+
"use DefaultPartitioner instead."
127+
)
128+
partitions = available_partitions or all_partitions
129+
if self._index >= len(partitions):
130+
self._index = 0
131+
partition = partitions[self._index]
132+
self._index += 1
133+
return partition

benchmark/simple_produce_bench.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from collections import Counter
66
import random
77

8+
from aiokafka.partitioner import DefaultPartitioner, RoundRobinPartitioner
9+
810

911
class Benchmark:
1012

@@ -26,9 +28,19 @@ def __init__(self, args):
2628
else:
2729
self._is_transactional = False
2830

31+
if args.partitioner == "default":
32+
self._producer_kwargs["partitioner"] = DefaultPartitioner()
33+
elif args.partitioner == "round-robin":
34+
self._producer_kwargs["partitioner"] = RoundRobinPartitioner()
35+
36+
if args.key:
37+
self._key = b"abc"
38+
else:
39+
self._key = None
40+
2941
self.transaction_size = args.transaction_size
3042

31-
self._partition = args.partition
43+
self._partition = args.partition if args.partition != -1 else None
3244
self._stats_interval = 1
3345
self._stats = [Counter()]
3446

@@ -76,7 +88,7 @@ async def bench_simple(self):
7688
if not self._is_transactional:
7789
for i in range(self._num):
7890
# payload[i % self._size] = random.randint(0, 255)
79-
await producer.send(topic, payload, partition=partition)
91+
await producer.send(topic, payload, partition=partition, key=self._key)
8092
self._stats[-1]['count'] += 1
8193
else:
8294
for i in range(self._num // transaction_size):
@@ -117,7 +129,7 @@ def parse_args():
117129
help='Topic to produce messages to. Default {default}.')
118130
parser.add_argument(
119131
'--partition', type=int, default=0,
120-
help='Partition to produce messages to. Default {default}.')
132+
help='Partition to produce messages to. Default {default}. Set to -1 to omit.')
121133
parser.add_argument(
122134
'--uvloop', action='store_true',
123135
help='Use uvloop instead of asyncio default loop.')
@@ -130,6 +142,12 @@ def parse_args():
130142
parser.add_argument(
131143
'--transaction-size', type=int, default=100,
132144
help='Number of messages in transaction')
145+
parser.add_argument(
146+
'--partitioner', type=str, default="default", choices=["default", "round-robin"],
147+
help='Partitioner, either `default` or `round-robin`')
148+
parser.add_argument(
149+
'--key', action='store_true',
150+
help='Whether to use a partitioning key')
133151
return parser.parse_args()
134152

135153

tests/test_partitioner.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22

3-
from aiokafka.partitioner import DefaultPartitioner, murmur2
3+
from aiokafka.partitioner import DefaultPartitioner, RoundRobinPartitioner, murmur2
44

55

66
def test_default_partitioner():
@@ -41,3 +41,21 @@ def test_murmur2_not_ascii():
4141
# Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode
4242
murmur2(b"\xa4")
4343
murmur2(b"\x81" * 1000)
44+
45+
46+
def test_round_robin_partitioner():
47+
partitioner = RoundRobinPartitioner()
48+
49+
all_partitions = available_partitions = list(range(2))
50+
assert partitioner(None, all_partitions, available_partitions) == 0
51+
assert partitioner(None, all_partitions, available_partitions) == 1
52+
assert partitioner(None, all_partitions, available_partitions) == 0
53+
assert partitioner(None, all_partitions, available_partitions) == 1
54+
55+
all_partitions = available_partitions = list(range(4))
56+
assert partitioner(None, all_partitions, available_partitions) == 2
57+
assert partitioner(None, all_partitions, available_partitions) == 3
58+
59+
all_partitions = [50]
60+
available_partitions = [70]
61+
assert partitioner(None, all_partitions, available_partitions) == 70

0 commit comments

Comments
 (0)