1
+ # analytics.py (версия с устранением задержки и оптимизациями)
2
+ import cv2
3
+ import sys
4
+ import json
5
+ import time
6
+ import base64
7
+ import threading # <--- Импортируем модуль для работы с потоками
8
+
9
+ try :
10
+ from ultralytics import YOLO
11
+ except ImportError :
12
+ print (json .dumps ({
13
+ "status" : "error" ,
14
+ "message" : "Ultralytics YOLO library not found. Please run 'pip install ultralytics'."
15
+ }), flush = True )
16
+ sys .exit (1 )
17
+
18
+ # Класс для чтения видеопотока в отдельном потоке
19
+ class FrameGrabber :
20
+ """
21
+ Класс для захвата кадров из видеопотока в отдельном потоке,
22
+ чтобы избежать накопления задержки в буфере cv2.VideoCapture.
23
+ """
24
+ def __init__ (self , src = 0 ):
25
+ self .stream = cv2 .VideoCapture (src )
26
+ if not self .stream .isOpened ():
27
+ raise IOError ("Cannot open video stream" )
28
+
29
+ self .ret , self .frame = self .stream .read ()
30
+ self .stopped = False
31
+ self .thread = threading .Thread (target = self .update , args = ())
32
+ self .thread .daemon = True
33
+
34
+ def start (self ):
35
+ self .stopped = False
36
+ self .thread .start ()
37
+
38
+ def update (self ):
39
+ while not self .stopped :
40
+ ret , frame = self .stream .read ()
41
+ if not ret :
42
+ self .stop ()
43
+ break
44
+ self .ret = ret
45
+ self .frame = frame
46
+
47
+ def read (self ):
48
+ return self .ret , self .frame
49
+
50
+ def stop (self ):
51
+ self .stopped = True
52
+ if self .thread .is_alive ():
53
+ self .thread .join (timeout = 1.0 )
54
+ self .stream .release ()
55
+
56
+
57
+ def run_analytics (rtsp_url , config_str ):
58
+ try :
59
+ model = YOLO ("yolov8n.pt" )
60
+ except Exception as e :
61
+ print (json .dumps ({"status" : "error" , "message" : f"Failed to load YOLOv8 model: { e } " }), flush = True )
62
+ sys .exit (1 )
63
+
64
+ config = {}
65
+ if config_str :
66
+ try :
67
+ config_json = base64 .b64decode (config_str ).decode ('utf-8' )
68
+ config = json .loads (config_json )
69
+ except Exception as e :
70
+ print (json .dumps ({"status" : "error" , "message" : f"Invalid config provided: { e } " }), flush = True )
71
+
72
+ roi = config .get ('roi' )
73
+ objects_to_detect = config .get ('objects' , None )
74
+ confidence_threshold = config .get ('confidence' , 0.5 )
75
+ frame_skip = int (config .get ('frame_skip' , 5 ))
76
+ if frame_skip < 1 :
77
+ frame_skip = 1
78
+ resize_width = int (config .get ('resize_width' , 640 ))
79
+
80
+ # Используем наш новый класс FrameGrabber
81
+ try :
82
+ frame_grabber = FrameGrabber (rtsp_url )
83
+ frame_grabber .start ()
84
+ time .sleep (2 ) # Даем время на подключение и заполнение первого кадра
85
+ except IOError as e :
86
+ print (json .dumps ({"status" : "error" , "message" : str (e )}), flush = True )
87
+ sys .exit (1 )
88
+
89
+ frame_count = 0
90
+
91
+ # Основной цикл и блок finally для корректного завершения
92
+ try :
93
+ while not frame_grabber .stopped :
94
+ ret , frame = frame_grabber .read ()
95
+ if not ret or frame is None :
96
+ # Поток мог завершиться, даем ему немного времени и проверяем снова
97
+ time .sleep (0.5 )
98
+ if frame_grabber .stopped :
99
+ break
100
+ continue
101
+
102
+ frame_count += 1
103
+ if frame_count % frame_skip != 0 :
104
+ # VVV ИЗМЕНЕНИЕ: Замена sleep на continue VVV
105
+ # Это более эффективно, так как не вносит искусственную задержку.
106
+ # Цикл просто перейдет к следующей итерации.
107
+ continue
108
+ # ^^^ КОНЕЦ ИЗМЕНЕНИЯ ^^^
109
+
110
+ original_height , original_width = frame .shape [:2 ]
111
+
112
+ scale_x , scale_y = 1.0 , 1.0
113
+ if resize_width > 0 and original_width > resize_width :
114
+ scale_x = original_width / resize_width
115
+ new_height = int (original_height / scale_x )
116
+ scale_y = original_height / new_height
117
+ frame_to_process = cv2 .resize (frame , (resize_width , new_height ), interpolation = cv2 .INTER_AREA )
118
+ else :
119
+ frame_to_process = frame
120
+
121
+ results = model (frame_to_process , verbose = False , conf = confidence_threshold )
122
+
123
+ detected_objects = []
124
+ for box in results [0 ].boxes :
125
+ class_id = int (box .cls [0 ])
126
+ label = model .names [class_id ]
127
+
128
+ x1 , y1 , x2 , y2 = box .xyxy [0 ]
129
+ x , y , w , h = int (x1 ), int (y1 ), int (x2 - x1 ), int (y2 - y1 )
130
+
131
+ detected_objects .append ({
132
+ 'label' : label ,
133
+ 'confidence' : float (box .conf [0 ]),
134
+ 'box' : {
135
+ 'x' : int (x * scale_x ),
136
+ 'y' : int (y * scale_y ),
137
+ 'w' : int (w * scale_x ),
138
+ 'h' : int (h * scale_y )
139
+ }
140
+ })
141
+
142
+ filtered_objects = []
143
+ for obj in detected_objects :
144
+ if objects_to_detect and obj ['label' ] not in objects_to_detect :
145
+ continue
146
+
147
+ if roi :
148
+ box = obj ['box' ]
149
+ obj_center_x = box ['x' ] + box ['w' ] / 2
150
+ obj_center_y = box ['y' ] + box ['h' ] / 2
151
+
152
+ roi_x1 = roi ['x' ] * original_width
153
+ roi_y1 = roi ['y' ] * original_height
154
+ roi_x2 = (roi ['x' ] + roi ['w' ]) * original_width
155
+ roi_y2 = (roi ['y' ] + roi ['h' ]) * original_height
156
+
157
+ if not (roi_x1 < obj_center_x < roi_x2 and roi_y1 < obj_center_y < roi_y2 ):
158
+ continue
159
+
160
+ filtered_objects .append (obj )
161
+
162
+ if len (filtered_objects ) > 0 :
163
+ result = {
164
+ "status" : "objects_detected" ,
165
+ "timestamp" : time .time (),
166
+ "objects" : filtered_objects
167
+ }
168
+ print (json .dumps (result ), flush = True )
169
+
170
+ finally :
171
+ print (json .dumps ({"status" : "info" , "message" : "Analytics process stopping." }), flush = True )
172
+ frame_grabber .stop ()
173
+
174
+ if __name__ == "__main__" :
175
+ if len (sys .argv ) > 1 :
176
+ rtsp_stream_url = sys .argv [1 ]
177
+ config_arg = sys .argv [2 ] if len (sys .argv ) > 2 else None
178
+ run_analytics (rtsp_stream_url , config_arg )
179
+ else :
180
+ print (json .dumps ({"status" : "error" , "message" : "RTSP URL not provided" }), flush = True )
181
+ sys .exit (1 )
0 commit comments