-
Notifications
You must be signed in to change notification settings - Fork 686
/
Copy pathstorage.py
280 lines (238 loc) · 9.85 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
from eth_hash.auto import keccak
from eth_typing import (
Address,
Hash32
)
from eth_utils import (
ValidationError,
get_extended_debug_logger,
int_to_big_endian,
)
import rlp
from trie import (
HexaryTrie,
exceptions as trie_exceptions,
)
from eth._utils.padding import (
pad32,
)
from eth.abc import (
AccountStorageDatabaseAPI,
AtomicDatabaseAPI,
DatabaseAPI,
)
from eth.db.backends.base import (
BaseDB,
)
from eth.db.batch import (
BatchDB,
)
from eth.db.cache import (
CacheDB,
)
from eth.db.journal import (
JournalDB,
)
from eth.vm.interrupt import (
MissingStorageTrieNode,
)
from eth.typing import (
JournalDBCheckpoint,
)
class StorageLookup(BaseDB):
"""
This lookup converts lookups of storage slot integers into the appropriate trie lookup.
Similarly, it persists changes to the appropriate trie at write time.
StorageLookup also tracks the state roots changed since the last persist.
"""
logger = get_extended_debug_logger("eth.db.storage.StorageLookup")
def __init__(self, db: DatabaseAPI, storage_root: Hash32, address: Address) -> None:
self._db = db
self._starting_root_hash = storage_root
self._address = address
self._write_trie = None
self._trie_nodes_batch: BatchDB = None
def _get_write_trie(self) -> HexaryTrie:
if self._trie_nodes_batch is None:
self._trie_nodes_batch = BatchDB(self._db, read_through_deletes=True)
if self._write_trie is None:
batch_db = self._trie_nodes_batch
self._write_trie = HexaryTrie(batch_db, root_hash=self._starting_root_hash, prune=True)
return self._write_trie
def _get_read_trie(self) -> HexaryTrie:
if self._write_trie is not None:
return self._write_trie
else:
# Creating "HexaryTrie" is a pretty light operation, so not a huge cost
# to create a new one at every read, but we could
# cache the read trie, if this becomes a bottleneck.
return HexaryTrie(self._db, root_hash=self._starting_root_hash)
def _decode_key(self, key: bytes) -> bytes:
padded_slot = pad32(key)
return keccak(padded_slot)
def __getitem__(self, key: bytes) -> bytes:
hashed_slot = self._decode_key(key)
read_trie = self._get_read_trie()
try:
return read_trie[hashed_slot]
except trie_exceptions.MissingTrieNode as exc:
raise MissingStorageTrieNode(
exc.missing_node_hash,
self._starting_root_hash,
exc.requested_key,
self._address,
) from exc
def __setitem__(self, key: bytes, value: bytes) -> None:
hashed_slot = self._decode_key(key)
write_trie = self._get_write_trie()
write_trie[hashed_slot] = value
def _exists(self, key: bytes) -> bool:
# used by BaseDB for __contains__ checks
hashed_slot = self._decode_key(key)
read_trie = self._get_read_trie()
return hashed_slot in read_trie
def __delitem__(self, key: bytes) -> None:
hashed_slot = self._decode_key(key)
write_trie = self._get_write_trie()
try:
del write_trie[hashed_slot]
except trie_exceptions.MissingTrieNode as exc:
raise MissingStorageTrieNode(
exc.missing_node_hash,
self._starting_root_hash,
exc.requested_key,
self._address,
) from exc
@property
def has_changed_root(self) -> bool:
return self._write_trie and self._write_trie.root_hash != self._starting_root_hash
def get_changed_root(self) -> Hash32:
if self._write_trie is not None:
return self._write_trie.root_hash
else:
raise ValidationError("Asked for changed root when no writes have been made")
def _clear_changed_root(self) -> None:
self._write_trie = None
self._trie_nodes_batch = None
self._starting_root_hash = None
def commit_to(self, db: DatabaseAPI) -> None:
"""
Trying to commit changes when nothing has been written will raise a
ValidationError
"""
self.logger.debug2('persist storage root to data store')
if self._trie_nodes_batch is None:
raise ValidationError(
"It is invalid to commit an account's storage if it has no pending changes. "
"Always check storage_lookup.has_changed_root before attempting to commit."
)
self._trie_nodes_batch.commit_to(db, apply_deletes=False)
self._clear_changed_root()
class AccountStorageDB(AccountStorageDatabaseAPI):
logger = get_extended_debug_logger("eth.db.storage.AccountStorageDB")
def __init__(self, db: AtomicDatabaseAPI, storage_root: Hash32, address: Address) -> None:
"""
Database entries go through several pipes, like so...
.. code::
db -> _storage_lookup -> _storage_cache -> _locked_changes -> _journal_storage
db is the raw database, we can assume it hits disk when written to.
Keys are stored as node hashes and rlp-encoded node values.
_storage_lookup is itself a pair of databases: (BatchDB -> HexaryTrie),
writes to storage lookup *are* immeditaely applied to a trie, generating
the appropriate trie nodes and and root hash (via the HexaryTrie). The
writes are *not* persisted to db, until _storage_lookup is explicitly instructed to,
via :meth:`StorageLookup.commit_to`
_storage_cache is a cache tied to the state root of the trie. It
is important that this cache is checked *after* looking for
the key in _journal_storage, because the cache is only invalidated
after a state root change. Otherwise, you will see data since the last
storage root was calculated.
_locked_changes is a batch database that includes only those values that are
un-revertable in the EVM. Currently, that means changes that completed in a
previous transaction.
Journaling batches writes at the _journal_storage layer, until persist is called.
It manages all the checkpointing and rollbacks that happen during EVM execution.
In both _storage_cache and _journal_storage, Keys are set/retrieved as the
big_endian encoding of the slot integer, and the rlp-encoded value.
"""
self._address = address
self._storage_lookup = StorageLookup(db, storage_root, address)
self._storage_cache = CacheDB(self._storage_lookup)
self._locked_changes = BatchDB(self._storage_cache)
self._journal_storage = JournalDB(self._locked_changes)
def get(self, slot: int, from_journal: bool=True) -> int:
key = int_to_big_endian(slot)
lookup_db = self._journal_storage if from_journal else self._locked_changes
try:
encoded_value = lookup_db[key]
except MissingStorageTrieNode:
raise
except KeyError:
return 0
if encoded_value == b'':
return 0
else:
return rlp.decode(encoded_value, sedes=rlp.sedes.big_endian_int)
def set(self, slot: int, value: int) -> None:
key = int_to_big_endian(slot)
if value:
self._journal_storage[key] = rlp.encode(value)
else:
try:
current_val = self._journal_storage[key]
except KeyError:
# deleting an empty key has no effect
return
else:
if current_val != b'':
# only try to delete the value if it's present
del self._journal_storage[key]
def delete(self) -> None:
self.logger.debug2(
"Deleting all storage in account 0x%s, hashed 0x%s",
self._address.hex(),
keccak(self._address).hex(),
)
self._journal_storage.clear()
self._storage_cache.reset_cache()
def record(self, checkpoint: JournalDBCheckpoint) -> None:
self._journal_storage.record(checkpoint)
def discard(self, checkpoint: JournalDBCheckpoint) -> None:
self.logger.debug2('discard checkpoint %r', checkpoint)
if self._journal_storage.has_checkpoint(checkpoint):
self._journal_storage.discard(checkpoint)
else:
# if the checkpoint comes before this account started tracking,
# then simply reset to the beginning
self._journal_storage.reset()
self._storage_cache.reset_cache()
def commit(self, checkpoint: JournalDBCheckpoint) -> None:
if self._journal_storage.has_checkpoint(checkpoint):
self._journal_storage.commit(checkpoint)
else:
# if the checkpoint comes before this account started tracking,
# then flatten all changes, without persisting
self._journal_storage.flatten()
def lock_changes(self) -> None:
self._journal_storage.persist()
def make_storage_root(self) -> None:
self.lock_changes()
self._locked_changes.commit(apply_deletes=True)
def _validate_flushed(self) -> None:
"""
Will raise an exception if there are some changes made since the last persist.
"""
journal_diff = self._journal_storage.diff()
if len(journal_diff) > 0:
raise ValidationError(
f"StorageDB had a dirty journal when it needed to be clean: {journal_diff!r}"
)
@property
def has_changed_root(self) -> bool:
return self._storage_lookup.has_changed_root
def get_changed_root(self) -> Hash32:
return self._storage_lookup.get_changed_root()
def persist(self, db: DatabaseAPI) -> None:
self._validate_flushed()
if self._storage_lookup.has_changed_root:
self._storage_lookup.commit_to(db)