Skip to content

Commit e83f8e1

Browse files
authored
feat(offline-download): SimpleHttp: download stream direct upload (#523)
* feat(offline-download): stream download to upload * 重命名stream_put为upload_download_stream * chore
1 parent d707f00 commit e83f8e1

File tree

6 files changed

+80
-7
lines changed

6 files changed

+80
-7
lines changed

internal/fs/copy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616
"github.com/OpenListTeam/OpenList/v4/internal/task"
1717
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1818
"github.com/OpenListTeam/OpenList/v4/server/common"
19-
"github.com/pkg/errors"
2019
"github.com/OpenListTeam/tache"
20+
"github.com/pkg/errors"
2121
)
2222

2323
type CopyTask struct {

internal/fs/move.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616
"github.com/OpenListTeam/OpenList/v4/internal/task"
1717
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1818
"github.com/OpenListTeam/OpenList/v4/server/common"
19-
"github.com/pkg/errors"
2019
"github.com/OpenListTeam/tache"
20+
"github.com/pkg/errors"
2121
)
2222

2323
type MoveTask struct {

internal/offline_download/http/client.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/OpenListTeam/OpenList/v4/internal/model"
1313
"github.com/OpenListTeam/OpenList/v4/internal/offline_download/tool"
14+
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1415
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1516
)
1617

@@ -53,10 +54,18 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
5354
if err != nil {
5455
return err
5556
}
56-
req, err := http.NewRequestWithContext(task.Ctx(), http.MethodGet, u, nil)
57+
streamPut := task.DeletePolicy == tool.UploadDownloadStream
58+
method := http.MethodGet
59+
if streamPut {
60+
method = http.MethodHead
61+
}
62+
req, err := http.NewRequestWithContext(task.Ctx(), method, u, nil)
5763
if err != nil {
5864
return err
5965
}
66+
if streamPut {
67+
req.Header.Set("Range", "bytes=0-")
68+
}
6069
resp, err := s.client.Do(req)
6170
if err != nil {
6271
return err
@@ -74,6 +83,17 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
7483
if n, err := parseFilenameFromContentDisposition(resp.Header.Get("Content-Disposition")); err == nil {
7584
filename = n
7685
}
86+
fileSize := resp.ContentLength
87+
if streamPut {
88+
if fileSize == 0 {
89+
start, end, _ := http_range.ParseContentRange(resp.Header.Get("Content-Range"))
90+
fileSize = start + end
91+
}
92+
task.SetTotalBytes(fileSize)
93+
task.TempDir = filename
94+
return nil
95+
}
96+
task.SetTotalBytes(fileSize)
7797
// save to temp dir
7898
_ = os.MkdirAll(task.TempDir, os.ModePerm)
7999
filePath := filepath.Join(task.TempDir, filename)
@@ -82,8 +102,6 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
82102
return err
83103
}
84104
defer file.Close()
85-
fileSize := resp.ContentLength
86-
task.SetTotalBytes(fileSize)
87105
err = utils.CopyWithCtx(task.Ctx(), file, resp.Body, fileSize, task.SetProgress)
88106
return err
89107
}

internal/offline_download/tool/add.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
DeleteOnUploadFailed DeletePolicy = "delete_on_upload_failed"
3030
DeleteNever DeletePolicy = "delete_never"
3131
DeleteAlways DeletePolicy = "delete_always"
32+
UploadDownloadStream DeletePolicy = "upload_download_stream"
3233
)
3334

