Skip to content

Commit 1651ec3

Browse files
israbbanielliot-barn
authored andcommitted
[core] Fix race condition b/w object eviction & repinning for recovery. (#53934)
This PR has two components: ### Fixing the Race Condition b/w Object Eviction and Repining for Object Recovery The sequence of events documents the life of an object called A. 1. Owner sends object eviction notifications to all locations as part of Object A going out of scope. (done through worker<->worker pubsub) 2. Workers that have a copy of Object A mark it for asynchronous deletion. 3. Owner decides to recover the object and repin a copy of object A that has already been marked for asynchronous deletion. 4. Owner submits a task that has Object A as an argument. 5. The task executor tries to fetch Object A and the pull manager hangs forever. The fix is to check to see if an object is pending deletion before pinning it. If it is, the owner cannot pin the object and has to either try a different location or trigger reconstruction by resubmitting the task that created the object. ### (The Joys of) Writing a Unit Test in Node Manager There are a few problems with writing unit tests in the NodeManager 1. Despite heroics from @rueian and @dayshah recently, the NodeManager still had a few dependencies that needed to be put behind interfaces so fakes could be injected. 2. The gRPC handlers are all private to the class so they cannot be called directly. Instead the unit tests need to create a gRPC client. 3. The NodeManager has a lot of dependencies and the dependencies have interdependencies so we currently dedicate 200 lines to constructing a test fixture. 4. There is a lack of usable fakes e.g. for the PlasmaClient for writing unit tests. ### Follow Ups 1. Create a shim between the NodeManager and it's gRPC handlers so they can be unit tested directly without creating gRPC clients. 2. Create a utility function that lets you construct a NodeManager for testing optionally overriding interesting dependencies (otherwise they will default to a Noop fake). 3. Move the fakes created in this PR into their own (more complete implementations). 4. Create a Fake for SubscriberInterface. --------- Signed-off-by: irabbani <[email protected]> Signed-off-by: Ibrahim Rabbani <[email protected]> Signed-off-by: elliot-barn <[email protected]>
1 parent f1faeae commit 1651ec3

File tree

8 files changed

+346
-73
lines changed

8 files changed

+346
-73
lines changed

BUILD.bazel

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,11 +567,22 @@ ray_cc_library(
567567
],
568568
)
569569

