Skip to content

Commit 6011d52

Browse files
njhillMurali Andoorveedu
authored andcommitted
[Core] Multiprocessing Pipeline Parallel support (vllm-project#6130)
Co-authored-by: Murali Andoorveedu <[email protected]>
1 parent d653d7f commit 6011d52

File tree

9 files changed

+152
-99
lines changed

9 files changed

+152
-99
lines changed

.buildkite/test-pipeline.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ steps:
5454
- label: Core Test
5555
mirror_hardwares: [amd]
5656
fast_check: true
57-
commands:
57+
commands:
5858
- pytest -v -s core
5959
- pytest -v -s distributed/test_parallel_state.py
6060

@@ -73,7 +73,7 @@ steps:
7373
commands:
7474
- # the following commands are for the first node, with ip 192.168.10.10 (ray environment already set up)
7575
- VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py
76-
- pytest -v -s distributed/test_pipeline_parallel.py
76+
- VLLM_MULTI_NODE=1 pytest -v -s distributed/test_pipeline_parallel.py
7777
- # the following commands are for the second node, with ip 192.168.10.11 (ray environment already set up)
7878
- VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py
7979

@@ -123,7 +123,7 @@ steps:
123123

124124
- label: Engine Test
125125
mirror_hardwares: [amd]
126-
commands:
126+
commands:
127127
- pytest -v -s engine test_sequence.py test_config.py test_logger.py
128128
# OOM in the CI unless we run this separately
129129
- pytest -v -s tokenization

tests/distributed/test_pipeline_parallel.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,42 @@
1+
import os
2+
13
import pytest
24

35
from ..utils import compare_two_settings
46

7+
VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"
8+
59

610
@pytest.mark.parametrize(
7-
"TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME", [
8-
(2, 2, 0, 1, "meta-llama/Meta-Llama-3-8B"),
9-
(2, 2, 1, 0, "meta-llama/Meta-Llama-3-8B"),
10-
(1, 3, 0, 0, "meta-llama/Meta-Llama-3-8B"),
11-
(1, 4, 0, 1, "meta-llama/Meta-Llama-3-8B"),
12-
(1, 4, 1, 0, "meta-llama/Meta-Llama-3-8B"),
11+
"TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME, DIST_BACKEND",
12+
[
13+
(2, 2, 0, 1, "meta-llama/Meta-Llama-3-8B", "ray"),
14+
(2, 2, 1, 0, "meta-llama/Meta-Llama-3-8B", "ray"),
15+
(1, 3, 0, 0, "meta-llama/Meta-Llama-3-8B", "ray"),
16+
(1, 4, 0, 1, "meta-llama/Meta-Llama-3-8B", "ray"),
17+
(1, 4, 1, 0, "meta-llama/Meta-Llama-3-8B", "ray"),
18+
(2, 2, 0, 1, "meta-llama/Meta-Llama-3-8B", "mp"),
19+
(2, 2, 1, 0, "meta-llama/Meta-Llama-3-8B", "mp"),
20+
(1, 3, 0, 0, "meta-llama/Meta-Llama-3-8B", "mp"),
21+
(1, 4, 0, 1, "meta-llama/Meta-Llama-3-8B", "mp"),
22+
(1, 4, 1, 0, "meta-llama/Meta-Llama-3-8B", "mp"),
1323
])
14-
def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME):
24+
def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME,
25+
DIST_BACKEND):
26+
if VLLM_MULTI_NODE and DIST_BACKEND == "mp":
27+
pytest.skip("Skipping multi-node pipeline parallel test for "
28+
"multiprocessing distributed backend")
1529

1630
pp_args = [
1731
# use half precision for speed and memory savings in CI environment
1832
"--dtype",
19-
"bfloat16",
33+
"float16",
2034
"--pipeline-parallel-size",
2135
str(PP_SIZE),
2236
"--tensor-parallel-size",
2337
str(TP_SIZE),
2438
"--distributed-executor-backend",
25-
"ray",
39+
DIST_BACKEND,
2640
]
2741

2842
# compare without pipeline parallelism

