Skip to content

Commit 3da62ec

Browse files
committed
fix dedup bug as of v1.13.8:
* v1.13.8 broke collision resolving for non-identical files; the correct filename was reserved but not symlinked to the original file, leaving a zerobyte file instead. See v1.14.3 github release notes for remediation info * add sanchecks for early detection of index/fs desync; saves performance and gives less confusing logs
1 parent 0123399 commit 3da62ec

File tree

5 files changed

+215
-17
lines changed

5 files changed

+215
-17
lines changed

copyparty/svchub.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def __init__(
106106
self.no_ansi = args.no_ansi
107107
self.logf: Optional[typing.TextIO] = None
108108
self.logf_base_fn = ""
109+
self.is_dut = False # running in unittest; always False
109110
self.stop_req = False
110111
self.stopping = False
111112
self.stopped = False

copyparty/up2k.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ def deferred_init(self) -> None:
236236
if not self.pp and self.args.exit == "idx":
237237
return self.hub.sigterm()
238238

239+
if self.hub.is_dut:
240+
return
241+
239242
Daemon(self._snapshot, "up2k-snapshot")
240243
if have_e2d:
241244
Daemon(self._hasher, "up2k-hasher")
@@ -1405,7 +1408,7 @@ def _build_dir(
14051408
if dts == lmod and dsz == sz and (nohash or dw[0] != "#" or not sz):
14061409
continue
14071410

1408-
t = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
1411+
t = "reindex [{}] => [{}] mtime({}/{}) size({}/{})".format(
14091412
top, rp, dts, lmod, dsz, sz
14101413
)
14111414
self.log(t)
@@ -2664,11 +2667,19 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
26642667
if stat.S_ISLNK(st.st_mode):
26652668
# broken symlink
26662669
raise Exception()
2667-
except:
2670+
if st.st_size != dsize:
2671+
t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
2672+
t = t.format(
2673+
wark, st.st_size, dsize, st.st_mtime, dtime, dp_abs
2674+
)
2675+
self.log(t)
2676+
raise Exception("desync")
2677+
except Exception as ex:
26682678
if n4g:
26692679
st = os.stat_result((0, -1, -1, 0, 0, 0, 0, 0, 0, 0))
26702680
else:
2671-
lost.append((cur, dp_dir, dp_fn))
2681+
if str(ex) != "desync":
2682+
lost.append((cur, dp_dir, dp_fn))
26722683
continue
26732684

26742685
j = {
@@ -2726,13 +2737,16 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
27262737
ptop = None # use cj or job as appropriate
27272738

27282739
if not job and wark in reg:
2729-
# ensure the files haven't been deleted manually
2740+
# ensure the files haven't been edited or deleted
2741+
path = ""
2742+
st = None
27302743
rj = reg[wark]
27312744
names = [rj[x] for x in ["name", "tnam"] if x in rj]
27322745
for fn in names:
27332746
path = djoin(rj["ptop"], rj["prel"], fn)
27342747
try:
2735-
if bos.path.getsize(path) > 0 or not rj["need"]:
2748+
st = bos.stat(path)
2749+
if st.st_size > 0 or not rj["need"]:
27362750
# upload completed or both present
27372751
break
27382752
except:
@@ -2743,6 +2757,14 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
27432757
del reg[wark]
27442758
break
27452759

2760+
if st and not self.args.nw and not n4g and st.st_size != rj["size"]:
2761+
t = "will not dedup (fs index desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
2762+
t = t.format(
2763+
wark, st.st_size, rj["size"], st.st_mtime, rj["lmod"], path
2764+
)
2765+
self.log(t)
2766+
del reg[wark]
2767+
27462768
if job or wark in reg:
27472769
job = job or reg[wark]
27482770
if (
@@ -2850,6 +2872,7 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
28502872
return self._handle_json(job, depth + 1)
28512873

28522874
job["name"] = self._untaken(pdir, job, now)
2875+
dst = djoin(job["ptop"], job["prel"], job["name"])
28532876

28542877
if not self.args.nw:
28552878
dvf: dict[str, Any] = vfs.flags

tests/test_dedup.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#!/usr/bin/env python3
2+
# coding: utf-8
3+
from __future__ import print_function, unicode_literals
4+
5+
import json
6+
import os
7+
import shutil
8+
import tempfile
9+
import unittest
10+
from itertools import product
11+
12+
from copyparty.authsrv import AuthSrv
13+
from copyparty.httpcli import HttpCli
14+
from tests import util as tu
15+
from tests.util import Cfg
16+
17+
18+
class TestDedup(unittest.TestCase):
19+
def setUp(self):
20+
self.td = tu.get_ramdisk()
21+
22+
def tearDown(self):
23+
os.chdir(tempfile.gettempdir())
24+
shutil.rmtree(self.td)
25+
26+
def reset(self):
27+
td = os.path.join(self.td, "vfs")
28+
if os.path.exists(td):
29+
shutil.rmtree(td)
30+
os.mkdir(td)
31+
os.chdir(td)
32+
return td
33+
34+
def test(self):
35+
quick = True # sufficient for regular smoketests
36+
# quick = False
37+
38+
dirnames = ["d1", "d2"]
39+
filenames = ["f1", "f2"]
40+
files = [
41+
(
42+
"one",
43+
"BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w",
44+
"XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc",
45+
),
46+
(
47+
"two",
48+
"ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj",
49+
"fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou",
50+
),
51+
]
52+
# (data, chash, wark)
53+
54+
# 3072 uploads in total
55+
self.ctr = 3072
56+
self.conn = None
57+
for e2d in [True, False]:
58+
for dn1, fn1, f1 in product(dirnames, filenames, files):
59+
for dn2, fn2, f2 in product(dirnames, filenames, files):
60+
for dn3, fn3, f3 in product(dirnames, filenames, files):
61+
self.reset()
62+
if self.conn:
63+
self.conn.hsrv.hub.up2k.shutdown()
64+
self.args = Cfg(v=[".::A"], a=[], e2d=e2d)
65+
self.asrv = AuthSrv(self.args, self.log)
66+
self.conn = tu.VHttpConn(
67+
self.args, self.asrv, self.log, b"", True
68+
)
69+
self.do_post(dn1, fn1, f1, True)
70+
self.do_post(dn2, fn2, f2, False)
71+
self.do_post(dn3, fn3, f3, False)
72+
if quick:
73+
break
74+
75+
def do_post(self, dn, fn, fi, first):
76+
print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first)))
77+
self.ctr -= 1
78+
79+
data, chash, wark = fi
80+
hs = self.handshake(dn, fn, fi)
81+
self.assertEqual(hs["wark"], wark)
82+
83+
sfn = hs["name"]
84+
if sfn == fn:
85+
print("using original name " + fn)
86+
else:
87+
print(fn + " got renamed to " + sfn)
88+
if first:
89+
raise Exception("wait what")
90+
91+
if hs["hash"]:
92+
self.assertEqual(hs["hash"][0], chash)
93+
self.put_chunk(dn, wark, chash, data)
94+
elif first:
95+
raise Exception("found first; %r, %r" % ((dn, fn, fi), hs))
96+
97+
h, b = self.curl("%s/%s" % (dn, sfn))
98+
self.assertEqual(b, data)
99+
100+
def handshake(self, dn, fn, fi):
101+
hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n"
102+
msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]}
103+
buf = json.dumps(msg).encode("utf-8")
104+
buf = (hdr % (dn, len(buf))).encode("utf-8") + buf
105+
print("HS -->", buf)
106+
HttpCli(self.conn.setbuf(buf)).run()
107+
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
108+
print("HS <--", ret)
109+
return json.loads(ret[1])
110+
111+
def put_chunk(self, dn, wark, chash, data):
112+
msg = [
113+
"POST /%s/ HTTP/1.1" % (dn,),
114+
"Connection: close",
115+
"Content-Type: application/octet-stream",
116+
"Content-Length: 3",
117+
"X-Up2k-Hash: " + chash,
118+
"X-Up2k-Wark: " + wark,
119+
"",
120+
data,
121+
]
122+
buf = "\r\n".join(msg).encode("utf-8")
123+
print("PUT -->", buf)
124+
HttpCli(self.conn.setbuf(buf)).run()
125+
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
126+
self.assertEqual(ret[1], "thank")
127+
128+
def curl(self, url, binary=False):
129+
h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n"
130+
HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run()
131+
if binary:
132+
h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)
133+
return [h.decode("utf-8"), b]
134+
135+
return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
136+
137+
def log(self, src, msg, c=0):
138+
print(msg)

