Skip to content

Commit 9988fec

Browse files
committed
feat: Support conversion between dataproto and batchmeta
1 parent eb31070 commit 9988fec

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

verl/utils/transferqueue_utils.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import inspect
16+
from functools import wraps
1517
from typing import Any
1618

19+
from transfer_queue import BatchMeta
20+
1721
from verl.experimental.transfer_queue import ZMQServerInfo
1822

1923
_TRANSFER_QUEUE_CONTROLLER_INFOS = None
@@ -33,3 +37,57 @@ def get_transferqueue_server_info():
3337
"TransferQueue server infos have not been set yet."
3438
)
3539
return _TRANSFER_QUEUE_CONTROLLER_INFOS, _TRANSFER_QUEUE_STORAGE_INFOS
40+
41+
42+
def _find_batchmeta(*args, **kwargs):
43+
for arg in args:
44+
if isinstance(arg, BatchMeta):
45+
return arg
46+
for v in kwargs.values():
47+
if isinstance(v, BatchMeta):
48+
return v
49+
return None
50+
51+
52+
def _batchmeta_to_dataproto(batchmeta: BatchMeta):
53+
...
54+
55+
56+
def _update_batchmeta_with_output(output, batchmeta: BatchMeta):
57+
...
58+
59+
60+
async def _async_update_batchmeta_with_output(output, batchmeta: BatchMeta):
61+
...
62+
63+
64+
def batchmeta_dataproto_pipe():
65+
def decorator(func):
66+
@wraps(func)
67+
def inner(*args, **kwargs):
68+
batchmeta = _find_batchmeta(*args, **kwargs)
69+
if batchmeta is None:
70+
return func(*args, **kwargs)
71+
else:
72+
args = [_batchmeta_to_dataproto(arg) if isinstance(arg, BatchMeta) else arg for arg in args]
73+
kwargs = {k: _batchmeta_to_dataproto(v) if isinstance(v, BatchMeta) else v for k, v in kwargs.items()}
74+
output = func(*args, **kwargs)
75+
_update_batchmeta_with_output(output, batchmeta)
76+
return batchmeta
77+
78+
@wraps(func)
79+
async def async_inner(*args, **kwargs):
80+
batchmeta = _find_batchmeta(*args, **kwargs)
81+
if batchmeta is None:
82+
return await func(*args, **kwargs)
83+
else:
84+
args = [_batchmeta_to_dataproto(arg) if isinstance(arg, BatchMeta) else arg for arg in args]
85+
kwargs = {k: _batchmeta_to_dataproto(v) if isinstance(v, BatchMeta) else v for k, v in kwargs.items()}
86+
output = await func(*args, **kwargs)
87+
await _async_update_batchmeta_with_output(output, batchmeta)
88+
return batchmeta
89+
90+
wrapper = async_inner if inspect.iscoroutinefunction(func) else inner
91+
return wrapper
92+
return decorator
93+

0 commit comments

Comments
 (0)