Skip to content

Commit 6299dc8

Browse files
dayshahelliot-barn
authored andcommitted
[core] Remove actor task path in normal task submitter (#53996)
Actor tasks are never submitted into the normal task submitter and never should be. This removes that path and asserts it. Also changing a variable name from `client_cache_` to `core_worker_client_pool_`. Signed-off-by: dayshah <[email protected]> Signed-off-by: elliot-barn <[email protected]>
1 parent 5226960 commit 6299dc8

File tree

2 files changed

+17
-24
lines changed

2 files changed

+17
-24
lines changed

src/ray/core_worker/transport/normal_task_submitter.cc

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ void NormalTaskSubmitter::AddWorkerLeaseClient(
9494
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources,
9595
const SchedulingKey &scheduling_key,
9696
const TaskID &task_id) {
97-
client_cache_->GetOrConnect(addr);
97+
core_worker_client_pool_->GetOrConnect(addr);
9898
int64_t expiration = current_time_ms() + lease_timeout_ms_;
9999
LeaseEntry new_lease_entry = LeaseEntry(
100100
std::move(lease_client), expiration, assigned_resources, scheduling_key, task_id);
@@ -166,7 +166,7 @@ void NormalTaskSubmitter::OnWorkerIdle(
166166
ReturnWorker(addr, was_error, error_detail, worker_exiting, scheduling_key);
167167
}
168168
} else {
169-
auto client = client_cache_->GetOrConnect(addr);
169+
auto client = core_worker_client_pool_->GetOrConnect(addr);
170170

171171
while (!current_queue.empty() && !lease_entry.is_busy) {
172172
auto task_spec = std::move(current_queue.front());
@@ -556,7 +556,6 @@ void NormalTaskSubmitter::PushNormalTask(
556556
<< NodeID::FromBinary(addr.raylet_id());
557557
auto task_id = task_spec.TaskId();
558558
auto request = std::make_unique<rpc::PushTaskRequest>();
559-
bool is_actor = task_spec.IsActorTask();
560559
bool is_actor_creation = task_spec.IsActorCreationTask();
561560

562561
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
@@ -573,7 +572,6 @@ void NormalTaskSubmitter::PushNormalTask(
573572
[this,
574573
task_spec = std::move(task_spec),
575574
task_id,
576-
is_actor,
577575
is_actor_creation,
578576
scheduling_key,
579577
addr,
@@ -600,11 +598,10 @@ void NormalTaskSubmitter::PushNormalTask(
600598
if (!status.ok()) {
601599
RAY_LOG(DEBUG) << "Getting error from raylet for task " << task_id;
602600
const ray::rpc::ClientCallback<ray::rpc::GetTaskFailureCauseReply> callback =
603-
[this, status, is_actor, task_id, addr](
601+
[this, status, task_id, addr](
604602
const Status &get_task_failure_cause_reply_status,
605603
const rpc::GetTaskFailureCauseReply &get_task_failure_cause_reply) {
606604
HandleGetTaskFailureCause(status,
607-
is_actor,
608605
task_id,
609606
addr,
610607
get_task_failure_cause_reply_status,
@@ -648,7 +645,6 @@ void NormalTaskSubmitter::PushNormalTask(
648645

649646
void NormalTaskSubmitter::HandleGetTaskFailureCause(
650647
const Status &task_execution_status,
651-
const bool is_actor,
652648
const TaskID &task_id,
653649
const rpc::Address &addr,
654650
const Status &get_task_failure_cause_reply_status,
@@ -690,13 +686,12 @@ void NormalTaskSubmitter::HandleGetTaskFailureCause(
690686
error_info->set_error_message(buffer.str());
691687
error_info->set_error_type(rpc::ErrorType::NODE_DIED);
692688
}
693-
RAY_UNUSED(task_finisher_.FailOrRetryPendingTask(
694-
task_id,
695-
is_actor ? rpc::ErrorType::ACTOR_DIED : task_error_type,
696-
&task_execution_status,
697-
error_info.get(),
698-
/*mark_task_object_failed*/ true,
699-
fail_immediately));
689+
RAY_UNUSED(task_finisher_.FailOrRetryPendingTask(task_id,
690+
task_error_type,
691+
&task_execution_status,
692+
error_info.get(),
693+
/*mark_task_object_failed*/ true,
694+
fail_immediately));
700695
}
701696

702697
Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec,
@@ -752,7 +747,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec,
752747
return Status::OK();
753748
}
754749
// Looks for an RPC handle for the worker executing the task.
755-
client = client_cache_->GetOrConnect(rpc_client->second);
750+
client = core_worker_client_pool_->GetOrConnect(rpc_client->second);
756751
}
757752

758753
RAY_CHECK(client != nullptr);
@@ -812,7 +807,7 @@ Status NormalTaskSubmitter::CancelRemoteTask(const ObjectID &object_id,
812807
const rpc::Address &worker_addr,
813808
bool force_kill,
814809
bool recursive) {
815-
auto client = client_cache_->GetOrConnect(worker_addr);
810+
auto client = core_worker_client_pool_->GetOrConnect(worker_addr);
816811
auto request = rpc::RemoteCancelTaskRequest();
817812
request.set_force_kill(force_kill);
818813
request.set_recursive(recursive);

src/ray/core_worker/transport/normal_task_submitter.h

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,17 @@ class NormalTaskSubmitter {
9595
const TensorTransportGetter &tensor_transport_getter,
9696
std::optional<boost::asio::steady_timer> cancel_timer = absl::nullopt)
9797
: rpc_address_(std::move(rpc_address)),
98-
local_lease_client_(lease_client),
99-
lease_client_factory_(lease_client_factory),
98+
local_lease_client_(std::move(lease_client)),
99+
lease_client_factory_(std::move(lease_client_factory)),
100100
lease_policy_(std::move(lease_policy)),
101101
resolver_(*store, task_finisher, *actor_creator, tensor_transport_getter),
102102
task_finisher_(task_finisher),
103103
lease_timeout_ms_(lease_timeout_ms),
104104
local_raylet_id_(local_raylet_id),
105105
worker_type_(worker_type),
106-
client_cache_(core_worker_client_pool),
106+
core_worker_client_pool_(std::move(core_worker_client_pool)),
107107
job_id_(job_id),
108-
lease_request_rate_limiter_(lease_request_rate_limiter),
108+
lease_request_rate_limiter_(std::move(lease_request_rate_limiter)),
109109
cancel_retry_timer_(std::move(cancel_timer)) {}
110110

111111
/// Schedule a task for direct submission to a worker.
@@ -218,7 +218,7 @@ class NormalTaskSubmitter {
218218
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
219219

220220
/// Check that the scheduling_key_entries_ hashmap is empty.
221-
inline bool CheckNoSchedulingKeyEntries() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
221+
bool CheckNoSchedulingKeyEntries() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
222222
return scheduling_key_entries_.empty();
223223
}
224224

@@ -233,7 +233,6 @@ class NormalTaskSubmitter {
233233
/// Handles result from GetTaskFailureCause.
234234
void HandleGetTaskFailureCause(
235235
const Status &task_execution_status,
236-
const bool is_actor,
237236
const TaskID &task_id,
238237
const rpc::Address &addr,
239238
const Status &get_task_failure_cause_reply_status,
@@ -276,8 +275,7 @@ class NormalTaskSubmitter {
276275
// Protects task submission state below.
277276
absl::Mutex mu_;
278277

279-
/// Cache of gRPC clients to other workers.
280-
std::shared_ptr<rpc::CoreWorkerClientPool> client_cache_;
278+
std::shared_ptr<rpc::CoreWorkerClientPool> core_worker_client_pool_;
281279

282280
/// The ID of the job.
283281
const JobID job_id_;

0 commit comments

Comments
 (0)