Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions aiokafka/partitioner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging
import random

log = logging.getLogger(__name__)


class DefaultPartitioner:
"""Default partitioner.
Expand Down Expand Up @@ -94,3 +97,37 @@ def murmur2(data):
h &= 0xFFFFFFFF

return h


class RoundRobinPartitioner:
"""
Round robin partitioner.
If partial ordering is not needed in your use case, round robin partitioning
achieves a more even distribution of messages across partitions and enables
a higher rate of consumption through a more uniform temporal spacing between
messages in a single partition.
"""

def __init__(self):
self._index = 0

def __call__(self, key, all_partitions, available_partitions):
"""
Get the next partition according to the round robin algorithm.
:param key: partitioning key, expects `None`
:param all_partitions: list of all partitions
:param available_partitions: list of available partitions
:return: one of the values from available_partitions or all_partitions
"""
if key:
log.warning(
"Partitioning key is ignored by RoundRobinPartitioner - "
"use DefaultPartitioner instead."
)
partitions = available_partitions or all_partitions
if self._index >= len(partitions):
self._index = 0
partition = partitions[self._index]
self._index += 1
return partition
24 changes: 21 additions & 3 deletions benchmark/simple_produce_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections import Counter
import random

from aiokafka.partitioner import DefaultPartitioner, RoundRobinPartitioner


class Benchmark:

Expand All @@ -26,9 +28,19 @@ def __init__(self, args):
else:
self._is_transactional = False

if args.partitioner == "default":
self._producer_kwargs["partitioner"] = DefaultPartitioner()
elif args.partitioner == "round-robin":
self._producer_kwargs["partitioner"] = RoundRobinPartitioner()

if args.key:
self._key = b"abc"
else:
self._key = None

self.transaction_size = args.transaction_size

self._partition = args.partition
self._partition = args.partition if args.partition != -1 else None
self._stats_interval = 1
self._stats = [Counter()]

Expand Down Expand Up @@ -76,7 +88,7 @@ async def bench_simple(self):
if not self._is_transactional:
for i in range(self._num):
# payload[i % self._size] = random.randint(0, 255)
await producer.send(topic, payload, partition=partition)
await producer.send(topic, payload, partition=partition, key=self._key)
self._stats[-1]['count'] += 1
else:
for i in range(self._num // transaction_size):
Expand Down Expand Up @@ -117,7 +129,7 @@ def parse_args():
help='Topic to produce messages to. Default {default}.')
parser.add_argument(
'--partition', type=int, default=0,
help='Partition to produce messages to. Default {default}.')
help='Partition to produce messages to. Default {default}. Set to -1 to omit.')
parser.add_argument(
'--uvloop', action='store_true',
help='Use uvloop instead of asyncio default loop.')
Expand All @@ -130,6 +142,12 @@ def parse_args():
parser.add_argument(
'--transaction-size', type=int, default=100,
help='Number of messages in transaction')
parser.add_argument(
'--partitioner', type=str, default="default", choices=["default", "round-robin"],
help='Partitioner, either `default` or `round-robin`')
parser.add_argument(
'--key', action='store_true',
help='Whether to use a partitioning key')
return parser.parse_args()


Expand Down
20 changes: 19 additions & 1 deletion tests/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from aiokafka.partitioner import DefaultPartitioner, murmur2
from aiokafka.partitioner import DefaultPartitioner, RoundRobinPartitioner, murmur2


def test_default_partitioner():
Expand Down Expand Up @@ -41,3 +41,21 @@ def test_murmur2_not_ascii():
# Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode
murmur2(b"\xa4")
murmur2(b"\x81" * 1000)


def test_round_robin_partitioner():
partitioner = RoundRobinPartitioner()

all_partitions = available_partitions = list(range(2))
assert partitioner(None, all_partitions, available_partitions) == 0
assert partitioner(None, all_partitions, available_partitions) == 1
assert partitioner(None, all_partitions, available_partitions) == 0
assert partitioner(None, all_partitions, available_partitions) == 1

all_partitions = available_partitions = list(range(4))
assert partitioner(None, all_partitions, available_partitions) == 2
assert partitioner(None, all_partitions, available_partitions) == 3

all_partitions = [50]
available_partitions = [70]
assert partitioner(None, all_partitions, available_partitions) == 70