Skip to content

Commit

Permalink
Ms.mempool locking (#9050)
Browse files Browse the repository at this point in the history
* Prority locking to consensus

* Remove pstats

* Linting

* Do some stuff outside of lock

* Fix startup

* Add log timings

* Try some different locking

* Add limit

* catch excp

* CLVM inside lock

* Try using a semaphore instead

* use events for lock queue

* test

* Add logging for message types

* type

* remove seed

* check new peak waiters

* correct FullNodeAPI self.full_node.new_peak._waiters typo

* correct logging string typos

* only warn about new_peak Waiters if there is at least 1

* remove no-longer-accepted parameter to FullNode.peak_post_processing()

* only warn about respond_transaction Waiters if there is at least 1

* lint

* Change some constants

* Small fix and logging changes

* Put message types outside

* Change some log levels so we can test with info

* More logging

* Increase rate limits but decrease paralelism

* tweaks

* Log dropped tx

* Fix pool rpc test

* Test fixes

* Mempool optimization

* Remove from seen if fails

* Increase queue sizes

* Message types info

* More test and logging

* Small changes to networking just in case

* Decrease logging

* Decrease logging even further

* Decrease logging even further even further

* Decrease logging 3

* Transaction queue

* Don't cancel tasks or close connection

* Cancel tasks on disconnect (for shutdown purposed)

* Fix typo

* Catch cancelled

* Do multiple at a time

* More accurate farmer response time

* More efficiently create tasks

* Increase queue size and priority by fee

* Revert priority

* Don't re-request too many times for dropped TX

* Handle cancelled error so we don't go into a bad state

* Catch cancelled in syncing tasks

* Reduce new_peak_sem to improve performance

* Less bytes conversion

* Missing file, and 2 workers for CLVM

* Validate BLS in a new thread

* tests

* Change semaphore constants

* correct a cancellation triggered exception and assertion

* Fix send_transaction, dont use BaseException, fix tests

* Fix more tests

* only log transaction handler cancellation in debug

* typing in log

* move unfinished validation to diff proc

* it is asyncio.CancelledError

* Add a test for bad signature

* Fix more tests, reduce logging, lint

* One more lint

* blockchain tests, pass bytes directly, single call

* Try to fix rl_wallet failures

* Fix mempool test

* catch everything

* Don't test RL wallet

* Fix more tests and return error code

* Improve error handling in multiprocess

* Add pre-validation time

* Add pre-validation time in logs, and revert pytest.ini changes

* Add log correctly

* Ms.bls cache experiment (#9115)

* Logging for cache

* Less logging

* Return to original plan

* Clean up

* Remove coment

* Remove log

* formalize LockQueue shutdown

* Comments

* Fix blockchain test

* Improve cache

* Remove logs

* Fix sign_coin_spends

* Fix pool wallet

Co-authored-by: Kyle Altendorf <[email protected]>
Co-authored-by: Yostra <[email protected]>
  • Loading branch information
3 people authored Nov 4, 2021
1 parent c80208c commit 8a028c3
Show file tree
Hide file tree
Showing 34 changed files with 899 additions and 264 deletions.
14 changes: 9 additions & 5 deletions chia/clvm/spend_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from chia.types.blockchain_format.program import Program, SerializedProgram
from chia.util.ints import uint64, uint32
from chia.util.hash import std_hash
from chia.util.errors import Err
from chia.util.errors import Err, ValidationError
from chia.util.db_wrapper import DBWrapper
from chia.types.coin_record import CoinRecord
from chia.types.spend_bundle import SpendBundle
Expand Down Expand Up @@ -49,6 +49,7 @@ def __init__(self, rci: List[Coin], height: uint32, timestamp: uint64):
self.timestamp = timestamp
self.is_transaction_block = True
self.header_hash = std_hash(bytes(height))
self.prev_transaction_block_hash = std_hash(std_hash(height))


class SpendSim:
Expand Down Expand Up @@ -78,7 +79,7 @@ async def close(self):
await self.connection.close()

async def new_peak(self):
await self.mempool_manager.new_peak(self.block_records[-1])
await self.mempool_manager.new_peak(self.block_records[-1], [])

def new_coin_record(self, coin: Coin, coinbase=False) -> CoinRecord:
return CoinRecord(
Expand Down Expand Up @@ -203,9 +204,12 @@ def __init__(self, service):
self.service = service

async def push_tx(self, spend_bundle: SpendBundle) -> Tuple[MempoolInclusionStatus, Optional[Err]]:
cost_result: NPCResult = await self.service.mempool_manager.pre_validate_spendbundle(
spend_bundle, spend_bundle.name()
)
try:
cost_result: NPCResult = await self.service.mempool_manager.pre_validate_spendbundle(
spend_bundle, None, spend_bundle.name()
)
except ValidationError as e:
return MempoolInclusionStatus.FAILED, e.code
cost, status, error = await self.service.mempool_manager.add_spendbundle(
spend_bundle, cost_result, spend_bundle.name()
)
Expand Down
12 changes: 7 additions & 5 deletions chia/consensus/block_body_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async def validate_block_body(
npc_result: Optional[NPCResult],
fork_point_with_peak: Optional[uint32],
get_block_generator: Callable,
validate_signature=True,
) -> Tuple[Optional[Err], Optional[NPCResult]]:
"""
This assumes the header block has been completely validated.
Expand Down Expand Up @@ -457,10 +458,11 @@ async def validate_block_body(
# However, we force caching of pairings just for unfinished blocks
# as the cache is likely to be useful when validating the corresponding
# finished blocks later.
force_cache: bool = isinstance(block, UnfinishedBlock)
if not cached_bls.aggregate_verify(
pairs_pks, pairs_msgs, block.transactions_info.aggregated_signature, force_cache
):
return Err.BAD_AGGREGATE_SIGNATURE, None
if validate_signature:
force_cache: bool = isinstance(block, UnfinishedBlock)
if not cached_bls.aggregate_verify(
pairs_pks, pairs_msgs, block.transactions_info.aggregated_signature, force_cache
):
return Err.BAD_AGGREGATE_SIGNATURE, None

return None, npc_result
41 changes: 23 additions & 18 deletions chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty
from chia.consensus.find_fork_point import find_fork_point_in_chain
from chia.consensus.full_block_to_block_record import block_to_block_record
from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing
from chia.consensus.multiprocess_validation import (
PreValidationResult,
pre_validate_blocks_multiprocessing,
_run_generator,
)
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.full_node.hint_store import HintStore
Expand All @@ -35,7 +39,7 @@
from chia.types.unfinished_block import UnfinishedBlock
from chia.types.unfinished_header_block import UnfinishedHeaderBlock
from chia.types.weight_proof import SubEpochChallengeSegment
from chia.util.errors import Err
from chia.util.errors import Err, ConsensusError
from chia.util.generator_tools import get_block_header, tx_removals_and_additions
from chia.util.ints import uint16, uint32, uint64, uint128
from chia.util.streamable import recurse_jsonify
Expand Down Expand Up @@ -561,7 +565,7 @@ def get_recent_reward_challenges(self) -> List[Tuple[bytes32, uint128]]:
return list(reversed(recent_rc))

async def validate_unfinished_block(
self, block: UnfinishedBlock, skip_overflow_ss_validation=True
self, block: UnfinishedBlock, npc_result: Optional[NPCResult], skip_overflow_ss_validation=True
) -> PreValidationResult:
if (
not self.contains_block(block.prev_header_hash)
Expand Down Expand Up @@ -601,21 +605,6 @@ async def validate_unfinished_block(
else self.block_record(block.prev_header_hash).height
)

npc_result = None
if block.transactions_generator is not None:
assert block.transactions_info is not None
try:
block_generator: Optional[BlockGenerator] = await self.get_block_generator(block)
except ValueError:
return PreValidationResult(uint16(Err.GENERATOR_REF_HAS_NO_GENERATOR.value), None, None)
if block_generator is None:
return PreValidationResult(uint16(Err.GENERATOR_REF_HAS_NO_GENERATOR.value), None, None)
npc_result = get_name_puzzle_conditions(
block_generator,
min(self.constants.MAX_BLOCK_COST_CLVM, block.transactions_info.cost),
cost_per_byte=self.constants.COST_PER_BYTE,
safe_mode=False,
)
error_code, cost_result = await validate_block_body(
self.constants,
self,
Expand All @@ -627,6 +616,7 @@ async def validate_unfinished_block(
npc_result,
None,
self.get_block_generator,
False,
)

if error_code is not None:
Expand Down Expand Up @@ -654,6 +644,21 @@ async def pre_validate_blocks_multiprocessing(
wp_summaries,
)

async def run_generator(self, unfinished_block: bytes, generator: BlockGenerator) -> NPCResult:
task = asyncio.get_running_loop().run_in_executor(
self.pool,
_run_generator,
self.constants_json,
unfinished_block,
bytes(generator),
)
error, npc_result_bytes = await task
if error is not None:
raise ConsensusError(error)
if npc_result_bytes is None:
raise ConsensusError(Err.UNKNOWN)
return NPCResult.from_bytes(npc_result_bytes)

def contains_block(self, header_hash: bytes32) -> bool:
"""
True if we have already added this block to the chain. This may return false for orphan blocks
Expand Down
34 changes: 33 additions & 1 deletion chia/consensus/multiprocess_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from chia.types.full_block import FullBlock
from chia.types.generator_types import BlockGenerator
from chia.types.header_block import HeaderBlock
from chia.types.unfinished_block import UnfinishedBlock
from chia.util.block_cache import BlockCache
from chia.util.errors import Err
from chia.util.errors import Err, ValidationError
from chia.util.generator_tools import get_block_header, tx_removals_and_additions
from chia.util.ints import uint16, uint64, uint32
from chia.util.streamable import Streamable, dataclass_from_dict, streamable
Expand Down Expand Up @@ -316,3 +317,34 @@ async def pre_validate_blocks_multiprocessing(
for batch_result in (await asyncio.gather(*futures))
for result in batch_result
]


def _run_generator(
constants_dict: bytes,
unfinished_block_bytes: bytes,
block_generator_bytes: bytes,
) -> Tuple[Optional[Err], Optional[bytes]]:
"""
Runs the CLVM generator from bytes inputs. This is meant to be called under a ProcessPoolExecutor, in order to
validate the heavy parts of a block (clvm program) in a different process.
"""
try:
constants: ConsensusConstants = dataclass_from_dict(ConsensusConstants, constants_dict)
unfinished_block: UnfinishedBlock = UnfinishedBlock.from_bytes(unfinished_block_bytes)
assert unfinished_block.transactions_info is not None
block_generator: BlockGenerator = BlockGenerator.from_bytes(block_generator_bytes)
assert block_generator.program == unfinished_block.transactions_generator
npc_result: NPCResult = get_name_puzzle_conditions(
block_generator,
min(constants.MAX_BLOCK_COST_CLVM, unfinished_block.transactions_info.cost),
cost_per_byte=constants.COST_PER_BYTE,
safe_mode=False,
)
if npc_result.error is not None:
return Err(npc_result.error), None
except ValidationError as e:
return e.code, None
except Exception:
return Err.UNKNOWN, None

return None, bytes(npc_result)
Loading

0 comments on commit 8a028c3

Please sign in to comment.