Skip to content
1 change: 1 addition & 0 deletions python/ray/dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

NODE_TAG_KEYS = ["ip", "Version", "SessionName", "IsHeadNode"]
GPU_TAG_KEYS = NODE_TAG_KEYS + ["GpuDeviceName", "GpuIndex"]
TPU_TAG_KEYS = NODE_TAG_KEYS + ["TpuDeviceName", "TpuIndex", "TpuType", "TpuTopology"]
CLUSTER_TAG_KEYS = ["node_type", "Version", "SessionName"]
COMPONENT_METRICS_TAG_KEYS = ["ip", "pid", "Version", "Component", "SessionName"]

Expand Down
151 changes: 151 additions & 0 deletions python/ray/dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import requests
import socket
import sys
import traceback
Expand All @@ -12,6 +13,7 @@

from opencensus.stats import stats as stats_module
from prometheus_client.core import REGISTRY
from prometheus_client.parser import text_string_to_metric_families
from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2
from grpc.aio import ServicerContext

Expand Down Expand Up @@ -44,6 +46,7 @@
COMPONENT_METRICS_TAG_KEYS,
GCS_RPC_TIMEOUT_SECONDS,
GPU_TAG_KEYS,
TPU_TAG_KEYS,
NODE_TAG_KEYS,
)
from ray.dashboard.modules.reporter.gpu_profile_manager import GpuProfilingManager
Expand All @@ -58,6 +61,8 @@

enable_gpu_usage_check = True

enable_tpu_usage_check = True

# Are we in a K8s pod?
IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ
# Flag to enable showing disk usage when running in a K8s pod,
Expand All @@ -76,6 +81,10 @@
"RAY_DASHBOARD_REPORTER_AGENT_TPE_MAX_WORKERS", 1
)

TPU_DEVICE_PLUGIN_ADDR = os.environ.get("TPU_DEVICE_PLUGIN_ADDR", None)

TPU_DEVICE_PLUGIN_PORT = os.environ.get("TPU_DEVICE_PLUGIN_PORT", "2112")


def recursive_asdict(o):
if isinstance(o, tuple) and hasattr(o, "_asdict"):
Expand Down Expand Up @@ -163,6 +172,19 @@ def jsonify_asdict(o) -> str:
"bytes",
GPU_TAG_KEYS,
),
# TPU metrics
"tpu_tensorcore_utilization": Gauge(
"tpu_tensorcore_utilization",
"Total TPU tensorcore utilization on a ray node",
"percentage",
TPU_TAG_KEYS,
),
"tpu_memory_bandwidth_utilization": Gauge(
"tpu_memory_bandwidth_utilization",
"Total TPU memory bandwidth usage on a ray node",
"percentage",
TPU_TAG_KEYS,
),
# Disk I/O metrics
"node_disk_io_read": Gauge(
"node_disk_io_read",
Expand Down Expand Up @@ -348,6 +370,16 @@ class GpuUtilizationInfo(TypedDict):
processes_pids: Optional[List[ProcessGPUInfo]]


# tpu utilization for google tpu
class TpuUtilizationInfo(TypedDict):
index: int
name: str
tpu_type: str
tpu_topology: str
tensorcore_utilization: Percentage
hbm_utilization: Percentage


class ReporterAgent(
dashboard_utils.DashboardAgentModule,
reporter_pb2_grpc.ReporterServiceServicer,
Expand Down Expand Up @@ -614,6 +646,87 @@ def decode(b: Union[str, bytes]) -> str:

return gpu_utilizations

@staticmethod
def _get_tpu_usage():

global enable_tpu_usage_check
if not enable_tpu_usage_check:
return []
tpu_utilizations = []

if not TPU_DEVICE_PLUGIN_ADDR:
enable_tpu_usage_check = False
return []

endpoint = f"http://{TPU_DEVICE_PLUGIN_ADDR}:{TPU_DEVICE_PLUGIN_PORT}/metrics"
try:
metrics = requests.get(endpoint).content
metrics = metrics.decode("utf-8")
except Exception as e:
logger.debug(
f"Failed to retrieve TPU information from device plugin: {endpoint} {e}"
)
enable_tpu_usage_check = False
return []

try:
for family in text_string_to_metric_families(metrics):
for sample in family.samples:
if sample.name == "memory_bandwidth_utilization":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem weird to have all of these sample name hardcoded here; how often these name changes; if they change will we have malformed metric collected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

labels = sample.labels
accelerator_id = labels["accelerator_id"]
index = accelerator_id.split("-")[1]
info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=0.0,
hbm_utilization=sample.value,
)
tpu_utilizations.append(info)

if sample.name == "tensorcore_utilization":
labels = sample.labels
accelerator_id = labels["accelerator_id"]
index = accelerator_id.split("-")[1]
info = TpuUtilizationInfo(
index=index,
name=accelerator_id,
tpu_type=labels["model"],
tpu_topology=labels["tpu_topology"],
tensorcore_utilization=sample.value,
hbm_utilization=0.0,
)
tpu_utilizations.append(info)
except Exception as e:
logger.debug(f"Failed to parse metrics from device plugin: {metrics} {e}")
return []

# Merge metrics
merged_tpu_utilizations = []

for info in tpu_utilizations:
index = int(info.get("index"))
if len(merged_tpu_utilizations) > index:
merged_info = merged_tpu_utilizations[index]
merged_info["tensorcore_utilization"] += info.get(
"tensorcore_utilization"
)
merged_info["hbm_utilization"] += info.get("hbm_utilization")
else:
merged_info = TpuUtilizationInfo(
index=info.get("index"),
name=info.get("name"),
tpu_type=info.get("tpu_type"),
tpu_topology=info.get("tpu_topology"),
tensorcore_utilization=info.get("tensorcore_utilization"),
hbm_utilization=info.get("hbm_utilization"),
)
merged_tpu_utilizations.append(merged_info)

return merged_tpu_utilizations

@staticmethod
def _get_boot_time():
if IN_KUBERNETES_POD:
Expand Down Expand Up @@ -835,6 +948,7 @@ def _collect_stats(self):
"disk_io": disk_stats,
"disk_io_speed": disk_speed_stats,
"gpus": self._get_gpu_usage(),
"tpus": self._get_tpu_usage(),
"network": network_stats,
"network_speed": network_speed_stats,
# Deprecated field, should be removed with frontend.
Expand Down Expand Up @@ -1161,6 +1275,43 @@ def _to_records(self, stats, cluster_stats) -> List[Record]:
]
)

