Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions python/ray/dashboard/modules/log/log_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,19 @@ async def _resolve_actor_filename(
suffix: str,
timeout: int,
):
"""
Resolve actor log file
Args:
actor_id: The actor id.
get_actor_fn: The function to get actor information.
suffix: The suffix of the log file.
timeout: Timeout in seconds.
Returns:
The log file name and node id.

Raises:
ValueError if actor data is not found or get_actor_fn is not provided.
"""Resolve actor log file.

Args:
actor_id: The actor id.
get_actor_fn: The function to get actor information.
suffix: The suffix of the log file.
timeout: Timeout in seconds.

Returns:
The log file name and node id.

Raises:
ValueError: If actor data is not found or get_actor_fn is not provided.
"""
if get_actor_fn is None:
raise ValueError("get_actor_fn needs to be specified for actor_id")
Expand Down Expand Up @@ -249,23 +250,21 @@ async def _resolve_actor_filename(
async def _resolve_task_filename(
self, task_id: str, attempt_number: int, suffix: str, timeout: int
):
"""
Resolve log file for a task.
"""Resolve log file for a task.

Args:
task_id: The task id.
attempt_number: The attempt number.
suffix: The suffix of the log file, e.g. out or err
suffix: The suffix of the log file, e.g. out or err.
timeout: Timeout in seconds.

Returns:
The log file name, node id, the start and end offsets of the
corresponding task log in the file.

Raises:
FileNotFoundError if the log file is not found.
ValueError if the suffix is not out or err.

FileNotFoundError: If the log file is not found.
ValueError: If the suffix is not out or err.
"""
log_filename = None
node_id = None
Expand Down
92 changes: 41 additions & 51 deletions python/ray/dashboard/modules/reporter/reporter_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,18 @@ async def get_task_ids_running_in_a_worker(self, worker_id: str) -> List[str]:
async def get_worker_details_for_running_task(
self, task_id: str, attempt_number: int
) -> Tuple[Optional[int], Optional[str]]:
"""
Retrieves worker details for a specific task and attempt number.
"""Retrieves worker details for a specific task and attempt number.

Args:
task_id: The ID of the task.
attempt_number: The attempt number of the task.

Returns:
Tuple[Optional[int], Optional[str]]: A tuple
containing the worker's PID (process ID),
and worker's ID.
Tuple[Optional[int], Optional[str]]: A tuple containing the worker's PID
(process ID), and worker's ID.

Raises:
ValueError: If the task attempt is not running or
the state APi is not initialized.
ValueError: If the task attempt is not running or the state API is not initialized.
"""
if self._state_api is None:
raise ValueError("The state API is not initialized yet. Please retry.")
Expand Down Expand Up @@ -217,35 +214,27 @@ async def get_worker_details_for_running_task(
return pid, worker_id

@routes.get("/task/traceback")
async def get_task_traceback(self, req) -> aiohttp.web.Response:
"""
Retrieves the traceback information for a specific task.
async def get_task_traceback(
self, req: aiohttp.web.Request
) -> aiohttp.web.Response:
"""Retrieves the traceback information for a specific task.
Note that one worker process works on one task at a time
or one worker works on multiple async tasks.

Args:
req (aiohttp.web.Request): The HTTP request object.

Params:
task_id: The ID of the task.
attempt_number: The attempt number of the task.
node_id: The ID of the node.

Returns:
aiohttp.web.Response: The HTTP response containing
the traceback information.
aiohttp.web.Response: The HTTP response containing the traceback information.

Raises:
ValueError: If the "task_id" parameter
is missing in the request query.
ValueError: If the "attempt_number" parameter
is missing in the request query.
ValueError: If the worker begins working on
another task during the traceback retrieval.
aiohttp.web.HTTPInternalServerError: If there is
an internal server error during the traceback retrieval.
ValueError: If the "task_id" parameter is missing in the request query.
ValueError: If the "attempt_number" parameter is missing in the request query.
ValueError: If the worker begins working on another task during the traceback retrieval.
aiohttp.web.HTTPInternalServerError: If there is an internal server error during the traceback retrieval.
"""

if "task_id" not in req.query:
raise ValueError("task_id is required")
if "attempt_number" not in req.query:
Expand Down Expand Up @@ -317,30 +306,23 @@ async def get_task_traceback(self, req) -> aiohttp.web.Response:
)

