Skip to content

Commit

Permalink
Fix race condition with new blocks and filters
Browse files Browse the repository at this point in the history
The race happened under this circunstance:

- A node learns about a new block, updates its state, then crashes
- On restart, the block number is recovered, the filters are
  installed with the latest known block.
- The race: The node finishes the above before a new block is mined
- The bug: The filter is polled during start of the RaidenService, by
  calling the AlarmTask.first_run, which always executes the
  callbacks, eventually using the StateFilter's to poll for new events
  from a block in the future.

The fix was to give the latest known block number to the alarm task in
the as an argument for first_run, and only execute the callbacks if
there is a new block.

fixes raiden-network#2838
  • Loading branch information
hackaugusto committed Jan 25, 2019
1 parent 3bf51cc commit a24a41b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 39 deletions.
2 changes: 1 addition & 1 deletion raiden/raiden_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def start(self):
# - The alarm must complete its first run before the transport is started,
# to reject messages for closed/settled channels.
self.alarm.register_callback(self._callback_new_block)
self.alarm.first_run()
self.alarm.first_run(last_log_block_number)

# The transport must not ever be started before the alarm task's first
# run, because it's this method which synchronizes the node with the
Expand Down
95 changes: 57 additions & 38 deletions raiden/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self, chain):
self.callbacks = list()
self.chain = chain
self.chain_id = None
self.last_block_number = None
self.known_block_number = None
self._stop_event = AsyncResult()

# TODO: Start with a larger sleep_time and decrease it as the
Expand Down Expand Up @@ -128,68 +128,87 @@ def loop_until_stop(self):
# This is required because the first run will synchronize the node with
# the blockchain since the last run.
assert self.chain_id, 'chain_id not set'
assert self.last_block_number, 'last_block_number not set'
assert self.known_block_number is not None, 'known_block_number not set'

chain_id = self.chain_id

sleep_time = self.sleep_time
while self._stop_event.wait(sleep_time) is not True:
last_block_number = self.last_block_number
latest_block = self.chain.get_block(block_identifier='latest')
latest_block_number = latest_block['number']
self._maybe_run_callbacks(latest_block)

if chain_id != self.chain.network_id:
raise RuntimeError(
'Changing the underlying blockchain while the Raiden node is running '
'is not supported.',
)

if latest_block_number != last_block_number:
log_details = dict(
block_number=latest_block_number,
block_hash=to_hex(latest_block['hash']),
block_gas_limit=latest_block['gasLimit'],
)

if latest_block_number > last_block_number + 1:
missed_blocks = latest_block_number - last_block_number - 1
log_details['num_missed_blocks'] = missed_blocks

log.debug(
'Received new block',
**log_details,
)

self._run_callbacks(latest_block)

def first_run(self):
# callbacks must be executed during the first run to update the node state
def first_run(self, known_block_number):
""" Blocking call to update the local state, if necessary. """
assert self.callbacks, 'callbacks not set'

chain_id = self.chain.network_id
latest_block = self.chain.get_block(block_identifier='latest')

log.debug(
'Starting at block number',
block_number=latest_block['number'],
gas_limit=latest_block['gasLimit'],
block_hash=to_hex(latest_block['hash']),
'Alarm task first run',
known_block_number=known_block_number,
latest_block_number=latest_block['number'],
latest_gas_limit=latest_block['gasLimit'],
latest_block_hash=to_hex(latest_block['hash']),
)

self._run_callbacks(latest_block)
self.known_block_number = known_block_number
self.chain_id = chain_id
self._maybe_run_callbacks(latest_block)

def _run_callbacks(self, latest_block):
remove = list()
for callback in self.callbacks:
result = callback(latest_block)
if result is REMOVE_CALLBACK:
remove.append(callback)
def _maybe_run_callbacks(self, latest_block):
""" Run the callbacks if there is at least one new block.
for callback in remove:
self.callbacks.remove(callback)
The callbacks are executed only if there is a new block, otherwise the
filters may try to poll for an inexisting block number and the Ethereum
client can return an JSON-RPC error.
"""
assert self.known_block_number is not None, 'known_block_number not set'

latest_block_number = latest_block['number']
missed_blocks = latest_block_number - self.known_block_number

if missed_blocks < 0:
log.critical(
'Block number decreased',
chain_id=self.chain_id,
known_block_number=self.known_block_number,
old_block_number=latest_block['number'],
old_gas_limit=latest_block['gasLimit'],
old_block_hash=to_hex(latest_block['hash']),
)
elif missed_blocks > 1:
log.debug(
'Received new block',
known_block_number=self.known_block_number,
latest_block_number=latest_block['number'],
latest_gas_limit=latest_block['gasLimit'],
latest_block_hash=to_hex(latest_block['hash']),
)

if missed_blocks > 2:
log.info(
'Missed block(s)',
missed_blocks=missed_blocks,
latest_block=latest_block,
)

remove = list()
for callback in self.callbacks:
result = callback(latest_block)
if result is REMOVE_CALLBACK:
remove.append(callback)

for callback in remove:
self.callbacks.remove(callback)

self.last_block_number = latest_block['number']
self.known_block_number = latest_block['number']

def stop(self):
self._stop_event.set(True)
Expand Down

0 comments on commit a24a41b

Please sign in to comment.