Skip to content

Commit 4d6691a

Browse files
committed
Update buffer refactoring for PATCH
1 parent 28091a0 commit 4d6691a

File tree

2 files changed

+29
-38
lines changed

2 files changed

+29
-38
lines changed

internal/buffer_list.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,19 @@ func (l *BufferList) Filter(cb func(buf, prev *FileBuffer) (cont bool, del bool)
175175
})
176176
}
177177

178+
func (l *BufferList) Select(start, end uint64, cb func(buf *FileBuffer) (good bool)) (bufs []*FileBuffer) {
179+
l.Ascend(start, func(bufEnd uint64, buf *FileBuffer) (cont bool, chg bool) {
180+
if buf.offset >= end {
181+
return false, false
182+
}
183+
if cb(buf) {
184+
bufs = append(bufs, buf)
185+
}
186+
return true, false
187+
})
188+
return bufs
189+
}
190+
178191
func (l *BufferList) nextID() uint64 {
179192
l.curQueueID = l.curQueueID + 1
180193
return l.curQueueID
@@ -525,6 +538,7 @@ func (l *BufferList) SplitAt(offset uint64) {
525538
})
526539
}
527540

541+
// FIXME: Do not scan all buffers
528542
func (l *BufferList) AnyDirty() (dirty bool) {
529543
l.at.Scan(func(end uint64, b *FileBuffer) bool {
530544
if b.state == BUF_DIRTY {

internal/file.go

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,12 +1088,7 @@ func (inode *Inode) patchObjectRanges() (initiated bool) {
10881088
if inode.flushLimitsExceeded() {
10891089
return
10901090
}
1091-
var flushBufs []*FileBuffer
1092-
for _, buf := range inode.buffers {
1093-
if buf.state == BUF_DIRTY {
1094-
flushBufs = append(flushBufs, buf)
1095-
}
1096-
}
1091+
flushBufs := inode.buffers.Select(0, inode.Attributes.Size, func(buf *FileBuffer) bool { return buf.state == BUF_DIRTY; })
10971092
inode.patchSimpleObj(flushBufs)
10981093
return true
10991094
}
@@ -1133,27 +1128,11 @@ func (inode *Inode) patchObjectRanges() (initiated bool) {
11331128
continue
11341129
}
11351130

1136-
var flushBufs []*FileBuffer
1137-
for pos := locateBuffer(inode.buffers, partStart); pos < len(inode.buffers); pos++ {
1138-
buf := inode.buffers[pos]
1139-
if buf.offset >= partEnd {
1140-
break
1141-
}
1142-
if buf.state != BUF_DIRTY || buf.zero && !wantFlush && !appendPatch {
1143-
continue
1144-
}
1145-
1146-
if buf.offset < partStart {
1147-
inode.splitBuffer(pos, partStart-buf.offset)
1148-
continue
1149-
}
1150-
if buf.offset+buf.length > partEnd {
1151-
inode.splitBuffer(pos, partEnd-buf.offset)
1152-
}
1153-
1154-
flushBufs = append(flushBufs, buf)
1155-
}
1156-
1131+
inode.buffers.SplitAt(partStart)
1132+
inode.buffers.SplitAt(partEnd)
1133+
flushBufs := inode.buffers.Select(partStart, partEnd, func(buf *FileBuffer) bool {
1134+
return buf.state == BUF_DIRTY && (!buf.zero || wantFlush || appendPatch)
1135+
})
11571136
if len(flushBufs) != 0 {
11581137
inode.patchPart(partStart, partSize, flushBufs)
11591138
initiated = true
@@ -1222,7 +1201,7 @@ func (inode *Inode) patchFromBuffers(bufs []*FileBuffer, partSize uint64) {
12221201
// If bufs is a contiguous range of buffers then we can send them as PATCH immediately,
12231202
// otherwise we need to read missing ranges first.
12241203
var (
1225-
reader io.ReadSeeker
1204+
reader io.ReadSeeker
12261205
dirtyBufs map[uint64]bool
12271206
)
12281207
if contiguous {
@@ -1256,21 +1235,19 @@ func (inode *Inode) patchFromBuffers(bufs []*FileBuffer, partSize uint64) {
12561235
log.Warnf("Local state of file %s (inode %d) changed, aborting patch", key, inode.Id)
12571236
return
12581237
}
1259-
reader, dirtyBufs = inode.GetMultiReader(offset, size)
1238+
reader, dirtyBufs, err = inode.getMultiReader(offset, size)
1239+
if err != nil {
1240+
log.Errorf("File %s data in %v+%v is missing during PATCH attempt: %v", key, offset, size, err)
1241+
return
1242+
}
12601243
}
12611244

12621245
if ok := inode.sendPatch(offset, size, reader, partSize); !ok {
12631246
return
12641247
}
12651248

1266-
inodeClean := inode.userMetadataDirty == 0 && inode.oldParent == nil
1267-
for _, buf := range inode.buffers {
1268-
if dirtyBufs[buf.dirtyID] {
1269-
buf.dirtyID, buf.state = 0, BUF_CLEAN
1270-
}
1271-
inodeClean = inodeClean && buf.state == BUF_CLEAN
1272-
}
1273-
if inodeClean {
1249+
inode.buffers.SetState(offset, size, dirtyBufs, BUF_CLEAN)
1250+
if !inode.isStillDirty() {
12741251
inode.SetCacheState(ST_CACHED)
12751252
}
12761253
}
@@ -1323,7 +1300,7 @@ func (inode *Inode) sendPatch(offset, size uint64, r io.ReadSeeker, partSize uin
13231300
}
13241301

13251302
func (inode *Inode) discardChanges(offset, size uint64) {
1326-
allocated := inode.removeRange(offset, size, BUF_DIRTY)
1303+
allocated := inode.buffers.RemoveRange(offset, size, nil)
13271304
inode.fs.bufferPool.Use(allocated, true)
13281305
}
13291306

0 commit comments

Comments
 (0)