Skip to content

Commit 132a835

Browse files
committed
add chunk stitching; twice as fast long-distance uploads:
rather than sending each file chunk as a separate HTTP request, sibling chunks will now be fused together into larger HTTP POSTs which results in unreasonably huge speed boosts on some routes ( `2.6x` from Norway to US-East, `1.6x` from US-West to Finland ) the `x-up2k-hash` request header now takes a comma-separated list of chunk hashes, which must all be sibling chunks, resulting in one large consecutive range of file data as the post body a new global-option `--u2sz`, default `1,64,96`, sets the target request size as 64 MiB, allowing the settings ui to specify any value between 1 and 96 MiB, which is cloudflare's max value this does not cause any issues for resumable uploads; thanks to the streaming HTTP POST parser, each chunk will be verified and written to disk as they arrive, meaning only the untransmitted chunks will have to be resent in the event of a connection drop -- of course assuming there are no misconfigured WAFs or caching-proxies the previous up2k approach of uploading each chunk in a separate HTTP POST was inefficient in many real-world scenarios, mainly due to TCP window-scaling behaving erratically in some IXPs / along some routes a particular link from Norway to Virginia,US is unusably slow for the first 4 MiB, only reaching optimal speeds after 100 MiB, and then immediately resets the scale when the request has been sent; connection reuse does not help in this case on this route, the basic-uploader was somehow faster than up2k with 6 parallel uploads; only time i've seen this
1 parent e565ad5 commit 132a835

File tree

9 files changed

+217
-115
lines changed

9 files changed

+217
-115
lines changed

bin/u2c.py

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#!/usr/bin/env python3
22
from __future__ import print_function, unicode_literals
33

4-
S_VERSION = "1.18"
5-
S_BUILD_DT = "2024-06-01"
4+
S_VERSION = "1.19"
5+
S_BUILD_DT = "2024-07-21"
66

