Skip to content

Commit

Permalink
u2c: add hashgen mode + fix shutdown lag
Browse files Browse the repository at this point in the history
  • Loading branch information
9001 committed Sep 6, 2024
1 parent b599fba commit 08848be
Showing 1 changed file with 109 additions and 42 deletions.
151 changes: 109 additions & 42 deletions bin/u2c.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env python3
from __future__ import print_function, unicode_literals

S_VERSION = "1.23"
S_BUILD_DT = "2024-08-22"
S_VERSION = "1.24"
S_BUILD_DT = "2024-09-05"

"""
u2c.py: upload to copyparty
Expand Down Expand Up @@ -41,19 +41,25 @@

try:
import requests

req_ses = requests.Session()
except ImportError as ex:
if EXE:
if "-" in sys.argv or "-h" in sys.argv:
m = ""
elif EXE:
raise
elif sys.version_info > (2, 7):
m = "\nERROR: need 'requests'; please run this command:\n {0} -m pip install --user requests\n"
m = "\nERROR: need 'requests'{0}; please run this command:\n {1} -m pip install --user requests\n"
else:
m = "requests/2.18.4 urllib3/1.23 chardet/3.0.4 certifi/2020.4.5.1 idna/2.7"
m = [" https://pypi.org/project/" + x + "/#files" for x in m.split()]
m = "\n ERROR: need these:\n" + "\n".join(m) + "\n"
m = "\n ERROR: need these{0}:\n" + "\n".join(m) + "\n"
m += "\n for f in *.whl; do unzip $f; done; rm -r *.dist-info\n"

print(m.format(sys.executable), "\nspecifically,", ex)
sys.exit(1)
if m:
t = " when not running with '-h' or url '-'"
print(m.format(t, sys.executable), "\nspecifically,", ex)
sys.exit(1)


# from copyparty/__init__.py
Expand All @@ -76,7 +82,22 @@
VT100 = platform.system() != "Windows"


req_ses = requests.Session()
try:
UTC = datetime.timezone.utc
except:
TD_ZERO = datetime.timedelta(0)

class _UTC(datetime.tzinfo):
def utcoffset(self, dt):
return TD_ZERO

def tzname(self, dt):
return "UTC"

def dst(self, dt):
return TD_ZERO

UTC = _UTC()


class Daemon(threading.Thread):
Expand Down Expand Up @@ -271,6 +292,12 @@ def hash_at(self, nch):
_print = print


def safe_print(*a, **ka):
ka["end"] = ""
zs = " ".join([unicode(x) for x in a])
_print(zs + "\n", **ka)


def eprint(*a, **ka):
ka["file"] = sys.stderr
ka["end"] = ""
Expand All @@ -284,18 +311,17 @@ def eprint(*a, **ka):

def flushing_print(*a, **ka):
try:
_print(*a, **ka)
safe_print(*a, **ka)
except:
v = " ".join(str(x) for x in a)
v = v.encode("ascii", "replace").decode("ascii")
_print(v, **ka)
safe_print(v, **ka)

if "flush" not in ka:
sys.stdout.flush()


if not VT100:
print = flushing_print
print = safe_print if VT100 else flushing_print


def termsize():
Expand Down Expand Up @@ -770,8 +796,6 @@ def __init__(self, ar, stats=None):
self.up_c = 0
self.up_b = 0
self.up_br = 0
self.hasher_busy = 1
self.handshaker_busy = 0
self.uploader_busy = 0
self.serialized = False

Expand All @@ -781,6 +805,9 @@ def __init__(self, ar, stats=None):
self.eta = "99:99:99"

self.mutex = threading.Lock()
self.exit_cond = threading.Condition()
self.uploader_alive = ar.j
self.handshaker_alive = ar.j
self.q_handshake = Queue() # type: Queue[File]
self.q_upload = Queue() # type: Queue[FileSlice]

Expand Down Expand Up @@ -851,27 +878,21 @@ def _fancy(self):
Daemon(self.handshaker)
Daemon(self.uploader)

idles = 0
while idles < 3:
time.sleep(0.07)
while True:
with self.exit_cond:
self.exit_cond.wait(0.07)
with self.mutex:
if (
self.q_handshake.empty()
and self.q_upload.empty()
and not self.hasher_busy
and not self.handshaker_busy
and not self.uploader_busy
):
idles += 1
else:
idles = 0
if not self.handshaker_alive and not self.uploader_alive:
break
st_hash = self.st_hash[:]
st_up = self.st_up[:]

if VT100 and not self.ar.ns:
maxlen = ss.w - len(str(self.nfiles)) - 14
txt = "\033[s\033[{0}H".format(ss.g)
for y, k, st, f in [
[0, "hash", self.st_hash, self.hash_f],
[1, "send", self.st_up, self.up_f],
[0, "hash", st_hash, self.hash_f],
[1, "send", st_up, self.up_f],
]:
txt += "\033[{0}H{1}:".format(ss.g + y, k)
file, arg = st
Expand Down Expand Up @@ -1027,20 +1048,53 @@ def hasher(self):
self.hash_f += 1
self.hash_c += len(file.cids)
self.hash_b += file.size
if self.ar.wlist:
self.up_f = self.hash_f
self.up_c = self.hash_c
self.up_b = self.hash_b

if self.ar.wlist:
zsl = [self.ar.wsalt, str(file.size)] + [x[0] for x in file.kchunks]
zb = hashlib.sha512("\n".join(zsl).encode("utf-8")).digest()[:33]
wark = base64.urlsafe_b64encode(zb).decode("utf-8")
vp = file.rel.decode("utf-8")
if self.ar.jw:
print("%s %s" % (wark, vp))
else:
zd = datetime.datetime.fromtimestamp(file.lmod, UTC)
dt = "%04d-%02d-%02d %02d:%02d:%02d" % (
zd.year,
zd.month,
zd.day,
zd.hour,
zd.minute,
zd.second,
)
print("%s %12d %s %s" % (dt, file.size, wark, vp))
continue

self.q_handshake.put(file)

self.hasher_busy = 0
self.st_hash = [None, "(finished)"]
self._check_if_done()

def _check_if_done(self):
with self.mutex:
if self.nfiles - self.up_f:
return
for _ in range(self.ar.j):
self.q_handshake.put(None)

def handshaker(self):
search = self.ar.s
burl = self.ar.url[:8] + self.ar.url[8:].split("/")[0] + "/"
while True:
file = self.q_handshake.get()
if not file:
with self.mutex:
self.handshaker_alive -= 1
self.q_upload.put(None)
break
return

upath = file.abs.decode("utf-8", "replace")
if not VT100:
Expand All @@ -1052,27 +1106,24 @@ def handshaker(self):
self.errs += 1
continue

with self.mutex:
self.handshaker_busy += 1

while time.time() < file.cd:
time.sleep(0.1)

hs, sprs = handshake(self.ar, file, search)
if search:
if hs:
for hit in hs:
t = "found: {0}\n {1}{2}\n"
print(t.format(upath, burl, hit["rp"]), end="")
t = "found: {0}\n {1}{2}"
print(t.format(upath, burl, hit["rp"]))
else:
print("NOT found: {0}\n".format(upath), end="")
print("NOT found: {0}".format(upath))

with self.mutex:
self.up_f += 1
self.up_c += len(file.cids)
self.up_b += file.size
self.handshaker_busy -= 1

self._check_if_done()
continue

if file.recheck:
Expand Down Expand Up @@ -1104,7 +1155,6 @@ def handshaker(self):
file.up_b -= sz

file.ucids = hs
self.handshaker_busy -= 1

if not hs:
self.at_hash += file.t_hash
Expand All @@ -1130,6 +1180,9 @@ def handshaker(self):
kw = "uploaded" if file.up_b else " found"
print("{0} {1}".format(kw, upath))

self._check_if_done()
continue

chunksz = up2k_chunksize(file.size)
njoin = (self.ar.sz * 1024 * 1024) // chunksz
cs = hs[:]
Expand All @@ -1149,8 +1202,16 @@ def uploader(self):
while True:
fsl = self.q_upload.get()
if not fsl:
self.st_up = [None, "(finished)"]
break
done = False
with self.mutex:
self.uploader_alive -= 1
if not self.uploader_alive:
done = not self.handshaker_alive
self.st_up = [None, "(finished)"]
if done:
with self.exit_cond:
self.exit_cond.notify_all()
return

file = fsl.file
cids = fsl.cids
Expand Down Expand Up @@ -1252,6 +1313,10 @@ def main():
ap.add_argument("--dr", action="store_true", help="delete remote files which don't exist locally (implies --ow)")
ap.add_argument("--drd", action="store_true", help="delete remote files during upload instead of afterwards; reduces peak disk space usage, but will reupload instead of detecting renames")

ap = app.add_argument_group("file-ID calculator; enable with url '-' to list warks (file identifiers) instead of upload/search")
ap.add_argument("--wsalt", type=unicode, metavar="S", default="hunter2", help="salt to use when creating warks; must match server config")
ap.add_argument("--jw", action="store_true", help="just identifier+filepath, not mtime/size too")

ap = app.add_argument_group("performance tweaks")
ap.add_argument("-j", type=int, metavar="CONNS", default=2, help="parallel connections")
ap.add_argument("-J", type=int, metavar="CORES", default=hcores, help="num cpu-cores to use for hashing; set 0 or 1 for single-core hashing")
Expand Down Expand Up @@ -1285,7 +1350,9 @@ def main():

ar.x = "|".join(ar.x or [])

for k in "dl dr drd".split():
setattr(ar, "wlist", ar.url == "-")

for k in "dl dr drd wlist".split():
errs = []
if ar.safe and getattr(ar, k):
errs.append(k)
Expand Down

0 comments on commit 08848be

Please sign in to comment.