diff --git a/raiden/raiden_service.py b/raiden/raiden_service.py index fe695e298b2..9d6dfcc978e 100644 --- a/raiden/raiden_service.py +++ b/raiden/raiden_service.py @@ -358,7 +358,7 @@ def start(self): # - The alarm must complete its first run before the transport is started, # to reject messages for closed/settled channels. self.alarm.register_callback(self._callback_new_block) - self.alarm.first_run() + self.alarm.first_run(last_log_block_number) # The transport must not ever be started before the alarm task's first # run, because it's this method which synchronizes the node with the diff --git a/raiden/tasks.py b/raiden/tasks.py index 1af03eee82b..0ff253a208d 100644 --- a/raiden/tasks.py +++ b/raiden/tasks.py @@ -86,7 +86,7 @@ def __init__(self, chain): self.callbacks = list() self.chain = chain self.chain_id = None - self.last_block_number = None + self.known_block_number = None self._stop_event = AsyncResult() # TODO: Start with a larger sleep_time and decrease it as the @@ -128,15 +128,14 @@ def loop_until_stop(self): # This is required because the first run will synchronize the node with # the blockchain since the last run. assert self.chain_id, 'chain_id not set' - assert self.last_block_number, 'last_block_number not set' + assert self.known_block_number is not None, 'known_block_number not set' chain_id = self.chain_id sleep_time = self.sleep_time while self._stop_event.wait(sleep_time) is not True: - last_block_number = self.last_block_number latest_block = self.chain.get_block(block_identifier='latest') - latest_block_number = latest_block['number'] + self._maybe_run_callbacks(latest_block) if chain_id != self.chain.network_id: raise RuntimeError( @@ -144,52 +143,72 @@ def loop_until_stop(self): 'is not supported.', ) - if latest_block_number != last_block_number: - log_details = dict( - block_number=latest_block_number, - block_hash=to_hex(latest_block['hash']), - block_gas_limit=latest_block['gasLimit'], - ) - - if latest_block_number > last_block_number + 1: - missed_blocks = latest_block_number - last_block_number - 1 - log_details['num_missed_blocks'] = missed_blocks - - log.debug( - 'Received new block', - **log_details, - ) - - self._run_callbacks(latest_block) - - def first_run(self): - # callbacks must be executed during the first run to update the node state + def first_run(self, known_block_number): + """ Blocking call to update the local state, if necessary. """ assert self.callbacks, 'callbacks not set' chain_id = self.chain.network_id latest_block = self.chain.get_block(block_identifier='latest') log.debug( - 'Starting at block number', - block_number=latest_block['number'], - gas_limit=latest_block['gasLimit'], - block_hash=to_hex(latest_block['hash']), + 'Alarm task first run', + known_block_number=known_block_number, + latest_block_number=latest_block['number'], + latest_gas_limit=latest_block['gasLimit'], + latest_block_hash=to_hex(latest_block['hash']), ) - self._run_callbacks(latest_block) + self.known_block_number = known_block_number self.chain_id = chain_id + self._maybe_run_callbacks(latest_block) - def _run_callbacks(self, latest_block): - remove = list() - for callback in self.callbacks: - result = callback(latest_block) - if result is REMOVE_CALLBACK: - remove.append(callback) + def _maybe_run_callbacks(self, latest_block): + """ Run the callbacks if there is at least one new block. - for callback in remove: - self.callbacks.remove(callback) + The callbacks are executed only if there is a new block, otherwise the + filters may try to poll for an inexisting block number and the Ethereum + client can return an JSON-RPC error. + """ + assert self.known_block_number is not None, 'known_block_number not set' + + latest_block_number = latest_block['number'] + missed_blocks = latest_block_number - self.known_block_number + + if missed_blocks < 0: + log.critical( + 'Block number decreased', + chain_id=self.chain_id, + known_block_number=self.known_block_number, + old_block_number=latest_block['number'], + old_gas_limit=latest_block['gasLimit'], + old_block_hash=to_hex(latest_block['hash']), + ) + elif missed_blocks > 1: + log.debug( + 'Received new block', + known_block_number=self.known_block_number, + latest_block_number=latest_block['number'], + latest_gas_limit=latest_block['gasLimit'], + latest_block_hash=to_hex(latest_block['hash']), + ) + + if missed_blocks > 2: + log.info( + 'Missed block(s)', + missed_blocks=missed_blocks, + latest_block=latest_block, + ) + + remove = list() + for callback in self.callbacks: + result = callback(latest_block) + if result is REMOVE_CALLBACK: + remove.append(callback) + + for callback in remove: + self.callbacks.remove(callback) - self.last_block_number = latest_block['number'] + self.known_block_number = latest_block['number'] def stop(self): self._stop_event.set(True)