Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ddtrace/internal/datadog/profiling/stack/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ if(NOT Threads_FOUND OR NOT CMAKE_USE_PTHREADS_INIT)
endif()

# Specify the target C-extension that we want to build
add_library(${EXTENSION_NAME} SHARED src/echion/danger.cc src/echion/frame.cc src/sampler.cpp src/stack.cpp
src/stack_renderer.cpp src/thread_span_links.cpp)
add_library(
${EXTENSION_NAME} SHARED
src/echion/danger.cc
src/echion/frame.cc
src/echion/interp.cc
src/echion/threads.cc
src/sampler.cpp
src/stack.cpp
src/stack_renderer.cpp
src/thread_span_links.cpp)

# Add common config
add_ddup_config(${EXTENSION_NAME})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <cstdint>

#include <unordered_map>

#include <echion/threads.h>

class EchionSampler
{
// Thread Info map (Thread ID -> ThreadInfo)
std::unordered_map<uintptr_t, ThreadInfo::Ptr> thread_info_map_;
std::mutex thread_info_map_lock_;

// Task Link map (Task -> Task relationships)
// std::unordered_map<PyObject*, PyObject*> task_link_map_;
// std::mutex task_link_map_lock_;

public:
EchionSampler() = default;
~EchionSampler() = default;

std::unordered_map<uintptr_t, ThreadInfo::Ptr>& thread_info_map() { return thread_info_map_; }
std::mutex& thread_info_map_lock() { return thread_info_map_lock_; }

// std::unordered_map<PyObject*, PyObject*>& task_link_map() { return task_link_map_; }
// std::mutex& task_link_map_lock() { return task_link_map_lock_; }

void postfork_child()
{
new (&thread_info_map_lock_) std::mutex;
// new (&task_link_map_lock_) std::mutex;
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,3 @@ inline std::unordered_map<uintptr_t, GreenletInfo::ID>& greenlet_thread_map =
inline std::mutex greenlet_info_map_lock;

// ----------------------------------------------------------------------------

inline std::vector<std::unique_ptr<StackInfo>> current_greenlets;

// ----------------------------------------------------------------------------
32 changes: 7 additions & 25 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#pragma once

#define PY_SSIZE_T_CLEAN
#define Py_BUILD_CORE
#include <Python.h>

#if PY_VERSION_HEX >= 0x03090000
#define Py_BUILD_CORE
#if defined __GNUC__ && defined HAVE_STD_ATOMIC
#undef HAVE_STD_ATOMIC
#endif

#if PY_VERSION_HEX >= 0x03090000
#include <internal/pycore_interp.h>
#endif

Expand All @@ -20,6 +21,8 @@
#include <echion/state.h>
#include <echion/vm.h>

class EchionSampler;

class InterpreterInfo
{
public:
Expand All @@ -28,26 +31,5 @@ class InterpreterInfo
void* next = NULL;
};

static void
for_each_interp(std::function<void(InterpreterInfo& interp)> callback)
{
InterpreterInfo interpreter_info = { 0 };

for (char* interp_addr = reinterpret_cast<char*>(runtime->interpreters.head); interp_addr != NULL;
interp_addr = reinterpret_cast<char*>(interpreter_info.next)) {
if (copy_type(interp_addr + offsetof(PyInterpreterState, id), interpreter_info.id))
continue;

#if PY_VERSION_HEX >= 0x030b0000
if (copy_type(interp_addr + offsetof(PyInterpreterState, threads.head), interpreter_info.tstate_head))
#else
if (copy_type(interp_addr + offsetof(PyInterpreterState, tstate_head), interpreter_info.tstate_head))
#endif
continue;

if (copy_type(interp_addr + offsetof(PyInterpreterState, next), interpreter_info.next))
continue;

callback(interpreter_info);
};
}
void
for_each_interp(_PyRuntimeState* runtime, std::function<void(InterpreterInfo& interp)> callback);
11 changes: 0 additions & 11 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/stacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ class FrameStack : public std::deque<Frame::Ref>
}
};

// ----------------------------------------------------------------------------

inline FrameStack python_stack;