570+
ray_cc_library(
571+
name ="local_object_manager_interface",
572+
hdrs = ["src/ray/raylet/local_object_manager_interface.h"],
573+
deps = [
574+
"//src/ray/common:id",
575+
"//src/ray/common:ray_object",
576+
"//src/ray/protobuf:node_manager_cc_proto",
577+
],
578+
)
579+
570580
ray_cc_library(
571581
name = "local_object_manager",
572582
srcs = ["src/ray/raylet/local_object_manager.cc"],
573583
hdrs = ["src/ray/raylet/local_object_manager.h"],
574584
deps = [
585+
":local_object_manager_interface",
575586
":worker_pool",
576587
":worker_rpc",
577588
"//src/ray/common:id",
@@ -615,6 +626,7 @@ ray_cc_library(
615626
],
616627
}),
617628
deps = [
629+
":local_object_manager_interface",
618630
":local_object_manager",
619631
":node_manager_fbs",
620632
":node_manager_rpc",
@@ -765,6 +777,8 @@ ray_cc_test(
765777
tags = ["team:core"],
766778
deps = [
767779
":ray_mock",
780+
":local_object_manager_interface",
781+
"//src/ray/util:macros",
768782
":raylet_lib",
769783
"@com_google_googletest//:gtest_main",
770784
],

src/ray/raylet/local_object_manager.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,24 +139,33 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
139139

140140
// Try to evict all copies of the object from the cluster.
141141
if (free_objects_period_ms_ >= 0) {
142-
objects_to_free_.push_back(object_id);
142+
objects_pending_deletion_.emplace(object_id);
143143
}
144-
if (objects_to_free_.size() == free_objects_batch_size_ ||
144+
if (objects_pending_deletion_.size() == free_objects_batch_size_ ||
145145
free_objects_period_ms_ == 0) {
146146
FlushFreeObjects();
147147
}
148148
}
149149

150150
void LocalObjectManager::FlushFreeObjects() {
151-
if (!objects_to_free_.empty()) {
152-
RAY_LOG(DEBUG) << "Freeing " << objects_to_free_.size() << " out-of-scope objects";
153-
on_objects_freed_(objects_to_free_);
154-
objects_to_free_.clear();
151+
if (!objects_pending_deletion_.empty()) {
152+
RAY_LOG(DEBUG) << "Freeing " << objects_pending_deletion_.size()
153+
<< " out-of-scope objects";
154+
// TODO(irabbani): CORE-1640 will modify as much as the plasma API as is
155+
// reasonable to remove usage of vectors in favor of sets.
156+
std::vector<ObjectID> objects_to_delete(objects_pending_deletion_.begin(),
157+
objects_pending_deletion_.end());
158+
on_objects_freed_(objects_to_delete);
159+
objects_pending_deletion_.clear();
155160
}
156161
ProcessSpilledObjectsDeleteQueue(free_objects_batch_size_);
157162
last_free_objects_at_ms_ = current_time_ms();
158163
}
159164

165+
bool LocalObjectManager::ObjectPendingDeletion(const ObjectID &object_id) {
166+
return objects_pending_deletion_.find(object_id) != objects_pending_deletion_.end();
167+
}
168+
160169
void LocalObjectManager::SpillObjectUptoMaxThroughput() {
161170
if (RayConfig::instance().object_spilling_config().empty()) {
162171
return;

src/ray/raylet/local_object_manager.h

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <memory>
1919
#include <queue>
2020
#include <string>
21+
#include <utility>
2122
#include <vector>
2223

2324
#include "ray/common/id.h"
@@ -26,9 +27,9 @@
2627
#include "ray/object_manager/common.h"
2728
#include "ray/object_manager/object_directory.h"
2829
#include "ray/pubsub/subscriber.h"
30+
#include "ray/raylet/local_object_manager_interface.h"
2931
#include "ray/raylet/worker_pool.h"
3032
#include "ray/rpc/worker/core_worker_client_pool.h"
31-
#include "src/ray/protobuf/node_manager.pb.h"
3233

3334
namespace ray {
3435

@@ -39,7 +40,7 @@ inline constexpr int64_t kDefaultSpilledObjectDeleteRetries = 3;
3940

4041
/// This class implements memory management for primary objects, objects that
4142
/// have been freed, and objects that have been spilled.
42-
class LocalObjectManager {
43+
class LocalObjectManager : public LocalObjectManagerInterface {
4344
public:
4445
LocalObjectManager(
4546
const NodeID &node_id,
@@ -58,19 +59,19 @@ class LocalObjectManager {
5859
pubsub::SubscriberInterface *core_worker_subscriber,
5960
IObjectDirectory *object_directory)
6061
: self_node_id_(node_id),
61-
self_node_address_(self_node_address),
62+
self_node_address_(std::move(self_node_address)),
6263
self_node_port_(self_node_port),
6364
io_service_(io_service),
6465
free_objects_period_ms_(free_objects_period_ms),
6566
free_objects_batch_size_(free_objects_batch_size),
6667
io_worker_pool_(io_worker_pool),
6768
owner_client_pool_(owner_client_pool),
68-
on_objects_freed_(on_objects_freed),
69+
on_objects_freed_(std::move(on_objects_freed)),
6970
last_free_objects_at_ms_(current_time_ms()),
7071
min_spilling_size_(RayConfig::instance().min_spilling_size()),
7172
num_active_workers_(0),
7273
max_active_workers_(max_io_workers),
73-
is_plasma_object_spillable_(is_plasma_object_spillable),
74+
is_plasma_object_spillable_(std::move(is_plasma_object_spillable)),
7475
is_external_storage_type_fs_(is_external_storage_type_fs),
7576
max_fused_object_count_(max_fused_object_count),
7677
next_spill_error_log_bytes_(RayConfig::instance().verbose_spill_logs()),
@@ -95,12 +96,12 @@ class LocalObjectManager {
9596
void PinObjectsAndWaitForFree(const std::vector<ObjectID> &object_ids,
9697
std::vector<std::unique_ptr<RayObject>> &&objects,
9798
const rpc::Address &owner_address,
98-
const ObjectID &generator_id = ObjectID::Nil());
99+
const ObjectID &generator_id = ObjectID::Nil()) override;
99100

100101
/// Spill objects as much as possible as fast as possible up to the max throughput.
101102
///
102103
/// \return True if spilling is in progress.
103-
void SpillObjectUptoMaxThroughput();
104+
void SpillObjectUptoMaxThroughput() override;
104105

105106
/// TODO(dayshah): This function is only used for testing, we should remove and just
106107
/// keep SpillObjectsInternal.
@@ -110,7 +111,7 @@ class LocalObjectManager {
110111
/// \param callback A callback to call once the objects have been spilled, or
111112
/// there is an error.
112113
void SpillObjects(const std::vector<ObjectID> &objects_ids,
113-
std::function<void(const ray::Status &)> callback);
114+
std::function<void(const ray::Status &)> callback) override;
114115

115116
/// Restore a spilled object from external storage back into local memory.
116117
/// Note: This is no-op if the same restoration request is in flight or the requested
@@ -121,14 +122,19 @@ class LocalObjectManager {
121122
/// \param object_url The URL where the object is spilled.
122123
/// \param callback A callback to call when the restoration is done.
123124
/// Status will contain the error during restoration, if any.
124-
void AsyncRestoreSpilledObject(const ObjectID &object_id,
125-
int64_t object_size,
126-
const std::string &object_url,
127-
std::function<void(const ray::Status &)> callback);
125+
void AsyncRestoreSpilledObject(
126+
const ObjectID &object_id,
127+
int64_t object_size,
128+
const std::string &object_url,
129+
std::function<void(const ray::Status &)> callback) override;
128130

129131
/// Clear any freed objects. This will trigger the callback for freed
130132
/// objects.
131-
void FlushFreeObjects();
133+
void FlushFreeObjects() override;
134+
135+
/// Returns true if the object has been marked for deletion through the
136+
/// eviction notification.
137+
bool ObjectPendingDeletion(const ObjectID &object_id) override;
132138

133139
/// Judge if objects are deletable from pending_delete_queue and delete them if
134140
/// necessary.
@@ -138,7 +144,7 @@ class LocalObjectManager {
138144
///
139145
/// \param max_batch_size Maximum number of objects that can be deleted by one
140146
/// invocation.
141-
void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size);
147+
void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) override;
142148

143149
/// Return True if spilling is in progress.
144150
/// This is a narrow interface that is accessed by plasma store.
@@ -147,31 +153,31 @@ class LocalObjectManager {
147153
/// which is against the general raylet design.
148154
///
149155
/// \return True if spilling is still in progress. False otherwise.
150-
bool IsSpillingInProgress();
156+
bool IsSpillingInProgress() override;
151157

152158
/// Populate object store stats.
153159
///
154160
/// \param reply Output parameter.
155-
void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const;
161+
void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const override;
156162

157163
/// Record object spilling stats to metrics.
158-
void RecordMetrics() const;
164+
void RecordMetrics() const override;
159165

160166
/// Return the spilled object URL if the object is spilled locally,
161167
/// or the empty string otherwise.
162168
/// If the external storage is cloud, this will always return an empty string.
163169
/// In that case, the URL is supposed to be obtained by the object directory.
164-
std::string GetLocalSpilledObjectURL(const ObjectID &object_id);
170+
std::string GetLocalSpilledObjectURL(const ObjectID &object_id) override;
165171

166172
/// Get the current bytes used by primary object copies. This number includes
167173
/// bytes used by objects currently being spilled.
168-
int64_t GetPrimaryBytes() const;
174+
int64_t GetPrimaryBytes() const override;
169175

170176
/// Returns true if we have objects spilled to the local
171177
/// filesystem.
172-
bool HasLocallySpilledObjects() const;
178+
bool HasLocallySpilledObjects() const override;
173179

174-
std::string DebugString() const;
180+
std::string DebugString() const override;
175181

176182
private:
177183
struct LocalObjectInfo {
@@ -284,7 +290,7 @@ class LocalObjectManager {
284290
/// from plasma. The cache is flushed when it reaches the
285291
/// free_objects_batch_size, or if objects have been in the cache for longer
286292
/// than the config's free_objects_period, whichever occurs first.
287-
std::vector<ObjectID> objects_to_free_;
293+
absl::flat_hash_set<ObjectID> objects_pending_deletion_;
288294

289295
/// The total size of the objects that are currently being
290296
/// spilled from this node, in bytes.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <functional>
18+
#include <memory>
19+
#include <string>
20+
#include <vector>
21+
22+
#include "ray/common/id.h"
23+
#include "ray/common/ray_object.h"
24+
#include "src/ray/protobuf/node_manager.pb.h"
25+
26+
namespace ray {
27+
28+
namespace raylet {
29+
30+
class LocalObjectManagerInterface {
31+
public:
32+
virtual ~LocalObjectManagerInterface() = default;
33+
34+
virtual void PinObjectsAndWaitForFree(const std::vector<ObjectID> &,
35+
std::vector<std::unique_ptr<RayObject>> &&,
36+
const rpc::Address &,
37+
const ObjectID & = ObjectID::Nil()) = 0;
38+
39+
virtual void SpillObjectUptoMaxThroughput() = 0;
40+
41+
/// TODO(dayshah): This function is only used for testing, we should remove and just
42+
/// keep SpillObjectsInternal.
43+
virtual void SpillObjects(const std::vector<ObjectID> &,
44+
std::function<void(const ray::Status &)>) = 0;
45+
46+
virtual void AsyncRestoreSpilledObject(const ObjectID &,
47+
int64_t,
48+
const std::string &,
49+
std::function<void(const ray::Status &)>) = 0;
50+
51+
virtual void FlushFreeObjects() = 0;
52+
53+
virtual bool ObjectPendingDeletion(const ObjectID &) = 0;
54+
55+
virtual void ProcessSpilledObjectsDeleteQueue(uint32_t) = 0;
56+
57+
virtual bool IsSpillingInProgress() = 0;
58+
59+
virtual void FillObjectStoreStats(rpc::GetNodeStatsReply *) const = 0;
60+
61+
virtual void RecordMetrics() const = 0;
62+
63+
virtual std::string GetLocalSpilledObjectURL(const ObjectID &) = 0;
64+
65+
virtual int64_t GetPrimaryBytes() const = 0;
66+
67+
virtual bool HasLocallySpilledObjects() const = 0;
68+
69+
virtual std::string DebugString() const = 0;
70+
};
71+
72+
}; // namespace raylet
73+
74+
}; // namespace ray

src/ray/raylet/main.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include "ray/common/task/task_common.h"
3232
#include "ray/gcs/gcs_client/gcs_client.h"
3333
#include "ray/object_manager/ownership_object_directory.h"
34+
#include "ray/raylet/local_object_manager.h"
35+
#include "ray/raylet/local_object_manager_interface.h"
3436
#include "ray/raylet/raylet.h"
3537
#include "ray/stats/stats.h"
3638
#include "ray/util/cmd_line_utils.h"
@@ -40,7 +42,6 @@
4042
#include "ray/util/stream_redirection_options.h"
4143
#include "ray/util/subreaper.h"
4244
#include "scheduling/cluster_task_manager.h"
43-
#include "src/ray/protobuf/gcs.pb.h"
4445

4546
using json = nlohmann::json;
4647

@@ -262,7 +263,7 @@ int main(int argc, char *argv[]) {
262263
std::unique_ptr<ray::raylet::WorkerPoolInterface> worker_pool;
263264
/// Manages all local objects that are pinned (primary
264265
/// copies), freed, and/or spilled.
265-
std::unique_ptr<ray::raylet::LocalObjectManager> local_object_manager;
266+
std::unique_ptr<ray::raylet::LocalObjectManagerInterface> local_object_manager;
266267
/// These classes make up the new scheduler. ClusterResourceScheduler is
267268
/// responsible for maintaining a view of the cluster state w.r.t resource
268269
/// usage. ClusterTaskManager is responsible for queuing, spilling back, and

src/ray/raylet/node_manager.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "ray/gcs/pb_util.h"
4343
#include "ray/object_manager/ownership_object_directory.h"
4444
#include "ray/raylet/format/node_manager_generated.h"
45+
#include "ray/raylet/local_object_manager_interface.h"
4546
#include "ray/raylet/scheduling/cluster_task_manager.h"
4647
#include "ray/raylet/worker_killing_policy.h"
4748
#include "ray/raylet/worker_pool.h"
@@ -121,7 +122,7 @@ NodeManager::NodeManager(
121122
ClusterTaskManagerInterface &cluster_task_manager,
122123
IObjectDirectory &object_directory,
123124
ObjectManagerInterface &object_manager,
124-
LocalObjectManager &local_object_manager,
125+
LocalObjectManagerInterface &local_object_manager,
125126
DependencyManager &dependency_manager,
126127
WorkerPoolInterface &worker_pool,
127128
absl::flat_hash_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
@@ -2490,11 +2491,16 @@ void NodeManager::HandlePinObjectIDs(rpc::PinObjectIDsRequest request,
24902491
auto object_id_it = object_ids.begin();
24912492
auto result_it = results.begin();
24922493
while (object_id_it != object_ids.end()) {
2493-
if (*result_it == nullptr) {
2494+
// Note: It is safe to call ObjectPendingDeletion here because the asynchronous
2495+
// deletion can only happen on the same thread as the call to HandlePinObjectIDs.
2496+
// Therefore, a new object cannot be marked for deletion while this function is
2497+
// executing.
2498+
if (*result_it == nullptr ||
2499+
local_object_manager_.ObjectPendingDeletion(*object_id_it)) {
24942500
RAY_LOG(DEBUG).WithField(*object_id_it)
24952501
<< "Failed to get object in the object store. This should only happen when "
2496-
"the owner tries to pin a "
2497-
<< "secondary copy and it's evicted in the meantime";
2502+
"the owner tries to pin an object and it's already been deleted or is "
2503+
"marked for deletion.";
24982504
object_id_it = object_ids.erase(object_id_it);
24992505
result_it = results.erase(result_it);
25002506
reply->add_successes(false);

0 commit comments

Comments
 (0)