-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfgbg
executable file
·880 lines (786 loc) · 35.9 KB
/
fgbg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
#!/usr/bin/env python
import itertools as it, functools as ft, ctypes as ct, pathlib as pl
import subprocess as sp, dataclasses as dcs, hashlib as hl
import os, sys, re, random, math, time, signal, base64, errno, warnings
p_err = ft.partial(print, file=sys.stderr, flush=True)
gather_wrap = lambda t: (lambda func: (lambda *a,**k: t(func(*a,**k))))
def hash_str(s, c=None, person=b'fgbg'):
if not isinstance(s, bytes):
s = hl.blake2s(s.encode(), person=person).digest()
s = ( base64.urlsafe_b64encode(s)
.decode().translate(dict.fromkeys(b'-_=')) )
if c is None: return s
if len(s) < c: s = hash_str(s, c, person)
return s[:c]
def hash_file(p, c=None, person=b'fgbg', chunk=1*2**20):
p_hash = hl.blake2s(person=person)
with p.open('rb') as src:
for chunk in iter(ft.partial(src.read, chunk), b''): p_hash.update(chunk)
return hash_str(p_hash.digest(), c, person)
def p_mtime(p):
try: return p.stat().st_mtime
except OSError as err:
if err.errno != errno.ENOENT: raise
return 0
parse_time_delta = lambda spec: sum(a*b for a,b in zip(
[1, 60, 3600], reversed(list(map(float, spec.split(':', 2)))) ))
@dcs.dataclass
class DisplayInfo:
n:int; index:int; x:int; y:int; w:int; h:int; w_mm:int; h_mm:int; name:str
@gather_wrap(list)
def dpy_info():
# Simplified randr module from https://github.com/rr-/screeninfo/
def make_struct(name, fields):
fields = fields.split()
for n, fs in enumerate(fields):
k, t = fs.split(':', 1)
try: fields[n] = k, getattr(ct, t := f'c_{t}')
except AttributeError:
if not t.endswith('_p'): raise
fields[n] = k, ct.POINTER(getattr(ct, t[:-2]))
return type(name, (ct.Structure,), dict(_fields_=fields))
XRRCrtcInfo = make_struct('XRRCrtcInfo', 'ts:ulong x:int y:int w:int h:int')
XRRScreenResources = make_struct( 'XRRScreenResources',
'ts:ulong ts_conf:ulong nctrc:int ctrcs:ulong_p nout:int out:ulong_p' )
XRROutputInfo = make_struct( 'XRROutputInfo',
'ts:ulong crtc:ulong name:char_p name_len:int w_mm:ulong h_mm:ulong conn:ushort' )
xlib, xrandr = ct.CDLL('libX11.so'), ct.CDLL('libXrandr.so')
xlib.XOpenDisplay.argtypes = [ct.c_char_p]
xlib.XOpenDisplay.restype = ct.POINTER(ct.c_void_p)
xrandr.XRRGetScreenResourcesCurrent.restype = ct.POINTER(XRRScreenResources)
xrandr.XRRGetOutputInfo.restype = ct.POINTER(XRROutputInfo)
xrandr.XRRGetCrtcInfo.restype = ct.POINTER(XRRCrtcInfo)
dpy = xlib.XOpenDisplay(b'')
if not dpy: raise RuntimeError('XOpenDisplay failed')
out_crtcs, crtcs, root_win = list(), list(), xlib.XDefaultRootWindow(dpy)
try:
n, res = -1, xrandr.XRRGetScreenResourcesCurrent(dpy, root_win)
for out_idx in range(res.contents.nout):
out_crtcs.append([ out_raw := xrandr
.XRRGetOutputInfo(dpy, res, res.contents.out[out_idx]), None ])
if (out := out_raw.contents).conn == 0:
crtc = ... # sometimes output is connected with crtc=0, idk why
if out.crtc: crtcs.append( crtc :=
xrandr.XRRGetCrtcInfo(dpy, ct.byref(out_raw), out.crtc) )
out_crtcs[-1][-1] = crtc
for out, crtc in out_crtcs:
if crtc is ...: crtc = len(crtcs) == 1 and crtcs[0] # for crtc=0 cases
if not crtc: continue
out, crtc, n = out.contents, crtc.contents, n + 1
yield DisplayInfo( n=n, index=out_idx,
x=crtc.x, y=crtc.y, w=crtc.w, h=crtc.h,
w_mm=out.w_mm, h_mm=out.h_mm,
name=out.name.decode(sys.getfilesystemencoding()) )
finally:
for crtc in crtcs: xrandr.XRRFreeCrtcInfo(crtc)
for out, crtc in out_crtcs: xrandr.XRRFreeOutputInfo(out)
xlib.XCloseDisplay(dpy)
def set_bg_image(images, dpy_list, dx=None, dy=None):
class sd_bus(ct.Structure): pass
class sd_bus_error(ct.Structure):
_fields_ = [ ('name', ct.c_char_p),
('message', ct.c_char_p), ('need_free', ct.c_int) ]
class sd_bus_msg(ct.Structure): pass
lib = ct.CDLL('libsystemd.so')
def run(call, *args, sig=None, check=True):
func = getattr(lib, call)
if sig: func.argtypes = sig
res = func(*args)
if check and res < 0: raise OSError(-res, os.strerror(-res))
return res
bus = ct.POINTER(sd_bus)()
run( 'sd_bus_open_user', ct.byref(bus),
sig=[ct.POINTER(ct.POINTER(sd_bus))] )
error = sd_bus_error()
reply = ct.POINTER(sd_bus_msg)()
try:
if isinstance(dx, int): dx = [dx]
if isinstance(dy, int): dy = [dy]
if not dx or not dy:
run( 'sd_bus_call_method',
bus,
b'org.enlightenment.wm.service',
b'/org/enlightenment/wm/RemoteObject',
b'org.enlightenment.wm.Desktop',
b'GetVirtualCount',
ct.byref(error),
ct.byref(reply),
b'',
sig=[
ct.POINTER(sd_bus),
ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p,
ct.POINTER(sd_bus_error),
ct.POINTER(ct.POINTER(sd_bus_msg)),
ct.c_char_p ] )
dxc, dyc = ct.c_int(), ct.c_int()
run( 'sd_bus_message_read', reply, b'ii', ct.byref(dxc), ct.byref(dyc),
sig=[ ct.POINTER(sd_bus_msg),
ct.c_char_p, ct.POINTER(ct.c_int), ct.POINTER(ct.c_int) ] )
if not dx: dx = list(range(dxc.value))
if not dy: dy = list(range(dyc.value))
for p, dpy, x, y in images.get_iter(dpy_list, dx, dy):
run( 'sd_bus_call_method',
bus,
b'org.enlightenment.wm.service',
b'/org/enlightenment/wm/RemoteObject',
b'org.enlightenment.wm.Desktop.Background',
b'Add',
ct.byref(error),
ct.byref(reply),
b'iiis', dpy.n, x, y, str(p).encode(),
sig=[
ct.POINTER(sd_bus),
ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p,
ct.POINTER(sd_bus_error),
ct.POINTER(ct.POINTER(sd_bus_msg)),
ct.c_char_p, ct.c_int, ct.c_int, ct.c_int, ct.c_char_p ] )
finally: run('sd_bus_flush_close_unref', bus, check=False)
class ImageMagickSkip(Exception): pass
@dcs.dataclass
class ImageMagickTallScale:
gravity: float = 1/4 # 0-1.0, <0 to disable
bg_color: str = '#000' # empty - disabled
edge_blend: int = 25 # scaled/blended edge width in px
edge_stretch_opacity: float = 0.7 # 0-1.0, 0 - disable
edge_stretch_blur: float = 3.0 # 0 - disable
@dcs.dataclass
class ImageMagickOpts:
label_font_path: str; label_font_size: int
label_color_font: str; label_color_outline: str
label_offset_x: int; label_offset_y: int
max_aspect_diff: float = 0.9; max_aspect_diff_scale: float = 0.3; h_flip: bool = False
tall_scale: any = dcs.field(default_factory=ImageMagickTallScale) # None - disabled
def opts_hash(self): return str(dcs.astuple(self))
def image_magick(p_src, p_dst, opts, dpy, desktop_label=None, dst_fmt='png24'):
'''Process image from source path
into a desktop background with specified parameters.
Uses ImageMagick and requires wand-py module.
Specifying "desktop_label" will just add that label in the corner.'''
from wand.image import Image
from wand.drawing import Drawing
@dcs.dataclass
class Box:
x1:int; y1:int; x2:int; y2:int
def __post_init__(self):
for k, v in dcs.asdict(self).items(): setattr(self, k, round(v))
ts_last = None
def _print_ts_delta(label): # to benchmark various ops
nonlocal ts_last
ts = time.monotonic()
if ts_last is None: ts_last = ts
ts_last, delta = ts, ts - ts_last
print(f'--- {label}: {delta:.3f}', flush=True)
def _pset(ctx, *base, **props):
'Set specified attributes on ctx, checking that they exist first'
if base:
props, props_update = base[0].copy(), props
for update in base[1:]: props.update(update)
props.update(props_update)
for k,v in props.items():
getattr(ctx, k) # make sure prop name is valid
setattr(ctx, k, v)
def _tall_scale(img, opts, w, h, img_flip):
if img.height > h: img_flip = img_flip() and None
img.resize(round(img.width * (h / img.height)), h)
if img_flip: img_flip()
canvas = img
with canvas.clone() as img: # scaled image
edge_w = min(img.width, opts.edge_blend)
x0 = round(max(0, min(
w - img.width, w * opts.gravity - (img.width / 2) )))
x1 = x0 + img.width
img.alpha_channel = 'opaque'
canvas.extent(w, h, x0)
canvas.alpha_channel = 'transparent' # basically clear it
if opts.bg_color: canvas.colorize(opts.bg_color, '#fff')
if opts.edge_stretch_opacity:
def _edge_stretch(img, ew, w, blur):
img.crop(0 if ew > 0 else img.width + ew, width=abs(ew))
img.resize(w + abs(ew), img.height)
if blur: img.gaussian_blur(blur, blur)
canvas_add = ft.partial( canvas.composite,
operator='blend', arguments=f'{round(opts.edge_stretch_opacity * 100)}%' )
with img.clone() as pad:
_edge_stretch(pad, edge_w, x0, opts.edge_stretch_blur)
canvas_add(pad, gravity='west')
with img.clone() as pad:
_edge_stretch(pad, -edge_w, w - x1, opts.edge_stretch_blur)
canvas_add(pad, gravity='east')
if opts.edge_blend:
with Image(width=img.width, height=img.height, colorspace='gray') as mask:
def _edge_blend(mask, ew):
with Image(colorspace='gray') as grad:
grad.options['gradient:vector'] = f'-1,0,{abs(ew)},0'
grad_dir, pos = ('black-white', 'west') if ew > 0 else ('white-black', 'east')
grad.pseudo(abs(ew), img.height, f'gradient:{grad_dir}')
mask.composite(grad, gravity=pos)
mask.alpha_channel = 'off'
mask.colorize('#fff', '#fff')
if x0 > 0: _edge_blend(mask, edge_w)
if x1 < canvas.width: _edge_blend(mask, -edge_w)
img.composite(mask, operator='copy_alpha')
canvas.composite(img, x0, 0)
def _crop_resize(img, w, h):
if img.width == w and img.height == h: return
asp_img, asp_dpy = (round(a, 2) for a in [img.width / img.height, w / h])
if asp_img == asp_dpy: img.resize(w, h)
elif abs(asp_img - asp_dpy) < opts.max_aspect_diff_scale:
# Aspect is close-enough - do resize with aspect + lqr scaling
w1, h1, w2, h2 = map(round, [h * asp_img, h, w, w / asp_img])
if w1*h1 > w2*h2: img.resize(w1, h1)
else: img.resize(w2, h2)
img.liquid_rescale(w, h, delta_x=round(math.sqrt(w*h)/400))
elif asp_img > asp_dpy:
# Wider than display - cut vertical stripe from the middle
crop_w = round(img.height * asp_dpy)
img.crop(round((img.width - crop_w)/2), 0, width=crop_w, height=img.height)
img.resize(w, h)
else:
# Taller than display - cut horizontal stripe from the middle
crop_h = round(img.height * (asp_img / asp_dpy))
img.crop(0, round((img.height - crop_h)/2), width=img.width, height=crop_h)
img.resize(w, h)
def _add_text_label( img, *, x=None, x_right=None, y, label,
text_color, outline_color, outline_hard, outline_blur, font_size, font_path ):
'Note: x/y are for top-left corner of the text box, x_right is for top-right label.'
with Drawing() as ctx,\
Image(width=img.width, height=img.height) as img_text:
# Calculate text position within cropped img_text box
# Cropping is done to avoid expensive
# operations like blur being performed on full-sized image
text_font = dict(font_size=font_size, font=font_path)
_pset(ctx, text_font, stroke_width=0)
text_box = ctx.get_font_metrics(img_text, label)
if x_right is not None: x = x_right - text_box.text_width
text_box = Box( # offset of text box corners from its origin point
0, -(text_box.ascender+1),
text_box.text_width, -(text_box.descender-1) )
text_margin = outline_hard + outline_blur
text_args = (
text_margin - text_box.x1,
text_margin - text_box.y1, label )
img_text_box = Box(
max(0, x - text_margin), max(0, y - text_margin),
min(img.width, x + (text_box.x2 - text_box.x1) + text_margin),
min(img.height, y + (text_box.y2 - text_box.y1) + text_margin) )
img_text.crop(*dcs.astuple(img_text_box))
# Hard inner outline edge
_pset( ctx, fill_color=outline_color,
stroke_width=outline_hard, stroke_color=outline_color )
ctx.text(*text_args)
ctx(img_text)
# Soft (blurred) outer outline edge, aka "glow"
with img_text.clone() as img_text_glow:
_pset(ctx, stroke_width=outline_hard)
ctx.text(*text_args)
ctx(img_text_glow)
_pset(img_text_glow, background_color=outline_color)
img_text_glow.gaussian_blur(outline_blur, outline_blur)
img_text.composite(img_text_glow)
# Actual text on top of outline
ctx.clear()
_pset(ctx, text_font, fill_color=text_color, stroke_width=0)
ctx.text(*text_args)
ctx(img_text)
img.composite(img_text, img_text_box.x1, img_text_box.y1)
with Image(filename=str(p_src)) as img:
if desktop_label:
x_right = dpy.w - (opts.label_offset_x or opts.label_font_size)
_add_text_label( img,
x_right=x_right, y=(opts.label_offset_y or opts.label_font_size),
label=desktop_label, text_color=opts.label_color_font,
outline_color=opts.label_color_outline, outline_hard=1, outline_blur=5,
font_size=opts.label_font_size, font_path=opts.label_font_path )
else:
img_flip = img.flop if opts.h_flip else lambda: None
asp_img, asp_dpy = img.width / img.height, dpy.w / dpy.h
if abs(asp_img - asp_dpy) > opts.max_aspect_diff:
# Special scaling to one screen side or skip
if opts.tall_scale and asp_img < asp_dpy:
_tall_scale(img, opts.tall_scale, dpy.w, dpy.h, img_flip)
else: raise ImageMagickSkip
else:
# Normal crop/scale, flipping smaller image, if enabled
step1, step2 = img_flip, ft.partial(_crop_resize, img, dpy.w, dpy.h)
if asp_img > asp_dpy: step1, step2 = step2, step1
step1(); step2()
if opts.label_font_path and opts.label_font_size:
# Filename text label
_add_text_label( img,
x=(opts.label_offset_x or opts.label_font_size),
y=(opts.label_offset_y or opts.label_font_size),
label=p_src.name, text_color=opts.label_color_font,
outline_color=opts.label_color_outline, outline_hard=1, outline_blur=5,
font_size=opts.label_font_size, font_path=opts.label_font_path )
img.strip() # mostly to remove icc color profiles that cause libpng warnings in WMs
img.save(filename=f'{dst_fmt}:{p_dst}')
class ImagePickerError(Exception): pass
@dcs.dataclass
class ImageProcessingOpts:
cache: bool; hash_person: str; desktop_label: bool
flip_chance: float = 0.5; magick: any = None
@dcs.dataclass
class ImagePickerOpts:
desktop_same: bool; proc: any
dir_refresh_interval: float = 12*3600; proc_retries: int = 8
cleanup: bool = True; cleanup_chance: float = 0.05; cleanup_size: int = 1000 * 2**20
class ImagePath(pl.PosixPath):
__slots__ = 'img_src',
@classmethod
def with_src_path(cls, p_src, p):
(p := cls(p)).img_src = p_src
return p
class ImagePicker:
image_ext_re = re.compile(r'(?i)\.(bmp|png|jpe?g|webp)$')
def __init__(self, paths, opts):
self.p_skip, self.opts = set(), opts
self.boot_tag = hash_str(pl.Path('/proc/sys/kernel/random/boot_id').read_text(), 6)
self._p_list_init(paths)
def __enter__(self):
self.proc_dir = None
if self.opts.proc and self.opts.proc.cache:
self.proc_dir = pl.Path('~/.cache/fgbg').expanduser()
self.proc_dir.mkdir(exist_ok=True)
self.proc_dir_tmp = pl.Path(os.environ.get('XDG_RUNTIME_DIR') or '/tmp') / 'fgbg'
if not self.proc_dir or self.opts.proc.desktop_label:
self.proc_dir_tmp.mkdir(0o700, exist_ok=True)
if not self.proc_dir: self.proc_dir = self.proc_dir_tmp
self.proc_files = dict()
return self
def __exit__(self, *err):
if self.opts.proc and not self.opts.proc.cache and self.opts.cleanup:
for p in self.proc_files.values(): p.unlink(missing_ok=True)
def _p_list_init(self, paths):
self.p_list, self.p_map, self.p_map_mtimes = list(), dict(), dict()
for p in map(pl.Path, paths): self._p_list_refresh(p)
self.p_map_ts = time.monotonic()
if not any(it.chain.from_iterable(self.p_map.values())):
raise ImagePickerError(f'No images matched for path(s): {paths}')
def _p_list_refresh(self, p, p_list_nx_cleanup=False):
dir_updates = dict()
if not p.is_dir(): return self.p_map.setdefault(None, {p})
for root, dirs, files in os.walk(p, followlinks=True):
root = pl.Path(root).resolve()
if (ts := root.stat().st_mtime) == self.p_map_mtimes.get(root): continue
dir_updates[root] = self.p_map.get(root, set())
self.p_map[root], self.p_map_mtimes[root] = set(), ts
for name in files:
if not self.image_ext_re.search(name): continue
self.p_map[root].add(root / name)
# Add/remove files to/from current p_list shuffle
for root, p_set in dir_updates.items():
self.p_list.extend((root, p) for p in (self.p_map[root] - p_set))
if p_list_nx_cleanup: # only used when bumping into removed files
nx_paths = p_set - self.p_map[root]
self.p_list = list((root, p) for root, p in self.p_list if p not in nx_paths)
def _p_list_get(self):
delta, p_list_updated = self.opts.dir_refresh_interval, False
if delta and self.p_map_ts < time.monotonic() - delta:
for p, mtime in self.p_map_mtimes.items():
if p is None: continue # direct file paths
try:
if mtime != p.stat().st_mtime: self._p_list_refresh(p)
except OSError: continue # assuming temporarily unavailable
self.p_map_ts = time.monotonic()
while True:
if not self.p_list and not p_list_updated:
self.p_list = list(it.chain.from_iterable(
((root, p) for p in path_set) for root, path_set in self.p_map.items() ))
p_list_updated = True
if not self.p_list and p_list_updated:
raise ImagePickerError('No suitable images in the shuffle') from None
n = random.randint(0, len(self.p_list) - 1)
if n == len(self.p_list) - 1: p = self.p_list.pop()
else: p, self.p_list[n] = self.p_list[n], self.p_list.pop()
p_root, p = p
if not p.exists():
self._p_list_refresh(p_root, True)
continue
if p not in self.p_skip: break
return p
def _cache_dir_cleanup(self):
files, files_keep = list(), set(self.proc_files.values())
for p in self.proc_dir.iterdir():
try: s = p.stat()
except OSError: continue
else: files.append((s.st_mtime, s.st_size, p))
files.sort(reverse=True)
files_size = sum(f[1] for f in files)
while files and files_size > self.opts.cleanup_size:
mtime, size, p = files.pop() # oldest one
if p in files_keep: continue
files_size -= size
p.unlink(missing_ok=True)
def get_random_image(self, dpy):
'Return random image path and remove it from current shuffle.'
for n in range(self.opts.proc_retries):
p_src = self._p_list_get()
p = self.proc_image_base(p_src, dpy)
if p: return ImagePath.with_src_path(p_src, p)
p_err(f'----- [retry count: {n+1} / {self.opts.proc_retries}]')
raise ImagePickerError(f'Failed to process bg image(s) after {n+1} attempts')
def get_iter(self, dpy_list, dx, dy):
'''Returns iterator of scaled image paths
for specified displays and x/y count of virtual desktops.'''
cache, label_fmt = dict(), '{x}x{y}' if len(dy) > 1 else '{x}'
for dpy, x, y in it.product(dpy_list, dx, dy):
if self.opts.desktop_same:
p = cache.get(('ds', dpy.n))
if not p: p = cache['ds', dpy.n] = self.get_random_image(dpy)
else: p = self.get_random_image(dpy)
if self.opts.proc.desktop_label:
p = self.proc_desktop_label(p, label_fmt, dpy, x, y)
if not p: raise ImagePickerError('Failed to add desktop-id label to pre-processed image')
yield p, dpy, x, y
def _proc_error_wrapper(func):
warn_skip_img, warn_skip = None, set()
@ft.wraps(func)
def _wrapper(self, p, *args, **kws):
nonlocal warn_skip_img, warn_skip
p_src = getattr(p, 'img_src', p)
if warn_skip and warn_skip_img != p_src: warn_skip.clear()
try:
with warnings.catch_warnings(record=True) as wrns:
# Change always -> error here to raise exceptions for full tracebacks
warnings.filterwarnings('always', message='.', module=r'^wand\.')
warn_skip_img, res = p_src, func(self, p, *args, **kws)
for w in wrns: # can usually be fixed by running "mogrify" on source
if (wm := str(w.message)) in warn_skip or warn_skip.add(wm): continue
p_err(f'----- WARNING: Non-fatal issue in image processing [ {p_src} ] - {wm}')
return res
except Exception as err:
p_err(f'----- ERROR: Failed to process image [ {p_src} ] - {err}')
import traceback
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
return _wrapper
@_proc_error_wrapper
def proc_image_base(self, p, dpy):
if self.opts.proc:
if isinstance(magick := self.opts.proc.magick, dict):
magick = magick.get(dpy.n, magick.get(None))
magick.h_flip = random.random() < self.opts.proc.flip_chance
img_hash = hash_file(p, person=self.opts.proc.hash_person)
img_hash = '\0'.join([img_hash, magick.opts_hash(), *map(str, [dpy.w, dpy.h])])
img_hash = hash_str(img_hash, 16, self.opts.proc.hash_person)
k, k_prev, p_src = f'current.{dpy.n}', f'last.{dpy.n}', p
if k in self.proc_files:
if self.opts.proc.cache and k_prev in self.proc_files:
self.proc_files[k_prev].unlink(missing_ok=True)
self.proc_files[k_prev] = self.proc_files[k]
p = self.proc_files[k] = self.proc_dir / f'fgbg.{img_hash}.png'
if not p.exists():
p_tmp = self.proc_files['tmp'] = p.parent / (p.name + '.tmp')
try: image_magick(p_src, p_tmp, magick, dpy)
except ImageMagickSkip:
self.p_skip.add(p_src)
return self.get_random_image(dpy)
p_tmp.rename(p)
else: p.touch()
if self.opts.cleanup and random.random() < self.opts.cleanup_chance:
self._cache_dir_cleanup()
return p
@_proc_error_wrapper
def proc_desktop_label(self, p, label_fmt, dpy, x, y):
# Keep two sets of paths and switch between them,
# as E detects same path and doesn't actually change background.
p_dst = list(( self.proc_dir_tmp /
f'fgbg.desktop.{self.boot_tag}.{v}.m{dpy.n}-x{x}-y{y}.png' ) for v in 'ab')
p_dst = sorted(p_dst, key=p_mtime)[0]
label = label_fmt.format(x=x+1, y=y+1) # to 1-indexed human value
image_magick(p, p_dst, self.opts.proc.desktop_label, dpy, desktop_label=label)
return p_dst
class SDDaemon:
'''Small daemon framework to manage
sleep/wakeup cycles and forking/systemd integration.'''
def _time_diff_str( self, ts, ts0=None, now='now', ext=None,
_units=dict( h=3600, m=60, s=1,
y=365.2422*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
res, s = list(), abs( (ts - ts0) if ts0 is not None
and not getattr(ts, 'total_seconds', False) else ts )
if not isinstance(s, (int, float)): s = s.total_seconds()
if s <= 0: return now
for unit, unit_s in sorted(_units.items(), key=lambda v: v[1], reverse=True):
val = math.floor(s / float(unit_s))
if not val: continue
res.append('{:.0f}{}'.format(val, unit))
if len(res) >= 2: break
s -= val * unit_s
if not res: return now
if ext: res.append(ext)
return ' '.join(res)
def __init__(self, wakeup_interval=None, fork=False, sd_status_tpl='Wakeup in {delta_str}'):
'fork=True would only do double-fork if not under systemd Type=notify control.'
import threading
self.sleep_event = threading.Event()
self.fork, self.wu_interval = fork, wakeup_interval
self.sd_ready, self.sd_status_tpl = None, sd_status_tpl
self.wu_next = time.monotonic() + self.wu_interval
if os.environ.get('NOTIFY_SOCKET'): self._init_systemd()
def _init_systemd(self):
import systemd.daemon
self.daemon = systemd.daemon
self.sd_ready = self.sd_ping_ts = self.sd_ping_interval = False
sd_pid, sd_usec = (os.environ.get(k) for k in ['WATCHDOG_PID', 'WATCHDOG_USEC'])
if sd_pid and sd_pid.isdigit() and int(sd_pid) == os.getpid():
self.sd_ping_interval = float(sd_usec) * 0.45 / 1e6 # ~ half of interval in seconds
if self.sd_ping_interval <= 0:
raise RuntimeError('Passed WATCHDOG_USEC interval <= 0')
if self.sd_ping_interval: self.sd_ping_ts = time.monotonic() + self.sd_ping_interval
def ping(self, status=None, ts=None):
'''Sends systemd ready/watchdog/status updates.
Returns delay before next required call if watchdog is enabled.
Can be called at any time, will only ping when necessary.'''
if self.sd_ready is None: return
if not self.sd_ready:
self.daemon.notify('READY=1')
if not status: status = 'Running...'
self.daemon.notify(f'STATUS={status}')
self.sd_ready = True
elif status: self.daemon.notify(f'STATUS={status}')
if self.sd_ping_ts:
if not ts: ts = time.monotonic()
delay = self.sd_ping_ts - ts
if delay <= 0:
self.daemon.notify('WATCHDOG=1')
while self.sd_ping_ts <= ts: self.sd_ping_ts += self.sd_ping_interval
delay = self.sd_ping_ts - ts
return delay
def wakeup_check(self, status=None):
'''Returns whether wakeup should be triggered and next check timestamp.
Also sets systemd status and pings sd watchdog if necessary.'''
if self.fork and self.sd_ready is None:
for n in range(2):
pid = os.fork()
if pid: os._exit(0)
ts = time.monotonic()
wakeup_event, delay_wu = False, self.wu_next - ts
while self.wu_next <= ts:
wakeup_event = True
self.wu_next += self.wu_interval
delay_wu = self.wu_next - ts
if not status:
status = ( None if not self.sd_status_tpl else
self.sd_status_tpl.format(delta_str=self._time_diff_str(delay_wu)) )
ts_next = ts + min(self.ping(status, ts) or delay_wu, delay_wu)
return wakeup_event, ts_next
def wakeup_sleep(self, ts_next, delay_max=None):
'sleep() until ts_next. Can be interrupted by wakeup_signal, returning True in that case.'
if not ts_next: return
delay = ts_next - time.monotonic()
if delay_max is not None: delay = min(delay, delay_max)
if delay > 0:
self.sleep_event.clear()
return self.sleep_event.wait(delay)
def wakeup_signal(self, *sig_args):
'Sets wakeup to trigger on next check and stops wakeup_sleep().'
self.wu_next = time.monotonic()
self.sleep_event.set()
def wakeup_reset(self):
'Reschedules wakeup after wakeup_interval from now.'
self.wu_next = time.monotonic() + self.wu_interval
def main(args=None):
def_proc_label = 'Liberation Sans-16', '#baebf9', 'black', 0, 0
def_desktop_label = 'Permanent Marker-60', '#a8563a', 'black', 20, 1
import argparse, textwrap
dd = lambda text: (textwrap.dedent(text).strip('\n') + '\n').replace('\t', ' ')
fill = lambda s,w=90,ind='',ind_next=' ',**k: textwrap.fill(
s, w, initial_indent=ind, subsequent_indent=ind if ind_next is None else ind_next, **k )
parser = argparse.ArgumentParser(
usage='%(prog)s [options] path ...',
formatter_class=argparse.RawTextHelpFormatter,
description='Set E desktop background via DBus API.' )
parser.add_argument('path', nargs='*', help=dd('''
Background image or directory path(s).
If path is a directory, images get matches by
bmp/png/jpe?g/webp extension (case-insensitive)
and get picked at random (with no repeats if possible).'''))
group = parser.add_argument_group('Monitor and virtual desktop')
group.add_argument('-m', '--monitor',
action='append', type=int, metavar='n', help=dd('''
Physical monitor offset number (starts from 0)
to set bg on (default: all). Can be specified multiple times.'''))
group.add_argument('-x', '--desktop-x',
action='append', type=int, metavar='n', help=dd('''
Desktop X offset to set bg on. Can be specified multiple times.
If not specified, all X values will be used (default).'''))
group.add_argument('-y', '--desktop-y',
action='append', type=int, metavar='n', help=dd('''
Desktop Y offset to set bg on. Can be specified multiple times.
If not specified, all Y values will be used (default).'''))
group.add_argument('-s', '--desktop-same', action='store_true',
help='Pick and use same one image for all virtual desktops within monitor.')
group = parser.add_argument_group('Continuous operation mode')
group.add_argument('-d', '--daemon', action='store_true',
help=fill('Run in continous daemon mode,'
' setting new background on interval or when receiving HUP/QUIT signal.'))
group.add_argument('-i', '--interval', metavar='((hh:)mm:)ss', default='4:23:47',
help='Interval between switching to a new background image(s). Default: %(default)s')
group.add_argument('-n', '--next', action='store_true',
help='Find running daemon pid(s) in /proc by script name, send SIGHUP and exit.')
group.add_argument('--fork', action='store_true',
help='Fork or indicate systemd startup only after setting initial background.')
group.add_argument('--no-image-proc-fork', action='store_true', help=dd('''
Do not fork for image processing - run as a single process.
Normally subprocess allows to not allocate memory required by imagemagick
in the main process, which will otherwise never be freed to the OS, causing
long-running process to grow to the requirements of the largest handled image.'''))
group.add_argument('--initial-delay', action='store_true',
help='Only set initial background after first interval of time passes.')
group.add_argument('--xprintidle-interval',
metavar='((hh:)mm:)ss', default='30:00', help=dd('''
Run "xprintidle" tool to check for activity, if it is available,
and only change backgrounds if something happened within specified interval.
Specify 0 or "-" to explicitly disable these checks. Default: %(default)s'''))
group.add_argument('--dir-refresh-interval',
metavar='((hh:)mm:)ss', default='12:00:00', help=dd('''
Min interval between checking mtime values on directories under specified paths.
It's checked before bg changes to detect any new/removed images.
Can be set to zero to disable this check entirely. Default: %(default)s'''))
group = parser.add_argument_group('Image processing options',
description='Enabling this requires ImageMagick and wand-py bindings for it.')
group.add_argument('-p', '--process', action='store_true', help=dd('''
Enable processing for image to desktop background using ImageMagick/wand-py.
This includes smart scaling, adding text label(s) to the corner and such stuff.'''))
group.add_argument('--no-cache', action='store_true',
help='Disable using ~/.cache/fgbg for caching processed images.')
group.add_argument('--label-style',
metavar='font-spec(:color)(:outline)(:ox)(:oy)',
default=':'.join(map(str, def_proc_label)), help=dd('''
Font specification for fontconfig.
Use command like "fc-match -b \'sans-15\'" to check what it\'d resolve to.
Special value "none" disables it. Default: %(default)s'''))
group.add_argument('--random-name', action='store_true',
help='Generate randomized filenames'
' for resulting tmp-images, instead of predictable one(s).')
group.add_argument('--hflip-chance',
type=float, metavar='0-1.0', default=ImageProcessingOpts.flip_chance,
help='Chance of mirroring image horizontally, for extra variety. Default: %(default)s')
group.add_argument('--desktop-label-disable', action='store_true',
help=dd('''
Disable putting virtual desktop number overlay in the corner of the image.
It takes an extra image-processing pass to add these,
and resulting one-off images are stored in tmpfs instead of shared cache.'''))
group.add_argument('--desktop-label-style',
metavar='font-spec(:color)(:outline)(:ox)(:oy)',
default=':'.join(map(str, def_desktop_label)), help=dd('''
Font specification for per-virtual-desktop label for fontconfig.
Same format as --label-style. Default: %(default)s'''))
group.add_argument('-o', '--magick-opts', action='append',
metavar='[mN;]key1=value1;key2.key3=value2;...', help=dd('''
Any free-form processing options to assign as-is to ImageMagickOpts object.
Example: -o 'm0; tall_scale.gravity=0.7' -o 'm1; label_font_size=23; tall_scale=None'
Keys are strings, which can be dot-separated to assign value to a child object.
Values should be in a "python literal" format, with no extra type conversion.
Optional m0, m1, ... prefix can be added to set monitor to apply these to.
Can be used multiple times, to apply all specified parameters.'''))
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if opts.next:
def _find_pids(name, self_pid):
self_pid = str(self_pid)
for p in pl.Path('/proc').glob('*/cmdline'):
try: cmd = p.read_bytes()
except OSError: pass
for cmd in cmd.decode().split('\0'):
if cmd.rsplit('/')[-1] == name:
pid = p.parent.name
if pid == self_pid or not pid.isdigit(): break
yield int(pid)
name = pl.Path(sys.argv[0]).name
pid_set = set(_find_pids(name, os.getpid()))
if not pid_set: parser.error( 'Failed to find'
f' running daemon pid(s) by script name: {name!r}' )
for pid in pid_set: os.kill(pid, signal.SIGHUP)
return
if not opts.path: parser.error('At least one image file/dir path argument is required')
mon_indexes, dx, dy = opts.monitor, opts.desktop_x, opts.desktop_y
image_proc_opts = opts.process
if image_proc_opts:
def _parse_label_style(style, defaults):
font, color_font, color_label, ox, oy = ( (v or v_def)
for v, v_def in it.zip_longest(style.split(':'), defaults) )
if font != 'none':
proc = sp.run(['fc-match', '-f', '%{size}\n%{file}\n', font], stdout=sp.PIPE)
font_size, font_path = proc.stdout.decode().splitlines()
else: font_path, font_size = None, 0
return ImageMagickOpts(
font_path, int(font_size), color_font, color_label, int(ox), int(oy) )
magick = _parse_label_style(opts.label_style, def_proc_label)
if opts.magick_opts:
import ast
updates = dict()
for opt_raw in map(str.strip, opts.magick_opts):
try:
dpy, opt = ( (int(m.group(1)), opt_raw[m.end():])
if (m := re.search(r'^[Mm](\d+)\s*;', opt_raw)) else (None, opt_raw) )
for opt in map(str.strip, opt.split(';')):
if not (opt := opt.strip()): continue
k, v = map(str.strip, opt.split('=', 1))
k, v = tuple(s.strip() for s in k.split('.', 1)), ast.literal_eval(v)
updates.setdefault(dpy, dict())[k] = v
except: parser.error(f'Failed to parse -o/--magick-opt string: {opt_raw}')
for dpy, upd in updates.items():
m = magick
for k, v in sorted(upd.items(), key=lambda v: (v is not None, v)):
if len(k) != 1: v = dcs.replace(getattr(m, k[0]), **{k[1]: v})
m = dcs.replace(m, **{k[0]: v})
updates[dpy] = m
if not set(updates).difference([None]): magick = updates[None]
else: magick, updates[None] = updates, magick
desktop_label_opts = None
if not opts.desktop_label_disable:
desktop_label_opts = _parse_label_style(opts.desktop_label_style, def_desktop_label)
proc_hash_person = 'fgbg.1'.encode() if not opts.random_name else os.urandom(8)
image_proc_opts = ImageProcessingOpts(
cache=not opts.no_cache, hash_person=proc_hash_person,
desktop_label=desktop_label_opts, flip_chance=opts.hflip_chance, magick=magick )
picker_opts = ImagePickerOpts(
proc=image_proc_opts, desktop_same=opts.desktop_same,
dir_refresh_interval=parse_time_delta(opts.dir_refresh_interval) )
try: images = ImagePicker(opts.path, picker_opts)
except ImagePickerError as err: parser.error(str(err))
with images:
daemon = False
if opts.daemon:
daemon = SDDaemon( parse_time_delta(opts.interval),
opts.fork, sd_status_tpl='Next background cycle in {delta_str}' )
for sig in 'int term'.upper().split():
signal.signal(getattr(signal, f'SIG{sig}'), lambda sig,frm: sys.exit(0))
for sig in 'hup quit'.upper().split():
signal.signal(getattr(signal, f'SIG{sig}'), daemon.wakeup_signal)
if not opts.fork: daemon.ping() # send "ready" immediately
idle_interval = None
if opts.xprintidle_interval and '-' not in opts.xprintidle_interval:
try: sp.run(['xprintidle'], stdout=sp.PIPE, check=True) # check if available
except (OSError, sp.CalledProcessError): pass
else: idle_interval = parse_time_delta(opts.xprintidle_interval)
bg_cycle = not daemon or not opts.initial_delay
ts_next = ts_idle = delay_max = None
while True:
if bg_cycle or ts_idle:
if daemon and idle_interval:
ts, ts_idle_old = time.monotonic(), True
if ts_idle: # in "idle delay" mode
if ts >= ts_idle: ts_idle = ts_idle_old = delay_max = None
else: delay_max = ts_idle - ts
if not ts_idle: # check and maybe set ts_idle for when to check next
try: xidle = int(sp.run(['xprintidle'], stdout=sp.PIPE).stdout) / 1000
except (OSError, ValueError): xidle = 0
if xidle > idle_interval: ts_idle = ts + idle_interval / 2
if not ts_idle and not ts_idle_old: daemon.wakeup_reset() # new interval from now
if not ts_idle:
dpy_list = dpy_info()
if mon_indexes: dpy_list = list(dpy_list[n] for n in mon_indexes)
if opts.no_image_proc_fork: set_bg_image(images, dpy_list, dx, dy)
elif pid := os.fork():
if os.waitpid(pid, 0)[1]: p_err('Failed to process/set bg image in a subprocess')
else:
set_bg_image(images, dpy_list, dx, dy)
exit(0)
if not daemon:
picker_opts.cleanup = False # keep processed files after exit, if any
break
if daemon.wakeup_sleep(ts_next, delay_max): ts_idle = None
status = None if not ts_idle else 'DE is idle, will update background after activity'
bg_cycle, ts_next = daemon.wakeup_check(status)
if __name__ == '__main__': sys.exit(main())