Skip to content

Commit 3d4da4f

Browse files
j2rong4cnSuyunmeng
authored andcommitted
feat(fs): full support webdav cross-driver copy and move (#823)
* fix(fs): restore webdav cross-driver copy and move * fix bug * webdav支持复制、移动 文件夹 * 优化 * 。
1 parent 8638fe6 commit 3d4da4f

File tree

4 files changed

+78
-115
lines changed

4 files changed

+78
-115
lines changed

internal/fs/archive.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,6 @@ func (t *ArchiveDownloadTask) Run() error {
5555
}
5656

5757
func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) {
58-
var err error
59-
if t.SrcStorage == nil {
60-
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
61-
if err != nil {
62-
return nil, err
63-
}
64-
}
6558
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
6659
if err != nil {
6760
return nil, err
@@ -111,7 +104,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
111104
baseName := strings.TrimSuffix(srcObj.GetName(), stdpath.Ext(srcObj.GetName()))
112105
uploadTask := &ArchiveContentUploadTask{
113106
TaskExtension: task.TaskExtension{
114-
Creator: t.GetCreator(),
107+
Creator: t.Creator,
115108
ApiUrl: t.ApiUrl,
116109
},
117110
ObjName: baseName,
@@ -179,13 +172,6 @@ func (t *ArchiveContentUploadTask) SetRetry(retry int, maxRetry int) {
179172
}
180173

181174
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *ArchiveContentUploadTask) error) error {
182-
var err error
183-
if t.dstStorage == nil {
184-
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
185-
if err != nil {
186-
return err
187-
}
188-
}
189175
info, err := os.Stat(t.FilePath)
190176
if err != nil {
191177
return err
@@ -224,7 +210,7 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *Arch
224210
}
225211
err = f(&ArchiveContentUploadTask{
226212
TaskExtension: task.TaskExtension{
227-
Creator: t.GetCreator(),
213+
Creator: t.Creator,
228214
ApiUrl: t.ApiUrl,
229215
},
230216
ObjName: entry.Name(),
@@ -243,11 +229,11 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *Arch
243229
return es
244230
}
245231
} else {
246-
t.SetTotalBytes(info.Size())
247232
file, err := os.Open(t.FilePath)
248233
if err != nil {
249234
return err
250235
}
236+
t.SetTotalBytes(info.Size())
251237
fs := &stream.FileStream{
252238
Obj: &model.Object{
253239
Name: t.ObjName,
@@ -379,13 +365,8 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
379365
return nil, err
380366
}
381367
}
382-
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
383368
tsk := &ArchiveDownloadTask{
384369
TaskData: TaskData{
385-
TaskExtension: task.TaskExtension{
386-
Creator: taskCreator,
387-
ApiUrl: common.GetApiUrl(ctx),
388-
},
389370
SrcStorage: srcStorage,
390371
DstStorage: dstStorage,
391372
SrcActualPath: srcObjActualPath,
@@ -396,19 +377,24 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
396377
ArchiveDecompressArgs: args,
397378
}
398379
if ctx.Value(conf.NoTaskKey) != nil {
380+
tsk.Base.SetCtx(ctx)
399381
uploadTask, err := tsk.RunWithoutPushUploadTask()
400382
if err != nil {
401383
return nil, errors.WithMessagef(err, "failed download [%s]", srcObjPath)
402384
}
403385
defer uploadTask.deleteSrcFile()
404386
var callback func(t *ArchiveContentUploadTask) error
405387
callback = func(t *ArchiveContentUploadTask) error {
388+
t.Base.SetCtx(ctx)
406389
e := t.RunWithNextTaskCallback(callback)
407390
t.deleteSrcFile()
408391
return e
409392
}
393+
uploadTask.Base.SetCtx(ctx)
410394
return nil, uploadTask.RunWithNextTaskCallback(callback)
411395
} else {
396+
tsk.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
397+
tsk.ApiUrl = common.GetApiUrl(ctx)
412398
ArchiveDownloadTaskManager.Add(tsk)
413399
return tsk, nil
414400
}

internal/fs/copy_move.go

Lines changed: 66 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/OpenListTeam/OpenList/v4/internal/conf"
10-
"github.com/OpenListTeam/OpenList/v4/internal/driver"
1110
"github.com/OpenListTeam/OpenList/v4/internal/errs"
1211
"github.com/OpenListTeam/OpenList/v4/internal/model"
1312
"github.com/OpenListTeam/OpenList/v4/internal/op"
@@ -52,17 +51,16 @@ func (t *FileTransferTask) Run() error {
5251
t.ClearEndTime()
5352
t.SetStartTime(time.Now())
5453
defer func() { t.SetEndTime(time.Now()) }()
55-
var err error
56-
if t.SrcStorage == nil {
57-
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
58-
}
59-
if t.DstStorage == nil {
60-
t.DstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
61-
}
62-
if err != nil {
63-
return errors.WithMessage(err, "failed get storage")
64-
}
65-
return putBetween2Storages(t, t.SrcStorage, t.DstStorage, t.SrcActualPath, t.DstActualPath)
54+
return t.RunWithNextTaskCallback(func(nextTask *FileTransferTask) error {
55+
nextTask.groupID = t.groupID
56+
task_group.TransferCoordinator.AddTask(t.groupID, nil)
57+
if t.TaskType == copy {
58+
CopyTaskManager.Add(nextTask)
59+
} else {
60+
MoveTaskManager.Add(nextTask)
61+
}
62+
return nil
63+
})
6664
}
6765

6866
func (t *FileTransferTask) OnSucceeded() {
@@ -109,51 +107,11 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
109107
return nil, err
110108
}
111109
}
112-
} else if ctx.Value(conf.NoTaskKey) != nil {
113-
return nil, fmt.Errorf("can't %s files between two storages, please use the front-end ", taskType)
114110
}
115111

116-
// if ctx.Value(conf.NoTaskKey) != nil { // webdav
117-
// srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath)
118-
// if err != nil {
119-
// return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
120-
// }
121-
// if !srcObj.IsDir() {
122-
// // copy file directly
123-
// link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{})
124-
// if err != nil {
125-
// return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath)
126-
// }
127-
// // any link provided is seekable
128-
// ss, err := stream.NewSeekableStream(&stream.FileStream{
129-
// Obj: srcObj,
130-
// Ctx: ctx,
131-
// }, link)
132-
// if err != nil {
133-
// _ = link.Close()
134-
// return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath)
135-
// }
136-
// if taskType == move {
137-
// defer func() {
138-
// task_group.TransferCoordinator.Done(dstDirPath, err == nil)
139-
// }()
140-
// task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
141-
// }
142-
// err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, taskType == move)
143-
// return nil, err
144-
// } else {
145-
// return nil, fmt.Errorf("can't %s dir two storages, please use the front-end ", taskType)
146-
// }
147-
// }
148-
149112
// not in the same storage
150-
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
151113
t := &FileTransferTask{
152114
TaskData: TaskData{
153-
TaskExtension: task.TaskExtension{
154-
Creator: taskCreator,
155-
ApiUrl: common.GetApiUrl(ctx),
156-
},
157115
SrcStorage: srcStorage,
158116
DstStorage: dstStorage,
159117
SrcActualPath: srcObjActualPath,
@@ -162,8 +120,34 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
162120
DstStorageMp: dstStorage.GetStorage().MountPath,
163121
},
164122
TaskType: taskType,
165-
groupID: dstDirPath,
166123
}
124+
125+
if ctx.Value(conf.NoTaskKey) != nil {
126+
var callback func(nextTask *FileTransferTask) error
127+
hasSuccess := false
128+
callback = func(nextTask *FileTransferTask) error {
129+
nextTask.Base.SetCtx(ctx)
130+
err := nextTask.RunWithNextTaskCallback(callback)
131+
if err == nil {
132+
hasSuccess = true
133+
}
134+
return err
135+
}
136+
t.Base.SetCtx(ctx)
137+
err = t.RunWithNextTaskCallback(callback)
138+
if hasSuccess || err == nil {
139+
if taskType == move {
140+
task_group.RefreshAndRemove(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
141+
} else {
142+
op.DeleteCache(t.DstStorage, dstDirActualPath)
143+
}
144+
}
145+
return nil, err
146+
}
147+
148+
t.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
149+
t.ApiUrl = common.GetApiUrl(ctx)
150+
t.groupID = dstDirPath
167151
if taskType == copy {
168152
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
169153
CopyTaskManager.Add(t)
@@ -174,76 +158,69 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
174158
return t, nil
175159
}
176160

177-
func putBetween2Storages(t *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error {
161+
func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransferTask) error) error {
178162
t.Status = "getting src object"
179-
srcObj, err := op.Get(t.Ctx(), srcStorage, srcActualPath)
163+
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
180164
if err != nil {
181-
return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath)
165+
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
182166
}
183167
if srcObj.IsDir() {
184168
t.Status = "src object is dir, listing objs"
185-
objs, err := op.List(t.Ctx(), srcStorage, srcActualPath, model.ListArgs{})
169+
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
186170
if err != nil {
187-
return errors.WithMessagef(err, "failed list src [%s] objs", srcActualPath)
171+
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
188172
}
189-
dstActualPath := stdpath.Join(dstDirActualPath, srcObj.GetName())
173+
dstActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
190174
if t.TaskType == copy {
191-
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
175+
if t.Ctx().Value(conf.NoTaskKey) != nil {
176+
defer op.DeleteCache(t.DstStorage, dstActualPath)
177+
} else {
178+
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
179+
}
192180
}
193181
for _, obj := range objs {
194182
if utils.IsCanceled(t.Ctx()) {
195183
return nil
196184
}
197-
task := &FileTransferTask{
185+
err = f(&FileTransferTask{
198186
TaskType: t.TaskType,
199187
TaskData: TaskData{
200188
TaskExtension: task.TaskExtension{
201-
Creator: t.GetCreator(),
189+
Creator: t.Creator,
202190
ApiUrl: t.ApiUrl,
203191
},
204-
SrcStorage: srcStorage,
205-
DstStorage: dstStorage,
206-
SrcActualPath: stdpath.Join(srcActualPath, obj.GetName()),
192+
SrcStorage: t.SrcStorage,
193+
DstStorage: t.DstStorage,
194+
SrcActualPath: stdpath.Join(t.SrcActualPath, obj.GetName()),
207195
DstActualPath: dstActualPath,
208-
SrcStorageMp: srcStorage.GetStorage().MountPath,
209-
DstStorageMp: dstStorage.GetStorage().MountPath,
196+
SrcStorageMp: t.SrcStorageMp,
197+
DstStorageMp: t.DstStorageMp,
210198
},
211-
groupID: t.groupID,
212-
}
213-
task_group.TransferCoordinator.AddTask(t.groupID, nil)
214-
if t.TaskType == copy {
215-
CopyTaskManager.Add(task)
216-
} else {
217-
MoveTaskManager.Add(task)
199+
})
200+
if err != nil {
201+
return err
218202
}
219203
}
220204
t.Status = fmt.Sprintf("src object is dir, added all %s tasks of objs", t.TaskType)
221205
return nil
222206
}
223-
return putFileBetween2Storages(t, srcStorage, dstStorage, srcActualPath, dstDirActualPath)
224-
}
225207

226-
func putFileBetween2Storages(tsk *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error {
227-
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcActualPath)
228-
if err != nil {
229-
return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath)
230-
}
231-
tsk.SetTotalBytes(srcFile.GetSize())
232-
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcActualPath, model.LinkArgs{})
208+
link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
233209
if err != nil {
234-
return errors.WithMessagef(err, "failed get [%s] link", srcActualPath)
210+
return errors.WithMessagef(err, "failed get [%s] link", t.SrcActualPath)
235211
}
236212
// any link provided is seekable
237213
ss, err := stream.NewSeekableStream(&stream.FileStream{
238-
Obj: srcFile,
239-
Ctx: tsk.Ctx(),
214+
Obj: srcObj,
215+
Ctx: t.Ctx(),
240216
}, link)
241217
if err != nil {
242218
_ = link.Close()
243-
return errors.WithMessagef(err, "failed get [%s] stream", srcActualPath)
219+
return errors.WithMessagef(err, "failed get [%s] stream", t.SrcActualPath)
244220
}
245-
tsk.SetTotalBytes(ss.GetSize())
246-
return op.Put(tsk.Ctx(), dstStorage, dstDirActualPath, ss, tsk.SetProgress, true)
221+
t.SetTotalBytes(ss.GetSize())
222+
t.Status = "uploading"
223+
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, ss, t.SetProgress, true)
247224
}
248225

