-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[Core] Update the attempt number of the actor creation task when actor restart #58877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Another issue we found during the investigation is that, because part of the actor creation process is done on GCS, the task events won't be sent for some of the task state transitions (SUBMITTED_TO_WORKER and PENDING_NODE_ASSIGNMENT for restart). I'm planning to create a followup PR to address the issue to expose the missing task events for the actor creation task. |
| assert event["task_definition_event"]["task_attempt"] == 0 | ||
| assert ( | ||
| event["task_definition_event"]["language"] == "PYTHON" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to just make the expected event and do event["task_definition_event"] = expected_event to make it a little cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. Because event fields like event_id is not deterministic.
I think one solution I can try is to extract all the deterministic fields and compare them with generated expected_event with those fields. Do you think it will be better?
|
|
||
| # Add a second node to the cluster for the actor to be restarted on | ||
| cluster.add_node(num_cpus=2) | ||
| cluster.wait_for_nodes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_node has wait: bool = True anyways
|
|
||
| driver_task_definition_received = False | ||
| actor_creation_task_definition_received = False | ||
| for event in events: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the order of events deterministic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really as well. The order of the events from the same worker process should be deterministic but not across processes. And events from different processes can be interleaved with each other.
| script = """ | ||
| import ray | ||
| import time | ||
| ray.init() | ||
| @ray.remote(num_cpus=2, max_restarts=-1, max_task_retries=-1) | ||
| class Actor: | ||
| def __init__(self): | ||
| pass | ||
| def actor_task(self): | ||
| pass | ||
| actor = Actor.remote() | ||
| time.sleep(999) # Keep the actor alive | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like to use textwrap.dedent to make this readable.
| script, httpserver, ray_start_cluster_head_with_env_vars, validate_events | ||
| ) | ||
|
|
||
| @_cluster_with_aggregator_target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking at the fixture while reviewing this and I think the env_var_prefix makes it hard to find RAY_DASHBOARD_AGGREGATOR_AGENT_PRESERVE_PROTO_FIELD_NAME config. This is one of the few cases in which duplication is probably better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Discussed offline. We should fix that in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
While working on the test of the comment of persisting the actor task spec, I realized that 2 addition issues:
I'll investigate the issues and add the fix to this PR (if it is required by the test case) or come up with followup PRs on them. |
Signed-off-by: Mengjin Yan <[email protected]>
With the investigation,
For this PR, I fixed the comment to persist the actor creation task spec after updating the I also tested locally with the following test case: def test_actor_creation_task_retry_with_gcs_restart(ray_start_regular_with_external_redis):
@ray.remote(max_restarts=-1, max_task_retries=-1)
class Actor:
def __init__(self):
pass
def get_pid(self):
return os.getpid()
# Create an actor and make sure it is alive.
actor = Actor.remote()
pid_1 = ray.get(actor.get_pid.remote())
# Kill the actor and checks the actor creation task is retried.
actor_process_1 = psutil.Process(pid_1)
actor_process_1.kill()
actor_process_1.wait(timeout=10)
pid_2 = ray.get(actor.get_pid.remote())
assert pid_2 != pid_1
# Kill and restart the GCS
ray._private.worker._global_node.kill_gcs_server()
ray._private.worker._global_node.start_gcs_server()
# Check the actor works with the new GCS
pid_3 = ray.get(actor.get_pid.remote())
assert pid_3 == pid_2
# Kill the actor and checks the actor creation task retry number is 2
# ray.kill(actor, no_restart=False)
# wait_for_condition(lambda: list_actors()[0].state == "RESTARTING")
actor_process_2 = psutil.Process(pid_2)
actor_process_2.kill()
actor_process_2.wait(timeout=10)
pid_4 = ray.get(actor.get_pid.remote())
assert pid_4 != pid_2
tasks = ray.util.state.list_tasks()
print(tasks)This is not added to the PR because the list task issue mentioned above needs to be fixed first. But with the local test, I verified by emitting logs about the raw task events and verified that, after the GCS restart, the restart of the actor will have |
Description
Recently, we realized that during actor restarts, we will generate duplicate task events for the same actor creation task id and attempt 0. This is because, the actor restart process happens inside GCS and the corresponding actor creation task TaskSpec won't be updated with the new attempt number.
This PR adds the logic to update the attempt number to the actor creation task TaskSpec and add a test to verify the change.
Related issues
N/A
Additional information
N/A