@routes.get("/task/cpu_profile")
async def get_task_cpu_profile(self, req) -> aiohttp.web.Response:
"""
Retrieves the CPU profile for a specific task.
async def get_task_cpu_profile(
self, req: aiohttp.web.Request
) -> aiohttp.web.Response:
"""Retrieves the CPU profile for a specific task.
Note that one worker process works on one task at a time
or one worker works on multiple async tasks.

Args:
req (aiohttp.web.Request): The HTTP request object.

Returns:
aiohttp.web.Response: The HTTP response containing the CPU profile data.

Raises:
ValueError: If the "task_id" parameter is
missing in the request query.
ValueError: If the "attempt_number" parameter is
missing in the request query.
ValueError: If the "task_id" parameter is missing in the request query.
ValueError: If the "attempt_number" parameter is missing in the request query.
ValueError: If the maximum duration allowed is exceeded.
ValueError: If the worker begins working on
another task during the profile retrieval.
aiohttp.web.HTTPInternalServerError: If there is
an internal server error during the profile retrieval.
aiohttp.web.HTTPInternalServerError: If the CPU Flame
Graph information for the task is not found.
ValueError: If the worker begins working on another task during the profile retrieval.
aiohttp.web.HTTPInternalServerError: If there is an internal server error during the profile retrieval.
aiohttp.web.HTTPInternalServerError: If the CPU Flame Graph information for the task is not found.
"""
if "task_id" not in req.query:
raise ValueError("task_id is required")
Expand Down Expand Up @@ -420,8 +402,9 @@ async def get_task_cpu_profile(self, req) -> aiohttp.web.Response:
)

@routes.get("/worker/traceback")
async def get_traceback(self, req) -> aiohttp.web.Response:
"""
async def get_traceback(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
"""Retrieves the traceback information for a specific worker.

Params:
pid: Required. The PID of the worker.
ip: Required. The IP address of the node.
Expand Down Expand Up @@ -457,11 +440,21 @@ async def get_traceback(self, req) -> aiohttp.web.Response:
return aiohttp.web.HTTPInternalServerError(text=reply.output)

@routes.get("/worker/cpu_profile")
async def cpu_profile(self, req) -> aiohttp.web.Response:
"""
async def cpu_profile(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
"""Retrieves the CPU profile for a specific worker.

Params:
pid: Required. The PID of the worker.
ip: Required. The IP address of the node.
duration: Optional. Duration in seconds for profiling (default: 5, max: 60).
format: Optional. Output format (default: "flamegraph").
native: Optional. Whether to use native profiling (default: false).

Raises:
ValueError: If pid is not provided.
ValueError: If ip is not provided.
ValueError: If duration exceeds 60 seconds.
aiohttp.web.HTTPInternalServerError: If there is an internal server error during the profile retrieval.
"""
pid = req.query.get("pid")
ip = req.query.get("ip")
Expand Down Expand Up @@ -510,21 +503,18 @@ async def cpu_profile(self, req) -> aiohttp.web.Response:
return aiohttp.web.HTTPInternalServerError(text=reply.output)

@routes.get("/memory_profile")
async def memory_profile(self, req) -> aiohttp.web.Response:
"""
Retrieves the memory profile for a specific worker or task.
async def memory_profile(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
"""Retrieves the memory profile for a specific worker or task.
Note that for tasks, one worker process works on one task at a time
or one worker works on multiple async tasks.

Args:
req (aiohttp.web.Request): The HTTP request object.

Returns:
aiohttp.web.Response: The HTTP response containing the memory profile data.

Params (1):
pid: The PID of the worker.
ip: The IP address of the node.

Params (2):
task_id: The ID of the task.
attempt_number: The attempt number of the task.
Expand All @@ -538,7 +528,7 @@ async def memory_profile(self, req) -> aiohttp.web.Response:
or "node id" is missing in the request query.
aiohttp.web.HTTPInternalServerError: If the maximum
duration allowed is exceeded.
aiohttp.web.HTTPInternalServerError If requesting task
aiohttp.web.HTTPInternalServerError: If requesting task
profiling for the worker begins working on another task
during the profile retrieval.
aiohttp.web.HTTPInternalServerError: If there is
Expand Down