Skip to content

Commit 10028d8

Browse files
authored
Merge pull request #507 from camunda-community-hub/poc-streamer
feat: add job streaming
2 parents 2701e7f + 7b31fe5 commit 10028d8

File tree

10 files changed

+293
-15
lines changed

10 files changed

+293
-15
lines changed

pyzeebe/errors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
ActivateJobsRequestInvalidError,
44
JobAlreadyDeactivatedError,
55
JobNotFoundError,
6+
StreamActivateJobsRequestInvalidError,
67
)
78
from .message_errors import MessageAlreadyExistsError
89
from .process_errors import (
@@ -34,6 +35,7 @@
3435
__all__ = (
3536
"InvalidOAuthCredentialsError",
3637
"ActivateJobsRequestInvalidError",
38+
"StreamActivateJobsRequestInvalidError",
3739
"JobAlreadyDeactivatedError",
3840
"JobNotFoundError",
3941
"MessageAlreadyExistsError",

pyzeebe/errors/job_errors.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,19 @@ def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activa
1616
super().__init__(msg)
1717

1818

19+
class StreamActivateJobsRequestInvalidError(PyZeebeError):
20+
def __init__(self, task_type: str, worker: str, timeout: int):
21+
msg = "Failed to activate jobs. Reasons:"
22+
if task_type == "" or task_type is None:
23+
msg = msg + "task_type is empty, "
24+
if worker == "" or worker is None:
25+
msg = msg + "worker is empty, "
26+
if timeout < 1:
27+
msg = msg + "job timeout is smaller than 0ms, "
28+
29+
super().__init__(msg)
30+
31+
1932
class JobAlreadyDeactivatedError(PyZeebeError):
2033
def __init__(self, job_key: int) -> None:
2134
super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)")

pyzeebe/grpc_internals/zeebe_adapter_base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import NoReturn
2+
from typing import TYPE_CHECKING, NoReturn, cast
33

44
import grpc
55

@@ -14,13 +14,16 @@
1414
from pyzeebe.grpc_internals.grpc_utils import is_error_status
1515
from pyzeebe.proto.gateway_pb2_grpc import GatewayStub
1616

17+
if TYPE_CHECKING:
18+
from pyzeebe.proto.gateway_pb2_grpc import GatewayAsyncStub
19+
1720
logger = logging.getLogger(__name__)
1821

1922

2023
class ZeebeAdapterBase:
2124
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = -1):
2225
self._channel = grpc_channel
23-
self._gateway_stub = GatewayStub(grpc_channel)
26+
self._gateway_stub = cast("GatewayAsyncStub", GatewayStub(grpc_channel))
2427
self._connected = True
2528
self.retrying_connection = False
2629
self._max_connection_retries = max_connection_retries

