@@ -21,6 +21,7 @@ import (
21
21
"os"
22
22
"path"
23
23
"sort"
24
+ "strings"
24
25
"sync"
25
26
"sync/atomic"
26
27
"syscall"
@@ -1516,7 +1517,25 @@ func (inode *Inode) SendUpload() bool {
1516
1517
}
1517
1518
}
1518
1519
1519
- if inode .Attributes .Size <= inode .fs .flags .SinglePartMB * 1024 * 1024 && inode .mpu == nil {
1520
+ if inode .IsFlushing >= inode .fs .flags .MaxParallelParts {
1521
+ return false
1522
+ }
1523
+
1524
+ smallFile := inode .Attributes .Size <= inode .fs .flags .SinglePartMB * 1024 * 1024
1525
+ canPatch := inode .fs .flags .UsePatch &&
1526
+ // Can only patch modified inodes with completed MPUs.
1527
+ inode .CacheState == ST_MODIFIED && inode .mpu == nil &&
1528
+ // In current implemetation we should not patch big simple objects. Reupload them as multiparts first.
1529
+ // If current ETag is unknown, try patching anyway, so that we don't trigger an unecessary mpu.
1530
+ (inode .uploadedAsMultipart () || inode .knownETag == "" || smallFile ) &&
1531
+ // Currently PATCH does not support truncates. If the file was truncated, reupload it.
1532
+ inode .knownSize <= inode .Attributes .Size
1533
+
1534
+ if canPatch {
1535
+ return inode .patchObjectRanges ()
1536
+ }
1537
+
1538
+ if smallFile && inode .mpu == nil {
1520
1539
// Don't flush small files with active file handles (if not under memory pressure)
1521
1540
if inode .IsFlushing == 0 && (inode .fileHandles == 0 || inode .forceFlush || atomic .LoadInt32 (& inode .fs .wantFree ) > 0 ) {
1522
1541
// Don't accidentally trigger a parallel multipart flush
@@ -1528,12 +1547,12 @@ func (inode *Inode) SendUpload() bool {
1528
1547
return false
1529
1548
}
1530
1549
1531
- if inode .IsFlushing >= inode .fs .flags .MaxParallelParts {
1532
- return false
1533
- }
1534
-
1535
1550
// Initiate multipart upload, if not yet
1536
1551
if inode .mpu == nil {
1552
+ // Wait for other updates to complete.
1553
+ if inode .IsFlushing > 0 {
1554
+ return false
1555
+ }
1537
1556
inode .IsFlushing += inode .fs .flags .MaxParallelParts
1538
1557
atomic .AddInt64 (& inode .fs .activeFlushers , 1 )
1539
1558
go func () {
@@ -1662,6 +1681,246 @@ func (inode *Inode) SendUpload() bool {
1662
1681
return initiated
1663
1682
}
1664
1683
1684
+ func (inode * Inode ) uploadedAsMultipart () bool {
1685
+ return strings .Contains (inode .knownETag , "-" )
1686
+ }
1687
+
1688
+ func (inode * Inode ) patchObjectRanges () (initiated bool ) {
1689
+ smallFile := inode .Attributes .Size <= inode .fs .flags .SinglePartMB * 1024 * 1024
1690
+ wantFlush := inode .fileHandles == 0 || inode .forceFlush || atomic .LoadInt32 (& inode .fs .wantFree ) > 0
1691
+
1692
+ if smallFile && wantFlush {
1693
+ if inode .flushLimitsExceeded () {
1694
+ return
1695
+ }
1696
+ var flushBufs []* FileBuffer
1697
+ for _ , buf := range inode .buffers {
1698
+ if buf .state == BUF_DIRTY {
1699
+ flushBufs = append (flushBufs , buf )
1700
+ }
1701
+ }
1702
+ inode .patchSimpleObj (flushBufs )
1703
+ return true
1704
+ }
1705
+
1706
+ updatedPartID := inode .fs .partNum (inode .lastWriteEnd )
1707
+ endPartID := inode .fs .partNum (inode .Attributes .Size - 1 )
1708
+
1709
+ var prevSize uint64
1710
+ for part := uint64 (0 ); part <= endPartID ; part ++ {
1711
+ if inode .flushLimitsExceeded () {
1712
+ break
1713
+ }
1714
+
1715
+ partStart , partSize := inode .fs .partRange (part )
1716
+ // In its current implementation PATCH doesn't support ranges with start offset larger than object size.
1717
+ if partStart > inode .knownSize {
1718
+ break
1719
+ }
1720
+
1721
+ partEnd , rangeBorder := partStart + partSize , partSize != prevSize
1722
+ appendPatch , newPart := partEnd > inode .knownSize , partStart == inode .knownSize
1723
+
1724
+ // When entering a new part range, we can't immediately switch to the new part size,
1725
+ // because we need to init a new part first.
1726
+ if newPart && rangeBorder && prevSize > 0 {
1727
+ partEnd , partSize = partStart + prevSize , prevSize
1728
+ }
1729
+ prevSize = partSize
1730
+
1731
+ smallTail := appendPatch && inode .Attributes .Size - partStart < partSize
1732
+ if smallTail && ! wantFlush {
1733
+ break
1734
+ }
1735
+
1736
+ partLocked := inode .IsRangeLocked (partStart , partEnd , true )
1737
+ if ! wantFlush && part == updatedPartID || partLocked {
1738
+ continue
1739
+ }
1740
+
1741
+ var flushBufs []* FileBuffer
1742
+ for pos := locateBuffer (inode .buffers , partStart ); pos < len (inode .buffers ); pos ++ {
1743
+ buf := inode .buffers [pos ]
1744
+ if buf .offset >= partEnd {
1745
+ break
1746
+ }
1747
+ if buf .state != BUF_DIRTY || buf .zero && ! wantFlush && ! appendPatch {
1748
+ continue
1749
+ }
1750
+
1751
+ if buf .offset < partStart {
1752
+ inode .splitBuffer (pos , partStart - buf .offset )
1753
+ continue
1754
+ }
1755
+ if buf .offset + buf .length > partEnd {
1756
+ inode .splitBuffer (pos , partEnd - buf .offset )
1757
+ }
1758
+
1759
+ flushBufs = append (flushBufs , buf )
1760
+ }
1761
+
1762
+ if len (flushBufs ) != 0 {
1763
+ inode .patchPart (partStart , partSize , flushBufs )
1764
+ initiated = true
1765
+ }
1766
+ }
1767
+ return
1768
+ }
1769
+
1770
+ func (inode * Inode ) flushLimitsExceeded () bool {
1771
+ return atomic .LoadInt64 (& inode .fs .activeFlushers ) >= inode .fs .flags .MaxFlushers ||
1772
+ inode .IsFlushing >= inode .fs .flags .MaxParallelParts
1773
+ }
1774
+
1775
+ func (inode * Inode ) patchSimpleObj (bufs []* FileBuffer ) {
1776
+ inode .LockRange (0 , inode .Attributes .Size , true )
1777
+ inode .IsFlushing += inode .fs .flags .MaxParallelParts
1778
+ atomic .AddInt64 (& inode .fs .activeFlushers , 1 )
1779
+
1780
+ go func () {
1781
+ inode .mu .Lock ()
1782
+ inode .patchFromBuffers (bufs , 0 )
1783
+
1784
+ inode .UnlockRange (0 , inode .Attributes .Size , true )
1785
+ inode .IsFlushing -= inode .fs .flags .MaxParallelParts
1786
+ inode .mu .Unlock ()
1787
+
1788
+ atomic .AddInt64 (& inode .fs .activeFlushers , - 1 )
1789
+ inode .fs .WakeupFlusher ()
1790
+ }()
1791
+ }
1792
+
1793
+ func (inode * Inode ) patchPart (partOffset , partSize uint64 , bufs []* FileBuffer ) {
1794
+ inode .LockRange (partOffset , partSize , true )
1795
+ inode .IsFlushing ++
1796
+ atomic .AddInt64 (& inode .fs .activeFlushers , 1 )
1797
+
1798
+ go func () {
1799
+ inode .mu .Lock ()
1800
+ inode .patchFromBuffers (bufs , partSize )
1801
+
1802
+ inode .UnlockRange (partOffset , partSize , true )
1803
+ inode .IsFlushing --
1804
+ inode .mu .Unlock ()
1805
+
1806
+ atomic .AddInt64 (& inode .fs .activeFlushers , - 1 )
1807
+ inode .fs .WakeupFlusher ()
1808
+ }()
1809
+ }
1810
+
1811
+ func (inode * Inode ) patchFromBuffers (bufs []* FileBuffer , partSize uint64 ) {
1812
+ if len (bufs ) == 0 {
1813
+ return
1814
+ }
1815
+
1816
+ first , last := bufs [0 ], bufs [len (bufs )- 1 ]
1817
+ offset , size := first .offset , last .offset + last .length - first .offset
1818
+
1819
+ var bufsSize uint64
1820
+ for _ , b := range bufs {
1821
+ bufsSize += b .length
1822
+ }
1823
+ contiguous := bufsSize == size
1824
+
1825
+ // If bufs is a contiguous range of buffers then we can send them as PATCH immediately,
1826
+ // otherwise we need to read missing ranges first.
1827
+ var reader io.ReadSeeker
1828
+ if contiguous {
1829
+ r := NewMultiReader ()
1830
+ for _ , buf := range bufs {
1831
+ if ! buf .zero {
1832
+ r .AddBuffer (buf .data )
1833
+ } else {
1834
+ r .AddZero (buf .length )
1835
+ }
1836
+ }
1837
+ reader = r
1838
+ } else {
1839
+ key := inode .FullName ()
1840
+ _ , err := inode .LoadRange (offset , size , 0 , true )
1841
+ if err != nil {
1842
+ switch mapAwsError (err ) {
1843
+ case syscall .ENOENT , syscall .ERANGE :
1844
+ s3Log .Warnf ("File %s (inode %d) is deleted or resized remotely, discarding all local changes" , key , inode .Id )
1845
+ inode .resetCache ()
1846
+ default :
1847
+ log .Errorf ("Failed to load range %d-%d of file %s (inode %d) to patch it: %s" , offset , offset + size , key , inode .Id , err )
1848
+ }
1849
+ return
1850
+ }
1851
+ // File size or inode state may have been changed again, abort patch. These are local changes,
1852
+ // so we don't need to drop any cached state here.
1853
+ if inode .Attributes .Size < offset || inode .CacheState != ST_MODIFIED {
1854
+ log .Warnf ("Local state of file %s (inode %d) changed, aborting patch" , key , inode .Id )
1855
+ return
1856
+ }
1857
+ reader , _ = inode .GetMultiReader (offset , size )
1858
+ }
1859
+
1860
+ if ok := inode .sendPatch (offset , size , reader , partSize ); ! ok {
1861
+ return
1862
+ }
1863
+
1864
+ for _ , b := range bufs {
1865
+ b .state , b .dirtyID = BUF_CLEAN , 0
1866
+ }
1867
+ if ! inode .isStillDirty () {
1868
+ inode .SetCacheState (ST_CACHED )
1869
+ }
1870
+ }
1871
+
1872
+ func (inode * Inode ) sendPatch (offset , size uint64 , r io.ReadSeeker , partSize uint64 ) bool {
1873
+ cloud , key := inode .cloud ()
1874
+ if inode .oldParent != nil {
1875
+ _ , key = inode .oldParent .cloud ()
1876
+ key = appendChildName (key , inode .oldName )
1877
+ }
1878
+ log .Debugf ("Patching range %d-%d of file %s (inode %d)" , offset , offset + size , key , inode .Id )
1879
+
1880
+ inode .mu .Unlock ()
1881
+ inode .fs .addInflightChange (key )
1882
+ resp , err := cloud .PatchBlob (& PatchBlobInput {
1883
+ Key : key ,
1884
+ Offset : offset ,
1885
+ Size : size ,
1886
+ AppendPartSize : int64 (partSize ),
1887
+ Body : r ,
1888
+ })
1889
+ inode .fs .completeInflightChange (key )
1890
+ inode .mu .Lock ()
1891
+
1892
+ // File was deleted while we were flushing it
1893
+ if inode .CacheState == ST_DELETED {
1894
+ return false
1895
+ }
1896
+
1897
+ inode .recordFlushError (err )
1898
+ if err != nil {
1899
+ switch mapAwsError (err ) {
1900
+ case syscall .ENOENT , syscall .ERANGE :
1901
+ s3Log .Warnf ("File %s (inode %d) is deleted or resized remotely, discarding all local changes" , key , inode .Id )
1902
+ inode .resetCache ()
1903
+ case syscall .EBUSY :
1904
+ s3Log .Warnf ("Failed to patch range %d-%d of file %s (inode %d) due to concurrent updates" , offset , offset + size , key , inode .Id )
1905
+ if inode .fs .flags .DropPatchConflicts {
1906
+ inode .discardChanges (offset , size )
1907
+ }
1908
+ default :
1909
+ log .Errorf ("Failed to patch range %d-%d of file %s (inode %d): %s" , offset , offset + size , key , inode .Id , err )
1910
+ }
1911
+ return false
1912
+ }
1913
+
1914
+ log .Debugf ("Succesfully patched range %d-%d of file %s (inode %d), etag: %s" , offset , offset + size , key , inode .Id , NilStr (resp .ETag ))
1915
+ inode .updateFromFlush (MaxUInt64 (inode .knownSize , offset + size ), resp .ETag , resp .LastModified , nil )
1916
+ return true
1917
+ }
1918
+
1919
+ func (inode * Inode ) discardChanges (offset , size uint64 ) {
1920
+ allocated := inode .removeRange (offset , size , BUF_DIRTY )
1921
+ inode .fs .bufferPool .Use (allocated , true )
1922
+ }
1923
+
1665
1924
func (inode * Inode ) isStillDirty () bool {
1666
1925
if inode .userMetadataDirty != 0 || inode .oldParent != nil {
1667
1926
return true
0 commit comments