249226
var (

internal/task_group/group.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/sirupsen/logrus"
77
)
88

9-
type OnCompletionFunc func(groupID string, payloads []any)
9+
type OnCompletionFunc func(groupID string, payloads ...any)
1010
type TaskGroupCoordinator struct {
1111
name string
1212
mu sync.Mutex
@@ -71,7 +71,7 @@ func (tgc *TaskGroupCoordinator) Done(groupID string, success bool) {
7171
if tgc.onCompletion != nil && state.hasSuccess {
7272
logrus.Debugf("OnCompletion:%s", groupID)
7373
tgc.mu.Unlock()
74-
tgc.onCompletion(groupID, payloads)
74+
tgc.onCompletion(groupID, payloads...)
7575
tgc.mu.Lock()
7676
}
7777
return

internal/task_group/transfer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type SrcPathToRemove string
1717
// ActualPath
1818
type DstPathToRefresh string
1919

20-
func refreshAndRemove(dstPath string, payloads []any) {
20+
func RefreshAndRemove(dstPath string, payloads ...any) {
2121
dstStorage, dstActualPath, err := op.GetStorageAndActualPath(dstPath)
2222
if err != nil {
2323
log.Error(errors.WithMessage(err, "failed get dst storage"))
@@ -100,4 +100,4 @@ func verifyAndRemove(ctx context.Context, srcStorage, dstStorage driver.Driver,
100100
return nil
101101
}
102102

103-
var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", refreshAndRemove)
103+
var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", RefreshAndRemove)

0 commit comments

Comments
 (0)