Skip to content
5 changes: 5 additions & 0 deletions python/ray/dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@

NODE_TAG_KEYS = ["ip", "Version", "SessionName", "IsHeadNode"]
GPU_TAG_KEYS = NODE_TAG_KEYS + ["GpuDeviceName", "GpuIndex"]

# TpuDeviceName and TpuIndex are expected to be equal to the number of TPU
# chips in the cluster. TpuType and TpuTopology are proportional to the number
# of node pools.
TPU_TAG_KEYS = NODE_TAG_KEYS + ["TpuDeviceName", "TpuIndex", "TpuType", "TpuTopology"]
CLUSTER_TAG_KEYS = ["node_type", "Version", "SessionName"]
COMPONENT_METRICS_TAG_KEYS = ["ip", "pid", "Version", "Component", "SessionName"]

Expand Down
258 changes: 258 additions & 0 deletions python/ray/dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import requests
import socket
import sys
import traceback
Expand All @@ -12,6 +13,7 @@

from opencensus.stats import stats as stats_module
from prometheus_client.core import REGISTRY
from prometheus_client.parser import text_string_to_metric_families
from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2
from grpc.aio import ServicerContext

Expand Down Expand Up @@ -44,6 +46,7 @@
COMPONENT_METRICS_TAG_KEYS,
GCS_RPC_TIMEOUT_SECONDS,
GPU_TAG_KEYS,
TPU_TAG_KEYS,
NODE_TAG_KEYS,
)
from ray.dashboard.modules.reporter.gpu_profile_manager import GpuProfilingManager
Expand All @@ -58,6 +61,8 @@

enable_gpu_usage_check = True

enable_tpu_usage_check = True

# Are we in a K8s pod?
IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ
# Flag to enable showing disk usage when running in a K8s pod,
Expand All @@ -76,6 +81,9 @@
"RAY_DASHBOARD_REPORTER_AGENT_TPE_MAX_WORKERS", 1
)

# TPU device plugin metric address should be in the format "{HOST_IP}:2112"
TPU_DEVICE_PLUGIN_ADDR = os.environ.get("TPU_DEVICE_PLUGIN_ADDR", None)


def recursive_asdict(o):
if isinstance(o, tuple) and hasattr(o, "_asdict"):
Expand Down Expand Up @@ -163,6 +171,37 @@ def jsonify_asdict(o) -> str:
"bytes",
GPU_TAG_KEYS,
),
# TPU metrics
"tpu_tensorcore_utilization": Gauge(
"tpu_tensorcore_utilization",
"Percentage TPU tensorcore utilization on a ray node, value should be between 0 and 100",
"percentage",
TPU_TAG_KEYS,
),
"tpu_memory_bandwidth_utilization": Gauge(
"tpu_memory_bandwidth_utilization",
"Percentage TPU memory bandwidth utilization on a ray node, value should be between 0 and 100",
"percentage",
TPU_TAG_KEYS,
),
"tpu_duty_cycle": Gauge(
"tpu_duty_cycle",
"Percentage of time during which the TPU was actively processing, value should be between 0 and 100",
"percentage",
TPU_TAG_KEYS,
),
"tpu_memory_used": Gauge(
"tpu_memory_used",
"Total memory used by the accelerator in bytes",
"bytes",
TPU_TAG_KEYS,
),
"tpu_memory_total": Gauge(
"tpu_memory_total",
"Total memory allocatable by the accelerator in bytes",
"bytes",
TPU_TAG_KEYS,
),
# Disk I/O metrics
"node_disk_io_read": Gauge(
"node_disk_io_read",
Expand Down Expand Up @@ -329,6 +368,7 @@ def jsonify_asdict(o) -> str:
# Types
Percentage = int
Megabytes = int
Bytes = int


# gpu utilization for nvidia gpu from a single process
Expand All @@ -348,6 +388,19 @@ class GpuUtilizationInfo(TypedDict):
processes_pids: Optional[List[ProcessGPUInfo]]


# tpu utilization for google tpu
class TpuUtilizationInfo(TypedDict):
index: int
name: str
tpu_type: str
tpu_topology: str
tensorcore_utilization: Percentage
hbm_utilization: Percentage
duty_cycle: Percentage
memory_used: Bytes
memory_total: Bytes


class ReporterAgent(
dashboard_utils.DashboardAgentModule,
reporter_pb2_grpc.ReporterServiceServicer,
Expand Down Expand Up @@ -614,6 +667,154 @@ def decode(b: Union[str, bytes]) -> str:

return gpu_utilizations

@staticmethod
def _get_tpu_usage() -> List[TpuUtilizationInfo]:

global enable_tpu_usage_check
if not enable_tpu_usage_check:
return []

if not TPU_DEVICE_PLUGIN_ADDR:
enable_tpu_usage_check = False
return []

endpoint = f"http://{TPU_DEVICE_PLUGIN_ADDR}/metrics"
try:
metrics = requests.get(endpoint).content
metrics = metrics.decode("utf-8")
except Exception as e:
logger.debug(
f"Failed to retrieve TPU information from device plugin: {endpoint} {e}"
)
enable_tpu_usage_check = False
return []

tpu_utilizations = []
# Sample should look like:
# Name: tensorcore_utilization_node Labels: {'accelerator_id': '4804690994094478883-0', 'make': 'cloud-tpu', 'model': 'tpu-v6e-slice', 'tpu_topology': '2x4'} Value: 0.0
# See https://cloud.google.com/monitoring/api/metrics_gcp#gcp-tpu for
# schema.
try:
for family in text_string_to_metric_families(metrics):
for sample in family.samples:
# Skip irrelevant metrics
if not hasattr(sample, "labels"):
continue
if "accelerator_id" not in sample.labels:
continue
labels = sample.labels
accelerator_id = labels["accelerator_id"]
index = accelerator_id.split("-")[1]

if sample.name == "memory_bandwidth_utilization":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem weird to have all of these sample name hardcoded here; how often these name changes; if they change will we have malformed metric collected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=0.0,
hbm_utilization=sample.value,
duty_cycle=0.0,
memory_used=0,
memory_total=0,
)
tpu_utilizations.append(info)

if sample.name == "tensorcore_utilization":
info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=sample.value,
hbm_utilization=0.0,
duty_cycle=0.0,
memory_used=0,
memory_total=0,
)
tpu_utilizations.append(info)

