Skip to content

Commit 9a19b45

Browse files
authored
Update deployment concurrency limit lease maintenance to raise on failure (#18811)
1 parent f72e6f1 commit 9a19b45

File tree

3 files changed

+36
-34
lines changed

3 files changed

+36
-34
lines changed

docs/v3/api-ref/python/prefect-flow_engine.mdx

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,37 +25,37 @@ load_flow(flow_run: FlowRun) -> Flow[..., Any]
2525
load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]
2626
```
2727

28-
### `run_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1378" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
28+
### `run_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1376" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
2929

3030
```python
3131
run_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
3232
```
3333

34-
### `run_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1402" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
34+
### `run_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1400" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
3535

3636
```python
3737
run_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
3838
```
3939

40-
### `run_generator_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1426" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
40+
### `run_generator_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1424" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
4141

4242
```python
4343
run_generator_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Generator[R, None, None]
4444
```
4545

46-
### `run_generator_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1467" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
46+
### `run_generator_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1465" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
4747

4848
```python
4949
run_generator_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> AsyncGenerator[R, None]
5050
```
5151

52-
### `run_flow` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1510" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
52+
### `run_flow` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1508" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
5353

5454
```python
5555
run_flow(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', error_logger: Optional[logging.Logger] = None, context: Optional[dict[str, Any]] = None) -> R | State | None | Coroutine[Any, Any, R | State | None] | Generator[R, None, None] | AsyncGenerator[R, None]
5656
```
5757

58-
### `run_flow_in_subprocess` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1583" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
58+
### `run_flow_in_subprocess` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1581" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
5959

6060
```python
6161
run_flow_in_subprocess(flow: 'Flow[..., Any]', flow_run: 'FlowRun | None' = None, parameters: dict[str, Any] | None = None, wait_for: Iterable[PrefectFuture[Any]] | None = None, context: dict[str, Any] | None = None) -> multiprocessing.context.SpawnProcess
@@ -127,7 +127,7 @@ state(self) -> State
127127
begin_run(self) -> State
128128
```
129129

130-
#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L789" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
130+
#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L788" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
131131

132132
```python
133133
call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]
@@ -179,7 +179,7 @@ handle_success(self, result: R) -> R
179179
handle_timeout(self, exc: TimeoutError) -> None
180180
```
181181

182-
#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L682" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
182+
#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L681" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
183183

184184
```python
185185
initialize_run(self)
@@ -212,7 +212,7 @@ then no flow run is returned.
212212
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
213213
```
214214

215-
#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L770" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
215+
#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L769" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
216216

217217
```python
218218
run_context(self)
@@ -233,13 +233,13 @@ set_state(self, state: State, force: bool = False) -> State
233233
setup_run_context(self, client: Optional[SyncPrefectClient] = None)
234234
```
235235

236-
#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L758" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
236+
#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L757" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
237237

238238
```python
239239
start(self) -> Generator[None, None, None]
240240
```
241241

242-
### `AsyncFlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
242+
### `AsyncFlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L806" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
243243

244244

245245
Async version of the flow run engine.
@@ -250,13 +250,13 @@ not being fully asyncified.
250250

251251
**Methods:**
252252

253-
#### `begin_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L864" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
253+
#### `begin_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L863" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
254254

255255
```python
256256
begin_run(self) -> State
257257
```
258258

259-
#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1366" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
259+
#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1364" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
260260

261261
```python
262262
call_flow_fn(self) -> Coroutine[Any, Any, R]
@@ -266,49 +266,49 @@ Convenience method to call the flow function. Returns a coroutine if the
266266
flow is async.
267267

268268

269-
#### `call_hooks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1107" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
269+
#### `call_hooks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1106" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
270270

271271
```python
272272
call_hooks(self, state: Optional[State] = None) -> None
273273
```
274274

275-
#### `client` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L820" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
275+
#### `client` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L819" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
276276

277277
```python
278278
client(self) -> PrefectClient
279279
```
280280

281-
#### `create_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1074" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
281+
#### `create_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1073" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
282282

283283
```python
284284
create_flow_run(self, client: PrefectClient) -> FlowRun
285285
```
286286

287-
#### `handle_crash` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1012" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
287+
#### `handle_crash` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1011" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
288288

289289
```python
290290
handle_crash(self, exc: BaseException) -> None
291291
```
292292

293-
#### `handle_exception` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L962" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
293+
#### `handle_exception` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L961" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
294294

295295
```python
296296
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
297297
```
298298

299-
#### `handle_success` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L945" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
299+
#### `handle_success` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L944" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
300300

301301
```python
302302
handle_success(self, result: R) -> R
303303
```
304304

305-
#### `handle_timeout` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L993" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
305+
#### `handle_timeout` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L992" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
306306

307307
```python
308308
handle_timeout(self, exc: TimeoutError) -> None
309309
```
310310

311-
#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1251" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
311+
#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1249" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
312312

313313
```python
314314
initialize_run(self)
@@ -317,7 +317,7 @@ initialize_run(self)
317317
Enters a client context and creates a flow run if needed.
318318

319319

320-
#### `load_subflow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1025" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
320+
#### `load_subflow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1024" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
321321

322322
```python
323323
load_subflow_run(self, parent_task_run: TaskRun, client: PrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
@@ -335,19 +335,19 @@ If no existing flow run is found, or if the subflow should be rerun,
335335
then no flow run is returned.
336336

337337

338-
#### `result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L920" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
338+
#### `result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L919" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
339339

340340
```python
341341
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
342342
```
343343

344-
#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1347" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
344+
#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1345" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
345345

346346
```python
347347
run_context(self)
348348
```
349349

350-
#### `set_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L903" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
350+
#### `set_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L902" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
351351

352352
```python
353353
set_state(self, state: State, force: bool = False) -> State
@@ -356,13 +356,13 @@ set_state(self, state: State, force: bool = False) -> State
356356

357357

358358

359-
#### `setup_run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1164" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
359+
#### `setup_run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1163" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
360360

361361
```python
362362
setup_run_context(self, client: Optional[PrefectClient] = None)
363363
```
364364

365-
#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1335" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
365+
#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1333" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
366366

367367
```python
368368
start(self) -> AsyncGenerator[None, None]

src/prefect/flow_engine.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -631,8 +631,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
631631
if lease_id := self.state.state_details.deployment_concurrency_lease_id:
632632
stack.enter_context(
633633
maintain_concurrency_lease(
634-
lease_id,
635-
300,
634+
lease_id, 300, raise_on_lease_renewal_failure=True
636635
)
637636
)
638637

@@ -1202,8 +1201,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
12021201
if lease_id := self.state.state_details.deployment_concurrency_lease_id:
12031202
await stack.enter_async_context(
12041203
amaintain_concurrency_lease(
1205-
lease_id,
1206-
300,
1204+
lease_id, 300, raise_on_lease_renewal_failure=True
12071205
)
12081206
)
12091207

tests/test_flow_engine.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from textwrap import dedent
77
from typing import Literal, Optional
88
from unittest import mock
9-
from unittest.mock import MagicMock
9+
from unittest.mock import ANY, MagicMock
1010
from uuid import UUID
1111

1212
import anyio
@@ -2252,7 +2252,9 @@ def foo():
22522252

22532253
run_flow(foo, flow_run)
22542254

2255-
mock_maintain_concurrency_lease.assert_called_once()
2255+
mock_maintain_concurrency_lease.assert_called_once_with(
2256+
ANY, 300, raise_on_lease_renewal_failure=True
2257+
)
22562258

22572259
async def test_lease_renewal_async(
22582260
self, prefect_client: PrefectClient, monkeypatch: pytest.MonkeyPatch
@@ -2285,4 +2287,6 @@ async def foo():
22852287

22862288
await run_flow(foo, flow_run)
22872289

2288-
mock_maintain_concurrency_lease.assert_called_once()
2290+
mock_maintain_concurrency_lease.assert_called_once_with(
2291+
ANY, 300, raise_on_lease_renewal_failure=True
2292+
)

0 commit comments

Comments
 (0)