// ----------------------------------------------------------------------------
static size_t
unwind_frame(PyObject* frame_addr, FrameStack& stack)
Expand Down Expand Up @@ -108,13 +104,6 @@ unwind_python_stack(PyThreadState* tstate, FrameStack& stack)
unwind_frame(frame_addr, stack);
}

// ----------------------------------------------------------------------------
static void
unwind_python_stack(PyThreadState* tstate)
{
unwind_python_stack(tstate, python_stack);
}

// ----------------------------------------------------------------------------
class StackInfo
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@
#include <internal/pycore_runtime.h>
#endif

#include <thread>

inline _PyRuntimeState* runtime = &_PyRuntime;
inline PyThreadState* current_tstate = NULL;

inline std::thread* sampler_thread = nullptr;

inline int running = 0;

inline PyObject* asyncio_current_tasks = NULL;
inline PyObject* asyncio_scheduled_tasks = NULL; // WeakSet
inline PyObject* asyncio_eager_tasks = NULL; // set
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,6 @@ TaskInfo::current(PyObject* loop)

// ----------------------------------------------------------------------------

inline std::vector<std::unique_ptr<StackInfo>> current_tasks;

// ----------------------------------------------------------------------------

inline size_t
TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
{
Expand Down
106 changes: 13 additions & 93 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

#pragma once

#include <Python.h>
#define Py_BUILD_CORE
#include <Python.h>

#if defined __GNUC__ && defined HAVE_STD_ATOMIC
#undef HAVE_STD_ATOMIC
#endif

#if PY_VERSION_HEX >= 0x030e0000
#include <internal/pycore_tstate.h>
Expand All @@ -15,7 +19,6 @@
#include <cstdint>
#include <functional>
#include <mutex>
#include <optional>
#include <unordered_map>

#if defined PL_LINUX
Expand All @@ -33,13 +36,18 @@
#include <echion/tasks.h>
#include <echion/timing.h>

class EchionSampler;

class ThreadInfo
{
public:
using Ptr = std::unique_ptr<ThreadInfo>;

uintptr_t thread_id;
unsigned long native_id;
FrameStack python_stack;
std::vector<std::unique_ptr<StackInfo>> current_tasks;
std::vector<std::unique_ptr<StackInfo>> current_greenlets;

std::string name;

Expand Down Expand Up @@ -187,21 +195,11 @@ ThreadInfo::is_running()
#endif
}

// ----------------------------------------------------------------------------

// We make this a reference to a heap-allocated object so that we can avoid
// the destruction on exit. We are in charge of cleaning up the object. Note
// that the object will leak, but this is not a problem.
inline std::unordered_map<uintptr_t, ThreadInfo::Ptr>& thread_info_map =
*(new std::unordered_map<uintptr_t, ThreadInfo::Ptr>()); // indexed by thread_id

inline std::mutex thread_info_map_lock;

// ----------------------------------------------------------------------------
inline void
ThreadInfo::unwind(PyThreadState* tstate)
{
unwind_python_stack(tstate);
unwind_python_stack(tstate, python_stack);

if (asyncio_loop) {
// unwind_tasks returns a [[nodiscard]] Result<void>.
Expand Down Expand Up @@ -727,83 +725,5 @@ ThreadInfo::sample(int64_t iid, PyThreadState* tstate, microsecond_t delta)
// ----------------------------------------------------------------------------
using PyThreadStateCallback = std::function<void(PyThreadState*, ThreadInfo&)>;

static void
for_each_thread(InterpreterInfo& interp, PyThreadStateCallback callback)
{
std::unordered_set<PyThreadState*> threads;
std::unordered_set<PyThreadState*> seen_threads;

threads.clear();
seen_threads.clear();

// Start from the thread list head
threads.insert(static_cast<PyThreadState*>(interp.tstate_head));

while (!threads.empty()) {
// Pop the next thread
PyThreadState* tstate_addr = *threads.begin();
threads.erase(threads.begin());

// Mark the thread as seen
seen_threads.insert(tstate_addr);

// Since threads can be created and destroyed at any time, we make
// a copy of the structure before trying to read its fields.
PyThreadState tstate;
if (copy_type(tstate_addr, tstate))
// We failed to copy the thread so we skip it.
continue;

// Enqueue the unseen threads that we can reach from this thread.
if (tstate.next != NULL && seen_threads.find(tstate.next) == seen_threads.end())
threads.insert(tstate.next);
if (tstate.prev != NULL && seen_threads.find(tstate.prev) == seen_threads.end())
threads.insert(tstate.prev);

{
const std::lock_guard<std::mutex> guard(thread_info_map_lock);

if (thread_info_map.find(tstate.thread_id) == thread_info_map.end()) {
// If the threading module was not imported in the target then
// we mistakenly take the hypno thread as the main thread. We
// assume that any missing thread is the actual main thread,
// provided we don't already have a thread with the name
// "MainThread". Note that this can also happen on shutdown, so
// we need to avoid doing anything in that case.
#if PY_VERSION_HEX >= 0x030b0000
auto native_id = tstate.native_thread_id;
#else
auto native_id = getpid();
#endif
bool main_thread_tracked = false;
for (auto& kv : thread_info_map) {
if (kv.second->name == "MainThread") {
main_thread_tracked = true;
break;
}
}
if (main_thread_tracked)
continue;

auto maybe_thread_info = ThreadInfo::create(tstate.thread_id, native_id, "MainThread");
if (!maybe_thread_info) {
// We failed to create the thread info object so we skip it.
// We'll likely try again later with the valid thread
// information.
continue;
}

thread_info_map.emplace(tstate.thread_id, std::move(*maybe_thread_info));
}

// Update the tstate_addr for thread info, so we can access
// asyncio_tasks_head field from `_PyThreadStateImpl` struct
// later when we unwind tasks.
auto thread_info = thread_info_map.find(tstate.thread_id)->second.get();
thread_info->tstate_addr = reinterpret_cast<uintptr_t>(tstate_addr);

// Call back with the copied thread state
callback(&tstate, *thread_info);
}
}
}
void
for_each_thread(EchionSampler& echion, InterpreterInfo& interp, PyThreadStateCallback callback);
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ inline clock_serv_t cclock;

typedef unsigned long microsecond_t;

inline microsecond_t last_time = 0;

#define TS_TO_MICROSECOND(ts) ((ts).tv_sec * 1e6 + (ts).tv_nsec / 1e3)
#define TV_TO_MICROSECOND(tv) ((tv).seconds * 1e6 + (tv).microseconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include "constants.hpp"
#include "stack_renderer.hpp"

class EchionSampler;

#include <atomic>

namespace Datadog {
Expand All @@ -11,6 +13,8 @@ class Sampler
// This class manages the initialization of echion as well as the sampling thread.
// The underlying echion instance it manages keeps much of its state globally, so this class is a singleton in order
// to keep it aligned with the echion state.
std::unique_ptr<EchionSampler> echion;

private:
std::shared_ptr<StackRenderer> renderer_ptr;

Expand Down Expand Up @@ -62,6 +66,7 @@ class Sampler
// self-time, and we're not currently accounting for the echion self-time.
void set_interval(double new_interval);
void set_adaptive_sampling(bool value) { do_adaptive_sampling = value; }
void postfork_child();
};

} // namespace Datadog
27 changes: 27 additions & 0 deletions ddtrace/internal/datadog/profiling/stack/src/echion/interp.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include <echion/interp.h>

#include <echion/echion_sampler.h>

void
for_each_interp(_PyRuntimeState* runtime, std::function<void(InterpreterInfo& interp)> callback)
{
InterpreterInfo interpreter_info = { 0 };

for (char* interp_addr = reinterpret_cast<char*>(runtime->interpreters.head); interp_addr != NULL;
interp_addr = reinterpret_cast<char*>(interpreter_info.next)) {
if (copy_type(interp_addr + offsetof(PyInterpreterState, id), interpreter_info.id))
continue;

#if PY_VERSION_HEX >= 0x030b0000
if (copy_type(interp_addr + offsetof(PyInterpreterState, threads.head), interpreter_info.tstate_head))
#else
if (copy_type(interp_addr + offsetof(PyInterpreterState, tstate_head), interpreter_info.tstate_head))
#endif
continue;

if (copy_type(interp_addr + offsetof(PyInterpreterState, next), interpreter_info.next))
continue;

callback(interpreter_info);
};
}
Loading