3
3
import json
4
4
import logging
5
5
import os
6
+ import requests
6
7
import socket
7
8
import sys
8
9
import traceback
12
13
13
14
from opencensus .stats import stats as stats_module
14
15
from prometheus_client .core import REGISTRY
16
+ from prometheus_client .parser import text_string_to_metric_families
15
17
from opentelemetry .proto .collector .metrics .v1 import metrics_service_pb2
16
18
from grpc .aio import ServicerContext
17
19
44
46
COMPONENT_METRICS_TAG_KEYS ,
45
47
GCS_RPC_TIMEOUT_SECONDS ,
46
48
GPU_TAG_KEYS ,
49
+ TPU_TAG_KEYS ,
47
50
NODE_TAG_KEYS ,
48
51
)
49
52
from ray .dashboard .modules .reporter .gpu_profile_manager import GpuProfilingManager
58
61
59
62
enable_gpu_usage_check = True
60
63
64
+ enable_tpu_usage_check = True
65
+
61
66
# Are we in a K8s pod?
62
67
IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os .environ
63
68
# Flag to enable showing disk usage when running in a K8s pod,
76
81
"RAY_DASHBOARD_REPORTER_AGENT_TPE_MAX_WORKERS" , 1
77
82
)
78
83
84
+ # TPU device plugin metric address should be in the format "{HOST_IP}:2112"
85
+ TPU_DEVICE_PLUGIN_ADDR = os .environ .get ("TPU_DEVICE_PLUGIN_ADDR" , None )
86
+
79
87
80
88
def recursive_asdict (o ):
81
89
if isinstance (o , tuple ) and hasattr (o , "_asdict" ):
@@ -163,6 +171,37 @@ def jsonify_asdict(o) -> str:
163
171
"bytes" ,
164
172
GPU_TAG_KEYS ,
165
173
),
174
+ # TPU metrics
175
+ "tpu_tensorcore_utilization" : Gauge (
176
+ "tpu_tensorcore_utilization" ,
177
+ "Percentage TPU tensorcore utilization on a ray node, value should be between 0 and 100" ,
178
+ "percentage" ,
179
+ TPU_TAG_KEYS ,
180
+ ),
181
+ "tpu_memory_bandwidth_utilization" : Gauge (
182
+ "tpu_memory_bandwidth_utilization" ,
183
+ "Percentage TPU memory bandwidth utilization on a ray node, value should be between 0 and 100" ,
184
+ "percentage" ,
185
+ TPU_TAG_KEYS ,
186
+ ),
187
+ "tpu_duty_cycle" : Gauge (
188
+ "tpu_duty_cycle" ,
189
+ "Percentage of time during which the TPU was actively processing, value should be between 0 and 100" ,
190
+ "percentage" ,
191
+ TPU_TAG_KEYS ,
192
+ ),
193
+ "tpu_memory_used" : Gauge (
194
+ "tpu_memory_used" ,
195
+ "Total memory used by the accelerator in bytes" ,
196
+ "bytes" ,
197
+ TPU_TAG_KEYS ,
198
+ ),
199
+ "tpu_memory_total" : Gauge (
200
+ "tpu_memory_total" ,
201
+ "Total memory allocatable by the accelerator in bytes" ,
202
+ "bytes" ,
203
+ TPU_TAG_KEYS ,
204
+ ),
166
205
# Disk I/O metrics
167
206
"node_disk_io_read" : Gauge (
168
207
"node_disk_io_read" ,
@@ -329,6 +368,7 @@ def jsonify_asdict(o) -> str:
329
368
# Types
330
369
Percentage = int
331
370
Megabytes = int
371
+ Bytes = int
332
372
333
373
334
374
# gpu utilization for nvidia gpu from a single process
@@ -348,6 +388,19 @@ class GpuUtilizationInfo(TypedDict):
348
388
processes_pids : Optional [List [ProcessGPUInfo ]]
349
389
350
390
391
+ # tpu utilization for google tpu
392
+ class TpuUtilizationInfo (TypedDict ):
393
+ index : int
394
+ name : str
395
+ tpu_type : str
396
+ tpu_topology : str
397
+ tensorcore_utilization : Percentage
398
+ hbm_utilization : Percentage
399
+ duty_cycle : Percentage
400
+ memory_used : Bytes
401
+ memory_total : Bytes
402
+
403
+
351
404
class ReporterAgent (
352
405
dashboard_utils .DashboardAgentModule ,
353
406
reporter_pb2_grpc .ReporterServiceServicer ,
@@ -616,6 +669,154 @@ def decode(b: Union[str, bytes]) -> str:
616
669
617
670
return gpu_utilizations
618
671
672
+ @staticmethod
673
+ def _get_tpu_usage () -> List [TpuUtilizationInfo ]:
674
+
675
+ global enable_tpu_usage_check
676
+ if not enable_tpu_usage_check :
677
+ return []
678
+
679
+ if not TPU_DEVICE_PLUGIN_ADDR :
680
+ enable_tpu_usage_check = False
681
+ return []
682
+
683
+ endpoint = f"http://{ TPU_DEVICE_PLUGIN_ADDR } /metrics"
684
+ try :
685
+ metrics = requests .get (endpoint ).content
686
+ metrics = metrics .decode ("utf-8" )
687
+ except Exception as e :
688
+ logger .debug (
689
+ f"Failed to retrieve TPU information from device plugin: { endpoint } { e } "
690
+ )
691
+ enable_tpu_usage_check = False
692
+ return []
693
+
694
+ tpu_utilizations = []
695
+ # Sample should look like:
696
+ # Name: tensorcore_utilization_node Labels: {'accelerator_id': '4804690994094478883-0', 'make': 'cloud-tpu', 'model': 'tpu-v6e-slice', 'tpu_topology': '2x4'} Value: 0.0
697
+ # See https://cloud.google.com/monitoring/api/metrics_gcp#gcp-tpu for
698
+ # schema.
699
+ try :
700
+ for family in text_string_to_metric_families (metrics ):
701
+ for sample in family .samples :
702
+ # Skip irrelevant metrics
703
+ if not hasattr (sample , "labels" ):
704
+ continue
705
+ if "accelerator_id" not in sample .labels :
706
+ continue
707
+ labels = sample .labels
708
+ accelerator_id = labels ["accelerator_id" ]
709
+ index = accelerator_id .split ("-" )[1 ]
710
+
711
+ if sample .name == "memory_bandwidth_utilization" :
712
+ info = TpuUtilizationInfo (
713
+ index = index ,
714
+ name = accelerator_id ,
715
+ tpu_type = labels ["model" ],
716
+ tpu_topology = labels ["tpu_topology" ],
717
+ tensorcore_utilization = 0.0 ,
718
+ hbm_utilization = sample .value ,
719
+ duty_cycle = 0.0 ,
720
+ memory_used = 0 ,
721
+ memory_total = 0 ,
722
+ )
723
+ tpu_utilizations .append (info )
724
+
725
+ if sample .name == "tensorcore_utilization" :
726
+ info = TpuUtilizationInfo (
727
+ index = index ,
728
+ name = accelerator_id ,
729
+ tpu_type = labels ["model" ],
730
+ tpu_topology = labels ["tpu_topology" ],
731
+ tensorcore_utilization = sample .value ,
732
+ hbm_utilization = 0.0 ,
733
+ duty_cycle = 0.0 ,
734
+ memory_used = 0 ,
735
+ memory_total = 0 ,
736
+ )
737
+ tpu_utilizations .append (info )
738
+
739
+ if sample .name == "duty_cycle" :
740
+ info = TpuUtilizationInfo (
741
+ index = index ,
742
+ name = accelerator_id ,
743
+ tpu_type = labels ["model" ],
744
+ tpu_topology = labels ["tpu_topology" ],
745
+ tensorcore_utilization = 0.0 ,
746
+ hbm_utilization = 0.0 ,
747
+ duty_cycle = sample .value ,
748
+ memory_used = 0 ,
749
+ memory_total = 0 ,
750
+ )
751
+ tpu_utilizations .append (info )
752
+
753
+ if sample .name == "memory_used" :
754
+ info = TpuUtilizationInfo (
755
+ index = index ,
756
+ name = accelerator_id ,
757
+ tpu_type = labels ["model" ],
758
+ tpu_topology = labels ["tpu_topology" ],
759
+ tensorcore_utilization = 0.0 ,
760
+ hbm_utilization = 0.0 ,
761
+ duty_cycle = 0.0 ,
762
+ memory_used = sample .value ,
763
+ memory_total = 0 ,
764
+ )
765
+ tpu_utilizations .append (info )
766
+
767
+ if sample .name == "memory_total" :
768
+ info = TpuUtilizationInfo (
769
+ index = index ,
770
+ name = accelerator_id ,
771
+ tpu_type = labels ["model" ],
772
+ tpu_topology = labels ["tpu_topology" ],
773
+ tensorcore_utilization = 0.0 ,
774
+ hbm_utilization = 0.0 ,
775
+ duty_cycle = 0.0 ,
776
+ memory_used = 0 ,
777
+ memory_total = sample .value ,
778
+ )
779
+ tpu_utilizations .append (info )
780
+ except Exception as e :
781
+ logger .debug (f"Failed to parse metrics from device plugin: { metrics } { e } " )
782
+ return []
783
+
784
+ # Each collected sample records only one metric (e.g. duty cycle) during
785
+ # the metric interval for one TPU. So here we need to aggregate the
786
+ # sample records together. The aggregated list should be indexed by the
787
+ # TPU accelerator index.
788
+ merged_tpu_utilizations = {}
789
+
790
+ for info in tpu_utilizations :
791
+ index = int (info .get ("index" ))
792
+ if index in merged_tpu_utilizations :
793
+ merged_info = merged_tpu_utilizations [index ]
794
+ merged_info ["tensorcore_utilization" ] += info .get (
795
+ "tensorcore_utilization"
796
+ )
797
+ merged_info ["hbm_utilization" ] += info .get ("hbm_utilization" )
798
+ merged_info ["duty_cycle" ] += info .get ("duty_cycle" )
799
+ merged_info ["memory_used" ] += info .get ("memory_used" )
800
+ merged_info ["memory_total" ] += info .get ("memory_total" )
801
+ else :
802
+ merged_info = TpuUtilizationInfo (
803
+ index = info .get ("index" ),
804
+ name = info .get ("name" ),
805
+ tpu_type = info .get ("tpu_type" ),
806
+ tpu_topology = info .get ("tpu_topology" ),
807
+ tensorcore_utilization = info .get ("tensorcore_utilization" ),
808
+ hbm_utilization = info .get ("hbm_utilization" ),
809
+ duty_cycle = info .get ("duty_cycle" ),
810
+ memory_used = info .get ("memory_used" ),
811
+ memory_total = info .get ("memory_total" ),
812
+ )
813
+ merged_tpu_utilizations [index ] = merged_info
814
+
815
+ sorted_tpu_utilizations = [
816
+ value for _ , value in sorted (merged_tpu_utilizations .items ())
817
+ ]
818
+ return sorted_tpu_utilizations
819
+
619
820
@staticmethod
620
821
def _get_boot_time ():
621
822
if IN_KUBERNETES_POD :
@@ -837,6 +1038,7 @@ def _collect_stats(self):
837
1038
"disk_io" : disk_stats ,
838
1039
"disk_io_speed" : disk_speed_stats ,
839
1040
"gpus" : self ._get_gpu_usage (),
1041
+ "tpus" : self ._get_tpu_usage (),
840
1042
"network" : network_stats ,
841
1043
"network_speed" : network_speed_stats ,
842
1044
# Deprecated field, should be removed with frontend.
@@ -1163,6 +1365,62 @@ def _to_records(self, stats, cluster_stats) -> List[Record]:
1163
1365
]
1164
1366
)
1165
1367
1368
+ # -- TPU per node --
1369
+ tpus = stats ["tpus" ]
1370
+
1371
+ for tpu in tpus :
1372
+ tpu_index = tpu .get ("index" )
1373
+ tpu_name = tpu .get ("name" )
1374
+ tpu_type = tpu .get ("tpu_type" )
1375
+ tpu_topology = tpu .get ("tpu_topology" )
1376
+ tensorcore_utilization = tpu .get ("tensorcore_utilization" )
1377
+ hbm_utilization = tpu .get ("hbm_utilization" )
1378
+ duty_cycle = tpu .get ("duty_cycle" )
1379
+ memory_used = tpu .get ("memory_used" )
1380
+ memory_total = tpu .get ("memory_total" )
1381
+
1382
+ tpu_tags = {
1383
+ ** node_tags ,
1384
+ "TpuIndex" : str (tpu_index ),
1385
+ "TpuDeviceName" : tpu_name ,
1386
+ "TpuType" : tpu_type ,
1387
+ "TpuTopology" : tpu_topology ,
1388
+ }
1389
+ tensorcore_utilization_record = Record (
1390
+ gauge = METRICS_GAUGES ["tpu_tensorcore_utilization" ],
1391
+ value = tensorcore_utilization ,
1392
+ tags = tpu_tags ,
1393
+ )
1394
+ hbm_utilization_record = Record (
1395
+ gauge = METRICS_GAUGES ["tpu_memory_bandwidth_utilization" ],
1396
+ value = hbm_utilization ,
1397
+ tags = tpu_tags ,
1398
+ )
1399
+ duty_cycle_record = Record (
1400
+ gauge = METRICS_GAUGES ["tpu_duty_cycle" ],
1401
+ value = duty_cycle ,
1402
+ tags = tpu_tags ,
1403
+ )
1404
+ memory_used_record = Record (
1405
+ gauge = METRICS_GAUGES ["tpu_memory_used" ],
1406
+ value = memory_used ,
1407
+ tags = tpu_tags ,
1408
+ )
1409
+ memory_total_record = Record (
1410
+ gauge = METRICS_GAUGES ["tpu_memory_total" ],
1411
+ value = memory_total ,
1412
+ tags = tpu_tags ,
1413
+ )
1414
+ records_reported .extend (
1415
+ [
1416
+ tensorcore_utilization_record ,
1417
+ hbm_utilization_record ,
1418
+ duty_cycle_record ,
1419
+ memory_used_record ,
1420
+ memory_total_record ,
1421
+ ]
1422
+ )
1423
+
1166
1424
# -- Disk per node --
1167
1425
disk_io_stats = stats ["disk_io" ]
1168
1426
disk_read_record = Record (
0 commit comments