Skip to content

Commit 9fd1767

Browse files
committed
support sync limits
1 parent 6cb675a commit 9fd1767

File tree

6 files changed

+124
-7
lines changed

6 files changed

+124
-7
lines changed

cmd/sync.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ Supported storage systems: https://juicefs.com/docs/community/how_to_setup_objec
142142
Aliases: []string{"l"},
143143
Usage: "copy symlinks as symlinks",
144144
},
145+
&cli.Int64Flag{
146+
Name: "limits",
147+
Usage: "limit the number of objects that will be processed",
148+
Value: -1,
149+
},
145150
&cli.StringFlag{
146151
Name: "manager",
147152
Usage: "manager address",

docs/en/reference/command_reference.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,9 @@ The order in which `--exclude` and `--include` are set will affect the result. E
564564
`--links, -l`<br />
565565
copy symlinks as symlinks (default: false)
566566

567+
` --limits value`<br />
568+
limit the number of objects that will be processed (default: -1)
569+
567570
`--manager value`<br />
568571
manager address
569572

docs/zh_cn/reference/command_reference.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ juicefs sync [command options] SRC DST
562562
`--links, -l`<br />
563563
将符号链接复制为符号链接 (默认: false)
564564

565+
` --limits value`<br />
566+
限制将要处理的对象的数量 (默认: -1)
567+
565568
`--manager value`<br />
566569
管理者地址
567570

pkg/sync/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Config struct {
3535
Exclude []string
3636
Include []string
3737
Links bool
38+
Limits int64
3839
Manager string
3940
Workers []string
4041
BWLimit int
@@ -46,6 +47,10 @@ type Config struct {
4647
}
4748

4849
func NewConfigFromCli(c *cli.Context) *Config {
50+
if c.IsSet("limits") && c.Int64("limits") < 0 {
51+
logger.Fatal("The limits parameter must be a non-negative integer")
52+
}
53+
4954
return &Config{
5055
Start: c.String("start"),
5156
End: c.String("end"),
@@ -60,6 +65,7 @@ func NewConfigFromCli(c *cli.Context) *Config {
6065
Exclude: c.StringSlice("exclude"),
6166
Include: c.StringSlice("include"),
6267
Links: c.Bool("links"),
68+
Limits: c.Int64("limits"),
6369
Workers: c.StringSlice("worker"),
6470
Manager: c.String("manager"),
6571
BWLimit: c.Int("bwlimit"),

pkg/sync/sync.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -584,13 +584,20 @@ func (o *withFSize) Size() int64 {
584584
return o.nsize
585585
}
586586

587-
func deleteFromDst(tasks chan<- object.Object, dstobj object.Object, dirs bool) {
588-
if !dirs && dstobj.IsDir() {
587+
func deleteFromDst(tasks chan<- object.Object, dstobj object.Object, config *Config) bool {
588+
if !config.Dirs && dstobj.IsDir() {
589589
logger.Debug("Ignore deleting dst directory ", dstobj.Key())
590-
return
590+
return false
591+
}
592+
if config.Limits != -1 {
593+
if config.Limits == 0 {
594+
return true
595+
}
596+
config.Limits--
591597
}
592598
tasks <- &withSize{dstobj, markDeleteDst}
593599
handled.IncrTotal(1)
600+
return false
594601
}
595602

596603
func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config *Config) {
@@ -630,11 +637,19 @@ func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config
630637
logger.Debug("Ignore directory ", obj.Key())
631638
continue
632639
}
640+
if config.Limits != -1 {
641+
if config.Limits == 0 {
642+
return
643+
}
644+
config.Limits--
645+
}
633646
handled.IncrTotal(1)
634647

635648
if dstobj != nil && obj.Key() > dstobj.Key() {
636649
if config.DeleteDst {
637-
deleteFromDst(tasks, dstobj, config.Dirs)
650+
if deleteFromDst(tasks, dstobj, config) {
651+
return
652+
}
638653
}
639654
dstobj = nil
640655
}
@@ -648,7 +663,9 @@ func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config
648663
break
649664
}
650665
if config.DeleteDst {
651-
deleteFromDst(tasks, dstobj, config.Dirs)
666+
if deleteFromDst(tasks, dstobj, config) {
667+
return
668+
}
652669
}
653670
dstobj = nil
654671
}
@@ -680,11 +697,15 @@ func producer(tasks chan<- object.Object, src, dst object.ObjectStorage, config
680697
}
681698
if config.DeleteDst {
682699
if dstobj != nil {
683-
deleteFromDst(tasks, dstobj, config.Dirs)
700+
if deleteFromDst(tasks, dstobj, config) {
701+
return
702+
}
684703
}
685704
for dstobj = range dstkeys {
686705
if dstobj != nil {
687-
deleteFromDst(tasks, dstobj, config.Dirs)
706+
if deleteFromDst(tasks, dstobj, config) {
707+
return
708+
}
688709
}
689710
}
690711
}

pkg/sync/sync_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package sync
1717

1818
import (
1919
"bytes"
20+
"fmt"
2021
"io/ioutil"
2122
"math"
2223
"os"
2324
"reflect"
25+
"strings"
2426
"testing"
2527

2628
"github.com/juicedata/juicefs/pkg/object"
@@ -454,5 +456,82 @@ func TestSingleLink(t *testing.T) {
454456
if readlink != readlink2 || readlink != "/tmp/aa" {
455457
t.Fatalf("sync link failed")
456458
}
459+
}
460+
461+
func TestLimits(t *testing.T) {
462+
defer func() {
463+
_ = os.RemoveAll("/tmp/a/")
464+
_ = os.RemoveAll("/tmp/b/")
465+
_ = os.RemoveAll("/tmp/c/")
466+
}()
467+
a, _ := object.CreateStorage("file", "/tmp/a/", "", "")
468+
b, _ := object.CreateStorage("file", "/tmp/b/", "", "")
469+
c, _ := object.CreateStorage("file", "/tmp/c/", "", "")
470+
put := func(storage object.ObjectStorage, keys []string) {
471+
for _, key := range keys {
472+
if key != "" {
473+
_ = storage.Put(key, bytes.NewReader([]byte{}))
474+
}
475+
}
476+
}
477+
commonKeys := []string{"", "a1", "a2", "a3", "a4", "a5", "a6"}
478+
put(a, commonKeys)
479+
put(c, []string{"c1", "c2", "c3"})
480+
type subConfig struct {
481+
dst object.ObjectStorage
482+
limit int64
483+
deleteDst bool
484+
expectedKeys []string
485+
}
486+
testCases := []subConfig{
487+
{b, 2, false, []string{"", "a1", "a2"}},
488+
{b, -1, false, commonKeys},
489+
{b, 0, false, commonKeys},
490+
{c, 7, true, append(commonKeys, "c2", "c3")},
491+
}
492+
config := &Config{
493+
Threads: 50,
494+
Update: true,
495+
Perms: true,
496+
}
497+
setConfig := func(config *Config, subC subConfig) {
498+
config.Limits = subC.limit
499+
config.DeleteDst = subC.deleteDst
500+
}
457501

502+
for _, tcase := range testCases {
503+
setConfig(config, tcase)
504+
if err := Sync(a, tcase.dst, config); err != nil {
505+
t.Fatalf("sync: %s", err)
506+
}
507+
508+
all, err := tcase.dst.ListAll("", "")
509+
if err != nil {
510+
t.Fatalf("list all b: %s", err)
511+
}
512+
513+
err = testKeysEqual(all, tcase.expectedKeys)
514+
if err != nil {
515+
t.Fatalf("testKeysEqual fail: %s", err)
516+
}
517+
}
518+
}
519+
520+
func testKeysEqual(objsCh <-chan object.Object, expectedKeys []string) error {
521+
var gottenKeys []string
522+
for obj := range objsCh {
523+
gottenKeys = append(gottenKeys, obj.Key())
524+
}
525+
if len(gottenKeys) != len(expectedKeys) {
526+
return fmt.Errorf("expected {%s}, got {%s}", strings.Join(expectedKeys, ", "),
527+
strings.Join(gottenKeys, ", "))
528+
}
529+
530+
for idx, key := range gottenKeys {
531+
if key != expectedKeys[idx] {
532+
return fmt.Errorf("expected {%s}, got {%s}", strings.Join(expectedKeys, ", "),
533+
strings.Join(gottenKeys, ", "))
534+
}
535+
}
536+
return nil
458537
}

0 commit comments

Comments
 (0)