Skip to content

Commit c8a5107

Browse files
edoakeslandscapepainter
authored andcommitted
[serve] Remove usage of ray._private.state (ray-project#54140)
Replace with `ray.util.state`. There is one remaining: ``` (ray) eoakes@eoakes-CQTYX0Y0RV serve % grep -r "_private.state" * _private/cluster_node_info_cache.py: ray._private.state.available_resources_per_node() ``` --------- Signed-off-by: Edward Oakes <[email protected]> Signed-off-by: doyoung <[email protected]>
1 parent f26e4c5 commit c8a5107

File tree

6 files changed

+53
-82
lines changed

6 files changed

+53
-82
lines changed

python/ray/serve/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def ray_shutdown():
5252
@pytest.fixture
5353
def ray_cluster():
5454
cluster = Cluster()
55-
yield Cluster()
55+
yield cluster
5656
serve.shutdown()
5757
ray.shutdown()
5858
cluster.shutdown()

python/ray/serve/tests/test_cluster.py

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from ray.serve.context import _get_global_client
2323
from ray.serve.handle import DeploymentHandle
2424
from ray.serve.schema import ServeDeploySchema
25+
from ray.util.state import list_actors
2526

2627

2728
def get_pids(expected, deployment_name="D", app_name="default", timeout=30):
@@ -220,14 +221,13 @@ def test_intelligent_scale_down(ray_cluster):
220221
client = _get_global_client()
221222

222223
def get_actor_distributions():
223-
actors = ray._private.state.actors()
224224
node_to_actors = defaultdict(list)
225-
for actor in actors.values():
226-
if "ServeReplica" not in actor["ActorClassName"]:
225+
for actor in list_actors(
226+
address=cluster.address, filters=[("STATE", "=", "ALIVE")]
227+
):
228+
if "ServeReplica" not in actor.class_name:
227229
continue
228-
if actor["State"] != "ALIVE":
229-
continue
230-
node_to_actors[actor["Address"]["NodeID"]].append(actor)
230+
node_to_actors[actor.node_id].append(actor)
231231

232232
return set(map(len, node_to_actors.values()))
233233

@@ -446,19 +446,22 @@ def __call__(self):
446446

447447
# Ensure worker node has both replicas.
448448
def check_replicas_on_worker_nodes():
449-
_actors = ray._private.state.actors().values()
450-
replica_nodes = [
451-
a["Address"]["NodeID"]
452-
for a in _actors
453-
if a["ActorClassName"].startswith("ServeReplica")
454-
]
455-
return len(set(replica_nodes)) == 1
449+
return (
450+
len(
451+
{
452+
a.node_id
453+
for a in list_actors(address=cluster.address)
454+
if a.class_name.startswith("ServeReplica")
455+
}
456+
)
457+
== 1
458+
)
456459

457460
wait_for_condition(check_replicas_on_worker_nodes)
458461

459462
# Ensure total actors of 2 proxies, 1 controller, and 2 replicas,
460463
# and 2 nodes exist.
461-
wait_for_condition(lambda: len(ray._private.state.actors()) == 5)
464+
wait_for_condition(lambda: len(list_actors(address=cluster.address)) == 5)
462465
assert len(ray.nodes()) == 2
463466

464467
# Ensure `/-/healthz` and `/-/routes` return 200 and expected responses
@@ -490,21 +493,12 @@ def check_request(url: str, expected_code: int, expected_text: str):
490493
# replicas on all nodes.
491494
serve.delete(name=SERVE_DEFAULT_APP_NAME)
492495

493-
def _check():
494-
_actors = ray._private.state.actors().values()
495-
return (
496-
len(
497-
list(
498-
filter(
499-
lambda a: a["State"] == "ALIVE",
500-
_actors,
501-
)
502-
)
503-
)
504-
== 3
496+
wait_for_condition(
497+
lambda: len(
498+
list_actors(address=cluster.address, filters=[("STATE", "=", "ALIVE")])
505499
)
506-
507-
wait_for_condition(_check)
500+
== 3,
501+
)
508502

509503
# Ensure head node `/-/healthz` and `/-/routes` continue to
510504
# return 200 and expected responses. Also, the worker node

python/ray/serve/tests/test_grpc.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import ray
1111
from ray import serve
1212
from ray._common.test_utils import SignalActor, wait_for_condition
13-
from ray.cluster_utils import Cluster
1413
from ray.serve._private.constants import SERVE_NAMESPACE
1514
from ray.serve._private.test_utils import (
1615
ping_fruit_stand,
@@ -26,6 +25,7 @@
2625
from ray.serve.generated import serve_pb2, serve_pb2_grpc
2726
from ray.serve.grpc_util import RayServegRPCContext
2827
from ray.serve.tests.test_config_files.grpc_deployment import g, g2
28+
from ray.util.state import list_actors
2929

3030

3131
def test_serving_request_through_grpc_proxy(ray_cluster):
@@ -246,7 +246,7 @@ def test_grpc_proxy_on_draining_nodes(ray_cluster):
246246
os.environ["TEST_WORKER_NODE_GRPC_PORT"] = str(worker_node_grpc_port)
247247

248248
# Set up a cluster with 2 nodes.
249-
cluster = Cluster()
249+
cluster = ray_cluster
250250
cluster.add_node(num_cpus=0)
251251
cluster.add_node(num_cpus=2)
252252
cluster.wait_for_nodes()
@@ -277,18 +277,21 @@ def __call__(self):
277277

278278
# Ensure worker node has both replicas.
279279
def check_replicas_on_worker_nodes():
280-
_actors = ray._private.state.actors().values()
281-
replica_nodes = [
282-
a["Address"]["NodeID"]
283-
for a in _actors
284-
if a["ActorClassName"].startswith("ServeReplica")
285-
]
286-
return len(set(replica_nodes)) == 1
280+
return (
281+
len(
282+
{
283+
a.node_id
284+
for a in list_actors(address=cluster.address)
285+
if a.class_name.startswith("ServeReplica")
286+
}
287+
)
288+
== 1
289+
)
287290

288291
wait_for_condition(check_replicas_on_worker_nodes)
289292

290293
# Ensure total actors of 2 proxies, 1 controller, and 2 replicas, and 2 nodes exist.
291-
wait_for_condition(lambda: len(ray._private.state.actors()) == 5)
294+
wait_for_condition(lambda: len(list_actors(address=cluster.address)) == 5)
292295
assert len(ray.nodes()) == 2
293296

294297
# Set up gRPC channels.
@@ -318,21 +321,12 @@ def check_replicas_on_worker_nodes():
318321
# replicas on all nodes.
319322
serve.delete(name=app_name)
320323

321-
def _check():
322-
_actors = ray._private.state.actors().values()
323-
return (
324-
len(
325-
list(
326-
filter(
327-
lambda a: a["State"] == "ALIVE",
328-
_actors,
329-
)
330-
)
331-
)
332-
== 3
324+
wait_for_condition(
325+
lambda: len(
326+
list_actors(address=cluster.address, filters=[("STATE", "=", "ALIVE")])
333327
)
334-
335-
wait_for_condition(_check)
328+
== 3,
329+
)
336330

337331
# Ensures ListApplications method on the head node is succeeding.
338332
wait_for_condition(

python/ray/serve/tests/test_proxy.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,15 @@
99
DEFAULT_UVICORN_KEEP_ALIVE_TIMEOUT_S,
1010
SERVE_NAMESPACE,
1111
)
12+
from ray.util.state import list_actors
1213

1314

1415
class TestTimeoutKeepAliveConfig:
1516
"""Test setting keep_alive_timeout_s in config and env."""
1617

1718
def get_proxy_actor(self) -> ActorHandle:
18-
proxy_actor_name = None
19-
for actor in ray._private.state.actors().values():
20-
if actor["ActorClassName"] == "ProxyActor":
21-
proxy_actor_name = actor["Name"]
22-
return ray.get_actor(proxy_actor_name, namespace=SERVE_NAMESPACE)
19+
[proxy_actor] = list_actors(filters=[("class_name", "=", "ProxyActor")])
20+
return ray.get_actor(proxy_actor.name, namespace=SERVE_NAMESPACE)
2321

2422
def test_default_keep_alive_timeout_s(self, ray_shutdown):
2523
"""Test when no keep_alive_timeout_s is set.

python/ray/serve/tests/test_standalone_2.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import pytest
88

99
import ray
10-
import ray._private.state
1110
import ray.actor
1211
from ray import serve
1312
from ray._common.test_utils import wait_for_condition

python/ray/serve/tests/test_standalone_3.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import pytest
99

1010
import ray
11-
import ray._private.state
1211
import ray.actor
1312
from ray import serve
1413
from ray._common.test_utils import SignalActor, wait_for_condition
@@ -20,6 +19,7 @@
2019
from ray.serve.context import _get_global_client
2120
from ray.serve.schema import ProxyStatus, ServeInstanceDetails
2221
from ray.tests.conftest import call_ray_stop_only # noqa: F401
22+
from ray.util.state import list_actors
2323

2424

2525
@pytest.fixture
@@ -287,23 +287,15 @@ def __call__(self, *args):
287287
serve.run(A.bind(), name="app_f")
288288

289289
# 2 proxies, 1 controller, 2 replicas.
290-
wait_for_condition(lambda: len(ray._private.state.actors()) == 5)
290+
wait_for_condition(lambda: len(list_actors()) == 5)
291291
assert len(ray.nodes()) == 2
292292

293293
# Stop all deployment replicas.
294294
serve.delete("app_f")
295295

296296
# The http proxy on worker node should exit as well.
297297
wait_for_condition(
298-
lambda: len(
299-
list(
300-
filter(
301-
lambda a: a["State"] == "ALIVE",
302-
ray._private.state.actors().values(),
303-
)
304-
)
305-
)
306-
== 2
298+
lambda: len(list_actors(filters=[("STATE", "=", "ALIVE")])) == 2,
307299
)
308300

309301
client = _get_global_client()
@@ -356,7 +348,7 @@ def __call__(self):
356348
serve.run(HelloModel.options(num_replicas=2).bind())
357349

358350
# 3 proxies, 1 controller, 2 replicas.
359-
wait_for_condition(lambda: len(ray._private.state.actors()) == 6)
351+
wait_for_condition(lambda: len(list_actors()) == 6)
360352
assert len(ray.nodes()) == 3
361353

362354
client = _get_global_client()
@@ -435,7 +427,7 @@ def __call__(self):
435427
serve.run(target=model)
436428

437429
# Ensure total actors of 2 proxies, 1 controller, and 2 replicas
438-
wait_for_condition(lambda: len(ray._private.state.actors()) == 5)
430+
wait_for_condition(lambda: len(list_actors()) == 5)
439431
assert len(ray.nodes()) == 2
440432

441433
# Call `graceful_shutdown()` on the controller, so it will start shutdown.
@@ -450,9 +442,7 @@ def __call__(self):
450442

451443
# Ensure the all resources are shutdown.
452444
wait_for_condition(
453-
lambda: all(
454-
[actor["State"] == "DEAD" for actor in ray._private.state.actors().values()]
455-
)
445+
lambda: len(list_actors(filters=[("STATE", "=", "ALIVE")])) == 0,
456446
)
457447

458448
# Clean up serve.
@@ -496,7 +486,7 @@ def __call__(self):
496486
serve.run(target=model)
497487

498488
# Ensure total actors of 2 proxies, 1 controller, and 2 replicas
499-
wait_for_condition(lambda: len(ray._private.state.actors()) == 5)
489+
wait_for_condition(lambda: len(list_actors()) == 5)
500490
assert len(ray.nodes()) == 2
501491

502492
# Ensure client times out if the controller does not shutdown within timeout.
@@ -510,9 +500,7 @@ def __call__(self):
510500

511501
# Ensure the all resources are shutdown gracefully.
512502
wait_for_condition(
513-
lambda: all(
514-
[actor["State"] == "DEAD" for actor in ray._private.state.actors().values()]
515-
),
503+
lambda: len(list_actors(filters=[("STATE", "=", "ALIVE")])) == 0,
516504
)
517505

518506
# Clean up serve.
@@ -543,9 +531,7 @@ def __call__(self):
543531

544532
# Ensure the all resources are shutdown gracefully.
545533
wait_for_condition(
546-
lambda: all(
547-
[actor["State"] == "DEAD" for actor in ray._private.state.actors().values()]
548-
),
534+
lambda: len(list_actors(filters=[("STATE", "=", "ALIVE")])) == 0,
549535
)
550536

551537
all_serve_logs = ""

0 commit comments

Comments
 (0)