Skip to content

Commit

Permalink
re-use background worker for recalculate_delays (and remove one thread)
Browse files Browse the repository at this point in the history
git-svn-id: https://xpra.org/svn/Xpra/trunk@4727 3bb7dfac-3a0b-4e04-842a-767bc560f471
totaam committed Nov 10, 2013
1 parent 196a436 commit 40fe2f5
Showing 1 changed file with 50 additions and 58 deletions.
108 changes: 50 additions & 58 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
from xpra.net.protocol import compressed_wrapper, Compressed
from xpra.daemon_thread import make_daemon_thread
from xpra.os_util import platform_name, StringIOClass, thread, Queue, get_machine_id, get_user_uuid
from xpra.server.background_worker import add_work_item
from xpra.util import std, typedict

NOYIELD = os.environ.get("XPRA_YIELD") is None
@@ -291,84 +292,75 @@ def __init__(self, protocol, disconnect_cb, idle_add, timeout_add, source_remove
protocol.set_packet_source(self.next_packet)
self.datapacket_thread = make_daemon_thread(self.data_to_packet, "encode")
self.datapacket_thread.start()
#for managing the recalculate_delays work:
self.calculate_window_ids = set()
self.calculate_event = Event()
self.calculate_thread = make_daemon_thread(self.calculate_delay_thread, "calculate_delay")
self.calculate_thread.start()
self.calculate_due = False
self.calculate_last_time = 0

def __str__(self):
return "ServerSource(%s)" % self.protocol

def is_closed(self):
return self.close_event.isSet()

def calculate_delay_thread(self):
try:
self.do_calculate_delay_thread()
except:
if not self.is_closed():
log.error("error in calculate thread!", exc_info=True)

def do_calculate_delay_thread(self):
def recalculate_delays(self):
""" calls update_averages() on ServerSource.statistics (GlobalStatistics)
and WindowSource.statistics (WindowPerformanceStatistics) for each window id in calculate_window_ids,
no more often than every RECALCULATE_DELAY
"""
RECALCULATE_DELAY = 0.250 #250ms
AFTER_EACH_WINDOW_WAIT = 0.010 #10ms
INITIAL_WAIT = 0.025 #25ms
while not self.is_closed():
self.calculate_event.wait()
if self.is_closed():
return
wait_time = RECALCULATE_DELAY-INITIAL_WAIT
self.close_event.wait(INITIAL_WAIT) #give time for the source/windows to disappear
debug("recalculate_delays()")
if self.is_closed():
return
self.statistics.update_averages()
wids = list(self.calculate_window_ids) #make a copy so we don't clobber new wids
for wid in wids:
self.calculate_window_ids.remove(wid)
ws = self.window_sources.get(wid)
if ws is None:
continue
try:
ws.statistics.update_averages()
ws.calculate_batch_delay()
ws.reconfigure()
except:
log.error("error on window %s", wid, exc_info=True)
if self.is_closed():
return
self.statistics.update_averages()
wids = list(self.calculate_window_ids) #make a copy so we don't clobber new wids
for wid in wids:
self.calculate_window_ids.remove(wid)
ws = self.window_sources.get(wid)
if ws is None:
continue
try:
ws.statistics.update_averages()
ws.calculate_batch_delay()
ws.reconfigure()
except:
log.error("error on window %s", wid, exc_info=True)
wait_time -= AFTER_EACH_WINDOW_WAIT
self.close_event.wait(AFTER_EACH_WINDOW_WAIT)
if self.is_closed():
return
#calculate weighted average as new global default delay:
now = time.time()
wdimsum, wdelay = 0, 0
for ws in list(self.window_sources.values()):
if ws.batch_config.last_updated<=0:
continue
w, h = ws.window_dimensions
time_w = 2.0+(now-ws.batch_config.last_updated) #add 2 seconds to even things out
weight = w*h*time_w
wdelay += ws.batch_config.delay*weight
wdimsum += weight
if wdimsum>0:
delay = wdelay / wdimsum
self.global_batch_config.last_delays.append((now, delay))
self.global_batch_config.delay = delay
self.calculate_event.clear()
if wait_time>0:
#wait before trying to run again:
self.close_event.wait(wait_time)
#calculate weighted average as new global default delay:
now = time.time()
wdimsum, wdelay = 0, 0
for ws in list(self.window_sources.values()):
if ws.batch_config.last_updated<=0:
continue
w, h = ws.window_dimensions
time_w = 2.0+(now-ws.batch_config.last_updated) #add 2 seconds to even things out
weight = w*h*time_w
wdelay += ws.batch_config.delay*weight
wdimsum += weight
if wdimsum>0:
delay = wdelay / wdimsum
self.global_batch_config.last_delays.append((now, delay))
self.global_batch_config.delay = delay

def may_recalculate(self, wid):
self.calculate_window_ids.add(wid)
self.calculate_event.set()
if self.calculate_due:
#already due
return
self.calculate_due = True
def recalculate_work():
self.calculate_due = False
self.calculate_last_time = time.time()
self.recalculate_delays()
delta = time.time() - self.calculate_last_time
RECALCULATE_DELAY = 1.0 #1s
if delta>RECALCULATE_DELAY:
add_work_item(recalculate_work)
else:
self.timeout_add(int(1000*(RECALCULATE_DELAY-delta)), recalculate_work)

def close(self):
self.close_event.set()
self.calculate_event.set()
self.damage_data_queue.put(None, block=False)
for window_source in self.window_sources.values():
window_source.cleanup()

0 comments on commit 40fe2f5

Please sign in to comment.