Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions paddlenlp/trainer/plugins/unified_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
get_checkpoint_shard_files,
is_safetensors_available,
)
from paddlenlp.utils.distributed import distributed_gather
from paddlenlp.utils.distributed import distributed_allgather, distributed_gather
from paddlenlp.utils.env import (
LORA_WEIGHTS_NAME,
PADDLE_MASTER_WEIGHTS_INDEX_NAME,
Expand All @@ -64,6 +64,7 @@
)
from paddlenlp.utils.log import logger
from paddlenlp.utils.nested import nested_copy, nested_copy_place
from paddlenlp.utils.tools import get_env_device

if is_safetensors_available():
# from safetensors import safe_open
Expand Down Expand Up @@ -1753,7 +1754,10 @@ def merge_tensor_parallel_with_shard(state_dict, tp_actions, all_filter_keys):
key = filter_keys[i]
tensor = state_dict[key]
if key in tp_actions:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)
if get_env_device() == "xpu":
ret = distributed_allgather(tensor, group=tp_group, offload=False)
else:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)
action = tp_actions.pop(key)
tensor = action(ret) if is_dst else None
else:
Expand Down Expand Up @@ -1790,7 +1794,10 @@ def merge_tensor_parallel_for_optimizer(state_dict, tp_actions, all_filter_keys)
if tensor.numel().item() == 1:
tensor = tensor._copy_to(DEST_PLACE, False) if is_dst else None # Need broadcast when loaded
else:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)
if get_env_device() == "xpu":
ret = distributed_allgather(tensor, group=tp_group, offload=False)
else:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)
action = tp_actions[model_key]
tensor = action(ret) if is_dst else None
else:
Expand Down