tests/test_dots.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def hdr(query, uname):
2424

2525

2626
class TestDots(unittest.TestCase):
27+
def __init__(self, *a, **ka):
28+
super(TestDots, self).__init__(*a, **ka)
29+
self.is_dut = True
30+
2731
def setUp(self):
2832
self.td = tu.get_ramdisk()
2933

tests/util.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import print_function, unicode_literals
44

55
import os
6-
import platform
76
import re
87
import shutil
98
import socket
@@ -16,9 +15,7 @@
1615

1716
import jinja2
1817

19-
WINDOWS = platform.system() == "Windows"
20-
ANYWIN = WINDOWS or sys.platform in ["msys"]
21-
MACOS = platform.system() == "Darwin"
18+
from copyparty.__init__ import MACOS, WINDOWS, E
2219

2320
J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader) # type: ignore
2421
J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}\nJ2EOT")
@@ -42,10 +39,11 @@ def eprint(*a, **ka):
4239
# 25% faster; until any tests do symlink stuff
4340

4441

45-
from copyparty.__init__ import E
4642
from copyparty.__main__ import init_E
43+
from copyparty.broker_thr import BrokerThr
4744
from copyparty.ico import Ico
4845
from copyparty.u2idx import U2idx
46+
from copyparty.up2k import Up2k
4947
from copyparty.util import FHC, CachedDict, Garda, Unrecv
5048

