Skip to content

Commit 00c14b3

Browse files
committed
fix: use canceled_tasks_ set only
Signed-off-by: machichima <[email protected]>
1 parent f999af9 commit 00c14b3

File tree

4 files changed

+10
-52
lines changed

4 files changed

+10
-52
lines changed

src/ray/core_worker/context.cc

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -98,25 +98,16 @@ struct WorkerThreadContext {
9898
SetCurrentPlacementGroupId(task_spec.PlacementGroupBundleId().first);
9999
SetPlacementGroupCaptureChildTasks(task_spec.PlacementGroupCaptureChildTasks());
100100
current_task_ = std::make_shared<const TaskSpecification>(task_spec);
101-
102-
is_current_task_canceled_.store(false);
103101
}
104102

105103
void ResetCurrentTask() {
106104
SetCurrentTaskId(TaskID::Nil(), /*attempt_number=*/0);
107105
task_index_ = 0;
108106
put_counter_ = 0;
109-
is_current_task_canceled_.store(false);
110107
}
111108

112109
uint32_t GetMaxNumGeneratorReturnIndex() const { return max_num_generator_returns_; }
113110

114-
void SetCurrentTaskCanceled(bool is_canceled) {
115-
is_current_task_canceled_.store(is_canceled);
116-
}
117-
118-
bool IsCurrentTaskCanceled() const { return is_current_task_canceled_.load(); }
119-
120111
private:
121112
/// The task ID for current task.
122113
TaskID current_task_id_;
@@ -156,12 +147,6 @@ struct WorkerThreadContext {
156147

157148
/// The maximum number of generator return values.
158149
uint32_t max_num_generator_returns_;
159-
160-
/// Whether the current task has been canceled via ray.cancel().
161-
/// This is set to true when a CancelTask RPC is received for the currently executing task.
162-
/// Using atomic because the cancellation request may come from a different thread
163-
/// (e.g., via the gRPC handler thread) than the task execution thread.
164-
std::atomic<bool> is_current_task_canceled_{false};
165150
};
166151

167152
thread_local std::unique_ptr<WorkerThreadContext> WorkerContext::thread_context_ =
@@ -490,18 +475,5 @@ WorkerThreadContext &WorkerContext::GetThreadContext() const {
490475
return *thread_context_;
491476
}
492477

493-
void WorkerContext::SetCurrentTaskCanceled(bool is_canceled) {
494-
if (thread_context_ != nullptr) {
495-
thread_context_->SetCurrentTaskCanceled(is_canceled);
496-
}
497-
}
498-
499-
bool WorkerContext::IsCurrentTaskCanceled() const {
500-
if (thread_context_ != nullptr) {
501-
return thread_context_->IsCurrentTaskCanceled();
502-
}
503-
return false;
504-
}
505-
506478
} // namespace core
507479
} // namespace ray

src/ray/core_worker/context.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,6 @@ class WorkerContext {
136136

137137
int64_t GetTaskDepth() const;
138138

139-
void SetCurrentTaskCanceled(bool is_canceled);
140-
bool IsCurrentTaskCanceled() const;
141-
142139
private:
143140
const WorkerType worker_type_;
144141
const WorkerID worker_id_;

src/ray/core_worker/core_worker.cc

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,24 +2495,10 @@ Status CoreWorker::CancelTask(const ObjectID &object_id,
24952495
}
24962496

24972497
bool CoreWorker::IsTaskCanceled(const TaskID &task_id) const {
2498-
auto current_task_id = worker_context_->GetCurrentTaskID();
2499-
2500-
if (current_task_id == task_id) {
2501-
// We're checking if the currently executing task is canceled (executor side).
2502-
// First check the thread-local state (for tasks canceled before execution started).
2503-
if (worker_context_->IsCurrentTaskCanceled()) {
2504-
return true;
2505-
}
2506-
// Then check the global canceled_tasks_ set (for tasks canceled during execution,
2507-
// possibly from a different thread via CancelTask RPC).
2508-
{
2509-
absl::MutexLock lock(&mutex_);
2510-
return canceled_tasks_.find(task_id) != canceled_tasks_.end();
2511-
}
2512-
}
2513-
2514-
// For submitted tasks (submitter side), check TaskManager.
2515-
return task_manager_->IsTaskCanceled(task_id);
2498+
// Check if the task is canceled on executor side. Check the canceled_tasks_ which is
2499+
// populated when CancelTask RPC is received.
2500+
absl::MutexLock lock(&mutex_);
2501+
return canceled_tasks_.find(task_id) != canceled_tasks_.end();
25162502
}
25172503

25182504
Status CoreWorker::CancelChildren(const TaskID &task_id, bool force_kill) {

src/ray/core_worker/core_worker.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1845,9 +1845,12 @@ class CoreWorker {
18451845
/// contexts from GetCoreWorkerStats().
18461846
absl::flat_hash_map<TaskID, TaskSpecification> running_tasks_ ABSL_GUARDED_BY(mutex_);
18471847

1848-
/// Tracks which tasks have been marked as canceled. This is needed because cancellation
1849-
/// requests may come from a different thread (e.g., via gRPC handler) than the thread
1850-
/// executing the task, so we can't rely solely on thread-local WorkerThreadContext.
1848+
/// Tracks which tasks have been marked as canceled. For single-threaded, non-async actors this will
1849+
/// contain at most one task ID.
1850+
///
1851+
/// We have to track this separately because cancellation requests come from gRPC
1852+
/// thread than the thread executing the task, so we cannot get the cancellation status
1853+
/// from the thread-local WorkerThreadContext.
18511854
absl::flat_hash_set<TaskID> canceled_tasks_ ABSL_GUARDED_BY(mutex_);
18521855

18531856
/// Key value pairs to be displayed on Web UI.

0 commit comments

Comments
 (0)