Skip to content

[vLLM]feat: support micro batch for vllm #1818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

Irvingwangjr
Copy link
Collaborator

No description provided.

exceptoin: Exception


class MicroBatchChatCompletionScheduler(NaiveChatCompletionScheduler):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NaiveChatCompletionScheduler 的定位应该只是个demo,所以放在example里面。你写的这个应该是正式的版本,挪到 verl/workers/rollout/ 这个目录,直接从ChatCompletionScheduler继承比较合适。
然后把MicroBatchChatCompletionScheduler设置成默认的scheduler

self.max_inflight_req = max_inflight_req
self.server_addresses = server_addresses
self.proxy_agents_coros = self._init_proxy_group(self.server_addresses, self.send_queue, self.reduce_queue, self.max_inflight_req)
print(self.proxy_agents_coros)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有很多print,如果对训练没帮助就删了吧。如果需要,改成logger



class MicroBatchChatCompletionScheduler(NaiveChatCompletionScheduler):
def __init__(self, config, model_path, server_addresses, max_cache_size=10000, max_inflight_req=8):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在配置文件里面,给 max_inflight_req 加个配置,并写一下使用说明

if exception is not None:
print("[MicroBatchChatCompletionScheduler] _consumer process callback get exception", idx)
await reduce_queue.put(RolloutSample(completions, info, None, None, None, exceptoin=exception))
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的if else分支没起作用吧?
await reduce_queue.put(RolloutSample(completions, info, None, None, None, exceptoin=exception) 是不是就行了

semaphore = asyncio.Semaphore(max_inflight_req)
local_queue = asyncio.Queue(max_inflight_req)
coros.append(self.get_element(addr, send_queue, local_queue, semaphore))
coros.extend([self.process(local_queue, reduce_queue, semaphore, addr, i) for i in range(max_inflight_req)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为对每个 sample都要取semaphore,那么max_inflight_req是不是并没有起到作用?

@eric-haibin-lin
Copy link
Collaborator

A PR feature without description and usage example is not acceptable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants