Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 41 additions & 32 deletions runner/app/live/pipelines/comfyui.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import os
import json
import torch
import asyncio
import numpy as np
from PIL import Image
from typing import Union
from pydantic import BaseModel, field_validator
import pathlib
import av

from .interface import Pipeline
from comfystream.client import ComfyStreamClient
from trickle import VideoFrame, VideoOutput
from comfystream.pipeline import Pipeline as ComfyStreamPipeline
from trickle import VideoFrame, VideoOutput, AudioFrame, AudioOutput

import logging

Expand Down Expand Up @@ -52,52 +52,61 @@ def validate_prompt(cls, v) -> dict:
class ComfyUI(Pipeline):
def __init__(self):
comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV)
self.client = ComfyStreamClient(cwd=comfy_ui_workspace)
self.comfystream = ComfyStreamPipeline(width=512, height=512, cwd=comfy_ui_workspace)
self.params: ComfyUIParams
self.video_incoming_frames: asyncio.Queue[VideoOutput] = asyncio.Queue()

async def initialize(self, **params):
new_params = ComfyUIParams(**params)
logging.info(f"Initializing ComfyUI Pipeline with prompt: {new_params.prompt}")
# TODO: currently its a single prompt, but need to support multiple prompts
await self.client.set_prompts([new_params.prompt])
await self.comfystream.set_prompts([new_params.prompt])
self.params = new_params

# Warm up the pipeline
dummy_frame = VideoFrame(None, 0, 0)
dummy_frame.side_data.input = torch.randn(1, 512, 512, 3)

for _ in range(WARMUP_RUNS):
self.client.put_video_input(dummy_frame)
_ = await self.client.get_video_output()
await self.comfystream.warm_video()
logging.info("Pipeline initialization and warmup complete")

async def put_video_frame(self, frame: VideoFrame, request_id: str):
image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0
frame.side_data.input = torch.tensor(image_np).unsqueeze(0)
frame.side_data.skipped = True
self.client.put_video_input(frame)
await self.video_incoming_frames.put(VideoOutput(frame, request_id))

async def get_processed_video_frame(self):
result_tensor = await self.client.get_video_output()
out = await self.video_incoming_frames.get()
while out.frame.side_data.skipped:
out = await self.video_incoming_frames.get()
Comment on lines -84 to -86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need this logic anymore?

Copy link
Collaborator Author

@eliteprox eliteprox May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need this logic anymore?

The frame skipping logic is in comfystream here https://github.com/livepeer/comfystream/blob/e9b87bc7bc0adb32c5731f70b1e11e44c1f2e3c1/src/comfystream/pipeline.py#L169-L170, so this method already implements the logic internally


result_tensor = result_tensor.squeeze(0)
result_image_np = (result_tensor * 255).byte()
result_image = Image.fromarray(result_image_np.cpu().numpy())
return out.replace_image(result_image)
await self.comfystream.put_video_frame(self._convert_to_av_frame(frame), request_id)

async def put_audio_frame(self, frame: AudioFrame, request_id: str):
await self.comfystream.put_audio_frame(self._convert_to_av_frame(frame), request_id)

async def get_processed_video_frame(self) -> VideoOutput:
av_frame = await self.comfystream.get_processed_video_frame()
video_frame = VideoFrame.from_av_video(av_frame)
return VideoOutput(video_frame, av_frame.side_data.request_id)

async def get_processed_audio_frame(self) -> AudioOutput:
av_frame = await self.comfystream.get_processed_audio_frame()
audio_frame = AudioFrame.from_av_audio(av_frame)
return AudioOutput(audio_frame, av_frame.side_data.request_id)

async def update_params(self, **params):
new_params = ComfyUIParams(**params)
logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}")
# TODO: currently its a single prompt, but need to support multiple prompts
await self.client.update_prompts([new_params.prompt])
await self.comfystream.update_prompts([new_params.prompt])
self.params = new_params

async def stop(self):
logging.info("Stopping ComfyUI pipeline")
await self.client.cleanup()
await self.comfystream.cleanup()
logging.info("ComfyUI pipeline stopped")

def _convert_to_av_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Union[av.VideoFrame, av.AudioFrame]:
"""Convert trickle frame to av frame"""
if isinstance(frame, VideoFrame):
av_frame = av.VideoFrame.from_ndarray(
np.array(frame.image.convert("RGB")),
format='rgb24'
)
elif isinstance(frame, AudioFrame):
av_frame = av.AudioFrame.from_ndarray(
frame.samples.reshape(-1, 1),
layout='mono',
rate=frame.rate
)

# Common frame properties
av_frame.pts = frame.timestamp
av_frame.time_base = frame.time_base
return av_frame
2 changes: 1 addition & 1 deletion runner/app/live/pipelines/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def put_video_frame(self, frame: VideoFrame, request_id: str):
async def get_processed_video_frame(self) -> VideoOutput:
out = await self.frame_queue.get()
processed_frame = out.image.convert("RGB")
return out.replace_image(processed_frame)
return VideoOutput(out.frame.replace_image(processed_frame), out.request_id)

async def initialize(self, **params):
logging.info(f"Initializing Noop pipeline with params: {params}")
Expand Down
2 changes: 1 addition & 1 deletion runner/app/live/streamer/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def get_recent_logs(self, n=None) -> list[str]:

def process_loop(self):
self._setup_logging()
pipeline = None

# Ensure CUDA environment is available inside the subprocess.
# Multiprocessing (spawn mode) does not inherit environment variables by default,
Expand Down Expand Up @@ -146,6 +145,7 @@ def _handle_logging_params(self, params: dict) -> dict:

async def _initialize_pipeline(self):
try:
pipeline = None
stream_id = ""
params = {}
try:
Expand Down
6 changes: 4 additions & 2 deletions runner/app/live/streamer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ async def start(self, params: dict):
run_in_background("ingress_loop", self.run_ingress_loop()),
run_in_background("egress_loop", self.run_egress_loop()),
run_in_background("report_status_loop", self.report_status_loop()),
run_in_background("control_loop", self.run_control_loop()),
run_in_background("control_loop", self.run_control_loop())
]
# auxiliary tasks that are not critical to the supervisor, but which we want to run
# TODO: maybe remove this since we had to move the control loop to main tasks
self.auxiliary_tasks: list[asyncio.Task] = []
self.auxiliary_tasks = [

]
self.tasks_supervisor_task = run_in_background(
"tasks_supervisor", self.tasks_supervisor()
)
Expand Down
1 change: 1 addition & 0 deletions runner/app/live/trickle/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class SideData:
"""
skipped: bool = True
input: Image.Image | np.ndarray | None
request_id: str = ''

class InputFrame:
"""
Expand Down
2 changes: 1 addition & 1 deletion runner/docker/Dockerfile.live-base-comfyui
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG BASE_IMAGE=livepeer/comfyui-base@sha256:4435bad85c3a2fce2b491135bee49eedb8edbd8bdf5d124cb0a95a1d4ecb6856
ARG BASE_IMAGE=livepeer/comfyui-base@sha256:9364f249297ea9aebc053f361a41efa78ce0fbbb15b73b4a73215f3edb8ada2f
FROM ${BASE_IMAGE}

# -----------------------------------------------------------------------------
Expand Down