-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Badger Batch] Insert approvals with badger batch update #6381
Conversation
680aa94
to
d3ec64f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6381 +/- ##
==========================================
- Coverage 41.50% 41.49% -0.01%
==========================================
Files 2013 2016 +3
Lines 143577 143718 +141
==========================================
+ Hits 59590 59636 +46
- Misses 77813 77906 +93
- Partials 6174 6176 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
} | ||
|
||
func (b *ReaderBatchWriter) DeleteRange(start, end []byte) error { | ||
// TODO: implement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a big challenging to implement concurrent-safe, will implement in a separate PR. As this module doesn't need it yet, will add along with modules that need it. For now I'm leaving it as a TODO item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we even need to implement it? I don't see any usage of DeleteRange
in our code-base right now.
I assume we will want to add them later. But maybe we can:
- do the migration with the common writer interface omitting
DeleteRange
- when the migration is complete, re-add
DeleteRange
to the writer interface (then we don't need to implement it with Badger)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I can remove it for now.
We need the DeleteRange method for pebble to remove all events, transactions etc for a given block when rolling back executed height.
I used Pebble's DeleteRange method as it can iterate range of keys with certain prefix and delete them atomically.
816b288
to
2cc8232
Compare
batch *badger.WriteBatch | ||
|
||
addingCallback sync.Mutex // protect callbacks | ||
callbacks []func(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we're done prototyping and align on a structure we expect to be long-lived (maybe now?), we should document these callback functions, for example:
- The error input is the error returned from
batch.Commit
- The callback is called regardless of whether the batch was successfully committed (callbacks are responsible for verifying this by checking the error input)
- Callbacks must be non-blocking
|
||
var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) | ||
|
||
func (b *ReaderBatchWriter) ReaderWriter() (storage.Reader, storage.Writer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realistically we will need to keep this structure where the ReaderBatchWriter
passed to lower-level storage methods can both read and write, even though it doesn't fit well conceptually with Pebble. I think it would be worthwhile to make the interface naming and documentation clearly communicate the fact that the reader is reading globally from committed database state.
I also feel it would be better to separate the method to access reader and writer:
- we want the design to discourage intermingling reads and writes, and communicate the fact that reads and writes are not being applied to the same snapshot of state like they were with badger.
- having one return value allows chaining (eg.
batch.Reader().Get(...)
) - separating the functions allows us to distinctly document each one (since the reader and writer are fairly conceptually different)
// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
// This reader may observe different values for the same key on subsequent reads.
func (b *ReaderBatchWriter) GlobalReader() storage.Reader { ... }
// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed.
// When we `Write` into the batch, that write operation is added to the pending batch, but not committed.
// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are.
func (b *ReaderBatchWriter) Writer() storage.Writer { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
I originally combined them because there is a few places need intermingling reads. But I think we probably are able to refactor those logic, such as bootstrapping a sealing segment.
return b.batch | ||
} | ||
|
||
func (b *ReaderBatchWriter) AddCallback(callback func(error)) { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented storage.OnCommitSucceed.
storage/badger/cache_b.go
Outdated
// injected. During normal operations, the following error returns are expected: | ||
// - `storage.ErrNotFound` if key is unknown. | ||
func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) { | ||
return func(tx storage.Reader) (V, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return func(tx storage.Reader) (V, error) { | |
return func(reader storage.Reader) (V, error) { |
Far from the highest priority, but it would be great if we could replace "transaction" terminology with "reader" or "writer" terminology where we notice it in new code.
@@ -44,6 +44,22 @@ func batchWrite(key []byte, entity interface{}) func(writeBatch *badger.WriteBat | |||
} | |||
} | |||
|
|||
func insertW(key []byte, val interface{}) func(storage.Writer) error { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great as a starting point for implementing read-committed concurrency safety against Badger. Nice work.
Added a few final comments:
- Before merging this, please add documentation to exported methods and types.
- I added a suggestion for a pattern to communicate which low-level storage operations must be synchronized by a higher-level lock.
func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error { | ||
return retrieve(makePrefix(codeResultApproval, approvalID), approval) | ||
func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error { | ||
return retrieveR(makePrefix(codeResultApproval, approvalID), approval) | ||
} | ||
|
||
// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID | ||
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists | ||
// error is returned. This operation is only used by the ResultApprovals store, | ||
// which is only used within a Verification node, where it is assumed that there | ||
// is only one approval per chunk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// is only one approval per chunk. | |
// is only one approval per chunk. | |
// CAUTION: Use of this function must be synchronized by storage.ResultApprovals. |
I'd like to communicate which low-level functions must be synchronized by a higher-level storage procedure. Adding documentation and using Unsafe...
naming seems like a relatively easy way to do that.
} | ||
|
||
// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID | ||
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists | ||
// error is returned. This operation is only used by the ResultApprovals store, | ||
// which is only used within a Verification node, where it is assumed that there | ||
// is only one approval per chunk. | ||
func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { | ||
return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) | ||
func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { | |
func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { |
|
||
var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) | ||
|
||
func (b *ReaderBatchWriter) GlobalReader() storage.Reader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before merging, please add documentation for these and other public methods. I wrote a version in this comment for the reader/writer methods: #6381 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to unblock as I'll be away next week and I don't think my outstanding comments require further discussion. I think this is broadly a good starting point for the plan to implement read-committed concurrency safety against Badger.
I'm happy to merge this once remaining comments have been addressed (mainly documentation), and we make the decision to go ahead with this plan to implement concurrency safety changes against Badger.
Co-authored-by: Jordan Schalm <[email protected]>
1e03a2d
to
ddd9404
Compare
Close for now. Found a better way to refactor. |
In favor of #6466 |
Before migrating to pebble based storage, we first refactor the existing badger based storage to use batch updates instead of transaction, so that the database operations are similar to how pebble stores data.
Referred to #6374 for ensuring concurrency-safety.
The following concurrency tests passed: