Skip to content

Commit 1361629

Browse files
raulchenlandscapepainter
authored andcommitted
[data] Extract backpressure-related code from ResourceManager as a policy (ray-project#54376)
* Extract backpressure related methods (`can_submit_new_tasks` and `max_task_output_bytes_to_read`) from ResourceManager, as a standalone policy `ResourceBudgetBackpressurePolicy`) * Report task_output_backpressure_time metric. --------- Signed-off-by: Hao Chen <[email protected]> Signed-off-by: doyoung <[email protected]>
1 parent 4d9d845 commit 1361629

14 files changed

+230
-93
lines changed

ci/lint/pydoclint-baseline.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1202,7 +1202,6 @@ python/ray/data/_internal/execution/streaming_executor_state.py
12021202
DOC201: Method `OpBufferQueue.has_next` does not have a return section in docstring
12031203
DOC101: Method `OpState.get_output_blocking`: Docstring contains fewer arguments than in function signature.
12041204
DOC103: Method `OpState.get_output_blocking`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [output_split_idx: Optional[int]].
1205-
DOC103: Function `process_completed_tasks`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [resource_manager: ResourceManager]. Arguments in the docstring but not in the function signature: [backpressure_policies: ].
12061205
--------------------
12071206
python/ray/data/_internal/iterator/stream_split_iterator.py
12081207
DOC101: Method `SplitCoordinator.start_epoch`: Docstring contains fewer arguments than in function signature.

python/ray/data/_internal/execution/backpressure_policy/__init__.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,33 @@
1-
from typing import TYPE_CHECKING
1+
from typing import TYPE_CHECKING, List
22

3-
import ray
43
from .backpressure_policy import BackpressurePolicy
54
from .concurrency_cap_backpressure_policy import ConcurrencyCapBackpressurePolicy
5+
from .resource_budget_backpressure_policy import ResourceBudgetBackpressurePolicy
6+
from ray.data.context import DataContext
67

78
if TYPE_CHECKING:
9+
from ray.data._internal.execution.resource_manager import ResourceManager
810
from ray.data._internal.execution.streaming_executor_state import Topology
911

1012
# Default enabled backpressure policies and its config key.
1113
# Use `DataContext.set_config` to config it.
1214
ENABLED_BACKPRESSURE_POLICIES = [
1315
ConcurrencyCapBackpressurePolicy,
16+
ResourceBudgetBackpressurePolicy,
1417
]
1518
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled"
1619

1720

18-
def get_backpressure_policies(topology: "Topology"):
19-
data_context = ray.data.DataContext.get_current()
21+
def get_backpressure_policies(
22+
data_context: DataContext,
23+
topology: "Topology",
24+
resource_manager: "ResourceManager",
25+
) -> List[BackpressurePolicy]:
2026
policies = data_context.get_config(
2127
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, ENABLED_BACKPRESSURE_POLICIES
2228
)
2329

24-
return [policy(topology) for policy in policies]
30+
return [policy(data_context, topology, resource_manager) for policy in policies]
2531

2632

2733
__all__ = [
Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
1-
from abc import ABC, abstractmethod
2-
from typing import TYPE_CHECKING
1+
from abc import ABC
2+
from typing import TYPE_CHECKING, Optional
3+
4+
from ray.data.context import DataContext
35

46
if TYPE_CHECKING:
57
from ray.data._internal.execution.interfaces.physical_operator import (
68
PhysicalOperator,
79
)
10+
from ray.data._internal.execution.resource_manager import ResourceManager
811
from ray.data._internal.execution.streaming_executor_state import Topology
912

1013

1114
class BackpressurePolicy(ABC):
1215
"""Interface for back pressure policies."""
1316

14-
@abstractmethod
15-
def __init__(self, topology: "Topology"):
16-
...
17+
def __init__(
18+
self,
19+
data_context: DataContext,
20+
topology: "Topology",
21+
resource_manager: "ResourceManager",
22+
):
23+
"""Initialize the backpressure policy.
24+
25+
Args:
26+
data_context: The data context.
27+
topology: The execution topology.
28+
resource_manager: The resource manager.
29+
"""
30+
self._data_context = data_context
31+
self._topology = topology
32+
self._resource_manager = resource_manager
1733

1834
def can_add_input(self, op: "PhysicalOperator") -> bool:
1935
"""Determine if we can add a new input to the operator. If returns False, the
@@ -26,3 +42,21 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:
2642
backpressured if any of the policies returns False.
2743
"""
2844
return True
45+
46+
def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
47+
"""Return the maximum bytes of pending task outputs can be read for
48+
the given operator. None means no limit.
49+
50+
This is used for output backpressure to limit how much data an operator
51+
can read from its running tasks.
52+
53+
Note, if multiple backpressure policies return non-None values for an operator,
54+
the minimum of those values will be used as the limit.
55+
56+
Args:
57+
op: The operator to get the limit for.
58+
59+
Returns:
60+
The maximum bytes that can be read, or None if no limit.
61+
"""
62+
return None

python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from ray.data._internal.execution.interfaces.physical_operator import (
1111
PhysicalOperator,
1212
)
13-
from ray.data._internal.execution.streaming_executor_state import Topology
1413

1514
logger = logging.getLogger(__name__)
1615

@@ -25,10 +24,11 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
2524
TODO(chengsu): Consolidate with actor scaling logic of `ActorPoolMapOperator`.
2625
"""
2726

28-
def __init__(self, topology: "Topology"):
27+
def __init__(self, *args, **kwargs):
28+
super().__init__(*args, **kwargs)
2929
self._concurrency_caps: dict["PhysicalOperator", float] = {}
3030

31-
for op, _ in topology.items():
31+
for op, _ in self._topology.items():
3232
if isinstance(op, TaskPoolMapOperator) and op.get_concurrency() is not None:
3333
self._concurrency_caps[op] = op.get_concurrency()
3434
else:
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import logging
2+
from typing import TYPE_CHECKING, Optional
3+
4+
from .backpressure_policy import BackpressurePolicy
5+
6+
if TYPE_CHECKING:
7+
from ray.data._internal.execution.interfaces.physical_operator import (
8+
PhysicalOperator,
9+
)
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class ResourceBudgetBackpressurePolicy(BackpressurePolicy):
15+
"""A backpressure policy based on resource budgets in ResourceManager."""
16+
17+
def can_add_input(self, op: "PhysicalOperator") -> bool:
18+
budget = self._resource_manager.get_budget(op)
19+
if budget is None:
20+
return True
21+
return op.incremental_resource_usage().satisfies_limit(budget)
22+
23+
def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
24+
"""Determine maximum bytes to read based on the resource budgets.
25+
26+
Args:
27+
op: The operator to get the limit for.
28+
29+
Returns:
30+
The maximum bytes that can be read, or None if no limit.
31+
"""
32+
return self._resource_manager.max_task_output_bytes_to_read(op)

python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
358358
description="Time spent in task submission backpressure.",
359359
metrics_group=MetricsGroup.TASKS,
360360
)
361+
task_output_backpressure_time: float = metric_field(
362+
default=0,
363+
description="Time spent in task output backpressure.",
364+
metrics_group=MetricsGroup.TASKS,
365+
)
361366
histogram_buckets_s = [
362367
0.1,
363368
0.25,
@@ -446,6 +451,8 @@ def __init__(self, op: "PhysicalOperator"):
446451
self._extra_metrics: Dict[str, Any] = {}
447452
# Start time of current pause due to task submission backpressure
448453
self._task_submission_backpressure_start_time = -1
454+
# Start time of current pause due to task output backpressure
455+
self._task_output_backpressure_start_time = -1
449456

450457
self._internal_inqueue = create_bundle_queue()
451458
self._internal_outqueue = create_bundle_queue()
@@ -667,6 +674,17 @@ def on_toggle_task_submission_backpressure(self, in_backpressure):
667674
)
668675
self._task_submission_backpressure_start_time = -1
669676

677+
def on_toggle_task_output_backpressure(self, in_backpressure):
678+
if in_backpressure and self._task_output_backpressure_start_time == -1:
679+
# backpressure starting, start timer
680+
self._task_output_backpressure_start_time = time.perf_counter()
681+
elif self._task_output_backpressure_start_time != -1:
682+
# backpressure stopping, stop timer
683+
self.task_output_backpressure_time += (
684+
time.perf_counter() - self._task_output_backpressure_start_time
685+
)
686+
self._task_output_backpressure_start_time = -1
687+
670688
def on_output_taken(self, output: RefBundle):
671689
"""Callback when an output is taken from the operator."""
672690
self.num_outputs_taken += 1

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,18 @@ def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
633633
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
634634
self._in_task_submission_backpressure = in_backpressure
635635

636+
def notify_in_task_output_backpressure(self, in_backpressure: bool) -> None:
637+
"""Called periodically from the executor to update internal output backpressure
638+
status for stats collection purposes.
639+
640+
Args:
641+
in_backpressure: Value this operator's output backpressure should be set to.
642+
"""
643+
# only update on change to in_backpressure
644+
if self._in_task_output_backpressure != in_backpressure:
645+
self._metrics.on_toggle_task_output_backpressure(in_backpressure)
646+
self._in_task_output_backpressure = in_backpressure
647+
636648
def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
637649
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
638650
return []

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,20 @@ def op_resource_allocator(self) -> "OpResourceAllocator":
306306
assert self._op_resource_allocator is not None
307307
return self._op_resource_allocator
308308

309+
def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
310+
"""Return the maximum bytes of pending task outputs can be read for
311+
the given operator. None means no limit."""
312+
if self._op_resource_allocator is None:
313+
return None
314+
return self._op_resource_allocator.max_task_output_bytes_to_read(op)
315+
316+
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
317+
"""Return the budget for the given operator, or None if the operator
318+
has unlimited budget."""
319+
if self._op_resource_allocator is None:
320+
return None
321+
return self._op_resource_allocator.get_budget(op)
322+
309323

310324
class OpResourceAllocator(ABC):
311325
"""An interface for dynamic operator resource allocation.
@@ -323,20 +337,16 @@ def update_usages(self):
323337
"""Callback to update resource usages."""
324338
...
325339

326-
@abstractmethod
327-
def can_submit_new_task(self, op: PhysicalOperator) -> bool:
328-
"""Return whether the given operator can submit a new task."""
329-
...
330-
331340
@abstractmethod
332341
def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
333342
"""Return the maximum bytes of pending task outputs can be read for
334343
the given operator. None means no limit."""
335344
...
336345

337346
@abstractmethod
338-
def get_budget(self, op: PhysicalOperator) -> ExecutionResources:
339-
"""Return the budget for the given operator."""
347+
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
348+
"""Return the budget for the given operator, or None if the operator
349+
has unlimited budget."""
340350
...
341351

342352

@@ -542,15 +552,8 @@ def _update_reservation(self):
542552

543553
self._total_shared = remaining
544554

545-
def can_submit_new_task(self, op: PhysicalOperator) -> bool:
546-
if op not in self._op_budgets:
547-
return True
548-
budget = self._op_budgets[op]
549-
res = op.incremental_resource_usage().satisfies_limit(budget)
550-
return res
551-
552-
def get_budget(self, op: PhysicalOperator) -> ExecutionResources:
553-
return self._op_budgets[op]
555+
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
556+
return self._op_budgets.get(op, None)
554557

555558
def _should_unblock_streaming_output_backpressure(
556559
self, op: PhysicalOperator

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,17 @@ def execute(
139139
lambda: self._autoscaler.get_total_resources(),
140140
self._data_context,
141141
)
142-
self._backpressure_policies = get_backpressure_policies(self._topology)
142+
self._backpressure_policies = get_backpressure_policies(
143+
self._data_context, self._topology, self._resource_manager
144+
)
143145
self._autoscaler = create_autoscaler(
144146
self._topology,
145147
self._resource_manager,
146148
config=self._data_context.autoscaling_config,
147149
execution_id=self._dataset_id,
148150
)
149151

150-
self._has_op_completed = {op: False for op in self._topology}
152+
self._has_op_completed = dict.fromkeys(self._topology, False)
151153

152154
self._output_node = dag, self._topology[dag]
153155

@@ -337,7 +339,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
337339
# greater parallelism.
338340
num_errored_blocks = process_completed_tasks(
339341
topology,
340-
self._resource_manager,
342+
self._backpressure_policies,
341343
self._max_errored_blocks,
342344
)
343345
if self._max_errored_blocks > 0:

0 commit comments

Comments
 (0)