From 8bb35a6822f1b6fc5fb32445eee2b8fe1303c464 Mon Sep 17 00:00:00 2001 From: Nodari Chkuaselidze Date: Mon, 1 May 2023 19:22:05 +0400 Subject: [PATCH 1/2] src: Update to leveldown v6.1.1 Update thread-safety-patch. --- ...h => binding.cc.batch-thread-safety.patch} | 56 +- src/binding.cc | 1281 ++++++++++------- 2 files changed, 760 insertions(+), 577 deletions(-) rename patches/{binding.cc.thread-saftey.patch => binding.cc.batch-thread-safety.patch} (64%) diff --git a/patches/binding.cc.thread-saftey.patch b/patches/binding.cc.batch-thread-safety.patch similarity index 64% rename from patches/binding.cc.thread-saftey.patch rename to patches/binding.cc.batch-thread-safety.patch index 05c31fd..87a7774 100644 --- a/patches/binding.cc.thread-saftey.patch +++ b/patches/binding.cc.batch-thread-safety.patch @@ -1,8 +1,8 @@ diff --git a/src/binding.cc b/src/binding.cc -index b42e570..26c8fed 100644 +index c30ac76..e933861 100644 --- a/src/binding.cc +++ b/src/binding.cc -@@ -1677,7 +1677,8 @@ struct Batch { +@@ -1874,7 +1874,8 @@ struct Batch { Batch (Database* database) : database_(database), batch_(new leveldb::WriteBatch()), @@ -12,7 +12,7 @@ index b42e570..26c8fed 100644 ~Batch () { delete batch_; -@@ -1704,9 +1705,22 @@ struct Batch { +@@ -1901,9 +1902,22 @@ struct Batch { return database_->WriteBatch(options, batch_); } @@ -35,7 +35,7 @@ index b42e570..26c8fed 100644 }; /** -@@ -1741,11 +1755,15 @@ NAPI_METHOD(batch_put) { +@@ -1938,11 +1952,15 @@ NAPI_METHOD(batch_put) { NAPI_ARGV(3); NAPI_BATCH_CONTEXT(); @@ -56,7 +56,7 @@ index b42e570..26c8fed 100644 NAPI_RETURN_UNDEFINED(); } -@@ -1757,9 +1775,13 @@ NAPI_METHOD(batch_del) { +@@ -1954,9 +1972,13 @@ NAPI_METHOD(batch_del) { NAPI_ARGV(2); NAPI_BATCH_CONTEXT(); @@ -73,7 +73,7 @@ index b42e570..26c8fed 100644 NAPI_RETURN_UNDEFINED(); } -@@ -1771,7 +1793,11 @@ NAPI_METHOD(batch_clear) { +@@ -1968,7 +1990,11 @@ NAPI_METHOD(batch_clear) { NAPI_ARGV(1); NAPI_BATCH_CONTEXT(); @@ -86,48 +86,34 @@ index b42e570..26c8fed 100644 NAPI_RETURN_UNDEFINED(); } -@@ -1788,6 +1814,9 @@ struct BatchWriteWorker final : public PriorityWorker { +@@ -1985,6 +2011,9 @@ struct BatchWriteWorker final : public PriorityWorker { : PriorityWorker(env, batch->database_, callback, "leveldown.batch.write"), batch_(batch), sync_(sync) { -+ // For thread saftey, consider BatchWrite as shared. ++ // For thread safety, consider BatchWrite as shared. + batch->Share(); + // Prevent GC of batch object before we execute - NAPI_STATUS_THROWS_VOID(napi_create_reference(env_, context, 1, &contextRef_)); + NAPI_STATUS_THROWS_VOID(napi_create_reference(env, context, 1, &contextRef_)); } -@@ -1802,6 +1831,11 @@ struct BatchWriteWorker final : public PriorityWorker { - } - } +@@ -1999,6 +2028,7 @@ struct BatchWriteWorker final : public PriorityWorker { -+ void DoFinally () override { -+ database_->DecrementPriorityWork(); + void DoFinally (napi_env env) override { + napi_delete_reference(env, contextRef_); + batch_->Unshare(); -+ } -+ - Batch* batch_; - bool sync_; - -@@ -1816,12 +1850,18 @@ NAPI_METHOD(batch_write) { - NAPI_ARGV(3); - NAPI_BATCH_CONTEXT(); + PriorityWorker::DoFinally(env); + } -- napi_value options = argv[1]; -- bool sync = BooleanProperty(env, options, "sync", false); +@@ -2019,6 +2049,12 @@ NAPI_METHOD(batch_write) { + const bool sync = BooleanProperty(env, options, "sync", false); napi_value callback = argv[2]; -- BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); -- worker->Queue(); -+ if (!batch->IsShared()) { -+ napi_value options = argv[1]; -+ bool sync = BooleanProperty(env, options, "sync", false); -+ -+ BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); -+ worker->Queue(); -+ } else { ++ if (batch->IsShared()) { + napi_value argv = CreateError(env, "Unsafe batch write."); + CallFunction(env, callback, 1, &argv); ++ NAPI_RETURN_UNDEFINED(); + } ++ + BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); + worker->Queue(env); - NAPI_RETURN_UNDEFINED(); - } diff --git a/src/binding.cc b/src/binding.cc index 26c8fed..cae05eb 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -17,7 +17,6 @@ */ struct Database; struct Iterator; -struct EndWorker; static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb); /** @@ -157,7 +156,7 @@ static uint32_t Uint32Property (napi_env env, napi_value obj, const char* key, } /** - * Returns a uint32 property 'key' from 'obj'. + * Returns a int32 property 'key' from 'obj'. * Returns 'DEFAULT' if the property doesn't exist. */ static int Int32Property (napi_env env, napi_value obj, const char* key, @@ -224,6 +223,51 @@ static size_t StringOrBufferLength (napi_env env, napi_value value) { return size; } +/** + * Takes a Buffer or string property 'name' from 'opts'. + * Returns null if the property does not exist or is zero-length. + */ +static std::string* RangeOption (napi_env env, napi_value opts, const char* name) { + if (HasProperty(env, opts, name)) { + napi_value value = GetProperty(env, opts, name); + + if (StringOrBufferLength(env, value) > 0) { + LD_STRING_OR_BUFFER_TO_COPY(env, value, to); + std::string* result = new std::string(toCh_, toSz_); + delete [] toCh_; + return result; + } + } + + return NULL; +} + +/** + * Converts an array containing Buffer or string keys to a vector. + * Empty elements are skipped. + */ +static std::vector* KeyArray (napi_env env, napi_value arr) { + uint32_t length; + std::vector* result = new std::vector(); + + if (napi_get_array_length(env, arr, &length) == napi_ok) { + result->reserve(length); + + for (uint32_t i = 0; i < length; i++) { + napi_value element; + + if (napi_get_element(env, arr, i, &element) == napi_ok && + StringOrBufferLength(env, element) > 0) { + LD_STRING_OR_BUFFER_TO_COPY(env, element, to); + result->emplace_back(toCh_, toSz_); + delete [] toCh_; + } + } + } + + return result; +} + /** * Calls a function. */ @@ -236,26 +280,86 @@ static napi_status CallFunction (napi_env env, return napi_call_function(env, global, callback, argc, argv, NULL); } +/** + * Whether to yield entries, keys or values. + */ +enum Mode { + entries, + keys, + values +}; + +/** + * Helper struct for caching and converting a key-value pair to napi_values. + */ +struct Entry { + Entry (const leveldb::Slice* key, const leveldb::Slice* value) { + key_ = key != NULL ? new std::string(key->data(), key->size()) : NULL; + value_ = value != NULL ? new std::string(value->data(), value->size()) : NULL; + } + + ~Entry () { + if (key_ != NULL) delete key_; + if (value_ != NULL) delete value_; + } + + // Not used yet. + void ConvertXX (napi_env env, Mode mode, bool keyAsBuffer, bool valueAsBuffer, napi_value* result) { + if (mode == Mode::entries) { + napi_create_array_with_length(env, 2, result); + + napi_value valueElement; + napi_value keyElement; + + Convert(env, key_, keyAsBuffer, &keyElement); + Convert(env, value_, valueAsBuffer, &valueElement); + + napi_set_element(env, *result, 0, keyElement); + napi_set_element(env, *result, 1, valueElement); + } else if (mode == Mode::keys) { + Convert(env, key_, keyAsBuffer, result); + } else { + Convert(env, value_, valueAsBuffer, result); + } + } + + static void Convert (napi_env env, const std::string* s, bool asBuffer, napi_value* result) { + if (s == NULL) { + napi_get_undefined(env, result); + } else if (asBuffer) { + napi_create_buffer_copy(env, s->size(), s->data(), NULL, result); + } else { + napi_create_string_utf8(env, s->data(), s->size(), result); + } + } + +private: + std::string* key_; + std::string* value_; +}; + /** * Base worker class. Handles the async work. Derived classes can override the * following virtual methods (listed in the order in which they're called): * * - DoExecute (abstract, worker pool thread): main work * - HandleOKCallback (main thread): call JS callback on success + * - HandleErrorCallback (main thread): call JS callback on error * - DoFinally (main thread): do cleanup regardless of success */ struct BaseWorker { + // Note: storing env is discouraged as we'd end up using it in unsafe places. BaseWorker (napi_env env, Database* database, napi_value callback, const char* resourceName) - : env_(env), database_(database), errMsg_(NULL) { - NAPI_STATUS_THROWS_VOID(napi_create_reference(env_, callback, 1, &callbackRef_)); + : database_(database), errMsg_(NULL) { + NAPI_STATUS_THROWS_VOID(napi_create_reference(env, callback, 1, &callbackRef_)); napi_value asyncResourceName; - NAPI_STATUS_THROWS_VOID(napi_create_string_utf8(env_, resourceName, + NAPI_STATUS_THROWS_VOID(napi_create_string_utf8(env, resourceName, NAPI_AUTO_LENGTH, &asyncResourceName)); - NAPI_STATUS_THROWS_VOID(napi_create_async_work(env_, callback, + NAPI_STATUS_THROWS_VOID(napi_create_async_work(env, callback, asyncResourceName, BaseWorker::Execute, BaseWorker::Complete, @@ -264,20 +368,23 @@ struct BaseWorker { virtual ~BaseWorker () { delete [] errMsg_; - napi_delete_reference(env_, callbackRef_); - napi_delete_async_work(env_, asyncWork_); } static void Execute (napi_env env, void* data) { BaseWorker* self = (BaseWorker*)data; + + // Don't pass env to DoExecute() because use of Node-API + // methods should generally be avoided in async work. self->DoExecute(); } - void SetStatus (leveldb::Status status) { + bool SetStatus (leveldb::Status status) { status_ = status; if (!status.ok()) { SetErrorMessage(status.ToString().c_str()); + return false; } + return true; } void SetErrorMessage(const char *msg) { @@ -288,44 +395,52 @@ struct BaseWorker { } virtual void DoExecute () = 0; - virtual void DoFinally () {}; static void Complete (napi_env env, napi_status status, void* data) { BaseWorker* self = (BaseWorker*)data; - self->DoComplete(); - self->DoFinally(); - delete self; + + self->DoComplete(env); + self->DoFinally(env); } - void DoComplete () { + void DoComplete (napi_env env) { + napi_value callback; + napi_get_reference_value(env, callbackRef_, &callback); + if (status_.ok()) { - return HandleOKCallback(); + HandleOKCallback(env, callback); + } else { + HandleErrorCallback(env, callback); } - - napi_value argv = CreateError(env_, errMsg_); - napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 1, &argv); } - virtual void HandleOKCallback () { + virtual void HandleOKCallback (napi_env env, napi_value callback) { napi_value argv; - napi_get_null(env_, &argv); - napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 1, &argv); + napi_get_null(env, &argv); + CallFunction(env, callback, 1, &argv); } - void Queue () { - napi_queue_async_work(env_, asyncWork_); + virtual void HandleErrorCallback (napi_env env, napi_value callback) { + napi_value argv = CreateError(env, errMsg_); + CallFunction(env, callback, 1, &argv); + } + + virtual void DoFinally (napi_env env) { + napi_delete_reference(env, callbackRef_); + napi_delete_async_work(env, asyncWork_); + + delete this; + } + + void Queue (napi_env env) { + napi_queue_async_work(env, asyncWork_); } - napi_env env_; - napi_ref callbackRef_; - napi_async_work asyncWork_; Database* database_; private: + napi_ref callbackRef_; + napi_async_work asyncWork_; leveldb::Status status_; char *errMsg_; }; @@ -334,13 +449,13 @@ struct BaseWorker { * Owns the LevelDB storage, cache, filter policy and iterators. */ struct Database { - Database (napi_env env) - : env_(env), - db_(NULL), + Database () + : db_(NULL), blockCache_(NULL), filterPolicy_(leveldb::NewBloomFilterPolicy(10)), currentIteratorId_(0), pendingCloseWorker_(NULL), + ref_(NULL), priorityWork_(0) {} ~Database () { @@ -413,367 +528,420 @@ struct Database { return db_->ReleaseSnapshot(snapshot); } - void AttachIterator (uint32_t id, Iterator* iterator) { + void AttachIterator (napi_env env, uint32_t id, Iterator* iterator) { iterators_[id] = iterator; - IncrementPriorityWork(); + IncrementPriorityWork(env); } - void DetachIterator (uint32_t id) { + void DetachIterator (napi_env env, uint32_t id) { iterators_.erase(id); - DecrementPriorityWork(); + DecrementPriorityWork(env); } - void IncrementPriorityWork () { - ++priorityWork_; + void IncrementPriorityWork (napi_env env) { + napi_reference_ref(env, ref_, &priorityWork_); } - void DecrementPriorityWork () { - if (--priorityWork_ == 0 && pendingCloseWorker_ != NULL) { - pendingCloseWorker_->Queue(); + void DecrementPriorityWork (napi_env env) { + napi_reference_unref(env, ref_, &priorityWork_); + + if (priorityWork_ == 0 && pendingCloseWorker_ != NULL) { + pendingCloseWorker_->Queue(env); pendingCloseWorker_ = NULL; } } - bool HasPriorityWork () { + bool HasPriorityWork () const { return priorityWork_ > 0; } - napi_env env_; leveldb::DB* db_; leveldb::Cache* blockCache_; const leveldb::FilterPolicy* filterPolicy_; uint32_t currentIteratorId_; BaseWorker *pendingCloseWorker_; std::map< uint32_t, Iterator * > iterators_; + napi_ref ref_; private: uint32_t priorityWork_; }; -/** - * Runs when a Database is garbage collected. - */ -static void FinalizeDatabase (napi_env env, void* data, void* hint) { - if (data) { - delete (Database*)data; - } -} - /** * Base worker class for doing async work that defers closing the database. */ struct PriorityWorker : public BaseWorker { PriorityWorker (napi_env env, Database* database, napi_value callback, const char* resourceName) : BaseWorker(env, database, callback, resourceName) { - database_->IncrementPriorityWork(); + database_->IncrementPriorityWork(env); } - ~PriorityWorker () {} + virtual ~PriorityWorker () {} - void DoFinally () override { - database_->DecrementPriorityWork(); + void DoFinally (napi_env env) override { + database_->DecrementPriorityWork(env); + BaseWorker::DoFinally(env); } }; /** * Owns a leveldb iterator. */ -struct Iterator { - Iterator (Database* database, - uint32_t id, - leveldb::Slice* start, - std::string* end, - bool reverse, - bool keys, - bool values, - int limit, - std::string* lt, - std::string* lte, - std::string* gt, - std::string* gte, - bool fillCache, - bool keyAsBuffer, - bool valueAsBuffer, - uint32_t highWaterMark) +struct BaseIterator { + BaseIterator(Database* database, + const bool reverse, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + const int limit, + const bool fillCache) : database_(database), - id_(id), - start_(start), - end_(end), + hasEnded_(false), + didSeek_(false), reverse_(reverse), - keys_(keys), - values_(values), - limit_(limit), lt_(lt), lte_(lte), gt_(gt), gte_(gte), - keyAsBuffer_(keyAsBuffer), - valueAsBuffer_(valueAsBuffer), - highWaterMark_(highWaterMark), - dbIterator_(NULL), - count_(0), - target_(NULL), - seeking_(false), - landed_(false), - nexting_(false), - ended_(false), - endWorker_(NULL), - ref_(NULL) { + limit_(limit), + count_(0) { options_ = new leveldb::ReadOptions(); options_->fill_cache = fillCache; options_->snapshot = database->NewSnapshot(); + dbIterator_ = database_->NewIterator(options_); } - ~Iterator () { - assert(ended_); - ReleaseTarget(); - if (start_ != NULL) { - // Special case for `start` option: it won't be - // freed up by any of the delete calls below. - if (!((lt_ != NULL && reverse_) - || (lte_ != NULL && reverse_) - || (gt_ != NULL && !reverse_) - || (gte_ != NULL && !reverse_))) { - delete [] start_->data(); - } - delete start_; - } - if (end_ != NULL) { - delete end_; - } - if (lt_ != NULL) { - delete lt_; - } - if (gt_ != NULL) { - delete gt_; - } - if (lte_ != NULL) { - delete lte_; - } - if (gte_ != NULL) { - delete gte_; - } - delete options_; - } - - void ReleaseTarget () { - if (target_ != NULL) { - if (!target_->empty()) { - delete [] target_->data(); - } - delete target_; - target_ = NULL; - } - } + virtual ~BaseIterator () { + assert(hasEnded_); - void Attach (napi_ref ref) { - ref_ = ref; - database_->AttachIterator(id_, this); - } + if (lt_ != NULL) delete lt_; + if (gt_ != NULL) delete gt_; + if (lte_ != NULL) delete lte_; + if (gte_ != NULL) delete gte_; - napi_ref Detach () { - database_->DetachIterator(id_); - return ref_; + delete options_; } - leveldb::Status IteratorStatus () { - return dbIterator_->status(); + bool DidSeek () const { + return didSeek_; } - void IteratorEnd () { - delete dbIterator_; - dbIterator_ = NULL; - database_->ReleaseSnapshot(options_->snapshot); - } + /** + * Seek to the first relevant key based on range options. + */ + void SeekToRange () { + didSeek_ = true; - bool GetIterator () { - if (dbIterator_ != NULL) return false; + if (!reverse_ && gte_ != NULL) { + dbIterator_->Seek(*gte_); + } else if (!reverse_ && gt_ != NULL) { + dbIterator_->Seek(*gt_); - dbIterator_ = database_->NewIterator(options_); + if (dbIterator_->Valid() && dbIterator_->key().compare(*gt_) == 0) { + dbIterator_->Next(); + } + } else if (reverse_ && lte_ != NULL) { + dbIterator_->Seek(*lte_); - if (start_ != NULL) { - dbIterator_->Seek(*start_); - - if (reverse_) { - if (!dbIterator_->Valid()) { - dbIterator_->SeekToLast(); - } else { - std::string keyStr = dbIterator_->key().ToString(); - - if (lt_ != NULL) { - if (lt_->compare(keyStr) <= 0) - dbIterator_->Prev(); - } else if (lte_ != NULL) { - if (lte_->compare(keyStr) < 0) - dbIterator_->Prev(); - } else if (start_ != NULL) { - if (start_->compare(keyStr)) - dbIterator_->Prev(); - } - } + if (!dbIterator_->Valid()) { + dbIterator_->SeekToLast(); + } else if (dbIterator_->key().compare(*lte_) > 0) { + dbIterator_->Prev(); + } + } else if (reverse_ && lt_ != NULL) { + dbIterator_->Seek(*lt_); - if (dbIterator_->Valid() && lt_ != NULL) { - if (lt_->compare(dbIterator_->key().ToString()) <= 0) - dbIterator_->Prev(); - } - } else { - if (dbIterator_->Valid() && gt_ != NULL - && gt_->compare(dbIterator_->key().ToString()) == 0) - dbIterator_->Next(); + if (!dbIterator_->Valid()) { + dbIterator_->SeekToLast(); + } else if (dbIterator_->key().compare(*lt_) >= 0) { + dbIterator_->Prev(); } } else if (reverse_) { dbIterator_->SeekToLast(); } else { dbIterator_->SeekToFirst(); } - - return true; } - bool Read (std::string& key, std::string& value) { - if (!GetIterator() && !seeking_) { - if (reverse_) { - dbIterator_->Prev(); - } - else { - dbIterator_->Next(); - } + /** + * Seek manually (during iteration). + */ + void Seek (leveldb::Slice& target) { + didSeek_ = true; + + if (OutOfRange(target)) { + return SeekToEnd(); } - seeking_ = false; + dbIterator_->Seek(target); if (dbIterator_->Valid()) { - std::string keyStr = dbIterator_->key().ToString(); - const int isEnd = end_ == NULL ? 1 : end_->compare(keyStr); - - if ((limit_ < 0 || ++count_ <= limit_) - && (end_ == NULL - || (reverse_ && (isEnd <= 0)) - || (!reverse_ && (isEnd >= 0))) - && ( lt_ != NULL ? (lt_->compare(keyStr) > 0) - : lte_ != NULL ? (lte_->compare(keyStr) >= 0) - : true ) - && ( gt_ != NULL ? (gt_->compare(keyStr) < 0) - : gte_ != NULL ? (gte_->compare(keyStr) <= 0) - : true ) - ) { - if (keys_) { - key.assign(dbIterator_->key().data(), dbIterator_->key().size()); - } - if (values_) { - value.assign(dbIterator_->value().data(), dbIterator_->value().size()); + int cmp = dbIterator_->key().compare(target); + if (reverse_ ? cmp > 0 : cmp < 0) { + Next(); + } + } else { + SeekToFirst(); + if (dbIterator_->Valid()) { + int cmp = dbIterator_->key().compare(target); + if (reverse_ ? cmp > 0 : cmp < 0) { + SeekToEnd(); } - return true; } } - - return false; } - bool OutOfRange (leveldb::Slice* target) { - if ((lt_ != NULL && target->compare(*lt_) >= 0) || - (lte_ != NULL && target->compare(*lte_) > 0) || - (start_ != NULL && reverse_ && target->compare(*start_) > 0)) { - return true; + void End () { + if (!hasEnded_) { + hasEnded_ = true; + delete dbIterator_; + dbIterator_ = NULL; + database_->ReleaseSnapshot(options_->snapshot); } + } - if (end_ != NULL) { - int d = target->compare(*end_); - if (reverse_ ? d < 0 : d > 0) return true; - } + bool Valid () const { + return dbIterator_->Valid() && !OutOfRange(dbIterator_->key()); + } - return ((gt_ != NULL && target->compare(*gt_) <= 0) || - (gte_ != NULL && target->compare(*gte_) < 0) || - (start_ != NULL && !reverse_ && target->compare(*start_) < 0)); + bool Increment () { + return limit_ < 0 || ++count_ <= limit_; } - bool IteratorNext (std::vector >& result) { - size_t size = 0; - uint32_t cacheSize = 0; + void Next () { + if (reverse_) dbIterator_->Prev(); + else dbIterator_->Next(); + } - while (true) { - std::string key, value; - bool ok = Read(key, value); + void SeekToFirst () { + if (reverse_) dbIterator_->SeekToLast(); + else dbIterator_->SeekToFirst(); + } - if (ok) { - result.push_back(std::make_pair(key, value)); + void SeekToLast () { + if (reverse_) dbIterator_->SeekToFirst(); + else dbIterator_->SeekToLast(); + } - if (!landed_) { - landed_ = true; - return true; - } + void SeekToEnd () { + SeekToLast(); + Next(); + } - size = size + key.size() + value.size(); - if (size > highWaterMark_) return true; + leveldb::Slice CurrentKey () const { + return dbIterator_->key(); + } - // Limit the size of the cache to prevent starving the event loop - // in JS-land while we're recursively calling process.nextTick(). - if (++cacheSize >= 1000) return true; - } else { - return false; - } - } + leveldb::Slice CurrentValue () const { + return dbIterator_->value(); + } + + leveldb::Status Status () const { + return dbIterator_->status(); + } + + bool OutOfRange (const leveldb::Slice& target) const { + // TODO: benchmark to see if this is worth it + // if (upperBoundOnly && !reverse_) { + // return ((lt_ != NULL && target.compare(*lt_) >= 0) || + // (lte_ != NULL && target.compare(*lte_) > 0)); + // } + + return ((lt_ != NULL && target.compare(*lt_) >= 0) || + (lte_ != NULL && target.compare(*lte_) > 0) || + (gt_ != NULL && target.compare(*gt_) <= 0) || + (gte_ != NULL && target.compare(*gte_) < 0)); } Database* database_; - uint32_t id_; - leveldb::Slice* start_; - std::string* end_; - bool reverse_; - bool keys_; - bool values_; - int limit_; + bool hasEnded_; + +private: + leveldb::Iterator* dbIterator_; + bool didSeek_; + const bool reverse_; std::string* lt_; std::string* lte_; std::string* gt_; std::string* gte_; - bool keyAsBuffer_; - bool valueAsBuffer_; - uint32_t highWaterMark_; - leveldb::Iterator* dbIterator_; + const int limit_; int count_; - leveldb::Slice* target_; - bool seeking_; + leveldb::ReadOptions* options_; +}; + +/** + * Extends BaseIterator for reading it from JS land. + */ +struct Iterator final : public BaseIterator { + Iterator (Database* database, + const uint32_t id, + const bool reverse, + const bool keys, + const bool values, + const int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + const bool fillCache, + const bool keyAsBuffer, + const bool valueAsBuffer, + const uint32_t highWaterMark) + : BaseIterator(database, reverse, lt, lte, gt, gte, limit, fillCache), + id_(id), + keys_(keys), + values_(values), + keyAsBuffer_(keyAsBuffer), + valueAsBuffer_(valueAsBuffer), + highWaterMark_(highWaterMark), + landed_(false), + nexting_(false), + isEnding_(false), + endWorker_(NULL), + ref_(NULL) { + } + + ~Iterator () {} + + void Attach (napi_env env, napi_value context) { + napi_create_reference(env, context, 1, &ref_); + database_->AttachIterator(env, id_, this); + } + + void Detach (napi_env env) { + database_->DetachIterator(env, id_); + if (ref_ != NULL) napi_delete_reference(env, ref_); + } + + bool ReadMany (uint32_t size) { + cache_.clear(); + size_t bytesRead = 0; + + while (true) { + if (landed_) Next(); + if (!Valid() || !Increment()) break; + + if (keys_) { + leveldb::Slice slice = CurrentKey(); + cache_.emplace_back(slice.data(), slice.size()); + bytesRead += slice.size(); + } else { + cache_.emplace_back(""); + } + + if (values_) { + leveldb::Slice slice = CurrentValue(); + cache_.emplace_back(slice.data(), slice.size()); + bytesRead += slice.size(); + } else { + cache_.emplace_back(""); + } + + if (!landed_) { + landed_ = true; + return true; + } + + if (bytesRead > highWaterMark_ || cache_.size() >= size * 2) { + return true; + } + } + + return false; + } + + const uint32_t id_; + const bool keys_; + const bool values_; + const bool keyAsBuffer_; + const bool valueAsBuffer_; + const uint32_t highWaterMark_; bool landed_; bool nexting_; - bool ended_; - - leveldb::ReadOptions* options_; - EndWorker* endWorker_; + bool isEnding_; + BaseWorker* endWorker_; + std::vector cache_; private: napi_ref ref_; }; +/** + * Hook for when the environment exits. This hook will be called after + * already-scheduled napi_async_work items have finished, which gives us + * the guarantee that no db operations will be in-flight at this time. + */ +static void env_cleanup_hook (void* arg) { + Database* database = (Database*)arg; + + // Do everything that db_close() does but synchronously. We're expecting that GC + // did not (yet) collect the database because that would be a user mistake (not + // closing their db) made during the lifetime of the environment. That's different + // from an environment being torn down (like the main process or a worker thread) + // where it's our responsibility to clean up. Note also, the following code must + // be a safe noop if called before db_open() or after db_close(). + if (database && database->db_ != NULL) { + std::map iterators = database->iterators_; + std::map::iterator it; + + // TODO: does not do `napi_delete_reference(env, iterator->ref_)`. Problem? + for (it = iterators.begin(); it != iterators.end(); ++it) { + it->second->End(); + } + + // Having ended the iterators (and released snapshots) we can safely close. + database->CloseDatabase(); + } +} + +/** + * Runs when a Database is garbage collected. + */ +static void FinalizeDatabase (napi_env env, void* data, void* hint) { + if (data) { + Database* database = (Database*)data; + napi_remove_env_cleanup_hook(env, env_cleanup_hook, database); + if (database->ref_ != NULL) napi_delete_reference(env, database->ref_); + delete database; + } +} + /** * Returns a context object for a database. */ NAPI_METHOD(db_init) { - Database* database = new Database(env); + Database* database = new Database(); + napi_add_env_cleanup_hook(env, env_cleanup_hook, database); napi_value result; NAPI_STATUS_THROWS(napi_create_external(env, database, FinalizeDatabase, NULL, &result)); + + // Reference counter to prevent GC of database while priority workers are active + NAPI_STATUS_THROWS(napi_create_reference(env, result, 0, &database->ref_)); + return result; } /** * Worker class for opening a database. + * TODO: shouldn't this be a PriorityWorker? */ struct OpenWorker final : public BaseWorker { OpenWorker (napi_env env, Database* database, napi_value callback, const std::string& location, - bool createIfMissing, - bool errorIfExists, - bool compression, - uint32_t writeBufferSize, - uint32_t blockSize, - uint32_t maxOpenFiles, - uint32_t blockRestartInterval, - uint32_t maxFileSize) + const bool createIfMissing, + const bool errorIfExists, + const bool compression, + const uint32_t writeBufferSize, + const uint32_t blockSize, + const uint32_t maxOpenFiles, + const uint32_t blockRestartInterval, + const uint32_t maxFileSize) : BaseWorker(env, database, callback, "leveldown.db.open"), location_(location) { options_.block_cache = database->blockCache_; @@ -809,17 +977,17 @@ NAPI_METHOD(db_open) { NAPI_ARGV_UTF8_NEW(location, 1); napi_value options = argv[2]; - bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); - bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); - bool compression = BooleanProperty(env, options, "compression", true); - - uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); - uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); - uint32_t blockSize = Uint32Property(env, options, "blockSize", 4096); - uint32_t maxOpenFiles = Uint32Property(env, options, "maxOpenFiles", 1000); - uint32_t blockRestartInterval = Uint32Property(env, options, + const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); + const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); + const bool compression = BooleanProperty(env, options, "compression", true); + + const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); + const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); + const uint32_t blockSize = Uint32Property(env, options, "blockSize", 4096); + const uint32_t maxOpenFiles = Uint32Property(env, options, "maxOpenFiles", 1000); + const uint32_t blockRestartInterval = Uint32Property(env, options, "blockRestartInterval", 16); - uint32_t maxFileSize = Uint32Property(env, options, "maxFileSize", 2 << 20); + const uint32_t maxFileSize = Uint32Property(env, options, "maxFileSize", 2 << 20); database->blockCache_ = leveldb::NewLRUCache(cacheSize); @@ -829,7 +997,7 @@ NAPI_METHOD(db_open) { compression, writeBufferSize, blockSize, maxOpenFiles, blockRestartInterval, maxFileSize); - worker->Queue(); + worker->Queue(env); delete [] location; NAPI_RETURN_UNDEFINED(); @@ -866,7 +1034,7 @@ NAPI_METHOD(db_close) { CloseWorker* worker = new CloseWorker(env, database, callback); if (!database->HasPriorityWork()) { - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -927,7 +1095,7 @@ NAPI_METHOD(db_put) { napi_value callback = argv[4]; PutWorker* worker = new PutWorker(env, database, callback, key, value, sync); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -940,8 +1108,8 @@ struct GetWorker final : public PriorityWorker { Database* database, napi_value callback, leveldb::Slice key, - bool asBuffer, - bool fillCache) + const bool asBuffer, + const bool fillCache) : PriorityWorker(env, database, callback, "leveldown.db.get"), key_(key), asBuffer_(asBuffer) { @@ -956,25 +1124,18 @@ struct GetWorker final : public PriorityWorker { SetStatus(database_->Get(options_, key_, value_)); } - void HandleOKCallback () override { + void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; - napi_get_null(env_, &argv[0]); - - if (asBuffer_) { - napi_create_buffer_copy(env_, value_.size(), value_.data(), NULL, &argv[1]); - } else { - napi_create_string_utf8(env_, value_.data(), value_.size(), &argv[1]); - } - - napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 2, argv); + napi_get_null(env, &argv[0]); + Entry::Convert(env, &value_, asBuffer_, &argv[1]); + CallFunction(env, callback, 2, argv); } +private: leveldb::ReadOptions options_; leveldb::Slice key_; std::string value_; - bool asBuffer_; + const bool asBuffer_; }; /** @@ -986,17 +1147,109 @@ NAPI_METHOD(db_get) { leveldb::Slice key = ToSlice(env, argv[1]); napi_value options = argv[2]; - bool asBuffer = BooleanProperty(env, options, "asBuffer", true); - bool fillCache = BooleanProperty(env, options, "fillCache", true); + const bool asBuffer = BooleanProperty(env, options, "asBuffer", true); + const bool fillCache = BooleanProperty(env, options, "fillCache", true); napi_value callback = argv[3]; GetWorker* worker = new GetWorker(env, database, callback, key, asBuffer, fillCache); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } +/** + * Worker class for getting many values. + */ +struct GetManyWorker final : public PriorityWorker { + GetManyWorker (napi_env env, + Database* database, + const std::vector* keys, + napi_value callback, + const bool valueAsBuffer, + const bool fillCache) + : PriorityWorker(env, database, callback, "leveldown.get.many"), + keys_(keys), valueAsBuffer_(valueAsBuffer) { + options_.fill_cache = fillCache; + options_.snapshot = database->NewSnapshot(); + } + + ~GetManyWorker() { + delete keys_; + } + + void DoExecute () override { + cache_.reserve(keys_->size()); + + for (const std::string& key: *keys_) { + std::string* value = new std::string(); + leveldb::Status status = database_->Get(options_, key, *value); + + if (status.ok()) { + cache_.push_back(value); + } else if (status.IsNotFound()) { + delete value; + cache_.push_back(NULL); + } else { + delete value; + for (const std::string* value: cache_) { + if (value != NULL) delete value; + } + SetStatus(status); + break; + } + } + + database_->ReleaseSnapshot(options_.snapshot); + } + + void HandleOKCallback (napi_env env, napi_value callback) override { + size_t size = cache_.size(); + napi_value array; + napi_create_array_with_length(env, size, &array); + + for (size_t idx = 0; idx < size; idx++) { + std::string* value = cache_[idx]; + napi_value element; + Entry::Convert(env, value, valueAsBuffer_, &element); + napi_set_element(env, array, static_cast(idx), element); + if (value != NULL) delete value; + } + + napi_value argv[2]; + napi_get_null(env, &argv[0]); + argv[1] = array; + CallFunction(env, callback, 2, argv); + } + +private: + leveldb::ReadOptions options_; + const std::vector* keys_; + const bool valueAsBuffer_; + std::vector cache_; +}; + +/** + * Gets many values from a database. + */ +NAPI_METHOD(db_get_many) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + const std::vector* keys = KeyArray(env, argv[1]); + napi_value options = argv[2]; + const bool asBuffer = BooleanProperty(env, options, "asBuffer", true); + const bool fillCache = BooleanProperty(env, options, "fillCache", true); + napi_value callback = argv[3]; + + GetManyWorker* worker = new GetManyWorker( + env, database, keys, callback, asBuffer, fillCache + ); + + worker->Queue(env); + NAPI_RETURN_UNDEFINED(); +} + /** * Worker class for deleting a value from a database. */ @@ -1035,7 +1288,91 @@ NAPI_METHOD(db_del) { napi_value callback = argv[3]; DelWorker* worker = new DelWorker(env, database, callback, key, sync); - worker->Queue(); + worker->Queue(env); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for deleting a range from a database. + */ +struct ClearWorker final : public PriorityWorker { + ClearWorker (napi_env env, + Database* database, + napi_value callback, + const bool reverse, + const int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte) + : PriorityWorker(env, database, callback, "leveldown.db.clear") { + iterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false); + writeOptions_ = new leveldb::WriteOptions(); + writeOptions_->sync = false; + } + + ~ClearWorker () { + delete iterator_; + delete writeOptions_; + } + + void DoExecute () override { + iterator_->SeekToRange(); + + // TODO: add option + uint32_t hwm = 16 * 1024; + leveldb::WriteBatch batch; + + while (true) { + size_t bytesRead = 0; + + while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) { + leveldb::Slice key = iterator_->CurrentKey(); + batch.Delete(key); + bytesRead += key.size(); + iterator_->Next(); + } + + if (!SetStatus(iterator_->Status()) || bytesRead == 0) { + break; + } + + if (!SetStatus(database_->WriteBatch(*writeOptions_, &batch))) { + break; + } + + batch.Clear(); + } + + iterator_->End(); + } + +private: + BaseIterator* iterator_; + leveldb::WriteOptions* writeOptions_; +}; + +/** + * Delete a range from a database. + */ +NAPI_METHOD(db_clear) { + NAPI_ARGV(3); + NAPI_DB_CONTEXT(); + + napi_value options = argv[1]; + napi_value callback = argv[2]; + + const bool reverse = BooleanProperty(env, options, "reverse", false); + const int limit = Int32Property(env, options, "limit", -1); + + std::string* lt = RangeOption(env, options, "lt"); + std::string* lte = RangeOption(env, options, "lte"); + std::string* gt = RangeOption(env, options, "gt"); + std::string* gte = RangeOption(env, options, "gte"); + + ClearWorker* worker = new ClearWorker(env, database, callback, reverse, limit, lt, lte, gt, gte); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1062,13 +1399,11 @@ struct ApproximateSizeWorker final : public PriorityWorker { size_ = database_->ApproximateSize(&range); } - void HandleOKCallback () override { + void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; - napi_get_null(env_, &argv[0]); - napi_create_uint32(env_, (uint32_t)size_, &argv[1]); - napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 2, argv); + napi_get_null(env, &argv[0]); + napi_create_int64(env, (uint64_t)size_, &argv[1]); + CallFunction(env, callback, 2, argv); } leveldb::Slice start_; @@ -1091,7 +1426,7 @@ NAPI_METHOD(db_approximate_size) { ApproximateSizeWorker* worker = new ApproximateSizeWorker(env, database, callback, start, end); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1134,7 +1469,7 @@ NAPI_METHOD(db_compact_range) { CompactRangeWorker* worker = new CompactRangeWorker(env, database, callback, start, end); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1188,7 +1523,7 @@ NAPI_METHOD(destroy_db) { napi_value callback = argv[1]; DestroyWorker* worker = new DestroyWorker(env, location, callback); - worker->Queue(); + worker->Queue(env); delete [] location; @@ -1224,7 +1559,7 @@ NAPI_METHOD(repair_db) { napi_value callback = argv[1]; RepairWorker* worker = new RepairWorker(env, location, callback); - worker->Queue(); + worker->Queue(env); delete [] location; @@ -1240,17 +1575,6 @@ static void FinalizeIterator (napi_env env, void* data, void* hint) { } } -#define CHECK_PROPERTY(name, code) \ - if (HasProperty(env, options, #name)) { \ - napi_value value = GetProperty(env, options, #name); \ - if (IsString(env, value) || IsBuffer(env, value)) { \ - if (StringOrBufferLength(env, value) > 0) { \ - LD_STRING_OR_BUFFER_TO_COPY(env, value, _##name); \ - code; \ - } \ - } \ - } \ - /** * Create an iterator. */ @@ -1259,101 +1583,26 @@ NAPI_METHOD(iterator_init) { NAPI_DB_CONTEXT(); napi_value options = argv[1]; - bool reverse = BooleanProperty(env, options, "reverse", false); - bool keys = BooleanProperty(env, options, "keys", true); - bool values = BooleanProperty(env, options, "values", true); - bool fillCache = BooleanProperty(env, options, "fillCache", false); - bool keyAsBuffer = BooleanProperty(env, options, "keyAsBuffer", true); - bool valueAsBuffer = BooleanProperty(env, options, "valueAsBuffer", true); - int limit = Int32Property(env, options, "limit", -1); - uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark", + const bool reverse = BooleanProperty(env, options, "reverse", false); + const bool keys = BooleanProperty(env, options, "keys", true); + const bool values = BooleanProperty(env, options, "values", true); + const bool fillCache = BooleanProperty(env, options, "fillCache", false); + const bool keyAsBuffer = BooleanProperty(env, options, "keyAsBuffer", true); + const bool valueAsBuffer = BooleanProperty(env, options, "valueAsBuffer", true); + const int limit = Int32Property(env, options, "limit", -1); + const uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark", 16 * 1024); - // TODO simplify and refactor the hideous code below - - leveldb::Slice* start = NULL; - char *startStr = NULL; - CHECK_PROPERTY(start, { - start = new leveldb::Slice(_startCh_, _startSz_); - startStr = _startCh_; - }); - - std::string* end = NULL; - CHECK_PROPERTY(end, { - end = new std::string(_endCh_, _endSz_); - delete [] _endCh_; - }); - - std::string* lt = NULL; - CHECK_PROPERTY(lt, { - lt = new std::string(_ltCh_, _ltSz_); - delete [] _ltCh_; - if (reverse) { - if (startStr != NULL) { - delete [] startStr; - startStr = NULL; - } - if (start != NULL) { - delete start; - } - start = new leveldb::Slice(lt->data(), lt->size()); - } - }); - - std::string* lte = NULL; - CHECK_PROPERTY(lte, { - lte = new std::string(_lteCh_, _lteSz_); - delete [] _lteCh_; - if (reverse) { - if (startStr != NULL) { - delete [] startStr; - startStr = NULL; - } - if (start != NULL) { - delete start; - } - start = new leveldb::Slice(lte->data(), lte->size()); - } - }); - - std::string* gt = NULL; - CHECK_PROPERTY(gt, { - gt = new std::string(_gtCh_, _gtSz_); - delete [] _gtCh_; - if (!reverse) { - if (startStr != NULL) { - delete [] startStr; - startStr = NULL; - } - if (start != NULL) { - delete start; - } - start = new leveldb::Slice(gt->data(), gt->size()); - } - }); - - std::string* gte = NULL; - CHECK_PROPERTY(gte, { - gte = new std::string(_gteCh_, _gteSz_); - delete [] _gteCh_; - if (!reverse) { - if (startStr != NULL) { - delete [] startStr; - startStr = NULL; - } - if (start != NULL) { - delete start; - } - start = new leveldb::Slice(gte->data(), gte->size()); - } - }); + std::string* lt = RangeOption(env, options, "lt"); + std::string* lte = RangeOption(env, options, "lte"); + std::string* gt = RangeOption(env, options, "gt"); + std::string* gte = RangeOption(env, options, "gte"); - uint32_t id = database->currentIteratorId_++; - Iterator* iterator = new Iterator(database, id, start, end, reverse, keys, + const uint32_t id = database->currentIteratorId_++; + Iterator* iterator = new Iterator(database, id, reverse, keys, values, limit, lt, lte, gt, gte, fillCache, keyAsBuffer, valueAsBuffer, highWaterMark); napi_value result; - napi_ref ref; NAPI_STATUS_THROWS(napi_create_external(env, iterator, FinalizeIterator, @@ -1361,8 +1610,7 @@ NAPI_METHOD(iterator_init) { // Prevent GC of JS object before the iterator is ended (explicitly or on // db close) and keep track of non-ended iterators to end them on db close. - NAPI_STATUS_THROWS(napi_create_reference(env, result, 1, &ref)); - iterator->Attach(ref); + iterator->Attach(env, result); return result; } @@ -1374,54 +1622,15 @@ NAPI_METHOD(iterator_seek) { NAPI_ARGV(2); NAPI_ITERATOR_CONTEXT(); - if (iterator->ended_) { + if (iterator->isEnding_ || iterator->hasEnded_) { napi_throw_error(env, NULL, "iterator has ended"); } - iterator->ReleaseTarget(); - iterator->target_ = new leveldb::Slice(ToSlice(env, argv[1])); - iterator->GetIterator(); - - leveldb::Iterator* dbIterator = iterator->dbIterator_; - dbIterator->Seek(*iterator->target_); - - iterator->seeking_ = true; + leveldb::Slice target = ToSlice(env, argv[1]); iterator->landed_ = false; + iterator->Seek(target); - if (iterator->OutOfRange(iterator->target_)) { - if (iterator->reverse_) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } - else if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(*iterator->target_); - if (cmp > 0 && iterator->reverse_) { - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse_) { - dbIterator->Next(); - } - } else { - if (iterator->reverse_) { - dbIterator->SeekToLast(); - } else { - dbIterator->SeekToFirst(); - } - if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(*iterator->target_); - if (cmp > 0 && iterator->reverse_) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse_) { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } - } - + DisposeSliceBuffer(target); NAPI_RETURN_UNDEFINED(); } @@ -1438,14 +1647,15 @@ struct EndWorker final : public BaseWorker { ~EndWorker () {} void DoExecute () override { - iterator_->IteratorEnd(); + iterator_->End(); } - void HandleOKCallback () override { - napi_delete_reference(env_, iterator_->Detach()); - BaseWorker::HandleOKCallback(); + void DoFinally (napi_env env) override { + iterator_->Detach(env); + BaseWorker::DoFinally(env); } +private: Iterator* iterator_; }; @@ -1454,14 +1664,14 @@ struct EndWorker final : public BaseWorker { * open iterators during NAPI_METHOD(db_close). */ static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb) { - if (!iterator->ended_) { + if (!iterator->isEnding_ && !iterator->hasEnded_) { EndWorker* worker = new EndWorker(env, iterator, cb); - iterator->ended_ = true; + iterator->isEnding_ = true; if (iterator->nexting_) { iterator->endWorker_ = worker; } else { - worker->Queue(); + worker->Queue(env); } } } @@ -1478,87 +1688,74 @@ NAPI_METHOD(iterator_end) { NAPI_RETURN_UNDEFINED(); } -/** - * TODO Move this to Iterator. There isn't any reason - * for this function being a separate function pointer. - */ -void CheckEndCallback (Iterator* iterator) { - iterator->ReleaseTarget(); - iterator->nexting_ = false; - if (iterator->endWorker_ != NULL) { - iterator->endWorker_->Queue(); - iterator->endWorker_ = NULL; - } -} - /** * Worker class for nexting an iterator. */ struct NextWorker final : public BaseWorker { NextWorker (napi_env env, Iterator* iterator, - napi_value callback, - void (*localCallback)(Iterator*)) + napi_value callback) : BaseWorker(env, iterator->database_, callback, "leveldown.iterator.next"), - iterator_(iterator), - localCallback_(localCallback) {} + iterator_(iterator), ok_() {} ~NextWorker () {} void DoExecute () override { - ok_ = iterator_->IteratorNext(result_); + if (!iterator_->DidSeek()) { + iterator_->SeekToRange(); + } + + // Limit the size of the cache to prevent starving the event loop + // in JS-land while we're recursively calling process.nextTick(). + ok_ = iterator_->ReadMany(1000); + if (!ok_) { - SetStatus(iterator_->IteratorStatus()); + SetStatus(iterator_->Status()); } } - void HandleOKCallback () override { - size_t arraySize = result_.size() * 2; + void HandleOKCallback (napi_env env, napi_value callback) override { + size_t arraySize = iterator_->cache_.size(); napi_value jsArray; - napi_create_array_with_length(env_, arraySize, &jsArray); + napi_create_array_with_length(env, arraySize, &jsArray); - for (size_t idx = 0; idx < result_.size(); ++idx) { - std::pair row = result_[idx]; - std::string key = row.first; - std::string value = row.second; + for (size_t idx = 0; idx < iterator_->cache_.size(); idx += 2) { + std::string key = iterator_->cache_[idx]; + std::string value = iterator_->cache_[idx + 1]; napi_value returnKey; - if (iterator_->keyAsBuffer_) { - napi_create_buffer_copy(env_, key.size(), key.data(), NULL, &returnKey); - } else { - napi_create_string_utf8(env_, key.data(), key.size(), &returnKey); - } - napi_value returnValue; - if (iterator_->valueAsBuffer_) { - napi_create_buffer_copy(env_, value.size(), value.data(), NULL, &returnValue); - } else { - napi_create_string_utf8(env_, value.data(), value.size(), &returnValue); - } + + Entry::Convert(env, &key, iterator_->keyAsBuffer_, &returnKey); + Entry::Convert(env, &value, iterator_->valueAsBuffer_, &returnValue); // put the key & value in a descending order, so that they can be .pop:ed in javascript-land - napi_set_element(env_, jsArray, static_cast(arraySize - idx * 2 - 1), returnKey); - napi_set_element(env_, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); + napi_set_element(env, jsArray, static_cast(arraySize - idx - 1), returnKey); + napi_set_element(env, jsArray, static_cast(arraySize - idx - 2), returnValue); } - // clean up & handle the next/end state - // TODO this should just do iterator_->CheckEndCallback(); - localCallback_(iterator_); - napi_value argv[3]; - napi_get_null(env_, &argv[0]); + napi_get_null(env, &argv[0]); argv[1] = jsArray; - napi_get_boolean(env_, !ok_, &argv[2]); - napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 3, argv); + napi_get_boolean(env, !ok_, &argv[2]); + CallFunction(env, callback, 3, argv); + } + + void DoFinally (napi_env env) override { + // clean up & handle the next/end state + iterator_->nexting_ = false; + + if (iterator_->endWorker_ != NULL) { + iterator_->endWorker_->Queue(env); + iterator_->endWorker_ = NULL; + } + + BaseWorker::DoFinally(env); } +private: Iterator* iterator_; - // TODO why do we need a function pointer for this? - void (*localCallback_)(Iterator*); - std::vector > result_; bool ok_; }; @@ -1571,17 +1768,16 @@ NAPI_METHOD(iterator_next) { napi_value callback = argv[1]; - if (iterator->ended_) { + if (iterator->isEnding_ || iterator->hasEnded_) { napi_value argv = CreateError(env, "iterator has ended"); CallFunction(env, callback, 1, &argv); NAPI_RETURN_UNDEFINED(); } - NextWorker* worker = new NextWorker(env, iterator, callback, - CheckEndCallback); + NextWorker* worker = new NextWorker(env, iterator, callback); iterator->nexting_ = true; - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1594,8 +1790,8 @@ struct BatchWorker final : public PriorityWorker { Database* database, napi_value callback, leveldb::WriteBatch* batch, - bool sync, - bool hasData) + const bool sync, + const bool hasData) : PriorityWorker(env, database, callback, "leveldown.batch.do"), batch_(batch), hasData_(hasData) { options_.sync = sync; @@ -1611,9 +1807,10 @@ struct BatchWorker final : public PriorityWorker { } } +private: leveldb::WriteOptions options_; leveldb::WriteBatch* batch_; - bool hasData_; + const bool hasData_; }; /** @@ -1624,7 +1821,7 @@ NAPI_METHOD(batch_do) { NAPI_DB_CONTEXT(); napi_value array = argv[1]; - bool sync = BooleanProperty(env, argv[2], "sync", false); + const bool sync = BooleanProperty(env, argv[2], "sync", false); napi_value callback = argv[3]; uint32_t length; @@ -1665,7 +1862,7 @@ NAPI_METHOD(batch_do) { } BatchWorker* worker = new BatchWorker(env, database, callback, batch, sync, hasData); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1810,20 +2007,18 @@ struct BatchWriteWorker final : public PriorityWorker { napi_value context, Batch* batch, napi_value callback, - bool sync) + const bool sync) : PriorityWorker(env, batch->database_, callback, "leveldown.batch.write"), batch_(batch), sync_(sync) { - // For thread saftey, consider BatchWrite as shared. + // For thread safety, consider BatchWrite as shared. batch->Share(); // Prevent GC of batch object before we execute - NAPI_STATUS_THROWS_VOID(napi_create_reference(env_, context, 1, &contextRef_)); + NAPI_STATUS_THROWS_VOID(napi_create_reference(env, context, 1, &contextRef_)); } - ~BatchWriteWorker () { - napi_delete_reference(env_, contextRef_); - } + ~BatchWriteWorker () {} void DoExecute () override { if (batch_->hasData_) { @@ -1831,15 +2026,15 @@ struct BatchWriteWorker final : public PriorityWorker { } } - void DoFinally () override { - database_->DecrementPriorityWork(); + void DoFinally (napi_env env) override { + napi_delete_reference(env, contextRef_); batch_->Unshare(); + PriorityWorker::DoFinally(env); } - Batch* batch_; - bool sync_; - private: + Batch* batch_; + const bool sync_; napi_ref contextRef_; }; @@ -1850,19 +2045,19 @@ NAPI_METHOD(batch_write) { NAPI_ARGV(3); NAPI_BATCH_CONTEXT(); + napi_value options = argv[1]; + const bool sync = BooleanProperty(env, options, "sync", false); napi_value callback = argv[2]; - if (!batch->IsShared()) { - napi_value options = argv[1]; - bool sync = BooleanProperty(env, options, "sync", false); - - BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); - worker->Queue(); - } else { + if (batch->IsShared()) { napi_value argv = CreateError(env, "Unsafe batch write."); CallFunction(env, callback, 1, &argv); + NAPI_RETURN_UNDEFINED(); } + BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); + worker->Queue(env); + NAPI_RETURN_UNDEFINED(); } @@ -1875,7 +2070,9 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(db_close); NAPI_EXPORT_FUNCTION(db_put); NAPI_EXPORT_FUNCTION(db_get); + NAPI_EXPORT_FUNCTION(db_get_many); NAPI_EXPORT_FUNCTION(db_del); + NAPI_EXPORT_FUNCTION(db_clear); NAPI_EXPORT_FUNCTION(db_approximate_size); NAPI_EXPORT_FUNCTION(db_compact_range); NAPI_EXPORT_FUNCTION(db_get_property); From 265493ed6ec57e4ee82b67cef4de649ba109eb31 Mon Sep 17 00:00:00 2001 From: Nodari Chkuaselidze Date: Tue, 2 May 2023 15:22:24 +0400 Subject: [PATCH 2/2] memdb/browserdb: deprecate start/end iterator options. --- lib/level-browser.js | 10 ++-------- lib/memdb.js | 10 ++-------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/lib/level-browser.js b/lib/level-browser.js index 506df2d..53b0f43 100644 --- a/lib/level-browser.js +++ b/lib/level-browser.js @@ -608,12 +608,6 @@ class IteratorOptions { this.values = options.values; } - if (options.start != null) - this.start = options.start; - - if (options.end != null) - this.end = options.end; - if (options.gte != null) this.start = options.gte; @@ -633,13 +627,13 @@ class IteratorOptions { if (this.start != null) { if (typeof this.start === 'string') this.start = Buffer.from(this.start, 'utf8'); - assert(Buffer.isBuffer(this.start), '`start` must be a Buffer.'); + assert(Buffer.isBuffer(this.start), '`gt(e)` must be a Buffer.'); } if (this.end != null) { if (typeof this.end === 'string') this.end = Buffer.from(this.end, 'utf8'); - assert(Buffer.isBuffer(this.end), '`end` must be a Buffer.'); + assert(Buffer.isBuffer(this.end), '`lt(e)` must be a Buffer.'); } if (options.keyAsBuffer != null) { diff --git a/lib/memdb.js b/lib/memdb.js index b7a657e..87cc845 100644 --- a/lib/memdb.js +++ b/lib/memdb.js @@ -616,12 +616,6 @@ class IteratorOptions { this.values = options.values; } - if (options.start != null) - this.start = options.start; - - if (options.end != null) - this.end = options.end; - if (options.gte != null) this.start = options.gte; @@ -641,13 +635,13 @@ class IteratorOptions { if (this.start != null) { if (typeof this.start === 'string') this.start = Buffer.from(this.start, 'utf8'); - assert(Buffer.isBuffer(this.start), '`start` must be a Buffer.'); + assert(Buffer.isBuffer(this.start), '`gt(e)` must be a Buffer.'); } if (this.end != null) { if (typeof this.end === 'string') this.end = Buffer.from(this.end, 'utf8'); - assert(Buffer.isBuffer(this.end), '`end` must be a Buffer.'); + assert(Buffer.isBuffer(this.end), '`lt(e)` must be a Buffer.'); } if (options.keyAsBuffer != null) {