Skip to content

Commit 0a8cd93

Browse files
committed
Better queue managmenet again
1 parent 3867924 commit 0a8cd93

File tree

1 file changed

+16
-13
lines changed

1 file changed

+16
-13
lines changed

olmocr/pipeline.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,12 @@ def _kill_proc():
624624

625625
# Shared variables between tasks
626626
last_running_req, last_queue_req = 0, 0
627-
prev_running_req_at_release = 0 # Track running requests at last semaphore release
627+
running_reqs_decreased = False
628628
server_printed_ready_message = False
629629
last_semaphore_release = time.time()
630630

631631
async def process_line(line):
632-
nonlocal last_running_req, last_queue_req, last_semaphore_release, server_printed_ready_message
632+
nonlocal last_running_req, last_queue_req, running_reqs_decreased, last_semaphore_release, server_printed_ready_message
633633
server_logger.info(line)
634634

635635
if "Detected errors during sampling" in line:
@@ -640,12 +640,15 @@ async def process_line(line):
640640
server_printed_ready_message = True
641641
last_semaphore_release = time.time()
642642

643-
match = re.search(r"Running: (\d+)", line)
644-
if match:
645-
last_running_req = int(match.group(1))
643+
if match := re.search(r"Running: (\d+)", line):
644+
current_running = int(match.group(1))
645+
# Check for negative derivative (decrease in running requests), to not overload VLLM
646+
if current_running < last_running_req:
647+
running_reqs_decreased = True
648+
logger.info(f"Running requests decreased: {last_running_req} -> {current_running}")
649+
last_running_req = current_running
646650

647-
match = re.search(r"(?:Waiting|Pending):\s*(\d+)", line)
648-
if match:
651+
if match := re.search(r"(?:Waiting|Pending):\s*(\d+)", line):
649652
last_queue_req = int(match.group(1))
650653
logger.info(f"vllm running req: {last_running_req} queue req: {last_queue_req}")
651654

@@ -661,25 +664,25 @@ async def read_stream(stream):
661664
logger.warning(f"Got {ex} when reading log line from inference server, skipping")
662665

663666
async def timeout_task():
664-
nonlocal last_running_req, last_queue_req, last_semaphore_release, prev_running_req_at_release
667+
nonlocal last_running_req, last_queue_req, last_semaphore_release, running_reqs_decreased
665668
try:
666669
while True:
667670
await asyncio.sleep(1)
668-
671+
669672
# Check if we should release the semaphore
670673
should_release = (
671674
server_printed_ready_message
672675
and last_queue_req == 0
673676
and time.time() - last_semaphore_release > 30
674677
and semaphore.locked()
675-
and (last_running_req == 0 or last_running_req < prev_running_req_at_release)
678+
and (last_running_req == 0 or running_reqs_decreased)
676679
)
677-
680+
678681
if should_release:
679682
semaphore.release()
680-
prev_running_req_at_release = last_running_req
683+
running_reqs_decreased = False # Reset flag after release
681684
last_semaphore_release = time.time()
682-
logger.info(f"Semaphore released, allowing a worker to proceed. Running requests: {last_running_req} (prev: {prev_running_req_at_release})")
685+
logger.info(f"Semaphore released, allowing a worker to proceed. Running requests: {last_running_req}")
683686
except asyncio.CancelledError:
684687
pass # Clean up if the task is cancelled
685688

0 commit comments

Comments
 (0)