-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathsleepc
executable file
·148 lines (136 loc) · 6.03 KB
/
sleepc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
import os, sys, re, math, time, datetime as dt
_td_days = dict(
y=365.2422, yr=365.2422, year=365.2422,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}\s*)?'
for k, v in [*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def ts_parse(ts_str, ts_now=None):
if ts_now is None: ts_now = dt.datetime.now()
try: ts = ts_now + dt.timedelta(seconds=float(ts_str))
except: ts = None
if not ts and ( # short time offset like "3d 5h"
(m := _td_re.search(ts_str)) and any(m.groups()) ):
delta = list()
for units in _td_days, _td_s:
val = 0
for k, v in units.items():
if not m.group(k): continue
val += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
delta.append(val)
ts = ts_now + dt.timedelta(*delta)
if not ts and (m := re.search( # common BE format
r'^(?P<date>(?:\d{2}|(?P<Y>\d{4}))-\d{2}-\d{2})'
r'(?:[ T](?P<time>\d{2}(?::\d{2}(?::\d{2})?)?)?)?$', ts_str )):
tpl = 'y' if not m.group('Y') else 'Y'
tpl, tss = f'%{tpl}-%m-%d', m.group('date')
if m.group('time'):
tpl_time = ['%H', '%M', '%S']
tss += ' ' + ':'.join(tss_time := m.group('time').split(':'))
tpl += ' ' + ':'.join(tpl_time[:len(tss_time)])
try: ts = dt.datetime.strptime(tss, tpl)
except ValueError: pass
if not ts and (m := re.search( # just time without AM/PM - treat as 24h format
r'^\d{1,2}:\d{2}(?::\d{2}(?P<us>\.\d+)?)?$', ts_str )):
us, tpl = 0, ':'.join(['%H', '%M', '%S'][:len(ts_str.split(':'))])
if m.group('us'):
ts_str, us = ts_str.rsplit('.', 1)
us = us[:6] + '0'*max(0, 6 - len(us))
try: ts = dt.datetime.strptime(ts_str, tpl)
except ValueError: pass
else:
ts = ts_now.replace( hour=ts.hour,
minute=ts.minute, second=ts.second, microsecond=int(us) )
if ts < ts_now: ts += dt.timedelta(1)
if not ts: # coreutils' "date" parses everything, but is more expensive to use
import subprocess as sp
while True:
res = sp.run( ['date', '+%s', '-d', ts_str],
stdout=sp.PIPE, stderr=sp.DEVNULL )
if res.returncode:
if ',' in ts_str: ts_str = ts_str.replace(',', ' '); continue
else:
ts = dt.datetime.fromtimestamp(int(res.stdout.strip()))
if 0 < (ts_now - ts).total_seconds() <= 24*3600 and re.search(
r'(?i)^[\d:]+\s*(am|pm)?\s*([-+][\d:]+|\w+|\w+[-+][\d:]+)?$', ts_str.strip() ):
ts += dt.timedelta(1)
break
if ts: return ts
raise ValueError(f'Failed to parse date/time string: {ts_str}')
def ts_repr( ts, ts0=None, units_max=2, units_res=None, printf=None,
_units=dict( h=3600, m=60, s=1,
y=365.2422*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
if ts0 is None and isinstance(ts, dt.datetime): ts0 = dt.datetime.now()
delta = ts if ts0 is None else (ts - ts0)
if isinstance(delta, dt.timedelta): delta = delta.total_seconds()
res, s, n_last = list(), abs(delta), units_max - 1
units = sorted(_units.items(), key=lambda v: v[1], reverse=True)
for unit, unit_s in units:
if not (val := math.floor(val_raw := s / unit_s)):
if units_res == unit: break
continue
elif val_raw - val > 0.98: val += 1
if len(res) == n_last or units_res == unit:
val, n_last = round(s / unit_s), True
res.append(f'{val:.0f}{unit}')
if n_last is True: break
if (s := s - val * unit_s) < 1: break
if not res: return 'now'
res = ' '.join(res)
if printf: res = printf % res
return res
def main(argv=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
usage='%(prog)s [opts] time-spec',
description=dd('''
Verbose "sleep" tool. Waits for/until time-spec, printing countdown to stdout.
Exits with code 0 upon reaching the time, non-0 if interrupted.
Intended for running something with known delay in interactive consoles,
to avoid needing to calculate time offset, and be able to check back on it later.'''))
parser.add_argument('time_spec', nargs='+', help=dd('''
Absolute or relative time until exiting.
Parses timestamps as relative short times (e.g. "30s", "10min", "1h 20m", etc),
iso8601-ish times/dates or falls back to just using "date" binary,
which parses a lot of different other absolute/relative formats.'''))
parser.add_argument('-q', '--quiet', action='store_true',
help='Print parsed time, but suppress countdown, acting more like "sleep" binary.')
parser.add_argument('-i', '--update-interval', type=float, metavar='sec', default=3,
help='Countdown timer update interval, in seconds. Default: %(default)ss')
opts = parser.parse_args()
ts_str = ' '.join(' '.join(opts.time_spec).replace('_', ' ').split())
ts_now = dt.datetime.now().replace(microsecond=0)
ts = ts_parse(ts_str, ts_now)
if ts < ts_now:
parser.error(
'Specified time-spec argument(s) cannot be in the past:\n'
f' {ts_str!r} - parsed as {ts} [{ts_repr(ts, printf="%s ago")}]' )
print(f'Parsed time-spec {ts_str!r} as {ts} [{ts_repr(ts, printf="in %s")}]', flush=True)
delay_min, line_wipe = 0.1, '\x1b[2K\x1b[\rb'
tsx, tdx_str = ts.timestamp(), ''
while (tsx_now := time.time()) < tsx:
tdx, tdx_pad = tsx - tsx_now, len(tdx_str)
delay = max(tdx, delay_min)
if opts.quiet: time.sleep(delay); continue
tdx_str = f'{dt.timedelta(seconds=int(tdx))} [{ts_repr(tdx, printf="%s left")}]'
sys.stdout.write(f'{line_wipe}Countdown: {tdx_str}')
if (tdx_pad := tdx_pad - len(tdx_str)) > 0: sys.stdout.write(' '*tdx_pad)
sys.stdout.flush()
time.sleep(min(delay, opts.update_interval))
if not opts.quiet:
done_str = 'finished'
if (tdx_pad := len(tdx_str) - len(done_str)) > 0: done_str += ' '*tdx_pad
sys.stdout.write(f'{line_wipe}Countdown: {done_str}\n')
if __name__ == '__main__':
try: sys.exit(main())
except KeyboardInterrupt: print()
except BrokenPipeError: # stdout pipe closed
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)