Skip to content
Merged
13 changes: 13 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,22 @@ ray_cc_library(
],
)

ray_cc_library(
name ="local_object_manager_interface",
hdrs = ["src/ray/raylet/local_object_manager_interface.h"],
deps = [
"//src/ray/common:id",
"//src/ray/common:ray_object",
"//src/ray/protobuf:node_manager_cc_proto",
],
)

ray_cc_library(
name = "local_object_manager",
srcs = ["src/ray/raylet/local_object_manager.cc"],
hdrs = ["src/ray/raylet/local_object_manager.h"],
deps = [
":local_object_manager_interface",
":worker_pool",
":worker_rpc",
"//src/ray/common:id",
Expand Down Expand Up @@ -615,6 +626,7 @@ ray_cc_library(
],
}),
deps = [
":local_object_manager_interface",
":local_object_manager",
":node_manager_fbs",
":node_manager_rpc",
Expand Down Expand Up @@ -764,6 +776,7 @@ ray_cc_test(
tags = ["team:core"],
deps = [
":ray_mock",
":local_object_manager_interface",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
Expand Down
21 changes: 15 additions & 6 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,33 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {

// Try to evict all copies of the object from the cluster.
if (free_objects_period_ms_ >= 0) {
objects_to_free_.push_back(object_id);
objects_pending_deletion_.emplace(object_id);
}
if (objects_to_free_.size() == free_objects_batch_size_ ||
if (objects_pending_deletion_.size() == free_objects_batch_size_ ||
free_objects_period_ms_ == 0) {
FlushFreeObjects();
}
}

void LocalObjectManager::FlushFreeObjects() {
if (!objects_to_free_.empty()) {
RAY_LOG(DEBUG) << "Freeing " << objects_to_free_.size() << " out-of-scope objects";
on_objects_freed_(objects_to_free_);
objects_to_free_.clear();
if (!objects_pending_deletion_.empty()) {
RAY_LOG(DEBUG) << "Freeing " << objects_pending_deletion_.size()
<< " out-of-scope objects";
// TODO(irabbani): CORE-1640 will modify as much as the plasma API as is
// reasonable to remove usage of vectors in favor of sets.
Copy link
Contributor

@dayshah dayshah Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably no jira tickets in oss code, make it a gh issue or don't keep in code

std::vector<ObjectID> objects_to_delete(objects_pending_deletion_.begin(),
objects_pending_deletion_.end());
on_objects_freed_(objects_to_delete);
objects_pending_deletion_.clear();
}
ProcessSpilledObjectsDeleteQueue(free_objects_batch_size_);
last_free_objects_at_ms_ = current_time_ms();
}

bool LocalObjectManager::ObjectPendingDeletion(const ObjectID &object_id) {
return objects_pending_deletion_.find(object_id) != objects_pending_deletion_.end();
}

void LocalObjectManager::SpillObjectUptoMaxThroughput() {
if (RayConfig::instance().object_spilling_config().empty()) {
return;
Expand Down
50 changes: 28 additions & 22 deletions src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <vector>

#include "ray/common/id.h"
Expand All @@ -26,9 +27,9 @@
#include "ray/object_manager/common.h"
#include "ray/object_manager/object_directory.h"
#include "ray/pubsub/subscriber.h"
#include "ray/raylet/local_object_manager_interface.h"
#include "ray/raylet/worker_pool.h"
#include "ray/rpc/worker/core_worker_client_pool.h"
#include "src/ray/protobuf/node_manager.pb.h"

namespace ray {

Expand All @@ -39,7 +40,7 @@ inline constexpr int64_t kDefaultSpilledObjectDeleteRetries = 3;

/// This class implements memory management for primary objects, objects that
/// have been freed, and objects that have been spilled.
class LocalObjectManager {
class LocalObjectManager : public LocalObjectManagerInterface {
public:
LocalObjectManager(
const NodeID &node_id,
Expand All @@ -58,19 +59,19 @@ class LocalObjectManager {
pubsub::SubscriberInterface *core_worker_subscriber,
IObjectDirectory *object_directory)
: self_node_id_(node_id),
self_node_address_(self_node_address),
self_node_address_(std::move(self_node_address)),
self_node_port_(self_node_port),
io_service_(io_service),
free_objects_period_ms_(free_objects_period_ms),
free_objects_batch_size_(free_objects_batch_size),
io_worker_pool_(io_worker_pool),
owner_client_pool_(owner_client_pool),
on_objects_freed_(on_objects_freed),
on_objects_freed_(std::move(on_objects_freed)),
last_free_objects_at_ms_(current_time_ms()),
min_spilling_size_(RayConfig::instance().min_spilling_size()),
num_active_workers_(0),
max_active_workers_(max_io_workers),
is_plasma_object_spillable_(is_plasma_object_spillable),
is_plasma_object_spillable_(std::move(is_plasma_object_spillable)),
is_external_storage_type_fs_(is_external_storage_type_fs),
max_fused_object_count_(max_fused_object_count),
next_spill_error_log_bytes_(RayConfig::instance().verbose_spill_logs()),
Expand All @@ -95,12 +96,12 @@ class LocalObjectManager {
void PinObjectsAndWaitForFree(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects,
const rpc::Address &owner_address,
const ObjectID &generator_id = ObjectID::Nil());
const ObjectID &generator_id = ObjectID::Nil()) override;

/// Spill objects as much as possible as fast as possible up to the max throughput.
///
/// \return True if spilling is in progress.
void SpillObjectUptoMaxThroughput();
void SpillObjectUptoMaxThroughput() override;

/// TODO(dayshah): This function is only used for testing, we should remove and just
/// keep SpillObjectsInternal.
Expand All @@ -110,7 +111,7 @@ class LocalObjectManager {
/// \param callback A callback to call once the objects have been spilled, or
/// there is an error.
void SpillObjects(const std::vector<ObjectID> &objects_ids,
std::function<void(const ray::Status &)> callback);
std::function<void(const ray::Status &)> callback) override;

/// Restore a spilled object from external storage back into local memory.
/// Note: This is no-op if the same restoration request is in flight or the requested
Expand All @@ -121,14 +122,19 @@ class LocalObjectManager {
/// \param object_url The URL where the object is spilled.
/// \param callback A callback to call when the restoration is done.
/// Status will contain the error during restoration, if any.
void AsyncRestoreSpilledObject(const ObjectID &object_id,
int64_t object_size,
const std::string &object_url,
std::function<void(const ray::Status &)> callback);
void AsyncRestoreSpilledObject(
const ObjectID &object_id,
int64_t object_size,
const std::string &object_url,
std::function<void(const ray::Status &)> callback) override;

/// Clear any freed objects. This will trigger the callback for freed
/// objects.
void FlushFreeObjects();
void FlushFreeObjects() override;

/// Returns true if the object has been marked for deletion through the
/// eviction notification.
bool ObjectPendingDeletion(const ObjectID &object_id) override;

/// Judge if objects are deletable from pending_delete_queue and delete them if
/// necessary.
Expand All @@ -138,7 +144,7 @@ class LocalObjectManager {
///
/// \param max_batch_size Maximum number of objects that can be deleted by one
/// invocation.
void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size);
void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) override;

/// Return True if spilling is in progress.
/// This is a narrow interface that is accessed by plasma store.
Expand All @@ -147,31 +153,31 @@ class LocalObjectManager {
/// which is against the general raylet design.
///
/// \return True if spilling is still in progress. False otherwise.
bool IsSpillingInProgress();
bool IsSpillingInProgress() override;

/// Populate object store stats.
///
/// \param reply Output parameter.
void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const;
void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const override;

/// Record object spilling stats to metrics.
void RecordMetrics() const;
void RecordMetrics() const override;

/// Return the spilled object URL if the object is spilled locally,
/// or the empty string otherwise.
/// If the external storage is cloud, this will always return an empty string.
/// In that case, the URL is supposed to be obtained by the object directory.
std::string GetLocalSpilledObjectURL(const ObjectID &object_id);
std::string GetLocalSpilledObjectURL(const ObjectID &object_id) override;

/// Get the current bytes used by primary object copies. This number includes
/// bytes used by objects currently being spilled.
int64_t GetPrimaryBytes() const;
int64_t GetPrimaryBytes() const override;

/// Returns true if we have objects spilled to the local
/// filesystem.
bool HasLocallySpilledObjects() const;
bool HasLocallySpilledObjects() const override;

std::string DebugString() const;
std::string DebugString() const override;

private:
struct LocalObjectInfo {
Expand Down Expand Up @@ -284,7 +290,7 @@ class LocalObjectManager {
/// from plasma. The cache is flushed when it reaches the
/// free_objects_batch_size, or if objects have been in the cache for longer
/// than the config's free_objects_period, whichever occurs first.
std::vector<ObjectID> objects_to_free_;
absl::flat_hash_set<ObjectID> objects_pending_deletion_;

/// The total size of the objects that are currently being
/// spilled from this node, in bytes.
Expand Down
74 changes: 74 additions & 0 deletions src/ray/raylet/local_object_manager_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "ray/common/id.h"
#include "ray/common/ray_object.h"
#include "src/ray/protobuf/node_manager.pb.h"

namespace ray {

namespace raylet {

class LocalObjectManagerInterface {
public:
virtual ~LocalObjectManagerInterface() = default;

virtual void PinObjectsAndWaitForFree(const std::vector<ObjectID> &,
std::vector<std::unique_ptr<RayObject>> &&,
const rpc::Address &,
const ObjectID & = ObjectID::Nil()) = 0;

virtual void SpillObjectUptoMaxThroughput() = 0;

/// TODO(dayshah): This function is only used for testing, we should remove and just
/// keep SpillObjectsInternal.
virtual void SpillObjects(const std::vector<ObjectID> &,
std::function<void(const ray::Status &)>) = 0;

virtual void AsyncRestoreSpilledObject(const ObjectID &,
int64_t,
const std::string &,
std::function<void(const ray::Status &)>) = 0;

virtual void FlushFreeObjects() = 0;

virtual bool ObjectPendingDeletion(const ObjectID &) = 0;

virtual void ProcessSpilledObjectsDeleteQueue(uint32_t) = 0;

virtual bool IsSpillingInProgress() = 0;

virtual void FillObjectStoreStats(rpc::GetNodeStatsReply *) const = 0;

virtual void RecordMetrics() const = 0;

virtual std::string GetLocalSpilledObjectURL(const ObjectID &) = 0;

virtual int64_t GetPrimaryBytes() const = 0;

virtual bool HasLocallySpilledObjects() const = 0;

virtual std::string DebugString() const = 0;
};

}; // namespace raylet

}; // namespace ray
5 changes: 3 additions & 2 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "ray/common/task/task_common.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/object_manager/ownership_object_directory.h"
#include "ray/raylet/local_object_manager.h"
#include "ray/raylet/local_object_manager_interface.h"
#include "ray/raylet/raylet.h"
#include "ray/stats/stats.h"
#include "ray/util/cmd_line_utils.h"
Expand All @@ -40,7 +42,6 @@
#include "ray/util/stream_redirection_options.h"
#include "ray/util/subreaper.h"
#include "scheduling/cluster_task_manager.h"
#include "src/ray/protobuf/gcs.pb.h"

using json = nlohmann::json;

Expand Down Expand Up @@ -262,7 +263,7 @@ int main(int argc, char *argv[]) {
std::unique_ptr<ray::raylet::WorkerPoolInterface> worker_pool;
/// Manages all local objects that are pinned (primary
/// copies), freed, and/or spilled.
std::unique_ptr<ray::raylet::LocalObjectManager> local_object_manager;
std::unique_ptr<ray::raylet::LocalObjectManagerInterface> local_object_manager;
/// These classes make up the new scheduler. ClusterResourceScheduler is
/// responsible for maintaining a view of the cluster state w.r.t resource
/// usage. ClusterTaskManager is responsible for queuing, spilling back, and
Expand Down
14 changes: 10 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "ray/gcs/pb_util.h"
#include "ray/object_manager/ownership_object_directory.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/raylet/local_object_manager_interface.h"
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/raylet/worker_killing_policy.h"
#include "ray/raylet/worker_pool.h"
Expand Down Expand Up @@ -121,7 +122,7 @@ NodeManager::NodeManager(
ClusterTaskManagerInterface &cluster_task_manager,
IObjectDirectory &object_directory,
ObjectManagerInterface &object_manager,
LocalObjectManager &local_object_manager,
LocalObjectManagerInterface &local_object_manager,
DependencyManager &dependency_manager,
WorkerPoolInterface &worker_pool,
absl::flat_hash_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
Expand Down Expand Up @@ -2490,11 +2491,16 @@ void NodeManager::HandlePinObjectIDs(rpc::PinObjectIDsRequest request,
auto object_id_it = object_ids.begin();
auto result_it = results.begin();
while (object_id_it != object_ids.end()) {
if (*result_it == nullptr) {
// Note: It is safe to call ObjectPendingDeletion here because the asynchronous
// deletion can only happen on the same thread as the call to HandlePinObjectIDs.
// Therefore, a new object cannot be marked for deletion while this function is
// executing.
if (*result_it == nullptr ||
local_object_manager_.ObjectPendingDeletion(*object_id_it)) {
Copy link
Contributor Author

@israbbani israbbani Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix and logic change.

RAY_LOG(DEBUG).WithField(*object_id_it)
<< "Failed to get object in the object store. This should only happen when "
"the owner tries to pin a "
<< "secondary copy and it's evicted in the meantime";
"the owner tries to pin an object and it's already been deleted or is "
"marked for deletion.";
object_id_it = object_ids.erase(object_id_it);
result_it = results.erase(result_it);
reply->add_successes(false);
Expand Down
Loading