Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ Supported storage systems: https://juicefs.com/docs/community/how_to_setup_objec
Aliases: []string{"l"},
Usage: "copy symlinks as symlinks",
},
&cli.Int64Flag{
Name: "limit",
Usage: "limit the number of objects that will be processed",
Value: -1,
},
&cli.StringFlag{
Name: "manager",
Usage: "manager address",
Expand Down
3 changes: 3 additions & 0 deletions docs/en/reference/command_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,9 @@ The order in which `--exclude` and `--include` are set will affect the result. E
`--links, -l`<br />
copy symlinks as symlinks (default: false)

` --limit value`<br />
limit the number of objects that will be processed (default: -1)

`--manager value`<br />
manager address

Expand Down
3 changes: 3 additions & 0 deletions docs/zh_cn/reference/command_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ juicefs sync [command options] SRC DST
`--links, -l`<br />
将符号链接复制为符号链接 (默认: false)

` --limit value`<br />
限制将要处理的对象的数量 (默认: -1)

`--manager value`<br />
管理者地址

Expand Down
6 changes: 6 additions & 0 deletions pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
Exclude []string
Include []string
Links bool
Limit int64
Manager string
Workers []string
BWLimit int
Expand All @@ -46,6 +47,10 @@ type Config struct {
}

func NewConfigFromCli(c *cli.Context) *Config {
if c.IsSet("limits") && c.Int64("limits") < 0 {
logger.Fatal("The limits parameter must be a non-negative integer")
}

return &Config{
Start: c.String("start"),
End: c.String("end"),
Expand All @@ -60,6 +65,7 @@ func NewConfigFromCli(c *cli.Context) *Config {
Exclude: c.StringSlice("exclude"),
Include: c.StringSlice("include"),
Links: c.Bool("links"),
Limit: c.Int64("limit"),
Workers: c.StringSlice("worker"),
Manager: c.String("manager"),
BWLimit: c.Int("bwlimit"),
Expand Down
35 changes: 28 additions & 7 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,13 +584,20 @@ func (o *withFSize) Size() int64 {
return o.nsize
}

func deleteFromDst(tasks chan<- object.Object, dstobj object.Object, dirs bool) {
if !dirs && dstobj.IsDir() {
func deleteFromDst(tasks chan<- object.Object, dstobj object.Object, config *Config) bool {
if !config.Dirs && dstobj.IsDir() {
logger.Debug("Ignore deleting dst directory ", dstobj.Key())
return
return false
}
if config.Limit != -1 {
if config.Limit == 0 {
return true
}
config.Limit--
}
tasks <- &withSize{dstobj, markDeleteDst}
handled.IncrTotal(1)
return false
}

func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config *Config) {
Expand Down Expand Up @@ -630,11 +637,19 @@ func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config
logger.Debug("Ignore directory ", obj.Key())
continue
}
if config.Limit != -1 {
if config.Limit == 0 {
return
}
config.Limit--
}
handled.IncrTotal(1)

if dstobj != nil && obj.Key() > dstobj.Key() {
if config.DeleteDst {
deleteFromDst(tasks, dstobj, config.Dirs)
if deleteFromDst(tasks, dstobj, config) {
return
}
}
dstobj = nil
}
Expand All @@ -648,7 +663,9 @@ func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config
break
}
if config.DeleteDst {
deleteFromDst(tasks, dstobj, config.Dirs)
if deleteFromDst(tasks, dstobj, config) {
return
}
}
dstobj = nil
}
Expand Down Expand Up @@ -680,11 +697,15 @@ func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config
}
if config.DeleteDst {
if dstobj != nil {
deleteFromDst(tasks, dstobj, config.Dirs)
if deleteFromDst(tasks, dstobj, config) {
return
}
}
for dstobj = range dstkeys {
if dstobj != nil {
deleteFromDst(tasks, dstobj, config.Dirs)
if deleteFromDst(tasks, dstobj, config) {
return
}
}
}
}
Expand Down
84 changes: 84 additions & 0 deletions pkg/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package sync

import (
"bytes"
"fmt"
"io/ioutil"
"math"
"os"
"reflect"
"strings"
"testing"

"github.com/juicedata/juicefs/pkg/object"
Expand Down Expand Up @@ -98,6 +100,7 @@ func TestSync(t *testing.T) {
Perms: true,
Dry: false,
DeleteSrc: false,
Limit: -1,
DeleteDst: false,
Exclude: []string{"c*"},
Include: []string{"a[1-9]", "a*"},
Expand Down Expand Up @@ -185,6 +188,7 @@ func TestSyncIncludeAndExclude(t *testing.T) {
DeleteSrc: false,
DeleteDst: false,
Verbose: false,
Limit: -1,
Quiet: true,
Exclude: []string{"1"},
}
Expand Down Expand Up @@ -353,6 +357,7 @@ func TestSyncLink(t *testing.T) {
Perms: true,
Links: true,
Quiet: true,
Limit: -1,
ForceUpdate: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -408,6 +413,7 @@ func TestSyncLinkWithOutFollow(t *testing.T) {
Perms: true,
Quiet: true,
ForceUpdate: true,
Limit: -1,
}); err != nil {
t.Fatalf("sync: %s", err)
}
Expand Down Expand Up @@ -441,6 +447,7 @@ func TestSingleLink(t *testing.T) {
Perms: true,
Links: true,
Quiet: true,
Limit: -1,
ForceUpdate: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand All @@ -454,5 +461,82 @@ func TestSingleLink(t *testing.T) {
if readlink != readlink2 || readlink != "/tmp/aa" {
t.Fatalf("sync link failed")
}
}

func TestLimits(t *testing.T) {
defer func() {
_ = os.RemoveAll("/tmp/a/")
_ = os.RemoveAll("/tmp/b/")
_ = os.RemoveAll("/tmp/c/")
}()
a, _ := object.CreateStorage("file", "/tmp/a/", "", "")
b, _ := object.CreateStorage("file", "/tmp/b/", "", "")
c, _ := object.CreateStorage("file", "/tmp/c/", "", "")
put := func(storage object.ObjectStorage, keys []string) {
for _, key := range keys {
if key != "" {
_ = storage.Put(key, bytes.NewReader([]byte{}))
}
}
}
commonKeys := []string{"", "a1", "a2", "a3", "a4", "a5", "a6"}
put(a, commonKeys)
put(c, []string{"c1", "c2", "c3"})
type subConfig struct {
dst object.ObjectStorage
limit int64
deleteDst bool
expectedKeys []string
}
testCases := []subConfig{
{b, 2, false, []string{"", "a1", "a2"}},
{b, -1, false, commonKeys},
{b, 0, false, commonKeys},
{c, 7, true, append(commonKeys, "c2", "c3")},
}
config := &Config{
Threads: 50,
Update: true,
Perms: true,
}
setConfig := func(config *Config, subC subConfig) {
config.Limit = subC.limit
config.DeleteDst = subC.deleteDst
}

for _, tcase := range testCases {
setConfig(config, tcase)
if err := Sync(a, tcase.dst, config); err != nil {
t.Fatalf("sync: %s", err)
}

all, err := tcase.dst.ListAll("", "")
if err != nil {
t.Fatalf("list all b: %s", err)
}

err = testKeysEqual(all, tcase.expectedKeys)
if err != nil {
t.Fatalf("testKeysEqual fail: %s", err)
}
}
}

func testKeysEqual(objsCh <-chan object.Object, expectedKeys []string) error {
var gottenKeys []string
for obj := range objsCh {
gottenKeys = append(gottenKeys, obj.Key())
}
if len(gottenKeys) != len(expectedKeys) {
return fmt.Errorf("expected {%s}, got {%s}", strings.Join(expectedKeys, ", "),
strings.Join(gottenKeys, ", "))
}

for idx, key := range gottenKeys {
if key != expectedKeys[idx] {
return fmt.Errorf("expected {%s}, got {%s}", strings.Join(expectedKeys, ", "),
strings.Join(gottenKeys, ", "))
}
}
return nil
}