Skip to content

Commit fd14d5a

Browse files
committed
poc comfystream.pipeline implementation
1 parent e209aa3 commit fd14d5a

File tree

5 files changed

+46
-27
lines changed

5 files changed

+46
-27
lines changed

runner/app/live/pipelines/comfyui.py

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
from typing import Union
77
from pydantic import BaseModel, field_validator
88
import pathlib
9+
import av
910

1011
from .interface import Pipeline
1112
from comfystream.pipeline import Pipeline as ComfyStreamPipeline
12-
from trickle import VideoFrame, VideoOutput
13+
from trickle import VideoFrame, VideoOutput, AudioFrame, AudioOutput
1314

1415
import logging
1516

@@ -64,22 +65,22 @@ async def initialize(self, **params):
6465
await self.pipeline.warm_video()
6566
logging.info("Pipeline initialization and warmup complete")
6667

68+
6769
async def put_video_frame(self, frame: VideoFrame, request_id: str):
68-
# Convert VideoFrame to format expected by comfystream
69-
image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0
70-
frame.side_data.input = torch.tensor(image_np).unsqueeze(0)
71-
frame.side_data.skipped = True
72-
frame.side_data.request_id = request_id
73-
await self.pipeline.put_video_frame(frame)
74-
75-
async def get_processed_video_frame(self) -> VideoOutput:
76-
processed_frame = await self.pipeline.get_processed_video_frame()
77-
# Convert back to VideoOutput format
78-
result_tensor = processed_frame.side_data.input
79-
result_tensor = result_tensor.squeeze(0)
80-
result_image_np = (result_tensor * 255).byte()
81-
result_image = Image.fromarray(result_image_np.cpu().numpy())
82-
return VideoOutput(processed_frame, processed_frame.side_data.request_id).replace_image(result_image)
70+
await self.pipeline.put_video_frame(self._convert_to_av_frame(frame))
71+
72+
async def put_audio_frame(self, frame: AudioFrame, request_id: str):
73+
await self.pipeline.put_audio_frame(self._convert_to_av_frame(frame))
74+
75+
async def get_processed_video_frame(self, request_id: str) -> VideoOutput:
76+
av_frame = await self.pipeline.get_processed_video_frame()
77+
video_frame = VideoFrame.from_av_video(av_frame)
78+
video_frame.side_data.request_id = request_id
79+
return VideoOutput(video_frame).replace_image(av_frame.to_image())
80+
81+
async def get_processed_audio_frame(self, request_id: str) -> AudioOutput:
82+
av_frame = await self.pipeline.get_processed_audio_frame()
83+
return AudioOutput(av_frame, request_id)
8384

8485
async def update_params(self, **params):
8586
new_params = ComfyUIParams(**params)
@@ -91,3 +92,22 @@ async def stop(self):
9192
logging.info("Stopping ComfyUI pipeline")
9293
await self.pipeline.cleanup()
9394
logging.info("ComfyUI pipeline stopped")
95+
96+
def _convert_to_av_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Union[av.VideoFrame, av.AudioFrame]:
97+
"""Convert trickle frame to av frame"""
98+
if isinstance(frame, VideoFrame):
99+
av_frame = av.VideoFrame.from_ndarray(
100+
np.array(frame.image.convert("RGB")),
101+
format='rgb24'
102+
)
103+
elif isinstance(frame, AudioFrame):
104+
av_frame = av.AudioFrame.from_ndarray(
105+
frame.samples.reshape(-1, 1),
106+
layout='mono',
107+
rate=frame.rate
108+
)
109+
110+
# Common frame properties
111+
av_frame.pts = frame.timestamp
112+
av_frame.time_base = frame.time_base
113+
return av_frame

runner/app/live/pipelines/interface.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self, **params):
2323
pass
2424

2525
@abstractmethod
26-
async def put_video_frame(self, frame: VideoFrame, request_id: str):
26+
async def put_video_frame(self, frame: VideoFrame):
2727
"""Put a frame into the pipeline.
2828
2929
Args:
@@ -32,7 +32,7 @@ async def put_video_frame(self, frame: VideoFrame, request_id: str):
3232
pass
3333

3434
@abstractmethod
35-
async def get_processed_video_frame(self) -> VideoOutput:
35+
async def get_processed_video_frame(self, request_id: str) -> VideoOutput:
3636
"""Get a processed frame from the pipeline.
3737
3838
Returns:

runner/app/live/pipelines/noop.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ def __init__(self):
1313
async def put_video_frame(self, frame: VideoFrame, request_id: str):
1414
await self.frame_queue.put(VideoOutput(frame, request_id))
1515

16-
async def get_processed_video_frame(self) -> VideoOutput:
16+
async def get_processed_video_frame(self, request_id: str) -> VideoOutput:
1717
out = await self.frame_queue.get()
1818
processed_frame = out.image.convert("RGB")
19-
return out.replace_image(processed_frame)
19+
return VideoOutput(out.frame.replace_image(processed_frame), request_id)
2020

2121
async def initialize(self, **params):
2222
logging.info(f"Initializing Noop pipeline with params: {params}")

runner/app/live/streamer/process.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def get_recent_logs(self, n=None) -> list[str]:
109109

110110
def process_loop(self):
111111
self._setup_logging()
112-
pipeline = None
112+
# pipeline = None
113113

114114
# Ensure CUDA environment is available inside the subprocess.
115115
# Multiprocessing (spawn mode) does not inherit environment variables by default,
@@ -146,7 +146,7 @@ async def _initialize_pipeline(self):
146146
params = {}
147147
try:
148148
params = self.param_update_queue.get_nowait()
149-
logging.info(f"PipelineProcess: Got params from param_update_queue {params}")
149+
logging.info(f"PipelineProcess: Got params from param_update_queue {params}")
150150
params = self._handle_logging_params(params)
151151
except queue.Empty:
152152
logging.info("PipelineProcess: No params found in param_update_queue, loading with default params")
@@ -206,8 +206,8 @@ async def _input_loop(self, pipeline: Pipeline):
206206
async def _output_loop(self, pipeline: Pipeline):
207207
while not self.is_done():
208208
try:
209-
output_frame = await pipeline.get_processed_video_frame()
210-
output_frame.log_timestamps["post_process_frame"] = time.time()
209+
output_frame = await pipeline.get_processed_video_frame(self.request_id)
210+
#output_frame.log_timestamps["post_process_frame"] = time.time()
211211
await asyncio.to_thread(self.output_queue.put, output_frame)
212212
except Exception as e:
213213
self._report_error(f"Error processing output frame: {e}")

runner/app/live/streamer/streamer.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,11 @@ async def start(self, params: dict):
5454
self.main_tasks = [
5555
run_in_background("ingress_loop", self.run_ingress_loop()),
5656
run_in_background("egress_loop", self.run_egress_loop()),
57-
run_in_background("report_status_loop", self.report_status_loop()),
58-
run_in_background("control_loop", self.run_control_loop()),
57+
run_in_background("report_status_loop", self.report_status_loop())
5958
]
6059
# auxiliary tasks that are not critical to the supervisor, but which we want to run
6160
self.auxiliary_tasks = [
62-
61+
run_in_background("control_loop", self.run_control_loop())
6362
]
6463
self.tasks_supervisor_task = run_in_background(
6564
"tasks_supervisor", self.tasks_supervisor()

0 commit comments

Comments
 (0)