1515import json
1616import os
1717import queue
18+ import shutil
1819import subprocess
1920import threading
2021import time
22+ from distutils .dir_util import copy_tree
2123from enum import Enum
2224from typing import List
2325
2830TRAIN_CONFIG = "/root/paddlejob/workspace/env_run/longjob/train.conf"
2931TAR_BIN = "tar"
3032
33+ FLASH_DEVICE = os .getenv ("PDC_FLASH_DEVICE" , "/shared/dev/shm/flash" )
34+
35+
36+ def pdc_flash_device_available ():
37+ # TODO(@gexiao): need better check
38+ return os .path .exists (FLASH_DEVICE )
39+
3140
3241class PDCErrorCode (Enum ):
3342 """Error Code For PDCTools usage"""
@@ -48,6 +57,7 @@ class PDCErrorCode(Enum):
4857 InvalidArgument = 1503
4958 CommandTimeout = 1504
5059 CheckSumCommandFail = 1505
60+ CopyTreeFailed = 1506
5161
5262 UnknownError = 1999
5363
@@ -493,14 +503,60 @@ def _download_file(self, remote_path: str, local_path: str) -> PDCErrorCode:
493503 raise Exception (f"exec cmd { download_cmd_args } with error: { e } " )
494504 return error_code
495505
496- def pdc_fc_generate_checksum (self , path : str ) -> PDCErrorCode :
506+ def _pdc_backup_failed_directory (self , path ):
507+ base_dir , target_path = os .path .split (os .path .normpath (path ))
508+ failed_path = os .path .join (base_dir , f"{ target_path } _failed" )
509+ if os .path .exists (path ):
510+ if os .path .exists (failed_path ):
511+ shutil .rmtree (failed_path )
512+ # Backup failed files for debug
513+ os .rename (path , failed_path )
514+
515+ def pdc_backup_to_flash_device (self , persistent_path : str , flash_device_path : str ) -> PDCErrorCode :
516+ """backup data to flash device
517+
518+ Args:
519+ persistent_path str: persistent path
520+ flash_device_path str: flash device path
521+ """
522+ if not os .path .exists (persistent_path ):
523+ logger .error (f"{ persistent_path } not exist" )
524+ return PDCErrorCode .LocalPathNotExist
525+
526+ logger .info ("starting backup to flash device..." )
527+
528+ # step 1: generate checksum for recovery
529+ result = self .pdc_generate_dir_checksum (persistent_path )
530+ if result != PDCErrorCode .Success :
531+ logger .error (f"[Error] [pdc_sdk] generating checksum for { persistent_path } failed" )
532+ return result
533+
534+ # step 2: copy persistent data to flash device
535+ try :
536+ copy_tree (persistent_path , flash_device_path )
537+ logger .info (f"backup { persistent_path } to { flash_device_path } successed." )
538+ except Exception as e :
539+ logger .error (f"[Error] [pdc_sdk] copy tree { persistent_path } to { flash_device_path } failed, error: { e } " )
540+ self ._pdc_backup_failed_directory (flash_device_path )
541+ return PDCErrorCode .CopyTreeFailed
542+
543+ # step 3: do checksum for storage on flash device
544+ result = self .pdc_flash_do_check (flash_device_path )
545+ if result == PDCErrorCode .Success :
546+ return result
547+
548+ logger .error (f"[Error] [pdc_sdk] checksum failed on { flash_device_path } after copy, backup for debug" )
549+ self ._pdc_backup_failed_directory (flash_device_path )
550+ return result
551+
552+ def pdc_generate_dir_checksum (self , path : str ) -> PDCErrorCode :
497553 """
498554 Args
499555 :param localPath:
500556 :return:
501557 """
502558 if not os .path .exists (path ):
503- logger .error (f"pdc_fc_generate_checksum gi{ path } not exist" )
559+ logger .error (f"pdc_generate_dir_checksum gi{ path } not exist" )
504560 return PDCErrorCode .CommandFail
505561 generate_checksum_args = [self ._pdc_agent_bin , "-mode" , "command" , "-type" , "generateSum" , "-path" , f"{ path } " ]
506562 error_code = PDCErrorCode .Success
@@ -514,14 +570,14 @@ def pdc_fc_generate_checksum(self, path: str) -> PDCErrorCode:
514570 return PDCErrorCode .CheckSumCommandFail
515571 return error_code
516572
517- def pdc_fc_do_check (self , path : str ) -> PDCErrorCode :
573+ def pdc_flash_do_check (self , path : str ) -> PDCErrorCode :
518574 """
519575 Args
520576 :param localPath:
521577 :return:
522578 """
523579 if not os .path .exists (path ):
524- logger .error (f"pdc_fc_do_check { path } not exist" )
580+ logger .error (f"pdc_flash_do_check { path } not exist" )
525581 return PDCErrorCode .CommandFail
526582 generate_checksum_args = [self ._pdc_agent_bin , "-mode" , "command" , "-type" , "checkSum" , "-path" , f"{ path } " ]
527583 error_code = PDCErrorCode .Success
@@ -530,8 +586,12 @@ def pdc_fc_do_check(self, path: str) -> PDCErrorCode:
530586 res , error_code = self ._exec_cmd (generate_checksum_args )
531587 if error_code == PDCErrorCode .Success :
532588 logger .info (f"check_sum { path } successfully" )
589+ else :
590+ logger .error (f"[Error] [pdc_sdk] check_sum { path } failed, error code: { error_code } " )
591+ self ._pdc_backup_failed_directory (path )
533592 except Exception as e :
534- logger .error (f"exec cmd { generate_checksum_args } with error: { e } " )
593+ logger .error (f"[Error] [pdc_sdk] exec cmd { generate_checksum_args } with error: { e } " )
594+ self ._pdc_backup_failed_directory (path )
535595 return PDCErrorCode .CheckSumCommandFail
536596 return error_code
537597
@@ -560,8 +620,10 @@ def _clean_tmp_files(self, tmp_files: List[str]):
560620 PDCErrorCode .AFSToolsNotExist : "afs tools not exist" ,
561621 PDCErrorCode .TrainConfigNotExist : "train config not exist" ,
562622 PDCErrorCode .LocalPathNotExist : "local path not exist" ,
563- PDCErrorCode .CommandFail : "download command fail" ,
623+ PDCErrorCode .CommandFail : "pdc agent command fail" ,
564624 PDCErrorCode .CalculateHashFail : "calculate hash fail" ,
565625 PDCErrorCode .InvalidArgument : "invalid argument" ,
566- PDCErrorCode .CommandTimeout : "command timeout" ,
626+ PDCErrorCode .CommandTimeout : "pdc agent command timeout" ,
627+ PDCErrorCode .CheckSumCommandFail : "checksum command fail" ,
628+ PDCErrorCode .CopyTreeFailed : "copy directory failed" ,
567629}
0 commit comments