Skip to content

Commit

Permalink
Merge pull request #2595 from devos50/fix_resume_data
Browse files Browse the repository at this point in the history
Refactored checkpointing system
  • Loading branch information
devos50 authored Nov 19, 2016
2 parents 105892a + 6b35203 commit 71b5242
Show file tree
Hide file tree
Showing 18 changed files with 314 additions and 310 deletions.
81 changes: 28 additions & 53 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from traceback import print_exc

from twisted.internet import reactor
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.defer import Deferred, inlineCallbacks, DeferredList
from twisted.internet.task import deferLater, LoopingCall
from twisted.internet.threads import deferToThread
from twisted.python.threadable import isInIOThread
Expand Down Expand Up @@ -368,7 +368,7 @@ def add(self, tdef, dscfg, pstate=None, initialdlstatus=None, setupDelay=0, hidd
self.downloads[infohash] = d
setup_deferred = d.setup(dscfg, pstate, initialdlstatus, wrapperDelay=setupDelay,
share_mode=share_mode, checkpoint_disabled=checkpoint_disabled)
setup_deferred.addCallback(self.on_download_wrapper_created)
setup_deferred.addCallback(self.on_download_handle_created)

if d and not hidden and self.session.get_megacache():
@forceDBThread
Expand All @@ -389,15 +389,12 @@ def write_my_pref():

return d

def on_download_wrapper_created(self, (d, pstate)):
""" Called by network thread """
try:
if pstate is None and not d.get_checkpoint_disabled():
# Checkpoint at startup
(infohash, pstate) = d.network_checkpoint()
self.save_download_pstate(infohash, pstate)
except:
print_exc()
def on_download_handle_created(self, download):
"""
This method is called when the download handle has been created.
Immediately checkpoint the download and write the resume data.
"""
return download.checkpoint()

def remove(self, d, removecontent=False, removestate=True, hidden=False):
""" Called by any thread """
Expand Down Expand Up @@ -570,7 +567,7 @@ def schedule_download():

self.previous_active_downloads = new_active_downloads
if do_checkpoint:
self.session.checkpoint()
self.session.checkpoint_downloads()

if self.state_cb_count % 4 == 0 and self.tunnel_community:
self.tunnel_community.monitor_downloads(states_list)
Expand Down Expand Up @@ -668,49 +665,27 @@ def resume_download(self, filename, initialdlstatus=None, initialdlstatus_dict={
else:
self._logger.info("tlm: could not resume checkpoint %s %s %s", filename, tdef, dscfg)

def checkpoint(self, stop=False, checkpoint=True, gracetime=2.0):
""" Called by any thread, assume sesslock already held """
# Even if the list of Downloads changes in the mean time this is
# no problem. For removals, dllist will still hold a pointer to the
# Download, and additions are no problem (just won't be included
# in list of states returned via callback.
#
dllist = self.downloads.values()
self._logger.debug("tlm: checkpointing %s stopping %s", len(dllist), stop)

return deferLater(reactor, 0, self.network_checkpoint_callback, dllist, stop, checkpoint, gracetime)

def network_checkpoint_callback(self, dllist, stop, checkpoint, gracetime):
""" Called by network thread """
if checkpoint:
for d in dllist:
try:
# Tell all downloads to stop, and save their persistent state
# in a infohash -> pstate dict which is then passed to the user
# for storage.
#
if stop:
(infohash, pstate) = d.network_stop(False, False)
else:
(infohash, pstate) = d.network_checkpoint()

self._logger.debug("tlm: network checkpointing: %s %s", d.get_def().get_name(), pstate)
def checkpoint_downloads(self):
"""
Checkpoints all running downloads in Tribler.
Even if the list of Downloads changes in the mean time this is no problem.
For removals, dllist will still hold a pointer to the download, and additions are no problem
(just won't be included in list of states returned via callback).
"""
downloads = self.downloads.values()
deferred_list = []
self._logger.debug("tlm: checkpointing %s downloads", len(downloads))
for download in downloads:
deferred_list.append(download.checkpoint())

self.save_download_pstate(infohash, pstate)
return DeferredList(deferred_list)

except Exception as e:
self._logger.exception("Exception while checkpointing: %s", d.get_def().get_name())

if stop:
delay = 0
# Some grace time for early shutdown tasks
if self.shutdownstarttime is not None:
now = timemod.time()
diff = now - self.shutdownstarttime
if diff < gracetime:
self._logger.info("tlm: shutdown: delaying for early shutdown tasks %s", gracetime - diff)
delay = gracetime - diff
return deferLater(reactor, delay, self.network_shutdown)
def shutdown_downloads(self):
"""
Shutdown all downloads in Tribler.
"""
for download in self.downloads.values():
download.stop()

def remove_pstate(self, infohash):
def do_remove():
Expand Down
112 changes: 61 additions & 51 deletions Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@

import libtorrent as lt
from twisted.internet import defer, reactor
from twisted.internet.defer import Deferred, CancelledError
from twisted.internet.defer import Deferred, CancelledError, succeed
from twisted.internet.task import LoopingCall

from Tribler.Core import NoDispersyRLock
from Tribler.Core.DownloadConfig import DownloadStartupConfig, DownloadConfigInterface
from Tribler.Core.DownloadState import DownloadState
from Tribler.Core.Libtorrent import checkHandleAndSynchronize, waitForHandleAndSynchronize
from Tribler.Core.Libtorrent import checkHandleAndSynchronize
from Tribler.Core.TorrentDef import TorrentDefNoMetainfo, TorrentDef
from Tribler.Core.Utilities import maketorrent
from Tribler.Core.Utilities.torrent_utils import get_info_from_handle
from Tribler.Core.exceptions import SaveResumeDataError
from Tribler.Core.osutils import fix_filebasename
from Tribler.Core.simpledefs import (DLSTATUS_WAITING4HASHCHECK, DLSTATUS_HASHCHECKING, DLSTATUS_METADATA,
DLSTATUS_DOWNLOADING, DLSTATUS_SEEDING, DLSTATUS_ALLOCATING_DISKSPACE,
Expand Down Expand Up @@ -151,6 +153,9 @@ def __init__(self, session, tdef):
self._checkpoint_disabled = False

self.deferreds_resume = []
self.deferreds_handle = []

self.handle_check_lc = self.register_task("handle_check", LoopingCall(self.check_handle))

def __str__(self):
return "LibtorrentDownloadImpl <name: '%s' hops: %d checkpoint_disabled: %d>" % \
Expand All @@ -168,6 +173,27 @@ def set_checkpoint_disabled(self, disabled=True):
def get_checkpoint_disabled(self):
return self._checkpoint_disabled

def check_handle(self):
"""
Check whether the handle exists and is valid. If so, stop the looping call and fire the deferreds waiting
for the handle.
"""
if self.handle and self.handle.is_valid():
self.handle_check_lc.stop()
for deferred in self.deferreds_handle:
deferred.callback(self.handle)

def get_handle(self):
"""
Returns a deferred that fires with a valid libtorrent download handle.
"""
if self.handle and self.handle.is_valid():
return succeed(self.handle)

deferred = Deferred()
self.deferreds_handle.append(deferred)
return deferred

def setup(self, dcfg=None, pstate=None, initialdlstatus=None,
wrapperDelay=0, share_mode=False, checkpoint_disabled=False):
"""
Expand All @@ -179,6 +205,7 @@ def setup(self, dcfg=None, pstate=None, initialdlstatus=None,
network_create_engine_wrapper.
"""
# Called by any thread, assume sessionlock is held
self.handle_check_lc.start(1, now=False)
self.set_checkpoint_disabled(checkpoint_disabled)

try:
Expand Down Expand Up @@ -338,7 +365,7 @@ def network_create_engine_wrapper(self, pstate, initialdlstatus=None, checkpoint
self.cew_scheduled = False

# Return a deferred with the callback already being called
return defer.succeed((self, pstate))
return defer.succeed(self)

def get_anon_mode(self):
return self.get_hops() > 0
Expand Down Expand Up @@ -496,7 +523,7 @@ def process_alert(self, alert, alert_type):

alert_types = ('tracker_reply_alert', 'tracker_error_alert', 'tracker_warning_alert', 'metadata_received_alert',
'file_renamed_alert', 'performance_alert', 'torrent_checked_alert', 'torrent_finished_alert',
'save_resume_data_alert')
'save_resume_data_alert', 'save_resume_data_failed_alert')

if alert_type in alert_types:
getattr(self, 'on_' + alert_type)(alert)
Expand All @@ -505,15 +532,12 @@ def process_alert(self, alert, alert_type):

def on_save_resume_data_alert(self, alert):
"""
alert to handle save_resume_data_alert
it will assign stored resume data in an attribute,
and write it to a file
Callback for the alert that contains the resume data of a specific download.
This resume data will be written to a file on disk.
"""
resume_data = alert.resume_data

if not self.pstate_for_restart:
self.pstate_for_restart = self.network_get_persistent_state()

self.pstate_for_restart = self.get_persistent_download_config()
self.pstate_for_restart.set('state', 'engineresumedata', resume_data)
self._logger.debug("%s get resume data %s", hexlify(resume_data['info-hash']), resume_data)

Expand All @@ -532,6 +556,14 @@ def on_save_resume_data_alert(self, alert):
# empties the deferred list
self.deferreds_resume = []

def on_save_resume_data_failed_alert(self, alert):
# fire errback for all deferreds_resume
for deferred_r in self.deferreds_resume:
deferred_r.errback(SaveResumeDataError(alert.msg))

# empties the deferred list
self.deferreds_resume = []

def on_tracker_reply_alert(self, alert):
self.tracker_status[alert.url] = [alert.num_peers, 'Working']

Expand Down Expand Up @@ -796,14 +828,13 @@ def _on_resume_err(self, failure):
failure.trap(CancelledError)
self._logger.error("Resume data cancelled")

@waitForHandleAndSynchronize()
def save_resume_data(self):
"""
method to save resume data.
Save the resume data of a download. This method returns a deferred that fires when the resume data is available.
Note that this method only calls save_resume_data once on subsequent calls.
"""
# only call libtorrent save resume in the first call
if not self.deferreds_resume:
self.handle.save_resume_data()
self.get_handle().addCallback(lambda handle: handle.save_resume_data())

defer_resume = Deferred()
defer_resume.addErrback(self._on_resume_err)
Expand Down Expand Up @@ -1018,7 +1049,7 @@ def network_stop(self, removestate, removecontent):
self._logger.debug("LibtorrentDownloadImpl: network_stop %s", self.tdef.get_name())
self.cancel_all_pending_tasks()

pstate = self.network_get_persistent_state()
pstate = self.get_persistent_download_config()
if self.handle is not None:
self._logger.debug("LibtorrentDownloadImpl: network_stop: engineresumedata from torrent handle")
self.pstate_for_restart = pstate
Expand Down Expand Up @@ -1048,12 +1079,9 @@ def network_stop(self, removestate, removecontent):
self._logger.debug(
"LibtorrentDownloadImpl: network_stop: Could not reuse engineresumedata as pstart_for_restart is None")

# Offload the removal of the dlcheckpoint to another thread
if removestate:
self.session.lm.remove_pstate(self.tdef.get_infohash())

return (self.tdef.get_infohash(), pstate)

def get_content_dest(self):
""" Returns the file to which the downloaded content is saved. """
return os.path.join(self.get_dest_dir(), self.correctedinfoname)
Expand All @@ -1078,7 +1106,7 @@ def schedule_create_engine(_):
self.cew_scheduled = True
create_engine_wrapper_deferred = self.network_create_engine_wrapper(
self.pstate_for_restart, initialdlstatus, share_mode=self.get_share_mode())
create_engine_wrapper_deferred.addCallback(self.session.lm.on_download_wrapper_created)
create_engine_wrapper_deferred.addCallback(self.session.lm.on_download_handle_created)

can_create_engine_deferred = self.can_create_engine_wrapper()
can_create_engine_deferred.addCallback(schedule_create_engine)
Expand All @@ -1104,33 +1132,19 @@ def get_dest_files(self, exts=None):
return dest_files

def checkpoint(self):
""" Called by any thread """
if self._checkpoint_disabled:
self._logger.warning("Ignoring checkpoint() call as is checkpointing disabled for this download.")
else:
infohash, pstate = self.network_checkpoint()
reactor.callFromThread(self.session.lm.save_download_pstate, infohash, pstate)

def network_checkpoint(self):
""" Called by network thread """
with self.dllock:
pstate = self.network_get_persistent_state()
resdata = None
if self.handle is None:
if self.pstate_for_restart is not None:
resdata = self.pstate_for_restart.get('state', 'engineresumedata')
elif isinstance(self.tdef, TorrentDef):
self.save_resume_data()

pstate.set('state', 'engineresumedata', resdata)
return (self.tdef.get_infohash(), pstate)
"""
Checkpoint this download. Returns a deferred that fires when the checkpointing is completed.
"""
if self._checkpoint_disabled or not self.handle or not self.handle.is_valid():
self._logger.warning("Ignoring checkpoint() call as checkpointing is disabled for this download "
"or the handle is not ready.")
return succeed(None)

def network_get_persistent_state(self):
# Assume sessionlock is held
return self.save_resume_data()

def get_persistent_download_config(self):
pstate = self.dlconfig.copy()

# Reset unpicklable params
pstate.set('downloadconfig', 'mode', DLMODE_NORMAL)

# Add state stuff
Expand Down Expand Up @@ -1173,23 +1187,20 @@ def get_magnet_link(self):
#
# External addresses
#
@waitForHandleAndSynchronize()
def add_peer(self, addr):
""" Add a peer address from 3rd source (not tracker, not DHT) to this download.
@param (hostname_ip,port) tuple
"""
self.handle.connect_peer(addr, 0)
self.get_handle().addCallback(lambda handle: handle.connect_peer(addr, 0))

@waitForHandleAndSynchronize()
def set_priority(self, prio):
self.handle.set_priority(prio)
self.get_handle().addCallback(lambda handle: handle.set_priority(prio))

@waitForHandleAndSynchronize(True)
def dlconfig_changed_callback(self, section, name, new_value, old_value):
if section == 'downloadconfig' and name == 'max_upload_rate':
self.handle.set_upload_limit(int(new_value * 1024))
self.get_handle().addCallback(lambda handle: handle.set_upload_limit(int(new_value * 1024)))
elif section == 'downloadconfig' and name == 'max_download_rate':
self.handle.set_download_limit(int(new_value * 1024))
self.get_handle().addCallback(lambda handle: handle.set_download_limit(int(new_value * 1024)))
elif section == 'downloadconfig' and name in ['correctedfilename', 'super_seeder']:
return False
return True
Expand All @@ -1198,9 +1209,8 @@ def dlconfig_changed_callback(self, section, name, new_value, old_value):
def get_share_mode(self):
return self.handle.status().share_mode

@waitForHandleAndSynchronize(True)
def set_share_mode(self, share_mode):
self.handle.set_share_mode(share_mode)
self.get_handle().addCallback(lambda handle: handle.set_share_mode(share_mode))


class LibtorrentStatisticsResponse:
Expand Down
Loading

0 comments on commit 71b5242

Please sign in to comment.