3435
type AddURLArgs struct {

internal/offline_download/tool/download.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ import (
66

77
"github.com/OpenListTeam/OpenList/v4/internal/conf"
88
"github.com/OpenListTeam/OpenList/v4/internal/errs"
9+
"github.com/OpenListTeam/OpenList/v4/internal/model"
10+
"github.com/OpenListTeam/OpenList/v4/internal/op"
911
"github.com/OpenListTeam/OpenList/v4/internal/setting"
1012
"github.com/OpenListTeam/OpenList/v4/internal/task"
13+
"github.com/OpenListTeam/tache"
1114
"github.com/pkg/errors"
1215
log "github.com/sirupsen/logrus"
13-
"github.com/OpenListTeam/tache"
1416
)
1517

1618
type DownloadTask struct {
@@ -171,6 +173,27 @@ func (t *DownloadTask) Transfer() error {
171173
}
172174
return nil
173175
}
176+
if t.DeletePolicy == UploadDownloadStream {
177+
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(t.DstDirPath)
178+
if err != nil {
179+
return errors.WithMessage(err, "failed get dst storage")
180+
}
181+
taskCreator, _ := t.Ctx().Value("user").(*model.User)
182+
task := &TransferTask{
183+
TaskExtension: task.TaskExtension{
184+
Creator: taskCreator,
185+
},
186+
SrcObjPath: t.TempDir,
187+
DstDirPath: dstDirActualPath,
188+
DstStorage: dstStorage,
189+
DstStorageMp: dstStorage.GetStorage().MountPath,
190+
DeletePolicy: t.DeletePolicy,
191+
Url: t.Url,
192+
}
193+
task.SetTotalBytes(t.GetTotalBytes())
194+
TransferTaskManager.Add(task)
195+
return nil
196+
}
174197
return transferStd(t.Ctx(), t.TempDir, t.DstDirPath, t.DeletePolicy)
175198
}
176199

internal/offline_download/tool/transfer.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ import (
1414
"github.com/OpenListTeam/OpenList/v4/internal/op"
1515
"github.com/OpenListTeam/OpenList/v4/internal/stream"
1616
"github.com/OpenListTeam/OpenList/v4/internal/task"
17+
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1718
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
19+
"github.com/OpenListTeam/tache"
1820
"github.com/pkg/errors"
1921
log "github.com/sirupsen/logrus"
20-
"github.com/OpenListTeam/tache"
2122
)
2223

2324
type TransferTask struct {
@@ -30,6 +31,7 @@ type TransferTask struct {
3031
SrcStorageMp string `json:"src_storage_mp"`
3132
DstStorageMp string `json:"dst_storage_mp"`
3233
DeletePolicy DeletePolicy `json:"delete_policy"`
34+
Url string `json:"-"`
3335
}
3436

3537
func (t *TransferTask) Run() error {
@@ -40,13 +42,42 @@ func (t *TransferTask) Run() error {
4042
t.SetStartTime(time.Now())
4143
defer func() { t.SetEndTime(time.Now()) }()
4244
if t.SrcStorage == nil {
45+
if t.DeletePolicy == UploadDownloadStream {
46+
rrc, err := stream.GetRangeReadCloserFromLink(t.GetTotalBytes(), &model.Link{URL: t.Url})
47+
if err != nil {
48+
return err
49+
}
50+
r, err := rrc.RangeRead(t.Ctx(), http_range.Range{Length: t.GetTotalBytes()})
51+
if err != nil {
52+
return err
53+
}
54+
name := t.SrcObjPath
55+
mimetype := utils.GetMimeType(name)
56+
s := &stream.FileStream{
57+
Ctx: nil,
58+
Obj: &model.Object{
59+
Name: name,
60+
Size: t.GetTotalBytes(),
61+
Modified: time.Now(),
62+
IsFolder: false,
63+
},
64+
Reader: r,
65+
Mimetype: mimetype,
66+
Closers: utils.NewClosers(rrc),
67+
}
68+
defer s.Close()
69+
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)
70+
}
4371
return transferStdPath(t)
4472
} else {
4573
return transferObjPath(t)
4674
}
4775
}
4876

4977
func (t *TransferTask) GetName() string {
78+
if t.DeletePolicy == UploadDownloadStream {
79+
return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcObjPath, t.Url, t.DstStorageMp, t.DstDirPath)
80+
}
5081
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
5182
}
5283

0 commit comments

Comments
 (0)