5149
init_E(E)
@@ -119,10 +117,10 @@ class Cfg(Namespace):
119117
def __init__(self, a=None, v=None, c=None, **ka0):
120118
ka = {}
121119

122-
ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol"
120+
ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_db_ip no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs"
123121
ka.update(**{k: False for k in ex.split()})
124122

125-
ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip"
123+
ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip"
126124
ka.update(**{k: True for k in ex.split()})
127125

128126
ex = "ah_cli ah_gen css_browser hist js_browser js_other mime mimes no_forget no_hash no_idx nonsus_urls og_tpl og_ua"
@@ -137,9 +135,12 @@ def __init__(self, a=None, v=None, c=None, **ka0):
137135
ex = "db_act k304 loris re_maxage rproxy rsp_jtr rsp_slp s_wr_slp snap_wri theme themes turbo"
138136
ka.update(**{k: 0 for k in ex.split()})
139137

140-
ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname R RS SR"
138+
ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr ipa html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname xff_src R RS SR"
141139
ka.update(**{k: "" for k in ex.split()})
142140

141+
ex = "ban_403 ban_404 ban_422 ban_pw ban_url"
142+
ka.update(**{k: "no" for k in ex.split()})
143+
143144
ex = "grp on403 on404 xad xar xau xban xbd xbr xbu xiu xm"
144145
ka.update(**{k: [] for k in ex.split()})
145146

@@ -221,11 +222,29 @@ def settimeout(self, a):
221222
pass
222223

223224

225+
class VHub(object):
226+
def __init__(self, args, asrv, log):
227+
self.args = args
228+
self.asrv = asrv
229+
self.log = log
230+
self.is_dut = True
231+
self.up2k = Up2k(self)
232+
233+
234+
class VBrokerThr(BrokerThr):
235+
def __init__(self, hub):
236+
self.hub = hub
237+
self.log = hub.log
238+
self.args = hub.args
239+
self.asrv = hub.asrv
240+
241+
224242
class VHttpSrv(object):
225243
def __init__(self, args, asrv, log):
226244
self.args = args
227245
self.asrv = asrv
228246
self.log = log
247+
self.hub = None
229248

230249
self.broker = NullBroker(args, asrv)
231250
self.prism = None
@@ -252,18 +271,25 @@ def get_u2idx(self):
252271
return self.u2idx
253272

254273

274+
class VHttpSrvUp2k(VHttpSrv):
275+
def __init__(self, args, asrv, log):
276+
super(VHttpSrvUp2k, self).__init__(args, asrv, log)
277+
self.hub = VHub(args, asrv, log)
278+
self.broker = VBrokerThr(self.hub)
279+
280+
255281
class VHttpConn(object):
256-
def __init__(self, args, asrv, log, buf):
282+
def __init__(self, args, asrv, log, buf, use_up2k=False):
257283
self.t0 = time.time()
258-
self.s = VSock(buf)
259-
self.sr = Unrecv(self.s, None) # type: ignore
260284
self.aclose = {}
261285
self.addr = ("127.0.0.1", "42069")
262286
self.args = args
263287
self.asrv = asrv
264288
self.bans = {}
265289
self.freshen_pwd = 0.0
266-
self.hsrv = VHttpSrv(args, asrv, log)
290+
291+
Ctor = VHttpSrvUp2k if use_up2k else VHttpSrv
292+
self.hsrv = Ctor(args, asrv, log)
267293
self.ico = Ico(args)
268294
self.ipa_nm = None
269295
self.lf_url = None
@@ -279,6 +305,12 @@ def __init__(self, args, asrv, log, buf):
279305
self.u2fh = FHC()
280306

281307
self.get_u2idx = self.hsrv.get_u2idx
308+
self.setbuf(buf)
309+
310+
def setbuf(self, buf):
311+
self.s = VSock(buf)
312+
self.sr = Unrecv(self.s, None) # type: ignore
313+
return self
282314

283315

284316
if WINDOWS:

0 commit comments

Comments
 (0)