if sample.name == "duty_cycle":
info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=0.0,
hbm_utilization=0.0,
duty_cycle=sample.value,
memory_used=0,
memory_total=0,
)
tpu_utilizations.append(info)

if sample.name == "memory_used":
info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=0.0,
hbm_utilization=0.0,
duty_cycle=0.0,
memory_used=sample.value,
memory_total=0,
)
tpu_utilizations.append(info)

if sample.name == "memory_total":
info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=0.0,
hbm_utilization=0.0,
duty_cycle=0.0,
memory_used=0,
memory_total=sample.value,
)
tpu_utilizations.append(info)
except Exception as e:
logger.debug(f"Failed to parse metrics from device plugin: {metrics} {e}")
return []

# Each collected sample records only one metric (e.g. duty cycle) during
# the metric interval for one TPU. So here we need to aggregate the
# sample records together. The aggregated list should be indexed by the
# TPU accelerator index.
merged_tpu_utilizations = {}

for info in tpu_utilizations:
index = int(info.get("index"))
if index in merged_tpu_utilizations:
merged_info = merged_tpu_utilizations[index]
merged_info["tensorcore_utilization"] += info.get(
"tensorcore_utilization"
)
merged_info["hbm_utilization"] += info.get("hbm_utilization")
merged_info["duty_cycle"] += info.get("duty_cycle")
merged_info["memory_used"] += info.get("memory_used")
merged_info["memory_total"] += info.get("memory_total")
else:
merged_info = TpuUtilizationInfo(
index=info.get("index"),
name=info.get("name"),
tpu_type=info.get("tpu_type"),
tpu_topology=info.get("tpu_topology"),
tensorcore_utilization=info.get("tensorcore_utilization"),
hbm_utilization=info.get("hbm_utilization"),
duty_cycle=info.get("duty_cycle"),
memory_used=info.get("memory_used"),
memory_total=info.get("memory_total"),
)
merged_tpu_utilizations[index] = merged_info

sorted_tpu_utilizations = [
value for _, value in sorted(merged_tpu_utilizations.items())
]
return sorted_tpu_utilizations

@staticmethod
def _get_boot_time():
if IN_KUBERNETES_POD:
Expand Down Expand Up @@ -835,6 +1036,7 @@ def _collect_stats(self):
"disk_io": disk_stats,
"disk_io_speed": disk_speed_stats,
"gpus": self._get_gpu_usage(),
"tpus": self._get_tpu_usage(),
"network": network_stats,
"network_speed": network_speed_stats,
# Deprecated field, should be removed with frontend.
Expand Down Expand Up @@ -1161,6 +1363,62 @@ def _to_records(self, stats, cluster_stats) -> List[Record]:
]
)

# -- TPU per node --
tpus = stats["tpus"]

for tpu in tpus:
tpu_index = tpu.get("index")
tpu_name = tpu.get("name")
tpu_type = tpu.get("tpu_type")
tpu_topology = tpu.get("tpu_topology")
tensorcore_utilization = tpu.get("tensorcore_utilization")
hbm_utilization = tpu.get("hbm_utilization")
duty_cycle = tpu.get("duty_cycle")
memory_used = tpu.get("memory_used")
memory_total = tpu.get("memory_total")

tpu_tags = {
**node_tags,
"TpuIndex": str(tpu_index),
"TpuDeviceName": tpu_name,
"TpuType": tpu_type,
"TpuTopology": tpu_topology,
}
tensorcore_utilization_record = Record(
gauge=METRICS_GAUGES["tpu_tensorcore_utilization"],
value=tensorcore_utilization,
tags=tpu_tags,
)
hbm_utilization_record = Record(
gauge=METRICS_GAUGES["tpu_memory_bandwidth_utilization"],
value=hbm_utilization,
tags=tpu_tags,
)
duty_cycle_record = Record(
gauge=METRICS_GAUGES["tpu_duty_cycle"],
value=duty_cycle,
tags=tpu_tags,
)
memory_used_record = Record(
gauge=METRICS_GAUGES["tpu_memory_used"],
value=memory_used,
tags=tpu_tags,
)
memory_total_record = Record(
gauge=METRICS_GAUGES["tpu_memory_total"],
value=memory_total,
tags=tpu_tags,
)
records_reported.extend(
[
tensorcore_utilization_record,
hbm_utilization_record,
duty_cycle_record,
memory_used_record,
memory_total_record,
]
)

# -- Disk per node --
disk_io_stats = stats["disk_io"]
disk_read_record = Record(
Expand Down
Loading