Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def __init__(
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
self._closed = False
self._started: bool = False

if topics:
topics = self._validate_topics(topics)
Expand Down Expand Up @@ -446,6 +447,11 @@ async def start(self):
await self._client.force_metadata_update()
self._coordinator.assign_all_partitions(check_unknown=True)

self._started = True

def _assert_started(self):
assert self._started, "The consumer needs to start() before it can be used"

async def _wait_topics(self):
if self._subscription.subscription is not None:
for topic in self._subscription.subscription.topics:
Expand Down Expand Up @@ -510,6 +516,7 @@ async def stop(self):
* Commit last consumed message if autocommit enabled
* Leave group if used Consumer Groups
"""
self._assert_started()
if self._closed:
return
log.debug("Closing the KafkaConsumer.")
Expand Down Expand Up @@ -575,6 +582,7 @@ async def commit(self, offsets=None):

.. _kafka-python: https://github.com/dpkp/kafka-python
"""
self._assert_started()
if self._group_id is None:
raise IllegalOperation("Requires group_id")

Expand Down Expand Up @@ -615,6 +623,7 @@ async def committed(self, partition):
Raises:
IllegalOperation: If used with ``group_id == None``
"""
self._assert_started()
if self._group_id is None:
raise IllegalOperation("Requires group_id")

Expand All @@ -633,6 +642,7 @@ async def topics(self):
Returns:
set: topics
"""
self._assert_started()
cluster = await self._client.fetch_all_metadata()
return cluster.topics()

Expand Down Expand Up @@ -669,6 +679,7 @@ async def position(self, partition):
:exc:`~aiokafka.errors.IllegalStateError` in case of unassigned
partition
"""
self._assert_started()
while True:
if not self._subscription.is_assigned(partition):
raise IllegalStateError(f"Partition {partition} is not assigned")
Expand Down Expand Up @@ -799,6 +810,7 @@ async def seek_to_beginning(self, *partitions):
.. versionadded:: 0.3.0

"""
self._assert_started()
if not all(isinstance(p, TopicPartition) for p in partitions):
raise TypeError("partitions must be TopicPartition instances")

Expand Down Expand Up @@ -839,6 +851,7 @@ async def seek_to_end(self, *partitions):
.. versionadded:: 0.3.0

"""
self._assert_started()
if not all(isinstance(p, TopicPartition) for p in partitions):
raise TypeError("partitions must be TopicPartition instances")

Expand Down Expand Up @@ -883,6 +896,7 @@ async def seek_to_committed(self, *partitions):
:exc:`~aiokafka.errors.IllegalStateError` in case of unassigned
partition
"""
self._assert_started()
if not all(isinstance(p, TopicPartition) for p in partitions):
raise TypeError("partitions must be TopicPartition instances")

Expand Down Expand Up @@ -939,6 +953,7 @@ async def offsets_for_times(self, timestamps):
.. versionadded:: 0.3.0

"""
self._assert_started()
if self._client.api_version <= (0, 10, 0):
raise UnsupportedVersionError(
"offsets_for_times API not supported"
Expand Down Expand Up @@ -981,6 +996,7 @@ async def beginning_offsets(self, partitions):
.. versionadded:: 0.3.0

"""
self._assert_started()
if self._client.api_version <= (0, 10, 0):
raise UnsupportedVersionError(
"offsets_for_times API not supported"
Expand Down Expand Up @@ -1018,6 +1034,7 @@ async def end_offsets(self, partitions):
.. versionadded:: 0.3.0

"""
self._assert_started()
if self._client.api_version <= (0, 10, 0):
raise UnsupportedVersionError(
"offsets_for_times API not supported"
Expand Down Expand Up @@ -1149,6 +1166,7 @@ async def getone(self, *partitions) -> ConsumerRecord:
print(message.offset, message.key, message.value)

"""
self._assert_started()
assert all(isinstance(k, TopicPartition) for k in partitions)
if self._closed:
raise ConsumerStoppedError()
Expand Down Expand Up @@ -1196,6 +1214,7 @@ async def getmany(
print(message.offset, message.key, message.value)

"""
self._assert_started()
assert all(isinstance(k, TopicPartition) for k in partitions)
if self._closed:
raise ConsumerStoppedError()
Expand Down