77
"""
88
u2c.py: upload to copyparty
@@ -119,6 +119,7 @@ def __init__(self, top, rel, size, lmod):
119119
self.nhs = 0
120120

121121
# set by upload
122+
self.nojoin = 0 # type: int
122123
self.up_b = 0 # type: int
123124
self.up_c = 0 # type: int
124125
self.cd = 0
@@ -130,10 +131,20 @@ def __init__(self, top, rel, size, lmod):
130131
class FileSlice(object):
131132
"""file-like object providing a fixed window into a file"""
132133

133-
def __init__(self, file, cid):
134+
def __init__(self, file, cids):
134135
# type: (File, str) -> None
135136

136-
self.car, self.len = file.kchunks[cid]
137+
self.file = file
138+
self.cids = cids
139+
140+
self.car, tlen = file.kchunks[cids[0]]
141+
for cid in cids[1:]:
142+
ofs, clen = file.kchunks[cid]
143+
if ofs != self.car + tlen:
144+
raise Exception(9)
145+
tlen += clen
146+
147+
self.len = tlen
137148
self.cdr = self.car + self.len
138149
self.ofs = 0 # type: int
139150
self.f = open(file.abs, "rb", 512 * 1024)
@@ -636,13 +647,13 @@ def handshake(ar, file, search):
636647
return r["hash"], r["sprs"]
637648

638649

639-
def upload(file, cid, pw, stats):
640-
# type: (File, str, str, str) -> None
641-
"""upload one specific chunk, `cid` (a chunk-hash)"""
650+
def upload(fsl, pw, stats):
651+
# type: (FileSlice, str, str) -> None
652+
"""upload a range of file data, defined by one or more `cid` (chunk-hash)"""
642653

643654
headers = {
644-
"X-Up2k-Hash": cid,
645-
"X-Up2k-Wark": file.wark,
655+
"X-Up2k-Hash": ",".join(fsl.cids),
656+
"X-Up2k-Wark": fsl.file.wark,
646657
"Content-Type": "application/octet-stream",
647658
}
648659

@@ -652,15 +663,20 @@ def upload(file, cid, pw, stats):
652663
if pw:
653664
headers["Cookie"] = "=".join(["cppwd", pw])
654665

655-
f = FileSlice(file, cid)
656666
try:
657-
r = req_ses.post(file.url, headers=headers, data=f)
667+
r = req_ses.post(fsl.file.url, headers=headers, data=fsl)
668+
669+
if r.status_code == 400:
670+
txt = r.text
671+
if "already got that" in txt or "already being written" in txt:
672+
fsl.file.nojoin = 1
673+
658674
if not r:
659675
raise Exception(repr(r))
660676

661677
_ = r.content
662678
finally:
663-
f.f.close()
679+
fsl.f.close()
664680

665681

666682
class Ctl(object):
@@ -743,7 +759,7 @@ def __init__(self, ar, stats=None):
743759

744760
self.mutex = threading.Lock()
745761
self.q_handshake = Queue() # type: Queue[File]
746-
self.q_upload = Queue() # type: Queue[tuple[File, str]]
762+
self.q_upload = Queue() # type: Queue[FileSlice]
747763

748764
self.st_hash = [None, "(idle, starting...)"] # type: tuple[File, int]
749765
self.st_up = [None, "(idle, starting...)"] # type: tuple[File, int]
@@ -788,7 +804,8 @@ def _safe(self):
788804
for nc, cid in enumerate(hs):
789805
print(" {0} up {1}".format(ncs - nc, cid))
790806
stats = "{0}/0/0/{1}".format(nf, self.nfiles - nf)
791-
upload(file, cid, self.ar.a, stats)
807+
fslice = FileSlice(file, [cid])
808+
upload(fslice, self.ar.a, stats)
792809

793810
print(" ok!")
794811
if file.recheck:
@@ -1062,13 +1079,24 @@ def handshaker(self):
10621079
if not hs:
10631080
kw = "uploaded" if file.up_b else " found"
10641081
print("{0} {1}".format(kw, upath))
1065-
for cid in hs:
1066-
self.q_upload.put([file, cid])
1082+
1083+
cs = hs[:]
1084+
while cs:
1085+
fsl = FileSlice(file, cs[:1])
1086+
try:
1087+
if file.nojoin:
1088+
raise Exception()
1089+
for n in range(2, self.ar.sz + 1):
1090+
fsl = FileSlice(file, cs[:n])
1091+
except:
1092+
pass
1093+
cs = cs[len(fsl.cids):]
1094+
self.q_upload.put(fsl)
10671095

10681096
def uploader(self):
10691097
while True:
1070-
task = self.q_upload.get()
1071-
if not task:
1098+
fsl = self.q_upload.get()
1099+
if not fsl:
10721100
self.st_up = [None, "(finished)"]
10731101
break
10741102

@@ -1086,22 +1114,23 @@ def uploader(self):
10861114
self.eta,
10871115
)
10881116

1089-
file, cid = task
1117+
file = fsl.file
1118+
cids = fsl.cids
10901119
try:
1091-
upload(file, cid, self.ar.a, stats)
1120+
upload(fsl, self.ar.a, stats)
10921121
except Exception as ex:
1093-
t = "upload failed, retrying: {0} #{1} ({2})\n"
1094-
eprint(t.format(file.name, cid[:8], ex))
1122+
t = "upload failed, retrying: %s #%d+%d (%s)\n"
1123+
eprint(t % (file.name, cids[0][:8], len(cids) - 1, ex))
10951124
file.cd = time.time() + self.ar.cd
10961125
# handshake will fix it
10971126

10981127
with self.mutex:
1099-
sz = file.kchunks[cid][1]
1100-
file.ucids = [x for x in file.ucids if x != cid]
1128+
sz = fsl.len
1129+
file.ucids = [x for x in file.ucids if x not in cids]
11011130
if not file.ucids:
11021131
self.q_handshake.put(file)
11031132

1104-
self.st_up = [file, cid]
1133+
self.st_up = [file, cids[0]]
11051134
file.up_b += sz
11061135
self.up_b += sz
11071136
self.up_br += sz
@@ -1164,6 +1193,7 @@ def main():
11641193
ap = app.add_argument_group("performance tweaks")
11651194
ap.add_argument("-j", type=int, metavar="CONNS", default=2, help="parallel connections")
11661195
ap.add_argument("-J", type=int, metavar="CORES", default=hcores, help="num cpu-cores to use for hashing; set 0 or 1 for single-core hashing")
1196+
ap.add_argument("--sz", type=int, metavar="MiB", default=64, help="try to make each POST this big")
11671197
ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
11681198
ap.add_argument("-ns", action="store_true", help="no status panel (for slow consoles and macos)")
11691199
ap.add_argument("--cd", type=float, metavar="SEC", default=5, help="delay before reattempting a failed handshake/upload")

copyparty/__main__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ def add_upload(ap):
942942
ap2.add_argument("--sparse", metavar="MiB", type=int, default=4, help="windows-only: minimum size of incoming uploads through up2k before they are made into sparse files")
943943
ap2.add_argument("--turbo", metavar="LVL", type=int, default=0, help="configure turbo-mode in up2k client; [\033[32m-1\033[0m] = forbidden/always-off, [\033[32m0\033[0m] = default-off and warn if enabled, [\033[32m1\033[0m] = default-off, [\033[32m2\033[0m] = on, [\033[32m3\033[0m] = on and disable datecheck")
944944
ap2.add_argument("--u2j", metavar="JOBS", type=int, default=2, help="web-client: number of file chunks to upload in parallel; 1 or 2 is good for low-latency (same-country) connections, 4-8 for android clients, 16 for cross-atlantic (max=64)")
945+
ap2.add_argument("--u2sz", metavar="N,N,N", type=u, default="1,64,96", help="web-client: default upload chunksize (MiB); sets \033[33mmin,default,max\033[0m in the settings gui. Each HTTP POST will aim for this size. Cloudflare max is 96. Big values are good for cross-atlantic but may increase HDD fragmentation on some FS. Disable this optimization with [\033[32m1,1,1\033[0m]")
945946
ap2.add_argument("--u2sort", metavar="TXT", type=u, default="s", help="upload order; [\033[32ms\033[0m]=smallest-first, [\033[32mn\033[0m]=alphabetical, [\033[32mfs\033[0m]=force-s, [\033[32mfn\033[0m]=force-n -- alphabetical is a bit slower on fiber/LAN but makes it easier to eyeball if everything went fine")
946947
ap2.add_argument("--write-uplog", action="store_true", help="write POST reports to textfiles in working-directory")
947948

copyparty/httpcli.py

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2199,33 +2199,36 @@ def handle_search(self, body: dict[str, Any]) -> bool:
21992199

22002200
def handle_post_binary(self) -> bool:
22012201
try:
2202-
remains = int(self.headers["content-length"])
2202+
postsize = remains = int(self.headers["content-length"])
22032203
except:
22042204
raise Pebkac(400, "you must supply a content-length for binary POST")
22052205

22062206
try:
2207-
chash = self.headers["x-up2k-hash"]
2207+
chashes = self.headers["x-up2k-hash"].split(",")
22082208
wark = self.headers["x-up2k-wark"]
22092209
except KeyError:
22102210
raise Pebkac(400, "need hash and wark headers for binary POST")
22112211

2212+
chashes = [x.strip() for x in chashes]
2213+
22122214
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
22132215
ptop = (vfs.dbv or vfs).realpath
22142216

2215-
x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
2217+
x = self.conn.hsrv.broker.ask("up2k.handle_chunks", ptop, wark, chashes)
22162218
response = x.get()
2217-
chunksize, cstart, path, lastmod, sprs = response
2219+
chunksize, cstarts, path, lastmod, sprs = response
2220+
maxsize = chunksize * len(chashes)
2221+
cstart0 = cstarts[0]
22182222

22192223
try:
22202224
if self.args.nw:
22212225
path = os.devnull
22222226

2223-
if remains > chunksize:
2224-
raise Pebkac(400, "your chunk is too big to fit")
2225-
2226-
self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains))
2227+
if remains > maxsize:
2228+
t = "your client is sending %d bytes which is too much (server expected %d bytes at most)"
2229+
raise Pebkac(400, t % (remains, maxsize))
22272230

2228-
reader = read_socket(self.sr, self.args.s_rd_sz, remains)
2231+
self.log("writing {} {} @{} len {}".format(path, chashes, cstart0, remains))
22292232

22302233
f = None
22312234
fpool = not self.args.no_fpool and sprs
@@ -2239,37 +2242,43 @@ def handle_post_binary(self) -> bool:
22392242
f = f or open(fsenc(path), "rb+", self.args.iobuf)
22402243

22412244
try:
2242-
f.seek(cstart[0])
2243-
post_sz, _, sha_b64 = hashcopy(reader, f, self.args.s_wr_slp)
2245+
for chash, cstart in zip(chashes, cstarts):
2246+
f.seek(cstart[0])
2247+
reader = read_socket(
2248+
self.sr, self.args.s_rd_sz, min(remains, chunksize)
2249+
)
2250+
post_sz, _, sha_b64 = hashcopy(reader, f, self.args.s_wr_slp)
22442251

2245-
if sha_b64 != chash:
2246-
try:
2247-
self.bakflip(f, cstart[0], post_sz, sha_b64, vfs.flags)
2248-
except:
2249-
self.log("bakflip failed: " + min_ex())
2252+
if sha_b64 != chash:
2253+
try:
2254+
self.bakflip(f, cstart[0], post_sz, sha_b64, vfs.flags)
2255+
except:
2256+
self.log("bakflip failed: " + min_ex())
22502257

2251-
t = "your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}"
2252-
raise Pebkac(400, t.format(post_sz, chash, sha_b64))
2258+
t = "your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}"
2259+
raise Pebkac(400, t.format(post_sz, chash, sha_b64))
22532260

2254-
if len(cstart) > 1 and path != os.devnull:
2255-
self.log(
2256-
"clone {} to {}".format(
2257-
cstart[0], " & ".join(unicode(x) for x in cstart[1:])
2261+
remains -= chunksize
2262+
2263+
if len(cstart) > 1 and path != os.devnull:
2264+
self.log(
2265+
"clone {} to {}".format(
2266+
cstart[0], " & ".join(unicode(x) for x in cstart[1:])
2267+
)
22582268
)
2259-
)
2260-
ofs = 0
2261-
while ofs < chunksize:
2262-
bufsz = max(4 * 1024 * 1024, self.args.iobuf)
2263-
bufsz = min(chunksize - ofs, bufsz)
2264-
f.seek(cstart[0] + ofs)
2265-
buf = f.read(bufsz)
2266-
for wofs in cstart[1:]:
2267-
f.seek(wofs + ofs)
2268-
f.write(buf)
2269+
ofs = 0
2270+
while ofs < chunksize:
2271+
bufsz = max(4 * 1024 * 1024, self.args.iobuf)
2272+
bufsz = min(chunksize - ofs, bufsz)
2273+
f.seek(cstart[0] + ofs)
2274+
buf = f.read(bufsz)
2275+
for wofs in cstart[1:]:
2276+
f.seek(wofs + ofs)
2277+
f.write(buf)
22692278

2270-
ofs += len(buf)
2279+
ofs += len(buf)
22712280

2272-
self.log("clone {} done".format(cstart[0]))
2281+
self.log("clone {} done".format(cstart[0]))
22732282

22742283
if not fpool:
22752284
f.close()
@@ -2281,10 +2290,10 @@ def handle_post_binary(self) -> bool:
22812290
f.close()
22822291
raise
22832292
finally:
2284-
x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash)
2293+
x = self.conn.hsrv.broker.ask("up2k.release_chunks", ptop, wark, chashes)
22852294
x.get() # block client until released
22862295

2287-
x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash)
2296+
x = self.conn.hsrv.broker.ask("up2k.confirm_chunks", ptop, wark, chashes)
22882297
ztis = x.get()
22892298
try:
22902299
num_left, fin_path = ztis
@@ -2303,7 +2312,7 @@ def handle_post_binary(self) -> bool:
23032312

23042313
cinf = self.headers.get("x-up2k-stat", "")
23052314

2306-
spd = self._spd(post_sz)
2315+
spd = self._spd(postsize)
23072316
self.log("{:70} thank {}".format(spd, cinf))
23082317
self.reply(b"thank")
23092318
return True
@@ -4500,6 +4509,7 @@ def tx_browser(self) -> bool:
45004509
"themes": self.args.themes,
45014510
"turbolvl": self.args.turbo,
45024511
"u2j": self.args.u2j,
4512+
"u2sz": self.args.u2sz,
45034513
"idxh": int(self.args.ih),
45044514
"u2sort": self.args.u2sort,
45054515
}

0 commit comments

Comments
 (0)