Skip to content

"Prefer patch uploads" option #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type FlagStorage struct {
PartSizes []PartSizeConfig
UsePatch bool
DropPatchConflicts bool
PreferPatchUploads bool

// Debugging
DebugMain bool
Expand Down
10 changes: 9 additions & 1 deletion internal/cfg/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,21 @@ MISC OPTIONS:

cli.BoolFlag{
Name: "enable-patch",
Usage: "Use PATCH method to upload object data changes to s3. Yandex only. (default: off)",
Usage: "Use PATCH method to upload object data changes to S3. All PATCH related flags are Yandex only. (default: off)",
},

cli.BoolFlag{
Name: "drop-patch-conflicts",
Usage: "Drop local changes in case of conflicting concurrent PATCH updates. (default: off)",
},

cli.BoolFlag{
Name: "prefer-patch-uploads",
Usage: "When uploading new objects, prefer PATCH requests to standard multipart upload process." +
"This allows for changes to appear faster in exchange for slower upload speed due to limited parallelism." +
"Must be used with --enable-patch flag (default: off)",
},

cli.IntFlag{
Name: "max-merge-copy",
Value: 0,
Expand Down Expand Up @@ -860,6 +867,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
CacheFileMode: os.FileMode(c.Int("cache-file-mode")),
UsePatch: c.Bool("enable-patch"),
DropPatchConflicts: c.Bool("drop-patch-conflicts"),
PreferPatchUploads: c.Bool("prefer-patch-uploads"),

// Common Backend Config
Endpoint: c.String("endpoint"),
Expand Down
107 changes: 88 additions & 19 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,11 @@ func (inode *Inode) sendUpload(priority int) bool {
if inode.IsFlushing > 0 {
return false
}
inode.sendStartMultipart()
if inode.fs.flags.UsePatch && inode.fs.flags.PreferPatchUploads {
inode.uploadMinMultipart()
} else {
inode.sendStartMultipart()
}
return true
}

Expand Down Expand Up @@ -954,6 +958,15 @@ func (inode *Inode) sendStartMultipart() {
atomic.AddInt64(&inode.fs.stats.flushes, 1)
atomic.AddInt64(&inode.fs.activeFlushers, 1)
go func() {
inode.beginMultipartUpload(cloud, key)
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}()
}

func (inode *Inode) beginMultipartUpload(cloud StorageBackend, key string) {
params := &MultipartBlobBeginInput{
Key: key,
ContentType: inode.fs.flags.GetMimeType(key),
Expand All @@ -973,11 +986,6 @@ func (inode *Inode) sendStartMultipart() {
log.Debugf("Started multi-part upload of object %v", key)
inode.mpu = resp
}
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}()
}

func (inode *Inode) sendUploadParts(priority int) (bool, bool) {
Expand Down Expand Up @@ -1090,6 +1098,61 @@ func (inode *Inode) uploadedAsMultipart() bool {
return strings.Contains(inode.knownETag, "-")
}

func (inode *Inode) uploadMinMultipart() {
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)

cloud, key := inode.cloud()
if inode.isDir() {
key += "/"
}

go func() {
defer func() {
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}()

atomic.AddInt64(&inode.fs.stats.flushes, 1)
inode.beginMultipartUpload(cloud, key)
if inode.mpu == nil {
return
}

if ok := inode.syncFlushPartsUpTo(2); !ok {
inode.abortMultipart()
return
}

partOffset, partSize := inode.fs.partRange(1)
if inode.Attributes.Size < partOffset+partSize {
partSize = inode.Attributes.Size - partOffset
}

atomic.AddInt64(&inode.fs.stats.flushes, 1)
inode.commitMultipartUpload(2, partOffset+partSize)
if inode.mpu != nil {
inode.abortMultipart()
}
}()
}

func (inode *Inode) syncFlushPartsUpTo(part uint64) bool {
if inode.mpu == nil {
return false
}
for i := uint64(0); i < part; i++ {
atomic.AddInt64(&inode.fs.stats.flushes, 1)
inode.flushPart(i)
if inode.mpu == nil || inode.mpu.Parts[i] == nil {
return false
}
}
return true
}

func (inode *Inode) patchObjectRanges() (initiated bool) {
smallFile := inode.Attributes.Size <= inode.fs.flags.SinglePartMB*1024*1024
wantFlush := inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0
Expand All @@ -1116,7 +1179,11 @@ func (inode *Inode) patchObjectRanges() (initiated bool) {
return false
}

_, prevSize := inode.fs.partRange(MaxUInt64(part-1, 0))
var prevPart uint64
if part > 0 {
prevPart = part - 1
}
_, prevSize := inode.fs.partRange(prevPart)

partEnd, rangeBorder := partStart+partSize, partSize != prevSize
appendPatch, newPart := partEnd > inode.knownSize, partStart == inode.knownSize
Expand Down Expand Up @@ -1345,6 +1412,15 @@ func (inode *Inode) resetCache() {
}
// And abort multipart upload, too
if inode.mpu != nil {
inode.abortMultipart()
}
inode.userMetadataDirty = 0
inode.SetCacheState(ST_CACHED)
// Invalidate metadata entry
inode.SetAttrTime(time.Time{})
}

func (inode *Inode) abortMultipart() {
cloud, key := inode.cloud()
go func(mpu *MultipartBlobCommitInput) {
_, abortErr := cloud.MultipartBlobAbort(mpu)
Expand All @@ -1354,11 +1430,6 @@ func (inode *Inode) resetCache() {
}(inode.mpu)
inode.mpu = nil
}
inode.userMetadataDirty = 0
inode.SetCacheState(ST_CACHED)
// Invalidate metadata entry
inode.SetAttrTime(time.Time{})
}

func (inode *Inode) flushSmallObject() {

Expand Down Expand Up @@ -1422,13 +1493,7 @@ func (inode *Inode) flushSmallObject() {
if inode.mpu != nil {
// Abort and forget abort multipart upload, because otherwise we may
// not be able to proceed to rename - it waits until inode.mpu == nil
go func(mpu *MultipartBlobCommitInput) {
_, abortErr := cloud.MultipartBlobAbort(mpu)
if abortErr != nil {
log.Warnf("Failed to abort multi-part upload of object %v: %v", key, abortErr)
}
}(inode.mpu)
inode.mpu = nil
inode.abortMultipart()
}
inode.mu.Unlock()
inode.fs.addInflightChange(key)
Expand Down Expand Up @@ -1677,6 +1742,10 @@ func (inode *Inode) completeMultipart() {
// Error, or already flushed, or conflict => do not complete
return
}
inode.commitMultipartUpload(numParts, finalSize)
}

func (inode *Inode) commitMultipartUpload(numParts, finalSize uint64) {
cloud, key := inode.cloud()
if inode.oldParent != nil {
// Always apply modifications before moving
Expand Down