Skip to content

Commit f9d1ff1

Browse files
authored
[core] Fix check fail when task buffer periodical runner runs before RayEvent is initialized (#55249)
Signed-off-by: dayshah <[email protected]>
1 parent aeae43f commit f9d1ff1

File tree

3 files changed

+55
-62
lines changed

3 files changed

+55
-62
lines changed

src/ray/core_worker/core_worker_process.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,18 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
722722
// We need init stats before using it/spawning threads.
723723
stats::Init(global_tags, options_.metrics_agent_port, worker_id_);
724724

725+
// Initialize event framework before starting up worker.
726+
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
727+
const std::vector<SourceTypeVariant> source_types = {
728+
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
729+
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
730+
RayEventInit(source_types,
731+
/*custom_fields=*/{},
732+
options_.log_dir,
733+
RayConfig::instance().event_level(),
734+
RayConfig::instance().emit_event_to_log_file());
735+
}
736+
725737
{
726738
// Notify that core worker is initialized.
727739
absl::Cleanup initialzed_scope_guard = [this] {
@@ -732,18 +744,6 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
732744
auto write_locked = core_worker_.LockForWrite();
733745
write_locked.Get() = worker;
734746
}
735-
736-
// Initialize event framework.
737-
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
738-
const std::vector<SourceTypeVariant> source_types = {
739-
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
740-
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
741-
RayEventInit(source_types,
742-
absl::flat_hash_map<std::string, std::string>(),
743-
options_.log_dir,
744-
RayConfig::instance().event_level(),
745-
RayConfig::instance().emit_event_to_log_file());
746-
}
747747
}
748748

749749
CoreWorkerProcessImpl::~CoreWorkerProcessImpl() {

src/ray/util/event.cc

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
#include <string>
2424
#include <vector>
2525

26-
#include "absl/base/call_once.h"
27-
#include "absl/time/time.h"
2826
#include "ray/util/random.h"
2927
#include "ray/util/string_utils.h"
3028
#include "ray/util/timestamp_utils.h"
@@ -52,7 +50,7 @@ LogEventReporter::LogEventReporter(SourceTypeVariant source_type,
5250
// generate file name, if the soucrce type is RAYLET or GCS, the file name would like
5351
// event_GCS.log, event_RAYLET.log other condition would like
5452
// event_CORE_WOREKER_{pid}.log
55-
std::string source_type_name = "";
53+
std::string source_type_name;
5654
bool add_pid_to_file = false;
5755
if (auto event_source_type_ptr = std::get_if<rpc::Event_SourceType>(&source_type)) {
5856
rpc::Event_SourceType event_source_type = *event_source_type_ptr;
@@ -215,14 +213,11 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) {
215213

216214
void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) {
217215
auto element = export_log_reporter_map_.find(export_event.source_type());
218-
if (element != export_log_reporter_map_.end()) {
219-
(element->second)->ReportExportEvent(export_event);
220-
} else {
221-
RAY_LOG(FATAL)
222-
<< "RayEventInit wasn't called with the necessary source type "
223-
<< ExportEvent_SourceType_Name(export_event.source_type())
224-
<< ". This indicates a bug in the code, and the event will be dropped.";
225-
}
216+
RAY_CHECK(element != export_log_reporter_map_.end())
217+
<< "RayEventInit wasn't called with the necessary source type "
218+
<< ExportEvent_SourceType_Name(export_event.source_type())
219+
<< ". This indicates a bug in the code, and the event will be dropped.";
220+
element->second->ReportExportEvent(export_event);
226221
}
227222

228223
void EventManager::AddReporter(std::shared_ptr<BaseEventReporter> reporter) {
@@ -270,7 +265,7 @@ void RayEventContext::SetEventContext(
270265
SetSourceType(source_type);
271266
UpdateCustomFields(custom_fields);
272267

273-
if (!global_context_started_setting_.fetch_or(1)) {
268+
if (global_context_started_setting_.fetch_or(1) == 0) {
274269
global_context_ = std::make_unique<RayEventContext>();
275270
global_context_->SetSourceType(source_type);
276271
global_context_->UpdateCustomFields(custom_fields);
@@ -471,15 +466,13 @@ void RayExportEvent::SendEvent() {
471466
EventManager::Instance().PublishExportEvent(export_event);
472467
}
473468

474-
static absl::once_flag init_once_;
475-
476-
void RayEventInit_(const std::vector<SourceTypeVariant> source_types,
469+
void RayEventInit_(const std::vector<SourceTypeVariant> &source_types,
477470
const absl::flat_hash_map<std::string, std::string> &custom_fields,
478471
const std::string &log_dir,
479472
const std::string &event_level,
480473
bool emit_event_to_log_file) {
481474
for (const auto &source_type : source_types) {
482-
std::string source_type_name = "";
475+
std::string source_type_name;
483476
auto event_dir = std::filesystem::path(log_dir) / std::filesystem::path("events");
484477
if (auto event_source_type_ptr = std::get_if<rpc::Event_SourceType>(&source_type)) {
485478
// Set custom fields for non export events
@@ -503,17 +496,16 @@ void RayEventInit_(const std::vector<SourceTypeVariant> source_types,
503496
SetEmitEventToLogFile(emit_event_to_log_file);
504497
}
505498

506-
void RayEventInit(const std::vector<SourceTypeVariant> source_types,
499+
void RayEventInit(const std::vector<SourceTypeVariant> &source_types,
507500
const absl::flat_hash_map<std::string, std::string> &custom_fields,
508501
const std::string &log_dir,
509502
const std::string &event_level,
510503
bool emit_event_to_log_file) {
511-
absl::call_once(
512-
init_once_,
513-
[&source_types, &custom_fields, &log_dir, &event_level, emit_event_to_log_file]() {
514-
RayEventInit_(
515-
source_types, custom_fields, log_dir, event_level, emit_event_to_log_file);
516-
});
504+
static std::once_flag init_once_;
505+
std::call_once(init_once_, [&]() {
506+
RayEventInit_(
507+
source_types, custom_fields, log_dir, event_level, emit_event_to_log_file);
508+
});
517509
}
518510

519511
bool IsExportAPIEnabledSourceType(

src/ray/util/event.h

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <memory>
2626
#include <sstream>
2727
#include <string>
28+
#include <utility>
2829
#include <variant>
2930
#include <vector>
3031

@@ -143,6 +144,12 @@ class EventManager final {
143144
public:
144145
static EventManager &Instance();
145146

147+
EventManager(const EventManager &manager) = delete;
148+
149+
const EventManager &operator=(const EventManager &manager) = delete;
150+
151+
~EventManager() = default;
152+
146153
bool IsEmpty();
147154

148155
// We added `const json &custom_fields` here because we need to support typed custom
@@ -167,11 +174,6 @@ class EventManager final {
167174
private:
168175
EventManager();
169176

170-
EventManager(const EventManager &manager) = delete;
171-
172-
const EventManager &operator=(const EventManager &manager) = delete;
173-
174-
private:
175177
absl::flat_hash_map<std::string, std::shared_ptr<BaseEventReporter>> reporter_map_;
176178
absl::flat_hash_map<rpc::ExportEvent_SourceType, std::shared_ptr<LogEventReporter>>
177179
export_log_reporter_map_;
@@ -183,7 +185,7 @@ class RayEventContext final {
183185
public:
184186
static RayEventContext &Instance();
185187

186-
RayEventContext() {}
188+
RayEventContext() = default;
187189

188190
void SetEventContext(
189191
rpc::Event_SourceType source_type,
@@ -201,31 +203,31 @@ class RayEventContext final {
201203
void UpdateCustomFields(
202204
const absl::flat_hash_map<std::string, std::string> &custom_fields);
203205

204-
inline void SetSourceType(rpc::Event_SourceType source_type) {
205-
source_type_ = source_type;
206-
}
206+
void SetSourceType(rpc::Event_SourceType source_type) { source_type_ = source_type; }
207207

208-
inline const rpc::Event_SourceType &GetSourceType() const { return source_type_; }
208+
const rpc::Event_SourceType &GetSourceType() const { return source_type_; }
209209

210-
inline const std::string &GetSourceHostname() const { return source_hostname_; }
210+
const std::string &GetSourceHostname() const { return source_hostname_; }
211211

212-
inline int32_t GetSourcePid() const { return source_pid_; }
212+
int32_t GetSourcePid() const { return source_pid_; }
213213

214-
inline const absl::flat_hash_map<std::string, std::string> &GetCustomFields() const {
214+
const absl::flat_hash_map<std::string, std::string> &GetCustomFields() const {
215215
return custom_fields_;
216216
}
217217

218-
inline bool GetInitialzed() const {
218+
bool GetInitialzed() const {
219219
return source_type_ != rpc::Event_SourceType::Event_SourceType_COMMON;
220220
}
221221

222-
private:
223-
static RayEventContext &GlobalInstance();
224-
225222
RayEventContext(const RayEventContext &event_context) = delete;
226223

227224
const RayEventContext &operator=(const RayEventContext &event_context) = delete;
228225

226+
~RayEventContext() = default;
227+
228+
private:
229+
static RayEventContext &GlobalInstance();
230+
229231
rpc::Event_SourceType source_type_ = rpc::Event_SourceType::Event_SourceType_COMMON;
230232
std::string source_hostname_ = boost::asio::ip::host_name();
231233
int32_t source_pid_ = getpid();
@@ -251,12 +253,12 @@ class RayEvent {
251253
// deconstructed. Otherwise we might have memory issues.
252254
RayEvent(rpc::Event_Severity severity,
253255
RayLogLevel log_severity,
254-
const std::string &label,
256+
std::string label,
255257
const char *file_name,
256258
int line_number)
257259
: severity_(severity),
258260
log_severity_(log_severity),
259-
label_(label),
261+
label_(std::move(label)),
260262
file_name_(file_name),
261263
line_number_(line_number) {}
262264

@@ -296,15 +298,15 @@ class RayEvent {
296298

297299
~RayEvent();
298300

301+
RayEvent(const RayEvent &event) = delete;
302+
303+
const RayEvent &operator=(const RayEvent &event) = delete;
304+
299305
private:
300306
RayEvent() = default;
301307

302308
void SendMessage(const std::string &message);
303309

304-
RayEvent(const RayEvent &event) = delete;
305-
306-
const RayEvent &operator=(const RayEvent &event) = delete;
307-
308310
// Only for test
309311
static void SetLevel(const std::string &event_level);
310312
// Only for test
@@ -331,13 +333,12 @@ using ExportEventDataPtr = std::variant<std::shared_ptr<rpc::ExportTaskEventData
331333
class RayExportEvent {
332334
public:
333335
explicit RayExportEvent(ExportEventDataPtr event_data_ptr)
334-
: event_data_ptr_(event_data_ptr) {}
336+
: event_data_ptr_(std::move(event_data_ptr)) {}
335337

336338
~RayExportEvent();
337339

338340
void SendEvent();
339341

340-
private:
341342
RayExportEvent(const RayExportEvent &event) = delete;
342343

343344
const RayExportEvent &operator=(const RayExportEvent &event) = delete;
@@ -366,7 +367,7 @@ bool IsExportAPIEnabledSourceType(
366367
/// \param emit_event_to_log_file if True, it will emit the event to the process log file
367368
/// (e.g., gcs_server.out). Otherwise, event will only be recorded to the event log file.
368369
/// \return void.
369-
void RayEventInit(const std::vector<SourceTypeVariant> source_types,
370+
void RayEventInit(const std::vector<SourceTypeVariant> &source_types,
370371
const absl::flat_hash_map<std::string, std::string> &custom_fields,
371372
const std::string &log_dir,
372373
const std::string &event_level = "warning",
@@ -376,7 +377,7 @@ void RayEventInit(const std::vector<SourceTypeVariant> source_types,
376377
/// and has been separated out so RayEventInit can be called multiple times in
377378
/// tests.
378379
/// **Note**: This should only be called from tests.
379-
void RayEventInit_(const std::vector<SourceTypeVariant> source_types,
380+
void RayEventInit_(const std::vector<SourceTypeVariant> &source_types,
380381
const absl::flat_hash_map<std::string, std::string> &custom_fields,
381382
const std::string &log_dir,
382383
const std::string &event_level,

0 commit comments

Comments
 (0)