pyzeebe/grpc_internals/zeebe_job_adapter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ActivateJobsRequestInvalidError,
1212
JobAlreadyDeactivatedError,
1313
JobNotFoundError,
14+
StreamActivateJobsRequestInvalidError,
1415
)
1516
from pyzeebe.grpc_internals.grpc_utils import is_error_status
1617
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
@@ -20,6 +21,7 @@
2021
ActivateJobsRequest,
2122
CompleteJobRequest,
2223
FailJobRequest,
24+
StreamActivatedJobsRequest,
2325
ThrowErrorRequest,
2426
)
2527
from pyzeebe.types import Variables
@@ -65,6 +67,34 @@ async def activate_jobs(
6567
raise ActivateJobsRequestInvalidError(task_type, worker, timeout, max_jobs_to_activate) from grpc_error
6668
await self._handle_grpc_error(grpc_error)
6769

70+
async def stream_activate_jobs(
71+
self,
72+
task_type: str,
73+
worker: str,
74+
timeout: int,
75+
variables_to_fetch: Iterable[str],
76+
stream_request_timeout: int,
77+
tenant_ids: Iterable[str] | None = None,
78+
) -> AsyncGenerator[Job]:
79+
try:
80+
async for raw_job in self._gateway_stub.StreamActivatedJobs(
81+
StreamActivatedJobsRequest(
82+
type=task_type,
83+
worker=worker,
84+
timeout=timeout,
85+
fetchVariable=variables_to_fetch,
86+
tenantIds=tenant_ids or [],
87+
),
88+
timeout=stream_request_timeout,
89+
):
90+
job = self._create_job_from_raw_job(raw_job)
91+
logger.debug("Got job: %s from zeebe", job)
92+
yield job
93+
except grpc.aio.AioRpcError as grpc_error:
94+
if is_error_status(grpc_error, grpc.StatusCode.INVALID_ARGUMENT):
95+
raise StreamActivateJobsRequestInvalidError(task_type, worker, timeout) from grpc_error
96+
await self._handle_grpc_error(grpc_error)
97+
6898
def _create_job_from_raw_job(self, response: ActivatedJob) -> Job:
6999
return Job(
70100
key=response.key,

pyzeebe/worker/job_poller.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from pyzeebe.errors import (
77
ActivateJobsRequestInvalidError,
8+
StreamActivateJobsRequestInvalidError,
89
ZeebeBackPressureError,
910
ZeebeDeadlineExceeded,
1011
ZeebeGatewayUnavailableError,
@@ -94,3 +95,63 @@ def calculate_max_jobs_to_activate(self) -> int:
9495
async def stop(self) -> None:
9596
self.stop_event.set()
9697
await self.queue.join()
98+
99+
100+
class JobStreamer:
101+
def __init__(
102+
self,
103+
zeebe_adapter: ZeebeJobAdapter,
104+
task: Task,
105+
queue: asyncio.Queue[Job],
106+
worker_name: str,
107+
stream_request_timeout: int,
108+
task_state: TaskState,
109+
tenant_ids: list[str] | None,
110+
) -> None:
111+
self.zeebe_adapter = zeebe_adapter
112+
self.task = task
113+
self.queue = queue
114+
self.worker_name = worker_name
115+
self.stream_request_timeout = stream_request_timeout
116+
self.task_state = task_state
117+
self.tenant_ids = tenant_ids
118+
self.stop_event = asyncio.Event()
119+
120+
async def poll(self) -> None:
121+
while self.should_poll():
122+
await self.activate_stream()
123+
124+
async def activate_stream(self) -> None:
125+
try:
126+
jobs = self.zeebe_adapter.stream_activate_jobs(
127+
task_type=self.task.type,
128+
worker=self.worker_name,
129+
timeout=self.task.config.timeout_ms,
130+
variables_to_fetch=self.task.config.variables_to_fetch or [],
131+
stream_request_timeout=self.stream_request_timeout,
132+
tenant_ids=self.tenant_ids,
133+
)
134+
async for job in jobs:
135+
self.task_state.add(job)
136+
await self.queue.put(job)
137+
except StreamActivateJobsRequestInvalidError:
138+
logger.warning("Stream job requests was invalid for task %s", self.task.type)
139+
raise
140+
except (
141+
ZeebeBackPressureError,
142+
ZeebeGatewayUnavailableError,
143+
ZeebeInternalError,
144+
ZeebeDeadlineExceeded,
145+
) as error:
146+
logger.warning(
147+
"Failed to strean jobs from the gateway. Exception: %s. Retrying in 5 seconds...",
148+
repr(error),
149+
)
150+
await asyncio.sleep(5)
151+
152+
def should_poll(self) -> bool:
153+
return not self.stop_event.is_set() and (self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection)
154+
155+
async def stop(self) -> None:
156+
self.stop_event.set()
157+
await self.queue.join()

pyzeebe/worker/worker.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from pyzeebe.task import task_builder
1414
from pyzeebe.task.exception_handler import ExceptionHandler
1515
from pyzeebe.worker.job_executor import JobExecutor
16-
from pyzeebe.worker.job_poller import JobPoller
16+
from pyzeebe.worker.job_poller import JobPoller, JobStreamer
1717
from pyzeebe.worker.task_router import ZeebeTaskRouter
1818
from pyzeebe.worker.task_state import TaskState
1919

@@ -34,6 +34,8 @@ def __init__(
3434
poll_retry_delay: int = 5,
3535
tenant_ids: list[str] | None = None,
3636
exception_handler: ExceptionHandler | None = None,
37+
stream_enabled: bool = False,
38+
stream_request_timeout: int = 3600,
3739
):
3840
"""
3941
Args:
@@ -46,6 +48,9 @@ def __init__(
4648
max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1
4749
poll_retry_delay (int): The number of seconds to wait before attempting to poll again when reaching max amount of running jobs
4850
tenant_ids (list[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3.
51+
stream_enabled (bool): Enables the job worker to stream jobs. It will still poll for older jobs, but streaming is favored. New in Zeebe 8.4.
52+
stream_request_timeout (int): If streaming is enabled, this sets the timeout on the underlying job stream.
53+
It's useful to set a few hours to load-balance your streams over time. New in Zeebe 8.4.
4954
"""
5055
super().__init__(before, after, exception_handler)
5156
self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries)
@@ -54,31 +59,46 @@ def __init__(
5459
self.poll_retry_delay = poll_retry_delay
5560
self.tenant_ids = tenant_ids
5661
self._job_pollers: list[JobPoller] = []
62+
self._job_streamers: list[JobStreamer] = []
5763
self._job_executors: list[JobExecutor] = []
5864
self._stop_event = anyio.Event()
65+
self._stream_enabled = stream_enabled
66+
self._stream_request_timeout = stream_request_timeout
5967

6068
def _init_tasks(self) -> None:
61-
self._job_executors, self._job_pollers = [], []
69+
self._job_executors, self._job_pollers, self._job_streamers = [], [], []
6270

6371
for task in self.tasks:
6472
jobs_queue = asyncio.Queue[Job]()
6573
task_state = TaskState()
6674

6775
poller = JobPoller(
68-
self.zeebe_adapter,
69-
task,
70-
jobs_queue,
71-
self.name,
72-
self.request_timeout,
73-
task_state,
74-
self.poll_retry_delay,
75-
self.tenant_ids,
76+
zeebe_adapter=self.zeebe_adapter,
77+
task=task,
78+
queue=jobs_queue,
79+
worker_name=self.name,
80+
request_timeout=self.request_timeout,
81+
task_state=task_state,
82+
poll_retry_delay=self.poll_retry_delay,
83+
tenant_ids=self.tenant_ids,
7684
)
7785
executor = JobExecutor(task, jobs_queue, task_state, self.zeebe_adapter)
7886

7987
self._job_pollers.append(poller)
8088
self._job_executors.append(executor)
8189

90+
if self._stream_enabled:
91+
streamer = JobStreamer(
92+
zeebe_adapter=self.zeebe_adapter,
93+
task=task,
94+
queue=jobs_queue,
95+
worker_name=self.name,
96+
stream_request_timeout=self._stream_request_timeout,
97+
task_state=task_state,
98+
tenant_ids=self.tenant_ids,
99+
)
100+
self._job_streamers.append(streamer)
101+
82102
async def work(self) -> None:
83103
"""
84104
Start the worker. The worker will poll zeebe for jobs of each task in a different asyncio task.
@@ -97,6 +117,9 @@ async def work(self) -> None:
97117
for poller in self._job_pollers:
98118
tg.start_soon(poller.poll)
99119

120+
for streamer in self._job_streamers:
121+
tg.start_soon(streamer.poll)
122+
100123
for executor in self._job_executors:
101124
tg.start_soon(executor.execute)
102125

@@ -113,6 +136,9 @@ async def stop(self) -> None:
113136
for poller in self._job_pollers:
114137
await poller.stop()
115138

139+
for streamer in self._job_streamers:
140+
await streamer.stop()
141+
116142
for executor in self._job_executors:
117143
await executor.stop()
118144

tests/unit/grpc_internals/zeebe_job_adapter_test.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
ActivateJobsRequestInvalidError,
88
JobAlreadyDeactivatedError,
99
JobNotFoundError,
10+
StreamActivateJobsRequestInvalidError,
1011
)
1112
from pyzeebe.grpc_internals.types import (
1213
CompleteJobResponse,
@@ -89,6 +90,53 @@ async def test_raises_on_invalid_max_jobs(self):
8990
await jobs.__anext__()
9091

9192

93+
@pytest.mark.asyncio
94+
class TestStreamActivateJobs:
95+
zeebe_job_adapter: ZeebeJobAdapter
96+
97+
@pytest.fixture(autouse=True)
98+
def set_up(self, zeebe_adapter: ZeebeJobAdapter):
99+
self.zeebe_job_adapter = zeebe_adapter
100+
101+
def stream_activate_jobs(
102+
self,
103+
task_type=str(uuid4()),
104+
worker=str(uuid4()),
105+
timeout=randint(10, 100),
106+
request_timeout=100,
107+
variables_to_fetch=[],
108+
tenant_ids=None,
109+
):
110+
return self.zeebe_job_adapter.stream_activate_jobs(
111+
task_type, worker, timeout, variables_to_fetch, request_timeout, tenant_ids
112+
)
113+
114+
async def test_returns_correct_amount_of_jobs(self, grpc_servicer: GatewayMock, task: Task):
115+
active_jobs_count = randint(4, 100)
116+
for _ in range(0, active_jobs_count):
117+
job = random_job(task)
118+
grpc_servicer.active_jobs[job.key] = job
119+
120+
jobs = self.stream_activate_jobs(task_type=task.type)
121+
122+
assert len([job async for job in jobs]) == active_jobs_count
123+
124+
async def test_raises_on_invalid_worker(self):
125+
with pytest.raises(StreamActivateJobsRequestInvalidError):
126+
jobs = self.stream_activate_jobs(worker=None)
127+
await jobs.__anext__()
128+
129+
async def test_raises_on_invalid_job_timeout(self):
130+
with pytest.raises(StreamActivateJobsRequestInvalidError):
131+
jobs = self.stream_activate_jobs(timeout=0)
132+
await jobs.__anext__()
133+
134+
async def test_raises_on_invalid_task_type(self):
135+
with pytest.raises(StreamActivateJobsRequestInvalidError):
136+
jobs = self.stream_activate_jobs(task_type=None)
137+
await jobs.__anext__()
138+
139+
92140
@pytest.mark.asyncio
93141
class TestCompleteJob:
94142
async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeJobAdapter, first_active_job: Job):

tests/unit/utils/gateway_mock.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,37 @@ def ActivateJobs(self, request, context):
9090
)
9191
yield ActivateJobsResponse(jobs=jobs)
9292

93+
def StreamActivatedJobs(self, request, context):
94+
if not request.type:
95+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
96+
return ActivatedJob()
97+
98+
if request.timeout <= 0:
99+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
100+
return ActivatedJob()
101+
102+
if not request.worker:
103+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
104+
return ActivatedJob()
105+
106+
for active_job in self.active_jobs.values():
107+
if active_job.type == request.type:
108+
yield ActivatedJob(
109+
key=active_job.key,
110+
type=active_job.type,
111+
processInstanceKey=active_job.process_instance_key,
112+
bpmnProcessId=active_job.bpmn_process_id,
113+
processDefinitionVersion=active_job.process_definition_version,
114+
processDefinitionKey=active_job.process_definition_key,
115+
elementId=active_job.element_id,
116+
elementInstanceKey=active_job.element_instance_key,
117+
customHeaders=json.dumps(active_job.custom_headers),
118+
worker=active_job.worker,
119+
retries=active_job.retries,
120+
deadline=active_job.deadline,
121+
variables=json.dumps(active_job.variables),
122+
)
123+
93124
def CompleteJob(self, request, context):
94125
if request.jobKey in self.active_jobs.keys():
95126
active_job = self.active_jobs.get(request.jobKey)

0 commit comments

Comments
 (0)