# -- TPU per node --
tpus = stats["tpus"]
tpus_available = len(tpus)

if tpus_available:
for tpu in tpus:
tpu_index = tpu.get("index")
tpu_name = tpu.get("name")
tpu_type = tpu.get("tpu_type")
tpu_topology = tpu.get("tpu_topology")
tensorcore_utilization = tpu.get("tensorcore_utilization")
hbm_utilization = tpu.get("hbm_utilization")

tpu_tags = {
**node_tags,
"TpuIndex": str(tpu_index),
"TpuDeviceName": tpu_name,
"TpuType": tpu_type,
"TpuTopology": tpu_topology,
}
tensorcore_utilization_record = Record(
gauge=METRICS_GAUGES["tpu_tensorcore_utilization"],
value=tensorcore_utilization,
tags=tpu_tags,
)
hbm_utilization_record = Record(
gauge=METRICS_GAUGES["tpu_memory_bandwidth_utilization"],
value=hbm_utilization,
tags=tpu_tags,
)
records_reported.extend(
[
tensorcore_utilization_record,
hbm_utilization_record,
]
)

# -- Disk per node --
disk_io_stats = stats["disk_io"]
disk_read_record = Record(
Expand Down
70 changes: 68 additions & 2 deletions python/ray/dashboard/modules/reporter/tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
),
},
"gpus": [],
"tpus": [],
"network": (13621160960, 11914936320),
"network_speed": (8.435062128545095, 7.378462703142336),
}
Expand Down Expand Up @@ -341,12 +342,23 @@ def test_report_stats():
STATS_TEMPLATE["gpus"] = [
{"utilization_gpu": 1, "memory_used": 100, "memory_total": 1000, "index": 0}
]
# Test stats with tpus
STATS_TEMPLATE["tpus"] = [
{
"index": 0,
"name": "foo",
"tpu_type": "v6e",
"tpu_topology": "2x2",
"tensorcore_utilization": 0.25,
"hbm_utilization": 0.50,
}
]
records = agent._to_records(STATS_TEMPLATE, cluster_stats)
assert len(records) == 41
assert len(records) == 43
# Test stats without autoscaler report
cluster_stats = {}
records = agent._to_records(STATS_TEMPLATE, cluster_stats)
assert len(records) == 39
assert len(records) == 41


def test_report_stats_gpu():
Expand Down Expand Up @@ -463,6 +475,60 @@ def test_report_stats_gpu():
assert gpu_metrics_aggregatd["node_gram_available"] == GPU_MEMORY * 4 - 6


def test_report_stats_tpu():
dashboard_agent = MagicMock()
agent = ReporterAgent(dashboard_agent)

STATS_TEMPLATE["tpus"] = [
{
"index": 0,
"name": "tpu-0",
"tpu_type": "v6e",
"tpu_topology": "2x2",
"tensorcore_utilization": 0.1,
"hbm_utilization": 0.1,
},
{
"index": 1,
"name": "tpu-1",
"tpu_type": "v6e",
"tpu_topology": "2x2",
"tensorcore_utilization": 0.2,
"hbm_utilization": 0.1,
},
{
"index": 2,
"name": "tpu-2",
"tpu_type": "v6e",
"tpu_topology": "2x2",
"tensorcore_utilization": 0.3,
"hbm_utilization": 0.1,
},
{
"index": 3,
"name": "tpu-3",
"tpu_type": "v6e",
"tpu_topology": "2x2",
"tensorcore_utilization": 0.4,
"hbm_utilization": 0.1,
},
]
tpu_metrics_aggregated = {
"tpu_tensorcore_utilization": 0.0,
"tpu_memory_bandwidth_utilization": 0.0,
}
records = agent._to_records(STATS_TEMPLATE, {})
num_tpu_records = 0
for record in records:
if record.gauge.name in tpu_metrics_aggregated:
num_tpu_records += 1
tpu_metrics_aggregated[record.gauge.name] += record.value

assert num_tpu_records == 8
assert tpu_metrics_aggregated["tpu_tensorcore_utilization"] == 1.0
assert tpu_metrics_aggregated["tpu_memory_bandwidth_utilization"] == 0.4


def test_report_per_component_stats():
dashboard_agent = MagicMock()
agent = ReporterAgent(dashboard_agent)
Expand Down