-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Add tpu usage metrics to reporter_agent #53678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
64f4464
d9a9dd3
140530a
a0ce742
f907ee0
913ef74
ad3fea0
1736fbc
0c4563c
a454867
71431f0
bf311dd
5d2d732
0b7dc95
6a077eb
4996ee0
7d775fc
1a947ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
import json | ||
import logging | ||
import os | ||
import requests | ||
import socket | ||
import sys | ||
import traceback | ||
|
@@ -12,6 +13,7 @@ | |
|
||
from opencensus.stats import stats as stats_module | ||
from prometheus_client.core import REGISTRY | ||
from prometheus_client.parser import text_string_to_metric_families | ||
from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2 | ||
from grpc.aio import ServicerContext | ||
|
||
|
@@ -44,6 +46,7 @@ | |
COMPONENT_METRICS_TAG_KEYS, | ||
GCS_RPC_TIMEOUT_SECONDS, | ||
GPU_TAG_KEYS, | ||
TPU_TAG_KEYS, | ||
NODE_TAG_KEYS, | ||
) | ||
from ray.dashboard.modules.reporter.gpu_profile_manager import GpuProfilingManager | ||
|
@@ -58,6 +61,8 @@ | |
|
||
enable_gpu_usage_check = True | ||
|
||
enable_tpu_usage_check = True | ||
|
||
# Are we in a K8s pod? | ||
IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ | ||
# Flag to enable showing disk usage when running in a K8s pod, | ||
|
@@ -76,6 +81,9 @@ | |
"RAY_DASHBOARD_REPORTER_AGENT_TPE_MAX_WORKERS", 1 | ||
) | ||
|
||
# TPU device plugin metric address should be in the format "{HOST_IP}:2112" | ||
TPU_DEVICE_PLUGIN_ADDR = os.environ.get("TPU_DEVICE_PLUGIN_ADDR", None) | ||
|
||
|
||
def recursive_asdict(o): | ||
if isinstance(o, tuple) and hasattr(o, "_asdict"): | ||
|
@@ -163,6 +171,37 @@ def jsonify_asdict(o) -> str: | |
"bytes", | ||
GPU_TAG_KEYS, | ||
), | ||
# TPU metrics | ||
"tpu_tensorcore_utilization": Gauge( | ||
"tpu_tensorcore_utilization", | ||
"Percentage TPU tensorcore utilization on a ray node, value should be between 0 and 100", | ||
"percentage", | ||
TPU_TAG_KEYS, | ||
), | ||
"tpu_memory_bandwidth_utilization": Gauge( | ||
"tpu_memory_bandwidth_utilization", | ||
"Percentage TPU memory bandwidth utilization on a ray node, value should be between 0 and 100", | ||
"percentage", | ||
TPU_TAG_KEYS, | ||
), | ||
"tpu_duty_cycle": Gauge( | ||
"tpu_duty_cycle", | ||
"Percentage of time during which the TPU was actively processing, value should be between 0 and 100", | ||
"percentage", | ||
TPU_TAG_KEYS, | ||
), | ||
"tpu_memory_used": Gauge( | ||
"tpu_memory_used", | ||
"Total memory used by the accelerator in bytes", | ||
"bytes", | ||
TPU_TAG_KEYS, | ||
), | ||
"tpu_memory_total": Gauge( | ||
"tpu_memory_total", | ||
"Total memory allocatable by the accelerator in bytes", | ||
"bytes", | ||
TPU_TAG_KEYS, | ||
), | ||
# Disk I/O metrics | ||
"node_disk_io_read": Gauge( | ||
"node_disk_io_read", | ||
|
@@ -329,6 +368,7 @@ def jsonify_asdict(o) -> str: | |
# Types | ||
Percentage = int | ||
Megabytes = int | ||
Bytes = int | ||
|
||
|
||
# gpu utilization for nvidia gpu from a single process | ||
|
@@ -348,6 +388,19 @@ class GpuUtilizationInfo(TypedDict): | |
processes_pids: Optional[List[ProcessGPUInfo]] | ||
|
||
|
||
# tpu utilization for google tpu | ||
class TpuUtilizationInfo(TypedDict): | ||
index: int | ||
name: str | ||
tpu_type: str | ||
tpu_topology: str | ||
tensorcore_utilization: Percentage | ||
hbm_utilization: Percentage | ||
duty_cycle: Percentage | ||
memory_used: Bytes | ||
memory_total: Bytes | ||
|
||
|
||
class ReporterAgent( | ||
dashboard_utils.DashboardAgentModule, | ||
reporter_pb2_grpc.ReporterServiceServicer, | ||
|
@@ -614,6 +667,150 @@ def decode(b: Union[str, bytes]) -> str: | |
|
||
return gpu_utilizations | ||
|
||
@staticmethod | ||
def _get_tpu_usage(): | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
global enable_tpu_usage_check | ||
if not enable_tpu_usage_check: | ||
return [] | ||
tpu_utilizations = [] | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if not TPU_DEVICE_PLUGIN_ADDR: | ||
enable_tpu_usage_check = False | ||
return [] | ||
|
||
endpoint = f"http://{TPU_DEVICE_PLUGIN_ADDR}/metrics" | ||
try: | ||
metrics = requests.get(endpoint).content | ||
metrics = metrics.decode("utf-8") | ||
except Exception as e: | ||
logger.debug( | ||
f"Failed to retrieve TPU information from device plugin: {endpoint} {e}" | ||
) | ||
enable_tpu_usage_check = False | ||
return [] | ||
|
||
try: | ||
for family in text_string_to_metric_families(metrics): | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for sample in family.samples: | ||
if sample.name == "memory_bandwidth_utilization": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seem weird to have all of these sample name hardcoded here; how often these name changes; if they change will we have malformed metric collected There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The schema should not change: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-tpu |
||
labels = sample.labels | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
accelerator_id = labels["accelerator_id"] | ||
index = accelerator_id.split("-")[1] | ||
info = TpuUtilizationInfo( | ||
index=index, | ||
name=accelerator_id, | ||
tpu_type=labels["model"], | ||
tpu_topology=labels["tpu_topology"], | ||
tensorcore_utilization=0.0, | ||
hbm_utilization=sample.value, | ||
duty_cycle=0.0, | ||
memory_used=0, | ||
memory_total=0, | ||
) | ||
tpu_utilizations.append(info) | ||
|
||
if sample.name == "tensorcore_utilization": | ||
labels = sample.labels | ||
accelerator_id = labels["accelerator_id"] | ||
index = accelerator_id.split("-")[1] | ||
info = TpuUtilizationInfo( | ||
index=index, | ||
name=accelerator_id, | ||
tpu_type=labels["model"], | ||
tpu_topology=labels["tpu_topology"], | ||
tensorcore_utilization=sample.value, | ||
hbm_utilization=0.0, | ||
duty_cycle=0.0, | ||
memory_used=0, | ||
memory_total=0, | ||
) | ||
tpu_utilizations.append(info) | ||
|
||
if sample.name == "duty_cycle": | ||
labels = sample.labels | ||
accelerator_id = labels["accelerator_id"] | ||
index = accelerator_id.split("-")[1] | ||
info = TpuUtilizationInfo( | ||
index=index, | ||
name=accelerator_id, | ||
tpu_type=labels["model"], | ||
tpu_topology=labels["tpu_topology"], | ||
tensorcore_utilization=0.0, | ||
hbm_utilization=0.0, | ||
duty_cycle=sample.value, | ||
memory_used=0, | ||
memory_total=0, | ||
) | ||
tpu_utilizations.append(info) | ||
|
||
if sample.name == "memory_used": | ||
labels = sample.labels | ||
accelerator_id = labels["accelerator_id"] | ||
index = accelerator_id.split("-")[1] | ||
info = TpuUtilizationInfo( | ||
index=index, | ||
name=accelerator_id, | ||
tpu_type=labels["model"], | ||
tpu_topology=labels["tpu_topology"], | ||
tensorcore_utilization=0.0, | ||
hbm_utilization=0.0, | ||
duty_cycle=0.0, | ||
memory_used=sample.value, | ||
memory_total=0, | ||
) | ||
tpu_utilizations.append(info) | ||
|
||
if sample.name == "memory_total": | ||
labels = sample.labels | ||
accelerator_id = labels["accelerator_id"] | ||
index = accelerator_id.split("-")[1] | ||
info = TpuUtilizationInfo( | ||
index=index, | ||
name=accelerator_id, | ||
tpu_type=labels["model"], | ||
tpu_topology=labels["tpu_topology"], | ||
tensorcore_utilization=0.0, | ||
hbm_utilization=0.0, | ||
duty_cycle=0.0, | ||
memory_used=0, | ||
memory_total=sample.value, | ||
) | ||
tpu_utilizations.append(info) | ||
except Exception as e: | ||
logger.debug(f"Failed to parse metrics from device plugin: {metrics} {e}") | ||
return [] | ||
|
||
# Merge metrics | ||
merged_tpu_utilizations = [] | ||
|
||
for info in tpu_utilizations: | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
index = int(info.get("index")) | ||
if len(merged_tpu_utilizations) > index: | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
merged_info = merged_tpu_utilizations[index] | ||
merged_info["tensorcore_utilization"] += info.get( | ||
"tensorcore_utilization" | ||
) | ||
merged_info["hbm_utilization"] += info.get("hbm_utilization") | ||
merged_info["duty_cycle"] += info.get("duty_cycle") | ||
merged_info["memory_used"] += info.get("memory_used") | ||
merged_info["memory_total"] += info.get("memory_total") | ||
else: | ||
merged_info = TpuUtilizationInfo( | ||
index=info.get("index"), | ||
name=info.get("name"), | ||
tpu_type=info.get("tpu_type"), | ||
tpu_topology=info.get("tpu_topology"), | ||
tensorcore_utilization=info.get("tensorcore_utilization"), | ||
hbm_utilization=info.get("hbm_utilization"), | ||
duty_cycle=info.get("duty_cycle"), | ||
memory_used=info.get("memory_used"), | ||
memory_total=info.get("memory_total"), | ||
) | ||
merged_tpu_utilizations.append(merged_info) | ||
|
||
return merged_tpu_utilizations | ||
|
||
@staticmethod | ||
def _get_boot_time(): | ||
if IN_KUBERNETES_POD: | ||
|
@@ -835,6 +1032,7 @@ def _collect_stats(self): | |
"disk_io": disk_stats, | ||
"disk_io_speed": disk_speed_stats, | ||
"gpus": self._get_gpu_usage(), | ||
"tpus": self._get_tpu_usage(), | ||
"network": network_stats, | ||
"network_speed": network_speed_stats, | ||
# Deprecated field, should be removed with frontend. | ||
|
@@ -1161,6 +1359,64 @@ def _to_records(self, stats, cluster_stats) -> List[Record]: | |
] | ||
) | ||
|
||
# -- TPU per node -- | ||
tpus = stats["tpus"] | ||
tpus_available = len(tpus) | ||
|
||
if tpus_available: | ||
richardsliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for tpu in tpus: | ||
tpu_index = tpu.get("index") | ||
tpu_name = tpu.get("name") | ||
tpu_type = tpu.get("tpu_type") | ||
tpu_topology = tpu.get("tpu_topology") | ||
tensorcore_utilization = tpu.get("tensorcore_utilization") | ||
hbm_utilization = tpu.get("hbm_utilization") | ||
duty_cycle = tpu.get("duty_cycle") | ||
memory_used = tpu.get("memory_used") | ||
memory_total = tpu.get("memory_total") | ||
|
||
tpu_tags = { | ||
**node_tags, | ||
"TpuIndex": str(tpu_index), | ||
"TpuDeviceName": tpu_name, | ||
"TpuType": tpu_type, | ||
"TpuTopology": tpu_topology, | ||
} | ||
tensorcore_utilization_record = Record( | ||
gauge=METRICS_GAUGES["tpu_tensorcore_utilization"], | ||
value=tensorcore_utilization, | ||
tags=tpu_tags, | ||
) | ||
hbm_utilization_record = Record( | ||
gauge=METRICS_GAUGES["tpu_memory_bandwidth_utilization"], | ||
value=hbm_utilization, | ||
tags=tpu_tags, | ||
) | ||
duty_cycle_record = Record( | ||
gauge=METRICS_GAUGES["tpu_duty_cycle"], | ||
value=duty_cycle, | ||
tags=tpu_tags, | ||
) | ||
memory_used_record = Record( | ||
gauge=METRICS_GAUGES["tpu_memory_used"], | ||
value=memory_used, | ||
tags=tpu_tags, | ||
) | ||
memory_total_record = Record( | ||
gauge=METRICS_GAUGES["tpu_memory_total"], | ||
value=memory_total, | ||
tags=tpu_tags, | ||
) | ||
records_reported.extend( | ||
[ | ||
tensorcore_utilization_record, | ||
hbm_utilization_record, | ||
duty_cycle_record, | ||
memory_used_record, | ||
memory_total_record, | ||
] | ||
) | ||
|
||
# -- Disk per node -- | ||
disk_io_stats = stats["disk_io"] | ||
disk_read_record = Record( | ||
|
Uh oh!
There was an error while loading. Please reload this page.