vllm/config.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -712,10 +712,6 @@ def __init__(
712712
self.rank = 0
713713

714714
def _verify_args(self) -> None:
715-
if (self.pipeline_parallel_size > 1
716-
and self.distributed_executor_backend == "mp"):
717-
raise NotImplementedError("Pipeline parallelism is not supported "
718-
"yet with multiprocessing.")
719715
if self.distributed_executor_backend not in ("ray", "mp", None):
720716
raise ValueError(
721717
"Unrecognized distributed executor backend. Supported values "

vllm/executor/executor_base.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from abc import ABC, abstractmethod
32
from typing import List, Optional, Set, Tuple
43

@@ -132,26 +131,6 @@ def __del__(self):
132131

133132
class ExecutorAsyncBase(ExecutorBase):
134133

135-
def __init__(
136-
self,
137-
model_config: ModelConfig,
138-
cache_config: CacheConfig,
139-
parallel_config: ParallelConfig,
140-
scheduler_config: SchedulerConfig,
141-
device_config: DeviceConfig,
142-
load_config: LoadConfig,
143-
lora_config: Optional[LoRAConfig],
144-
multimodal_config: Optional[MultiModalConfig],
145-
speculative_config: Optional[SpeculativeConfig],
146-
prompt_adapter_config: Optional[PromptAdapterConfig],
147-
) -> None:
148-
self.pp_locks: Optional[List[asyncio.Lock]] = None
149-
150-
super().__init__(model_config, cache_config, parallel_config,
151-
scheduler_config, device_config, load_config,
152-
lora_config, multimodal_config, speculative_config,
153-
prompt_adapter_config)
154-
155134
@abstractmethod
156135
async def execute_model_async(
157136
self,

vllm/executor/gpu_executor.py

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@
1212
logger = init_logger(__name__)
1313

1414

15+
def create_worker(worker_module_name, worker_class_name, **kwargs):
16+
wrapper = WorkerWrapperBase(
17+
worker_module_name=worker_module_name,
18+
worker_class_name=worker_class_name,
19+
)
20+
wrapper.init_worker(**kwargs)
21+
return wrapper.worker
22+
23+
1524
class GPUExecutor(ExecutorBase):
1625

1726
def _init_executor(self) -> None:
@@ -51,25 +60,30 @@ def _get_worker_kwargs(
5160
or (rank % self.parallel_config.tensor_parallel_size == 0),
5261
)
5362

63+
def _get_create_worker_kwargs(
64+
self,
65+
local_rank: int = 0,
66+
rank: int = 0,
67+
distributed_init_method: Optional[str] = None) -> Dict:
68+
worker_kwargs = self._get_worker_kwargs(local_rank, rank,
69+
distributed_init_method)
70+
if self.speculative_config is None:
71+
worker_kwargs.update(worker_module_name="vllm.worker.worker",
72+
worker_class_name="Worker")
73+
else:
74+
worker_kwargs.update(
75+
worker_module_name="vllm.spec_decode.spec_decode_worker",
76+
worker_class_name="create_spec_worker")
77+
return worker_kwargs
78+
5479
def _create_worker(self,
5580
local_rank: int = 0,
5681
rank: int = 0,
5782
distributed_init_method: Optional[str] = None):
58-
59-
if self.speculative_config is None:
60-
worker_module_name = "vllm.worker.worker"
61-
worker_class_name = "Worker"
62-
else:
63-
worker_module_name = "vllm.spec_decode.spec_decode_worker"
64-
worker_class_name = "create_spec_worker"
65-
66-
wrapper = WorkerWrapperBase(
67-
worker_module_name=worker_module_name,
68-
worker_class_name=worker_class_name,
69-
)
70-
wrapper.init_worker(**self._get_worker_kwargs(local_rank, rank,
71-
distributed_init_method))
72-
return wrapper.worker
83+
return create_worker(**self._get_create_worker_kwargs(
84+
local_rank=local_rank,
85+
rank=rank,
86+
distributed_init_method=distributed_init_method))
7387

7488
def determine_num_available_blocks(self) -> Tuple[int, int]:
7589
"""Determine the number of available KV blocks by invoking the

vllm/executor/multiproc_gpu_executor.py

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77

88
from vllm.executor.distributed_gpu_executor import ( # yapf: disable
99
DistributedGPUExecutor, DistributedGPUExecutorAsync)
10+
from vllm.executor.gpu_executor import create_worker
1011
from vllm.executor.multiproc_worker_utils import (ProcessWorkerWrapper,
1112
ResultHandler, WorkerMonitor)
1213
from vllm.logger import init_logger
1314
from vllm.sequence import ExecuteModelRequest, SamplerOutput
1415
from vllm.triton_utils import maybe_set_triton_cache_manager
15-
from vllm.utils import (cuda_device_count_stateless,
16+
from vllm.utils import (_run_task_with_lock, cuda_device_count_stateless,
1617
error_on_invalid_device_count_status,
1718
get_distributed_init_method, get_open_port,
1819
get_vllm_instance_id, make_async,
@@ -26,7 +27,8 @@ class MultiprocessingGPUExecutor(DistributedGPUExecutor):
2627

2728
def _init_executor(self) -> None:
2829
# Create the parallel GPU workers.
29-
world_size = self.parallel_config.tensor_parallel_size
30+
world_size = self.parallel_config.world_size
31+
tensor_parallel_size = self.parallel_config.tensor_parallel_size
3032

3133
# Set CUDA_VISIBLE_DEVICES for the driver, inherited by workers
3234
if "CUDA_VISIBLE_DEVICES" not in os.environ:
@@ -49,8 +51,15 @@ def _init_executor(self) -> None:
4951
if world_size > 1:
5052
maybe_set_triton_cache_manager()
5153

52-
assert world_size <= cuda_device_count_stateless(), (
53-
"please set tensor_parallel_size to less than max local gpu count")
54+
cuda_device_count = cuda_device_count_stateless()
55+
# Use confusing message for more common TP-only case.
56+
assert tensor_parallel_size <= cuda_device_count, (
57+
f"please set tensor_parallel_size ({tensor_parallel_size}) "
58+
f"to less than max local gpu count ({cuda_device_count})")
59+
60+
assert world_size <= cuda_device_count, (
61+
f"please ensure that world_size ({world_size}) "
62+
f"is less than than max local gpu count ({cuda_device_count})")
5463

5564
error_on_invalid_device_count_status()
5665

@@ -60,21 +69,35 @@ def _init_executor(self) -> None:
6069
distributed_init_method = get_distributed_init_method(
6170
"127.0.0.1", get_open_port())
6271

72+
self.workers: List[ProcessWorkerWrapper] = []
73+
# This is the list of workers that are rank 0 of each TP group EXCEPT
74+
# global rank 0. These are the workers that will broadcast to the
75+
# rest of the workers.
76+
self.tp_driver_workers: List[ProcessWorkerWrapper] = []
77+
# This is the list of workers that are not drivers and not the first
78+
# worker in a TP group. These are the workers that will be
79+
# broadcasted to.
80+
self.non_driver_workers: List[ProcessWorkerWrapper] = []
81+
6382
if world_size == 1:
64-
self.workers = []
6583
self.worker_monitor = None
6684
else:
6785
result_handler = ResultHandler()
68-
self.workers = [
69-
ProcessWorkerWrapper(
86+
for rank in range(1, world_size):
87+
worker = ProcessWorkerWrapper(
7088
result_handler,
7189
partial(
72-
self._create_worker,
73-
rank=rank,
74-
local_rank=rank,
75-
distributed_init_method=distributed_init_method,
76-
)) for rank in range(1, world_size)
77-
]
90+
create_worker,
91+
**self._get_create_worker_kwargs(
92+
rank=rank,
93+
local_rank=rank,
94+
distributed_init_method=distributed_init_method,
95+
)))
96+
self.workers.append(worker)
97+
if rank % tensor_parallel_size == 0:
98+
self.tp_driver_workers.append(worker)
99+
else:
100+
self.non_driver_workers.append(worker)
78101

79102
self.worker_monitor = WorkerMonitor(self.workers, result_handler)
80103
result_handler.start()
@@ -136,16 +159,19 @@ def _run_workers(
136159
raise NotImplementedError(
137160
"max_concurrent_workers is not supported yet.")
138161

139-
# Start the workers first.
162+
if async_run_tensor_parallel_workers_only:
163+
# Run only non-driver workers and just return futures.
164+
return [
165+
worker.execute_method(method, *args, **kwargs)
166+
for worker in self.non_driver_workers
167+
]
168+
169+
# Start all remote workers first.
140170
worker_outputs = [
141171
worker.execute_method(method, *args, **kwargs)
142172
for worker in self.workers
143173
]
144174

145-
if async_run_tensor_parallel_workers_only:
146-
# Just return futures
147-
return worker_outputs
148-
149175
driver_worker_method = getattr(self.driver_worker, method)
150176
driver_worker_output = driver_worker_method(*args, **kwargs)
151177

@@ -172,16 +198,45 @@ class MultiprocessingGPUExecutorAsync(MultiprocessingGPUExecutor,
172198
def __init__(self, *args, **kwargs):
173199
super().__init__(*args, **kwargs)
174200
self.driver_exec_model = make_async(self.driver_worker.execute_model)
201+
self.pp_locks: Optional[List[asyncio.Lock]] = None
175202

176203
async def _driver_execute_model_async(
177204
self,
178205
execute_model_req: Optional[ExecuteModelRequest] = None
179206
) -> List[SamplerOutput]:
180-
return await self.driver_exec_model(execute_model_req)
207+
if not self.tp_driver_workers:
208+
return await self.driver_exec_model(execute_model_req)
209+
210+
if self.pp_locks is None:
211+
# This locks each pipeline parallel stage so multiple virtual
212+
# engines can't execute on the same stage at the same time
213+
# We create the locks here to avoid creating them in the constructor
214+
# which uses a different asyncio loop.
215+
self.pp_locks = [
216+
asyncio.Lock()
217+
for _ in range(self.parallel_config.pipeline_parallel_size)
218+
]
219+
220+
tasks = [
221+
asyncio.create_task(
222+
_run_task_with_lock(self.driver_exec_model, self.pp_locks[0],
223+
execute_model_req))
224+
]
225+
for pp_rank, driver_worker in enumerate(self.tp_driver_workers,
226+
start=1):
227+
tasks.append(
228+
asyncio.create_task(
229+
_run_task_with_lock(driver_worker.execute_method_async,
230+
self.pp_locks[pp_rank],
231+
"execute_model", execute_model_req)))
232+
results = await asyncio.gather(*tasks)
233+
234+
# Only the last PP stage has the final results.
235+
return results[-1]
181236

182237
async def _start_worker_execution_loop(self):
183238
coros = [
184239
worker.execute_method_async("start_worker_execution_loop")
185-
for worker in self.workers
240+
for worker in self.non_driver_workers
186241
]
187242
return await asyncio.gather(*coros)

0 commit comments

Comments
 (0)