diff --git a/binding.cc b/binding.cc new file mode 100644 index 00000000..2ba2e2c6 --- /dev/null +++ b/binding.cc @@ -0,0 +1,1852 @@ +#define NAPI_VERSION 3 + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +/** + * Forward declarations. + */ +struct Database; +struct Iterator; +struct EndWorker; +static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb); + +/** + * Macros. + */ + +#define NAPI_DB_CONTEXT() \ + Database* database = NULL; \ + NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&database)); + +#define NAPI_ITERATOR_CONTEXT() \ + Iterator* iterator = NULL; \ + NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&iterator)); + +#define NAPI_BATCH_CONTEXT() \ + Batch* batch = NULL; \ + NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&batch)); + +#define NAPI_RETURN_UNDEFINED() \ + return 0; + +#define NAPI_UTF8_NEW(name, val) \ + size_t name##_size = 0; \ + NAPI_STATUS_THROWS(napi_get_value_string_utf8(env, val, NULL, 0, &name##_size)) \ + char* name = new char[name##_size + 1]; \ + NAPI_STATUS_THROWS(napi_get_value_string_utf8(env, val, name, name##_size + 1, &name##_size)) \ + name[name##_size] = '\0'; + +#define NAPI_ARGV_UTF8_NEW(name, i) \ + NAPI_UTF8_NEW(name, argv[i]) + +#define LD_STRING_OR_BUFFER_TO_COPY(env, from, to) \ + char* to##Ch_ = 0; \ + size_t to##Sz_ = 0; \ + if (IsString(env, from)) { \ + napi_get_value_string_utf8(env, from, NULL, 0, &to##Sz_); \ + to##Ch_ = new char[to##Sz_ + 1]; \ + napi_get_value_string_utf8(env, from, to##Ch_, to##Sz_ + 1, &to##Sz_); \ + to##Ch_[to##Sz_] = '\0'; \ + } else if (IsBuffer(env, from)) { \ + char* buf = 0; \ + napi_get_buffer_info(env, from, (void **)&buf, &to##Sz_); \ + to##Ch_ = new char[to##Sz_]; \ + memcpy(to##Ch_, buf, to##Sz_); \ + } + +/********************************************************************* + * Helpers. + ********************************************************************/ + +/** + * Returns true if 'value' is a string. + */ +static bool IsString (napi_env env, napi_value value) { + napi_valuetype type; + napi_typeof(env, value, &type); + return type == napi_string; +} + +/** + * Returns true if 'value' is a buffer. + */ +static bool IsBuffer (napi_env env, napi_value value) { + bool isBuffer; + napi_is_buffer(env, value, &isBuffer); + return isBuffer; +} + +/** + * Returns true if 'value' is an object. + */ +static bool IsObject (napi_env env, napi_value value) { + napi_valuetype type; + napi_typeof(env, value, &type); + return type == napi_object; +} + +/** + * Create an error object. + */ +static napi_value CreateError (napi_env env, const char* str) { + napi_value msg; + napi_create_string_utf8(env, str, strlen(str), &msg); + napi_value error; + napi_create_error(env, NULL, msg, &error); + return error; +} + +/** + * Returns true if 'obj' has a property 'key'. + */ +static bool HasProperty (napi_env env, napi_value obj, const char* key) { + bool has = false; + napi_has_named_property(env, obj, key, &has); + return has; +} + +/** + * Returns a property in napi_value form. + */ +static napi_value GetProperty (napi_env env, napi_value obj, const char* key) { + napi_value value; + napi_get_named_property(env, obj, key, &value); + return value; +} + +/** + * Returns a boolean property 'key' from 'obj'. + * Returns 'DEFAULT' if the property doesn't exist. + */ +static bool BooleanProperty (napi_env env, napi_value obj, const char* key, + bool DEFAULT) { + if (HasProperty(env, obj, key)) { + napi_value value = GetProperty(env, obj, key); + bool result; + napi_get_value_bool(env, value, &result); + return result; + } + + return DEFAULT; +} + +/** + * Returns a uint32 property 'key' from 'obj'. + * Returns 'DEFAULT' if the property doesn't exist. + */ +static uint32_t Uint32Property (napi_env env, napi_value obj, const char* key, + uint32_t DEFAULT) { + if (HasProperty(env, obj, key)) { + napi_value value = GetProperty(env, obj, key); + uint32_t result; + napi_get_value_uint32(env, value, &result); + return result; + } + + return DEFAULT; +} + +/** + * Returns a uint32 property 'key' from 'obj'. + * Returns 'DEFAULT' if the property doesn't exist. + */ +static int Int32Property (napi_env env, napi_value obj, const char* key, + int DEFAULT) { + if (HasProperty(env, obj, key)) { + napi_value value = GetProperty(env, obj, key); + int result; + napi_get_value_int32(env, value, &result); + return result; + } + + return DEFAULT; +} + +/** + * Returns a string property 'key' from 'obj'. + * Returns empty string if the property doesn't exist. + */ +static std::string StringProperty (napi_env env, napi_value obj, const char* key) { + if (HasProperty(env, obj, key)) { + napi_value value = GetProperty(env, obj, key); + if (IsString(env, value)) { + size_t size = 0; + napi_get_value_string_utf8(env, value, NULL, 0, &size); + + char* buf = new char[size + 1]; + napi_get_value_string_utf8(env, value, buf, size + 1, &size); + buf[size] = '\0'; + + std::string result = buf; + delete [] buf; + return result; + } + } + + return ""; +} + +static void DisposeSliceBuffer (leveldb::Slice slice) { + if (!slice.empty()) delete [] slice.data(); +} + +/** + * Convert a napi_value to a leveldb::Slice. + */ +static leveldb::Slice ToSlice (napi_env env, napi_value from) { + LD_STRING_OR_BUFFER_TO_COPY(env, from, to); + return leveldb::Slice(toCh_, toSz_); +} + +/** + * Returns length of string or buffer + */ +static size_t StringOrBufferLength (napi_env env, napi_value value) { + size_t size = 0; + + if (IsString(env, value)) { + napi_get_value_string_utf8(env, value, NULL, 0, &size); + } else if (IsBuffer(env, value)) { + char* buf; + napi_get_buffer_info(env, value, (void **)&buf, &size); + } + + return size; +} + +/** + * Calls a function. + */ +static napi_status CallFunction (napi_env env, + napi_value callback, + const int argc, + napi_value* argv) { + napi_value global; + napi_get_global(env, &global); + return napi_call_function(env, global, callback, argc, argv, NULL); +} + +/** + * Base worker class. Handles the async work. Derived classes can override the + * following virtual methods (listed in the order in which they're called): + * + * - DoExecute (abstract, worker pool thread): main work + * - HandleOKCallback (main thread): call JS callback on success + * - DoFinally (main thread): do cleanup regardless of success + */ +struct BaseWorker { + BaseWorker (napi_env env, + Database* database, + napi_value callback, + const char* resourceName) + : env_(env), database_(database), errMsg_(NULL) { + NAPI_STATUS_THROWS(napi_create_reference(env_, callback, 1, &callbackRef_)); + napi_value asyncResourceName; + NAPI_STATUS_THROWS(napi_create_string_utf8(env_, resourceName, + NAPI_AUTO_LENGTH, + &asyncResourceName)); + NAPI_STATUS_THROWS(napi_create_async_work(env_, callback, + asyncResourceName, + BaseWorker::Execute, + BaseWorker::Complete, + this, &asyncWork_)); + } + + virtual ~BaseWorker () { + delete [] errMsg_; + napi_delete_reference(env_, callbackRef_); + napi_delete_async_work(env_, asyncWork_); + } + + static void Execute (napi_env env, void* data) { + BaseWorker* self = (BaseWorker*)data; + self->DoExecute(); + } + + void SetStatus (leveldb::Status status) { + status_ = status; + if (!status.ok()) { + SetErrorMessage(status.ToString().c_str()); + } + } + + void SetErrorMessage(const char *msg) { + delete [] errMsg_; + size_t size = strlen(msg) + 1; + errMsg_ = new char[size]; + memcpy(errMsg_, msg, size); + } + + virtual void DoExecute () = 0; + virtual void DoFinally () {}; + + static void Complete (napi_env env, napi_status status, void* data) { + BaseWorker* self = (BaseWorker*)data; + self->DoComplete(); + self->DoFinally(); + delete self; + } + + void DoComplete () { + if (status_.ok()) { + return HandleOKCallback(); + } + + napi_value argv = CreateError(env_, errMsg_); + napi_value callback; + napi_get_reference_value(env_, callbackRef_, &callback); + CallFunction(env_, callback, 1, &argv); + } + + virtual void HandleOKCallback () { + napi_value argv; + napi_get_null(env_, &argv); + napi_value callback; + napi_get_reference_value(env_, callbackRef_, &callback); + CallFunction(env_, callback, 1, &argv); + } + + void Queue () { + napi_queue_async_work(env_, asyncWork_); + } + + napi_env env_; + napi_ref callbackRef_; + napi_async_work asyncWork_; + Database* database_; + +private: + leveldb::Status status_; + char *errMsg_; +}; + +/** + * Owns the LevelDB storage, cache, filter policy and iterators. + */ +struct Database { + Database (napi_env env) + : env_(env), + db_(NULL), + blockCache_(NULL), + filterPolicy_(leveldb::NewBloomFilterPolicy(10)), + currentIteratorId_(0), + pendingCloseWorker_(NULL), + priorityWork_(0) {} + + ~Database () { + if (db_ != NULL) { + delete db_; + db_ = NULL; + } + } + + leveldb::Status Open (const leveldb::Options& options, + const char* location) { + return leveldb::DB::Open(options, location, &db_); + } + + void CloseDatabase () { + delete db_; + db_ = NULL; + if (blockCache_) { + delete blockCache_; + blockCache_ = NULL; + } + } + + leveldb::Status Put (const leveldb::WriteOptions& options, + leveldb::Slice key, + leveldb::Slice value) { + return db_->Put(options, key, value); + } + + leveldb::Status Get (const leveldb::ReadOptions& options, + leveldb::Slice key, + std::string& value) { + return db_->Get(options, key, &value); + } + + leveldb::Status Del (const leveldb::WriteOptions& options, + leveldb::Slice key) { + return db_->Delete(options, key); + } + + leveldb::Status WriteBatch (const leveldb::WriteOptions& options, + leveldb::WriteBatch* batch) { + return db_->Write(options, batch); + } + + uint64_t ApproximateSize (const leveldb::Range* range) { + uint64_t size = 0; + db_->GetApproximateSizes(range, 1, &size); + return size; + } + + void CompactRange (const leveldb::Slice* start, + const leveldb::Slice* end) { + db_->CompactRange(start, end); + } + + void GetProperty (const leveldb::Slice& property, std::string* value) { + db_->GetProperty(property, value); + } + + const leveldb::Snapshot* NewSnapshot () { + return db_->GetSnapshot(); + } + + leveldb::Iterator* NewIterator (leveldb::ReadOptions* options) { + return db_->NewIterator(*options); + } + + void ReleaseSnapshot (const leveldb::Snapshot* snapshot) { + return db_->ReleaseSnapshot(snapshot); + } + + void AttachIterator (uint32_t id, Iterator* iterator) { + iterators_[id] = iterator; + IncrementPriorityWork(); + } + + void DetachIterator (uint32_t id) { + iterators_.erase(id); + DecrementPriorityWork(); + } + + void IncrementPriorityWork () { + ++priorityWork_; + } + + void DecrementPriorityWork () { + if (--priorityWork_ == 0 && pendingCloseWorker_ != NULL) { + pendingCloseWorker_->Queue(); + pendingCloseWorker_ = NULL; + } + } + + bool HasPriorityWork () { + return priorityWork_ > 0; + } + + napi_env env_; + leveldb::DB* db_; + leveldb::Cache* blockCache_; + const leveldb::FilterPolicy* filterPolicy_; + uint32_t currentIteratorId_; + BaseWorker *pendingCloseWorker_; + std::map< uint32_t, Iterator * > iterators_; + +private: + uint32_t priorityWork_; +}; + +/** + * Runs when a Database is garbage collected. + */ +static void FinalizeDatabase (napi_env env, void* data, void* hint) { + if (data) { + delete (Database*)data; + } +} + +/** + * Base worker class for doing async work that defers closing the database. + */ +struct PriorityWorker : public BaseWorker { + PriorityWorker (napi_env env, Database* database, napi_value callback, const char* resourceName) + : BaseWorker(env, database, callback, resourceName) { + database_->IncrementPriorityWork(); + } + + ~PriorityWorker () {} + + void DoFinally () override { + database_->DecrementPriorityWork(); + } +}; + +/** + * Owns a leveldb iterator. + */ +struct Iterator { + Iterator (Database* database, + uint32_t id, + leveldb::Slice* start, + std::string* end, + bool reverse, + bool keys, + bool values, + int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + bool fillCache, + bool keyAsBuffer, + bool valueAsBuffer, + uint32_t highWaterMark) + : database_(database), + id_(id), + start_(start), + end_(end), + reverse_(reverse), + keys_(keys), + values_(values), + limit_(limit), + lt_(lt), + lte_(lte), + gt_(gt), + gte_(gte), + keyAsBuffer_(keyAsBuffer), + valueAsBuffer_(valueAsBuffer), + highWaterMark_(highWaterMark), + dbIterator_(NULL), + count_(0), + target_(NULL), + seeking_(false), + landed_(false), + nexting_(false), + ended_(false), + endWorker_(NULL), + ref_(NULL) { + options_ = new leveldb::ReadOptions(); + options_->fill_cache = fillCache; + options_->snapshot = database->NewSnapshot(); + } + + ~Iterator () { + assert(ended_); + ReleaseTarget(); + if (start_ != NULL) { + // Special case for `start` option: it won't be + // freed up by any of the delete calls below. + if (!((lt_ != NULL && reverse_) + || (lte_ != NULL && reverse_) + || (gt_ != NULL && !reverse_) + || (gte_ != NULL && !reverse_))) { + delete [] start_->data(); + } + delete start_; + } + if (end_ != NULL) { + delete end_; + } + if (lt_ != NULL) { + delete lt_; + } + if (gt_ != NULL) { + delete gt_; + } + if (lte_ != NULL) { + delete lte_; + } + if (gte_ != NULL) { + delete gte_; + } + delete options_; + } + + void ReleaseTarget () { + if (target_ != NULL) { + if (!target_->empty()) { + delete [] target_->data(); + } + delete target_; + target_ = NULL; + } + } + + void Attach (napi_ref ref) { + ref_ = ref; + database_->AttachIterator(id_, this); + } + + napi_ref Detach () { + database_->DetachIterator(id_); + return ref_; + } + + leveldb::Status IteratorStatus () { + return dbIterator_->status(); + } + + void IteratorEnd () { + delete dbIterator_; + dbIterator_ = NULL; + database_->ReleaseSnapshot(options_->snapshot); + } + + bool GetIterator () { + if (dbIterator_ != NULL) return false; + + dbIterator_ = database_->NewIterator(options_); + + if (start_ != NULL) { + dbIterator_->Seek(*start_); + + if (reverse_) { + if (!dbIterator_->Valid()) { + dbIterator_->SeekToLast(); + } else { + std::string keyStr = dbIterator_->key().ToString(); + + if (lt_ != NULL) { + if (lt_->compare(keyStr) <= 0) + dbIterator_->Prev(); + } else if (lte_ != NULL) { + if (lte_->compare(keyStr) < 0) + dbIterator_->Prev(); + } else if (start_ != NULL) { + if (start_->compare(keyStr)) + dbIterator_->Prev(); + } + } + + if (dbIterator_->Valid() && lt_ != NULL) { + if (lt_->compare(dbIterator_->key().ToString()) <= 0) + dbIterator_->Prev(); + } + } else { + if (dbIterator_->Valid() && gt_ != NULL + && gt_->compare(dbIterator_->key().ToString()) == 0) + dbIterator_->Next(); + } + } else if (reverse_) { + dbIterator_->SeekToLast(); + } else { + dbIterator_->SeekToFirst(); + } + + return true; + } + + bool Read (std::string& key, std::string& value) { + if (!GetIterator() && !seeking_) { + if (reverse_) { + dbIterator_->Prev(); + } + else { + dbIterator_->Next(); + } + } + + seeking_ = false; + + if (dbIterator_->Valid()) { + std::string keyStr = dbIterator_->key().ToString(); + const int isEnd = end_ == NULL ? 1 : end_->compare(keyStr); + + if ((limit_ < 0 || ++count_ <= limit_) + && (end_ == NULL + || (reverse_ && (isEnd <= 0)) + || (!reverse_ && (isEnd >= 0))) + && ( lt_ != NULL ? (lt_->compare(keyStr) > 0) + : lte_ != NULL ? (lte_->compare(keyStr) >= 0) + : true ) + && ( gt_ != NULL ? (gt_->compare(keyStr) < 0) + : gte_ != NULL ? (gte_->compare(keyStr) <= 0) + : true ) + ) { + if (keys_) { + key.assign(dbIterator_->key().data(), dbIterator_->key().size()); + } + if (values_) { + value.assign(dbIterator_->value().data(), dbIterator_->value().size()); + } + return true; + } + } + + return false; + } + + bool OutOfRange (leveldb::Slice* target) { + if ((lt_ != NULL && target->compare(*lt_) >= 0) || + (lte_ != NULL && target->compare(*lte_) > 0) || + (start_ != NULL && reverse_ && target->compare(*start_) > 0)) { + return true; + } + + if (end_ != NULL) { + int d = target->compare(*end_); + if (reverse_ ? d < 0 : d > 0) return true; + } + + return ((gt_ != NULL && target->compare(*gt_) <= 0) || + (gte_ != NULL && target->compare(*gte_) < 0) || + (start_ != NULL && !reverse_ && target->compare(*start_) < 0)); + } + + bool IteratorNext (std::vector >& result) { + size_t size = 0; + while (true) { + std::string key, value; + bool ok = Read(key, value); + + if (ok) { + result.push_back(std::make_pair(key, value)); + + if (!landed_) { + landed_ = true; + return true; + } + + size = size + key.size() + value.size(); + if (size > highWaterMark_) return true; + + } else { + return false; + } + } + } + + Database* database_; + uint32_t id_; + leveldb::Slice* start_; + std::string* end_; + bool reverse_; + bool keys_; + bool values_; + int limit_; + std::string* lt_; + std::string* lte_; + std::string* gt_; + std::string* gte_; + bool keyAsBuffer_; + bool valueAsBuffer_; + uint32_t highWaterMark_; + leveldb::Iterator* dbIterator_; + int count_; + leveldb::Slice* target_; + bool seeking_; + bool landed_; + bool nexting_; + bool ended_; + + leveldb::ReadOptions* options_; + EndWorker* endWorker_; + +private: + napi_ref ref_; +}; + +/** + * Returns a context object for a database. + */ +NAPI_METHOD(db_init) { + Database* database = new Database(env); + + napi_value result; + NAPI_STATUS_THROWS(napi_create_external(env, database, + FinalizeDatabase, + NULL, &result)); + return result; +} + +/** + * Worker class for opening a database. + */ +struct OpenWorker final : public BaseWorker { + OpenWorker (napi_env env, + Database* database, + napi_value callback, + const std::string& location, + bool createIfMissing, + bool errorIfExists, + bool compression, + uint32_t writeBufferSize, + uint32_t blockSize, + uint32_t maxOpenFiles, + uint32_t blockRestartInterval, + uint32_t maxFileSize) + : BaseWorker(env, database, callback, "leveldown.db.open"), + location_(location) { + options_.block_cache = database->blockCache_; + options_.filter_policy = database->filterPolicy_; + options_.create_if_missing = createIfMissing; + options_.error_if_exists = errorIfExists; + options_.compression = compression + ? leveldb::kSnappyCompression + : leveldb::kNoCompression; + options_.write_buffer_size = writeBufferSize; + options_.block_size = blockSize; + options_.max_open_files = maxOpenFiles; + options_.block_restart_interval = blockRestartInterval; + options_.max_file_size = maxFileSize; + } + + ~OpenWorker () {} + + void DoExecute () override { + SetStatus(database_->Open(options_, location_.c_str())); + } + + leveldb::Options options_; + std::string location_; +}; + +/** + * Open a database. + */ +NAPI_METHOD(db_open) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + NAPI_ARGV_UTF8_NEW(location, 1); + + napi_value options = argv[2]; + bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); + bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); + bool compression = BooleanProperty(env, options, "compression", true); + + uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); + uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); + uint32_t blockSize = Uint32Property(env, options, "blockSize", 4096); + uint32_t maxOpenFiles = Uint32Property(env, options, "maxOpenFiles", 1000); + uint32_t blockRestartInterval = Uint32Property(env, options, + "blockRestartInterval", 16); + uint32_t maxFileSize = Uint32Property(env, options, "maxFileSize", 2 << 20); + + database->blockCache_ = leveldb::NewLRUCache(cacheSize); + + napi_value callback = argv[3]; + OpenWorker* worker = new OpenWorker(env, database, callback, location, + createIfMissing, errorIfExists, + compression, writeBufferSize, blockSize, + maxOpenFiles, blockRestartInterval, + maxFileSize); + worker->Queue(); + delete [] location; + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for closing a database + */ +struct CloseWorker final : public BaseWorker { + CloseWorker (napi_env env, + Database* database, + napi_value callback) + : BaseWorker(env, database, callback, "leveldown.db.close") {} + + ~CloseWorker () {} + + void DoExecute () override { + database_->CloseDatabase(); + } +}; + +napi_value noop_callback (napi_env env, napi_callback_info info) { + return 0; +} + +/** + * Close a database. + */ +NAPI_METHOD(db_close) { + NAPI_ARGV(2); + NAPI_DB_CONTEXT(); + + napi_value callback = argv[1]; + CloseWorker* worker = new CloseWorker(env, database, callback); + + if (!database->HasPriorityWork()) { + worker->Queue(); + NAPI_RETURN_UNDEFINED(); + } + + database->pendingCloseWorker_ = worker; + + napi_value noop; + napi_create_function(env, NULL, 0, noop_callback, NULL, &noop); + + std::map iterators = database->iterators_; + std::map::iterator it; + + for (it = iterators.begin(); it != iterators.end(); ++it) { + iterator_end_do(env, it->second, noop); + } + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for putting key/value to the database + */ +struct PutWorker final : public PriorityWorker { + PutWorker (napi_env env, + Database* database, + napi_value callback, + leveldb::Slice key, + leveldb::Slice value, + bool sync) + : PriorityWorker(env, database, callback, "leveldown.db.put"), + key_(key), value_(value) { + options_.sync = sync; + } + + ~PutWorker () { + DisposeSliceBuffer(key_); + DisposeSliceBuffer(value_); + } + + void DoExecute () override { + SetStatus(database_->Put(options_, key_, value_)); + } + + leveldb::WriteOptions options_; + leveldb::Slice key_; + leveldb::Slice value_; +}; + +/** + * Puts a key and a value to a database. + */ +NAPI_METHOD(db_put) { + NAPI_ARGV(5); + NAPI_DB_CONTEXT(); + + leveldb::Slice key = ToSlice(env, argv[1]); + leveldb::Slice value = ToSlice(env, argv[2]); + bool sync = BooleanProperty(env, argv[3], "sync", false); + napi_value callback = argv[4]; + + PutWorker* worker = new PutWorker(env, database, callback, key, value, sync); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for getting a value from a database. + */ +struct GetWorker final : public PriorityWorker { + GetWorker (napi_env env, + Database* database, + napi_value callback, + leveldb::Slice key, + bool asBuffer, + bool fillCache) + : PriorityWorker(env, database, callback, "leveldown.db.get"), + key_(key), + asBuffer_(asBuffer) { + options_.fill_cache = fillCache; + } + + ~GetWorker () { + DisposeSliceBuffer(key_); + } + + void DoExecute () override { + SetStatus(database_->Get(options_, key_, value_)); + } + + void HandleOKCallback () override { + napi_value argv[2]; + napi_get_null(env_, &argv[0]); + + if (asBuffer_) { + napi_create_buffer_copy(env_, value_.size(), value_.data(), NULL, &argv[1]); + } else { + napi_create_string_utf8(env_, value_.data(), value_.size(), &argv[1]); + } + + napi_value callback; + napi_get_reference_value(env_, callbackRef_, &callback); + CallFunction(env_, callback, 2, argv); + } + + leveldb::ReadOptions options_; + leveldb::Slice key_; + std::string value_; + bool asBuffer_; +}; + +/** + * Gets a value from a database. + */ +NAPI_METHOD(db_get) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + leveldb::Slice key = ToSlice(env, argv[1]); + napi_value options = argv[2]; + bool asBuffer = BooleanProperty(env, options, "asBuffer", true); + bool fillCache = BooleanProperty(env, options, "fillCache", true); + napi_value callback = argv[3]; + + GetWorker* worker = new GetWorker(env, database, callback, key, asBuffer, + fillCache); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for deleting a value from a database. + */ +struct DelWorker final : public PriorityWorker { + DelWorker (napi_env env, + Database* database, + napi_value callback, + leveldb::Slice key, + bool sync) + : PriorityWorker(env, database, callback, "leveldown.db.del"), + key_(key) { + options_.sync = sync; + } + + ~DelWorker () { + DisposeSliceBuffer(key_); + } + + void DoExecute () override { + SetStatus(database_->Del(options_, key_)); + } + + leveldb::WriteOptions options_; + leveldb::Slice key_; +}; + +/** + * Delete a value from a database. + */ +NAPI_METHOD(db_del) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + leveldb::Slice key = ToSlice(env, argv[1]); + bool sync = BooleanProperty(env, argv[2], "sync", false); + napi_value callback = argv[3]; + + DelWorker* worker = new DelWorker(env, database, callback, key, sync); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for calculating the size of a range. + */ +struct ApproximateSizeWorker final : public PriorityWorker { + ApproximateSizeWorker (napi_env env, + Database* database, + napi_value callback, + leveldb::Slice start, + leveldb::Slice end) + : PriorityWorker(env, database, callback, "leveldown.db.approximate_size"), + start_(start), end_(end) {} + + ~ApproximateSizeWorker () { + DisposeSliceBuffer(start_); + DisposeSliceBuffer(end_); + } + + void DoExecute () override { + leveldb::Range range(start_, end_); + size_ = database_->ApproximateSize(&range); + } + + void HandleOKCallback () override { + napi_value argv[2]; + napi_get_null(env_, &argv[0]); + napi_create_uint32(env_, (uint32_t)size_, &argv[1]); + napi_value callback; + napi_get_reference_value(env_, callbackRef_, &callback); + CallFunction(env_, callback, 2, argv); + } + + leveldb::Slice start_; + leveldb::Slice end_; + uint64_t size_; +}; + +/** + * Calculates the approximate size of a range in a database. + */ +NAPI_METHOD(db_approximate_size) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + leveldb::Slice start = ToSlice(env, argv[1]); + leveldb::Slice end = ToSlice(env, argv[2]); + + napi_value callback = argv[3]; + + ApproximateSizeWorker* worker = new ApproximateSizeWorker(env, database, + callback, start, + end); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for compacting a range in a database. + */ +struct CompactRangeWorker final : public PriorityWorker { + CompactRangeWorker (napi_env env, + Database* database, + napi_value callback, + leveldb::Slice start, + leveldb::Slice end) + : PriorityWorker(env, database, callback, "leveldown.db.compact_range"), + start_(start), end_(end) {} + + ~CompactRangeWorker () { + DisposeSliceBuffer(start_); + DisposeSliceBuffer(end_); + } + + void DoExecute () override { + database_->CompactRange(&start_, &end_); + } + + leveldb::Slice start_; + leveldb::Slice end_; +}; + +/** + * Compacts a range in a database. + */ +NAPI_METHOD(db_compact_range) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + leveldb::Slice start = ToSlice(env, argv[1]); + leveldb::Slice end = ToSlice(env, argv[2]); + napi_value callback = argv[3]; + + CompactRangeWorker* worker = new CompactRangeWorker(env, database, callback, + start, end); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Get a property from a database. + */ +NAPI_METHOD(db_get_property) { + NAPI_ARGV(2); + NAPI_DB_CONTEXT(); + + leveldb::Slice property = ToSlice(env, argv[1]); + + std::string value; + database->GetProperty(property, &value); + + napi_value result; + napi_create_string_utf8(env, value.data(), value.size(), &result); + + DisposeSliceBuffer(property); + + return result; +} + +/** + * Worker class for destroying a database. + */ +struct DestroyWorker final : public BaseWorker { + DestroyWorker (napi_env env, + const std::string& location, + napi_value callback) + : BaseWorker(env, NULL, callback, "leveldown.destroy_db"), + location_(location) {} + + ~DestroyWorker () {} + + void DoExecute () override { + leveldb::Options options; + SetStatus(leveldb::DestroyDB(location_, options)); + } + + std::string location_; +}; + +/** + * Destroys a database. + */ +NAPI_METHOD(destroy_db) { + NAPI_ARGV(2); + NAPI_ARGV_UTF8_NEW(location, 0); + napi_value callback = argv[1]; + + DestroyWorker* worker = new DestroyWorker(env, location, callback); + worker->Queue(); + + delete [] location; + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for repairing a database. + */ +struct RepairWorker final : public BaseWorker { + RepairWorker (napi_env env, + const std::string& location, + napi_value callback) + : BaseWorker(env, NULL, callback, "leveldown.repair_db"), + location_(location) {} + + ~RepairWorker () {} + + void DoExecute () override { + leveldb::Options options; + SetStatus(leveldb::RepairDB(location_, options)); + } + + std::string location_; +}; + +/** + * Repairs a database. + */ +NAPI_METHOD(repair_db) { + NAPI_ARGV(2); + NAPI_ARGV_UTF8_NEW(location, 0); + napi_value callback = argv[1]; + + RepairWorker* worker = new RepairWorker(env, location, callback); + worker->Queue(); + + delete [] location; + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Runs when an Iterator is garbage collected. + */ +static void FinalizeIterator (napi_env env, void* data, void* hint) { + if (data) { + delete (Iterator*)data; + } +} + +#define CHECK_PROPERTY(name, code) \ + if (HasProperty(env, options, #name)) { \ + napi_value value = GetProperty(env, options, #name); \ + if (IsString(env, value) || IsBuffer(env, value)) { \ + if (StringOrBufferLength(env, value) > 0) { \ + LD_STRING_OR_BUFFER_TO_COPY(env, value, _##name); \ + code; \ + } \ + } \ + } \ + +/** + * Create an iterator. + */ +NAPI_METHOD(iterator_init) { + NAPI_ARGV(2); + NAPI_DB_CONTEXT(); + + napi_value options = argv[1]; + bool reverse = BooleanProperty(env, options, "reverse", false); + bool keys = BooleanProperty(env, options, "keys", true); + bool values = BooleanProperty(env, options, "values", true); + bool fillCache = BooleanProperty(env, options, "fillCache", false); + bool keyAsBuffer = BooleanProperty(env, options, "keyAsBuffer", true); + bool valueAsBuffer = BooleanProperty(env, options, "valueAsBuffer", true); + int limit = Int32Property(env, options, "limit", -1); + uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark", + 16 * 1024); + + // TODO simplify and refactor the hideous code below + + leveldb::Slice* start = NULL; + char *startStr = NULL; + CHECK_PROPERTY(start, { + start = new leveldb::Slice(_startCh_, _startSz_); + startStr = _startCh_; + }); + + std::string* end = NULL; + CHECK_PROPERTY(end, { + end = new std::string(_endCh_, _endSz_); + delete [] _endCh_; + }); + + std::string* lt = NULL; + CHECK_PROPERTY(lt, { + lt = new std::string(_ltCh_, _ltSz_); + delete [] _ltCh_; + if (reverse) { + if (startStr != NULL) { + delete [] startStr; + startStr = NULL; + } + if (start != NULL) { + delete start; + } + start = new leveldb::Slice(lt->data(), lt->size()); + } + }); + + std::string* lte = NULL; + CHECK_PROPERTY(lte, { + lte = new std::string(_lteCh_, _lteSz_); + delete [] _lteCh_; + if (reverse) { + if (startStr != NULL) { + delete [] startStr; + startStr = NULL; + } + if (start != NULL) { + delete start; + } + start = new leveldb::Slice(lte->data(), lte->size()); + } + }); + + std::string* gt = NULL; + CHECK_PROPERTY(gt, { + gt = new std::string(_gtCh_, _gtSz_); + delete [] _gtCh_; + if (!reverse) { + if (startStr != NULL) { + delete [] startStr; + startStr = NULL; + } + if (start != NULL) { + delete start; + } + start = new leveldb::Slice(gt->data(), gt->size()); + } + }); + + std::string* gte = NULL; + CHECK_PROPERTY(gte, { + gte = new std::string(_gteCh_, _gteSz_); + delete [] _gteCh_; + if (!reverse) { + if (startStr != NULL) { + delete [] startStr; + startStr = NULL; + } + if (start != NULL) { + delete start; + } + start = new leveldb::Slice(gte->data(), gte->size()); + } + }); + + uint32_t id = database->currentIteratorId_++; + Iterator* iterator = new Iterator(database, id, start, end, reverse, keys, + values, limit, lt, lte, gt, gte, fillCache, + keyAsBuffer, valueAsBuffer, highWaterMark); + napi_value result; + napi_ref ref; + + NAPI_STATUS_THROWS(napi_create_external(env, iterator, + FinalizeIterator, + NULL, &result)); + + // Prevent GC of JS object before the iterator is ended (explicitly or on + // db close) and keep track of non-ended iterators to end them on db close. + NAPI_STATUS_THROWS(napi_create_reference(env, result, 1, &ref)); + iterator->Attach(ref); + + return result; +} + +/** + * Seeks an iterator. + */ +NAPI_METHOD(iterator_seek) { + NAPI_ARGV(2); + NAPI_ITERATOR_CONTEXT(); + + if (iterator->ended_) { + napi_throw_error(env, NULL, "iterator has ended"); + } + + iterator->ReleaseTarget(); + iterator->target_ = new leveldb::Slice(ToSlice(env, argv[1])); + iterator->GetIterator(); + + leveldb::Iterator* dbIterator = iterator->dbIterator_; + dbIterator->Seek(*iterator->target_); + + iterator->seeking_ = true; + iterator->landed_ = false; + + if (iterator->OutOfRange(iterator->target_)) { + if (iterator->reverse_) { + dbIterator->SeekToFirst(); + dbIterator->Prev(); + } else { + dbIterator->SeekToLast(); + dbIterator->Next(); + } + } + else if (dbIterator->Valid()) { + int cmp = dbIterator->key().compare(*iterator->target_); + if (cmp > 0 && iterator->reverse_) { + dbIterator->Prev(); + } else if (cmp < 0 && !iterator->reverse_) { + dbIterator->Next(); + } + } else { + if (iterator->reverse_) { + dbIterator->SeekToLast(); + } else { + dbIterator->SeekToFirst(); + } + if (dbIterator->Valid()) { + int cmp = dbIterator->key().compare(*iterator->target_); + if (cmp > 0 && iterator->reverse_) { + dbIterator->SeekToFirst(); + dbIterator->Prev(); + } else if (cmp < 0 && !iterator->reverse_) { + dbIterator->SeekToLast(); + dbIterator->Next(); + } + } + } + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for ending an iterator + */ +struct EndWorker final : public BaseWorker { + EndWorker (napi_env env, + Iterator* iterator, + napi_value callback) + : BaseWorker(env, iterator->database_, callback, "leveldown.iterator.end"), + iterator_(iterator) {} + + ~EndWorker () {} + + void DoExecute () override { + iterator_->IteratorEnd(); + } + + void HandleOKCallback () override { + napi_delete_reference(env_, iterator_->Detach()); + BaseWorker::HandleOKCallback(); + } + + Iterator* iterator_; +}; + +/** + * Called by NAPI_METHOD(iterator_end) and also when closing + * open iterators during NAPI_METHOD(db_close). + */ +static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb) { + if (!iterator->ended_) { + EndWorker* worker = new EndWorker(env, iterator, cb); + iterator->ended_ = true; + + if (iterator->nexting_) { + iterator->endWorker_ = worker; + } else { + worker->Queue(); + } + } +} + +/** + * Ends an iterator. + */ +NAPI_METHOD(iterator_end) { + NAPI_ARGV(2); + NAPI_ITERATOR_CONTEXT(); + + iterator_end_do(env, iterator, argv[1]); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * TODO Move this to Iterator. There isn't any reason + * for this function being a separate function pointer. + */ +void CheckEndCallback (Iterator* iterator) { + iterator->ReleaseTarget(); + iterator->nexting_ = false; + if (iterator->endWorker_ != NULL) { + iterator->endWorker_->Queue(); + iterator->endWorker_ = NULL; + } +} + +/** + * Worker class for nexting an iterator. + */ +struct NextWorker final : public BaseWorker { + NextWorker (napi_env env, + Iterator* iterator, + napi_value callback, + void (*localCallback)(Iterator*)) + : BaseWorker(env, iterator->database_, callback, + "leveldown.iterator.next"), + iterator_(iterator), + localCallback_(localCallback) {} + + ~NextWorker () {} + + void DoExecute () override { + ok_ = iterator_->IteratorNext(result_); + if (!ok_) { + SetStatus(iterator_->IteratorStatus()); + } + } + + void HandleOKCallback () override { + size_t arraySize = result_.size() * 2; + napi_value jsArray; + napi_create_array_with_length(env_, arraySize, &jsArray); + + for (size_t idx = 0; idx < result_.size(); ++idx) { + std::pair row = result_[idx]; + std::string key = row.first; + std::string value = row.second; + + napi_value returnKey; + if (iterator_->keyAsBuffer_) { + napi_create_buffer_copy(env_, key.size(), key.data(), NULL, &returnKey); + } else { + napi_create_string_utf8(env_, key.data(), key.size(), &returnKey); + } + + napi_value returnValue; + if (iterator_->valueAsBuffer_) { + napi_create_buffer_copy(env_, value.size(), value.data(), NULL, &returnValue); + } else { + napi_create_string_utf8(env_, value.data(), value.size(), &returnValue); + } + + // put the key & value in a descending order, so that they can be .pop:ed in javascript-land + napi_set_element(env_, jsArray, static_cast(arraySize - idx * 2 - 1), returnKey); + napi_set_element(env_, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); + } + + // clean up & handle the next/end state + // TODO this should just do iterator_->CheckEndCallback(); + localCallback_(iterator_); + + napi_value argv[3]; + napi_get_null(env_, &argv[0]); + argv[1] = jsArray; + napi_get_boolean(env_, !ok_, &argv[2]); + napi_value callback; + napi_get_reference_value(env_, callbackRef_, &callback); + CallFunction(env_, callback, 3, argv); + } + + Iterator* iterator_; + // TODO why do we need a function pointer for this? + void (*localCallback_)(Iterator*); + std::vector > result_; + bool ok_; +}; + +/** + * Moves an iterator to next element. + */ +NAPI_METHOD(iterator_next) { + NAPI_ARGV(2); + NAPI_ITERATOR_CONTEXT(); + + napi_value callback = argv[1]; + + if (iterator->ended_) { + napi_value argv = CreateError(env, "iterator has ended"); + CallFunction(env, callback, 1, &argv); + + NAPI_RETURN_UNDEFINED(); + } + + NextWorker* worker = new NextWorker(env, iterator, callback, + CheckEndCallback); + iterator->nexting_ = true; + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for batch write operation. + */ +struct BatchWorker final : public PriorityWorker { + BatchWorker (napi_env env, + Database* database, + napi_value callback, + leveldb::WriteBatch* batch, + bool sync, + bool hasData) + : PriorityWorker(env, database, callback, "leveldown.batch.do"), + batch_(batch), hasData_(hasData) { + options_.sync = sync; + } + + ~BatchWorker () { + delete batch_; + } + + void DoExecute () override { + if (hasData_) { + SetStatus(database_->WriteBatch(options_, batch_)); + } + } + + leveldb::WriteOptions options_; + leveldb::WriteBatch* batch_; + bool hasData_; +}; + +/** + * Does a batch write operation on a database. + */ +NAPI_METHOD(batch_do) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + napi_value array = argv[1]; + bool sync = BooleanProperty(env, argv[2], "sync", false); + napi_value callback = argv[3]; + + uint32_t length; + napi_get_array_length(env, array, &length); + + leveldb::WriteBatch* batch = new leveldb::WriteBatch(); + bool hasData = false; + + for (uint32_t i = 0; i < length; i++) { + napi_value element; + napi_get_element(env, array, i, &element); + + if (!IsObject(env, element)) continue; + + std::string type = StringProperty(env, element, "type"); + + if (type == "del") { + if (!HasProperty(env, element, "key")) continue; + leveldb::Slice key = ToSlice(env, GetProperty(env, element, "key")); + + batch->Delete(key); + if (!hasData) hasData = true; + + DisposeSliceBuffer(key); + } else if (type == "put") { + if (!HasProperty(env, element, "key")) continue; + if (!HasProperty(env, element, "value")) continue; + + leveldb::Slice key = ToSlice(env, GetProperty(env, element, "key")); + leveldb::Slice value = ToSlice(env, GetProperty(env, element, "value")); + + batch->Put(key, value); + if (!hasData) hasData = true; + + DisposeSliceBuffer(key); + DisposeSliceBuffer(value); + } + } + + BatchWorker* worker = new BatchWorker(env, database, callback, batch, sync, hasData); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Owns a WriteBatch. + */ +struct Batch { + Batch (Database* database) + : database_(database), + batch_(new leveldb::WriteBatch()), + hasData_(false) {} + + ~Batch () { + delete batch_; + } + + void Put (leveldb::Slice key, leveldb::Slice value) { + batch_->Put(key, value); + hasData_ = true; + } + + void Del (leveldb::Slice key) { + batch_->Delete(key); + hasData_ = true; + } + + void Clear () { + batch_->Clear(); + hasData_ = false; + } + + leveldb::Status Write (bool sync) { + leveldb::WriteOptions options; + options.sync = sync; + return database_->WriteBatch(options, batch_); + } + + Database* database_; + leveldb::WriteBatch* batch_; + bool hasData_; +}; + +/** + * Runs when a Batch is garbage collected. + */ +static void FinalizeBatch (napi_env env, void* data, void* hint) { + if (data) { + delete (Batch*)data; + } +} + +/** + * Return a batch object. + */ +NAPI_METHOD(batch_init) { + NAPI_ARGV(1); + NAPI_DB_CONTEXT(); + + Batch* batch = new Batch(database); + + napi_value result; + NAPI_STATUS_THROWS(napi_create_external(env, batch, + FinalizeBatch, + NULL, &result)); + return result; +} + +/** + * Adds a put instruction to a batch object. + */ +NAPI_METHOD(batch_put) { + NAPI_ARGV(3); + NAPI_BATCH_CONTEXT(); + + leveldb::Slice key = ToSlice(env, argv[1]); + leveldb::Slice value = ToSlice(env, argv[2]); + batch->Put(key, value); + DisposeSliceBuffer(key); + DisposeSliceBuffer(value); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Adds a delete instruction to a batch object. + */ +NAPI_METHOD(batch_del) { + NAPI_ARGV(2); + NAPI_BATCH_CONTEXT(); + + leveldb::Slice key = ToSlice(env, argv[1]); + batch->Del(key); + DisposeSliceBuffer(key); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Clears a batch object. + */ +NAPI_METHOD(batch_clear) { + NAPI_ARGV(1); + NAPI_BATCH_CONTEXT(); + + batch->Clear(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * Worker class for batch write operation. + */ +struct BatchWriteWorker final : public PriorityWorker { + BatchWriteWorker (napi_env env, + napi_value context, + Batch* batch, + napi_value callback, + bool sync) + : PriorityWorker(env, batch->database_, callback, "leveldown.batch.write"), + batch_(batch), + sync_(sync) { + // Prevent GC of batch object before we execute + NAPI_STATUS_THROWS(napi_create_reference(env_, context, 1, &contextRef_)); + } + + ~BatchWriteWorker () { + napi_delete_reference(env_, contextRef_); + } + + void DoExecute () override { + if (batch_->hasData_) { + SetStatus(batch_->Write(sync_)); + } + } + + Batch* batch_; + bool sync_; + +private: + napi_ref contextRef_; +}; + +/** + * Writes a batch object. + */ +NAPI_METHOD(batch_write) { + NAPI_ARGV(3); + NAPI_BATCH_CONTEXT(); + + napi_value options = argv[1]; + bool sync = BooleanProperty(env, options, "sync", false); + napi_value callback = argv[2]; + + BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + +/** + * All exported functions. + */ +NAPI_INIT() { + NAPI_EXPORT_FUNCTION(db_init); + NAPI_EXPORT_FUNCTION(db_open); + NAPI_EXPORT_FUNCTION(db_close); + NAPI_EXPORT_FUNCTION(db_put); + NAPI_EXPORT_FUNCTION(db_get); + NAPI_EXPORT_FUNCTION(db_del); + NAPI_EXPORT_FUNCTION(db_approximate_size); + NAPI_EXPORT_FUNCTION(db_compact_range); + NAPI_EXPORT_FUNCTION(db_get_property); + + NAPI_EXPORT_FUNCTION(destroy_db); + NAPI_EXPORT_FUNCTION(repair_db); + + NAPI_EXPORT_FUNCTION(iterator_init); + NAPI_EXPORT_FUNCTION(iterator_seek); + NAPI_EXPORT_FUNCTION(iterator_end); + NAPI_EXPORT_FUNCTION(iterator_next); + + NAPI_EXPORT_FUNCTION(batch_do); + NAPI_EXPORT_FUNCTION(batch_init); + NAPI_EXPORT_FUNCTION(batch_put); + NAPI_EXPORT_FUNCTION(batch_del); + NAPI_EXPORT_FUNCTION(batch_clear); + NAPI_EXPORT_FUNCTION(batch_write); +} diff --git a/binding.gyp b/binding.gyp index 98b48d9a..686724d7 100644 --- a/binding.gyp +++ b/binding.gyp @@ -55,17 +55,10 @@ "<(module_root_dir)/deps/leveldb/leveldb.gyp:leveldb" ] , "include_dirs" : [ - " -#include -#include "database.h" - -namespace leveldown { - -class Database; - -/* abstract */ class AsyncWorker : public Nan::AsyncWorker { -public: - AsyncWorker ( - leveldown::Database* database - , Nan::Callback *callback - , const char *resource_name - ) : Nan::AsyncWorker(callback, resource_name), database(database) { } - -protected: - void SetStatus(rocksdb::Status status) { - this->status = status; - if (!status.ok()) - SetErrorMessage(status.ToString().c_str()); - } - Database* database; -private: - rocksdb::Status status; -}; - -} // namespace leveldown - -#endif diff --git a/src/batch.cc b/src/batch.cc deleted file mode 100644 index 1472c3fc..00000000 --- a/src/batch.cc +++ /dev/null @@ -1,142 +0,0 @@ -#include -#include -#include - -#include "database.h" -#include "batch_async.h" -#include "batch.h" -#include "common.h" - -namespace leveldown { - -static Nan::Persistent batch_constructor; - -Batch::Batch (leveldown::Database* database, bool sync) : database(database) { - options = new rocksdb::WriteOptions(); - options->sync = sync; - batch = new rocksdb::WriteBatch(); - hasData = false; -} - -Batch::~Batch () { - delete options; - delete batch; -} - -rocksdb::Status Batch::Write () { - return database->WriteBatchToDatabase(options, batch); -} - -void Batch::Init () { - v8::Local tpl = Nan::New(Batch::New); - batch_constructor.Reset(tpl); - tpl->SetClassName(Nan::New("Batch").ToLocalChecked()); - tpl->InstanceTemplate()->SetInternalFieldCount(1); - Nan::SetPrototypeMethod(tpl, "put", Batch::Put); - Nan::SetPrototypeMethod(tpl, "del", Batch::Del); - Nan::SetPrototypeMethod(tpl, "clear", Batch::Clear); - Nan::SetPrototypeMethod(tpl, "write", Batch::Write); -} - -NAN_METHOD(Batch::New) { - Database* database = Nan::ObjectWrap::Unwrap(info[0]->ToObject()); - v8::Local optionsObj; - - if (info.Length() > 1 && info[1]->IsObject()) { - optionsObj = v8::Local::Cast(info[1]); - } - - bool sync = BooleanOptionValue(optionsObj, "sync"); - - Batch* batch = new Batch(database, sync); - batch->Wrap(info.This()); - - info.GetReturnValue().Set(info.This()); -} - -v8::Local Batch::NewInstance ( - v8::Local database - , v8::Local optionsObj - ) { - - Nan::EscapableHandleScope scope; - - Nan::MaybeLocal maybeInstance; - v8::Local instance; - - v8::Local constructorHandle = - Nan::New(batch_constructor); - - if (optionsObj.IsEmpty()) { - v8::Local argv[1] = { database }; - maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 1, argv); - } else { - v8::Local argv[2] = { database, optionsObj }; - maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 2, argv); - } - - if (maybeInstance.IsEmpty()) - Nan::ThrowError("Could not create new Batch instance"); - else - instance = maybeInstance.ToLocalChecked(); - return scope.Escape(instance); -} - -NAN_METHOD(Batch::Put) { - Batch* batch = ObjectWrap::Unwrap(info.Holder()); - v8::Local callback; // purely for the error macros - - v8::Local keyBuffer = info[0]; - v8::Local valueBuffer = info[1]; - LD_STRING_OR_BUFFER_TO_SLICE(key, keyBuffer, key) - LD_STRING_OR_BUFFER_TO_SLICE(value, valueBuffer, value) - - batch->batch->Put(key, value); - if (!batch->hasData) - batch->hasData = true; - - DisposeStringOrBufferFromSlice(keyBuffer, key); - DisposeStringOrBufferFromSlice(valueBuffer, value); - - info.GetReturnValue().Set(info.Holder()); -} - -NAN_METHOD(Batch::Del) { - Batch* batch = ObjectWrap::Unwrap(info.Holder()); - - v8::Local callback; // purely for the error macros - - v8::Local keyBuffer = info[0]; - LD_STRING_OR_BUFFER_TO_SLICE(key, keyBuffer, key) - - batch->batch->Delete(key); - if (!batch->hasData) - batch->hasData = true; - - DisposeStringOrBufferFromSlice(keyBuffer, key); - - info.GetReturnValue().Set(info.Holder()); -} - -NAN_METHOD(Batch::Clear) { - Batch* batch = ObjectWrap::Unwrap(info.Holder()); - - batch->batch->Clear(); - batch->hasData = false; - - info.GetReturnValue().Set(info.Holder()); -} - -NAN_METHOD(Batch::Write) { - Batch* batch = ObjectWrap::Unwrap(info.Holder()); - - Nan::Callback *callback = - new Nan::Callback(v8::Local::Cast(info[0])); - BatchWriteWorker* worker = new BatchWriteWorker(batch, callback); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("batch", _this); - Nan::AsyncQueueWorker(worker); -} - -} // namespace leveldown diff --git a/src/batch.h b/src/batch.h deleted file mode 100644 index d6bcbc17..00000000 --- a/src/batch.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef LD_BATCH_H -#define LD_BATCH_H - -#include -#include - -#include - -#include "database.h" - -namespace leveldown { - -class Batch : public Nan::ObjectWrap { -public: - static void Init(); - static v8::Local NewInstance ( - v8::Local database - , v8::Local optionsObj - ); - - Batch (leveldown::Database* database, bool sync); - ~Batch (); - rocksdb::Status Write (); - - bool hasData; // keep track of whether we're writing data or not - -private: - leveldown::Database* database; - rocksdb::WriteOptions* options; - rocksdb::WriteBatch* batch; - - static NAN_METHOD(New); - static NAN_METHOD(Put); - static NAN_METHOD(Del); - static NAN_METHOD(Clear); - static NAN_METHOD(Write); -}; - -} // namespace leveldown - -#endif diff --git a/src/batch_async.cc b/src/batch_async.cc deleted file mode 100644 index db3846f7..00000000 --- a/src/batch_async.cc +++ /dev/null @@ -1,24 +0,0 @@ -#include -#include "batch.h" -#include "batch_async.h" - -namespace leveldown { - -/** NEXT WORKER **/ - -BatchWriteWorker::BatchWriteWorker ( - Batch* batch - , Nan::Callback *callback -) : AsyncWorker(NULL, callback, "rocksdb:batch.write") - , batch(batch) -{}; - -BatchWriteWorker::~BatchWriteWorker () {} - -void BatchWriteWorker::Execute () { - if (batch->hasData) { - SetStatus(batch->Write()); - } -} - -} // namespace leveldown diff --git a/src/batch_async.h b/src/batch_async.h deleted file mode 100644 index d254a7e1..00000000 --- a/src/batch_async.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef LD_BATCH_ASYNC_H -#define LD_BATCH_ASYNC_H - -#include -#include - -#include "async.h" -#include "batch.h" -#include "database.h" - -namespace leveldown { - -class BatchWriteWorker : public AsyncWorker { -public: - BatchWriteWorker ( - Batch* batch - , Nan::Callback *callback - ); - - virtual ~BatchWriteWorker (); - virtual void Execute (); - -private: - Batch* batch; -}; - -} // namespace leveldown - -#endif diff --git a/src/common.h b/src/common.h deleted file mode 100644 index c86a79b4..00000000 --- a/src/common.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef LD_COMMON_H -#define LD_COMMON_H - -#include - -namespace leveldown { - -NAN_INLINE bool BooleanOptionValue(v8::Local options, - const char* _key, - bool def = false) { - Nan::HandleScope scope; - v8::Local key = Nan::New(_key).ToLocalChecked(); - return !options.IsEmpty() - && options->Has(key) - ? options->Get(key)->BooleanValue() - : def; -} - -NAN_INLINE uint32_t UInt32OptionValue(v8::Local options, - const char* _key, - uint32_t def) { - Nan::HandleScope scope; - v8::Local key = Nan::New(_key).ToLocalChecked(); - return !options.IsEmpty() - && options->Has(key) - && options->Get(key)->IsNumber() - ? options->Get(key)->Uint32Value() - : def; -} - -} // namespace leveldown - -#endif diff --git a/src/database.cc b/src/database.cc deleted file mode 100644 index 5c11c5fc..00000000 --- a/src/database.cc +++ /dev/null @@ -1,526 +0,0 @@ -#include -#include - -#include -#include - -#include "leveldown.h" -#include "database.h" -#include "async.h" -#include "database_async.h" -#include "batch.h" -#include "iterator.h" -#include "common.h" - -namespace leveldown { - -static Nan::Persistent database_constructor; - -Database::Database (const v8::Local& from) - : location(new Nan::Utf8String(from)) - , db(NULL) - , currentIteratorId(0) - , pendingCloseWorker(NULL) - , blockCache(NULL) - , filterPolicy(NULL) {}; - -Database::~Database () { - if (db != NULL) - delete db; - delete location; -}; - -/* Calls from worker threads, NO V8 HERE *****************************/ - -rocksdb::Status Database::OpenDatabase ( - rocksdb::Options* options, - bool readOnly - ) { - if (readOnly) { - return rocksdb::DB::OpenForReadOnly(*options, **location, &db); - } else { - return rocksdb::DB::Open(*options, **location, &db); - } -} - -rocksdb::Status Database::PutToDatabase ( - rocksdb::WriteOptions* options - , rocksdb::Slice key - , rocksdb::Slice value - ) { - return db->Put(*options, key, value); -} - -rocksdb::Status Database::GetFromDatabase ( - rocksdb::ReadOptions* options - , rocksdb::Slice key - , std::string& value - ) { - return db->Get(*options, key, &value); -} - -rocksdb::Status Database::DeleteFromDatabase ( - rocksdb::WriteOptions* options - , rocksdb::Slice key - ) { - return db->Delete(*options, key); -} - -rocksdb::Status Database::WriteBatchToDatabase ( - rocksdb::WriteOptions* options - , rocksdb::WriteBatch* batch - ) { - return db->Write(*options, batch); -} - -uint64_t Database::ApproximateSizeFromDatabase (const rocksdb::Range* range) { - uint64_t size; - db->GetApproximateSizes(range, 1, &size); - return size; -} - -void Database::CompactRangeFromDatabase (const rocksdb::Slice* start, - const rocksdb::Slice* end) { - rocksdb::CompactRangeOptions options; - db->CompactRange(options, start, end); -} - -void Database::GetPropertyFromDatabase ( - const rocksdb::Slice& property - , std::string* value) { - - db->GetProperty(property, value); -} - -rocksdb::Iterator* Database::NewIterator (rocksdb::ReadOptions* options) { - return db->NewIterator(*options); -} - -const rocksdb::Snapshot* Database::NewSnapshot () { - return db->GetSnapshot(); -} - -void Database::ReleaseSnapshot (const rocksdb::Snapshot* snapshot) { - return db->ReleaseSnapshot(snapshot); -} - -void Database::ReleaseIterator (uint32_t id) { - // called each time an Iterator is End()ed, in the main thread - // we have to remove our reference to it and if it's the last iterator - // we have to invoke a pending CloseWorker if there is one - // if there is a pending CloseWorker it means that we're waiting for - // iterators to end before we can close them - iterators.erase(id); - if (iterators.empty() && pendingCloseWorker != NULL) { - Nan::AsyncQueueWorker((AsyncWorker*)pendingCloseWorker); - pendingCloseWorker = NULL; - } -} - -void Database::CloseDatabase () { - delete db; - db = NULL; - if (blockCache) { - // According to - // https://github.com/facebook/rocksdb/wiki/basic-operations#cache - // it doesn't look like this needs to be deleted by hand anymore. - // delete blockCache; - blockCache = NULL; - } - if (filterPolicy) { - delete filterPolicy; - filterPolicy = NULL; - } -} - -/* V8 exposed functions *****************************/ - -NAN_METHOD(LevelDOWN) { - v8::Local location = info[0].As(); - info.GetReturnValue().Set(Database::NewInstance(location)); -} - -void Database::Init () { - v8::Local tpl = Nan::New(Database::New); - database_constructor.Reset(tpl); - tpl->SetClassName(Nan::New("Database").ToLocalChecked()); - tpl->InstanceTemplate()->SetInternalFieldCount(1); - Nan::SetPrototypeMethod(tpl, "open", Database::Open); - Nan::SetPrototypeMethod(tpl, "close", Database::Close); - Nan::SetPrototypeMethod(tpl, "put", Database::Put); - Nan::SetPrototypeMethod(tpl, "get", Database::Get); - Nan::SetPrototypeMethod(tpl, "del", Database::Delete); - Nan::SetPrototypeMethod(tpl, "batch", Database::Batch); - Nan::SetPrototypeMethod(tpl, "approximateSize", Database::ApproximateSize); - Nan::SetPrototypeMethod(tpl, "compactRange", Database::CompactRange); - Nan::SetPrototypeMethod(tpl, "getProperty", Database::GetProperty); - Nan::SetPrototypeMethod(tpl, "iterator", Database::Iterator); -} - -NAN_METHOD(Database::New) { - Database* obj = new Database(info[0]); - obj->Wrap(info.This()); - - info.GetReturnValue().Set(info.This()); -} - -v8::Local Database::NewInstance (v8::Local &location) { - Nan::EscapableHandleScope scope; - - Nan::MaybeLocal maybeInstance; - v8::Local instance; - - v8::Local constructorHandle = - Nan::New(database_constructor); - - v8::Local argv[] = { location }; - maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 1, argv); - - if (maybeInstance.IsEmpty()) - Nan::ThrowError("Could not create new Database instance"); - else - instance = maybeInstance.ToLocalChecked(); - return scope.Escape(instance); -} - -NAN_METHOD(Database::Open) { - LD_METHOD_SETUP_COMMON(open, 0, 1) - - bool readOnly = BooleanOptionValue(optionsObj, "readOnly", false); - bool createIfMissing = BooleanOptionValue(optionsObj, "createIfMissing", true); - bool errorIfExists = BooleanOptionValue(optionsObj, "errorIfExists"); - bool compression = BooleanOptionValue(optionsObj, "compression", true); - - uint32_t cacheSize = UInt32OptionValue(optionsObj, "cacheSize", 8 << 20); - uint32_t writeBufferSize = UInt32OptionValue( - optionsObj - , "writeBufferSize" - , 4 << 20 - ); - uint32_t blockSize = UInt32OptionValue(optionsObj, "blockSize", 4096); - uint32_t maxOpenFiles = UInt32OptionValue(optionsObj, "maxOpenFiles", 1000); - uint32_t blockRestartInterval = UInt32OptionValue( - optionsObj - , "blockRestartInterval" - , 16 - ); - uint32_t maxFileSize = UInt32OptionValue(optionsObj, "maxFileSize", 2 << 20); - - database->blockCache = cacheSize ? rocksdb::NewLRUCache(cacheSize) : - NULL; - database->filterPolicy = rocksdb::NewBloomFilterPolicy(10); - - OpenWorker* worker = new OpenWorker( - database - , new Nan::Callback(callback) - , database->blockCache - , database->filterPolicy - , createIfMissing - , errorIfExists - , compression - , writeBufferSize - , blockSize - , maxOpenFiles - , blockRestartInterval - , maxFileSize - , readOnly - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); -} - -// for an empty callback to iterator.end() -NAN_METHOD(EmptyMethod) { -} - -NAN_METHOD(Database::Close) { - LD_METHOD_SETUP_COMMON_ONEARG(close) - - CloseWorker* worker = new CloseWorker( - database - , new Nan::Callback(callback) - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - - if (!database->iterators.empty()) { - // yikes, we still have iterators open! naughty naughty. - // we have to queue up a CloseWorker and manually close each of them. - // the CloseWorker will be invoked once they are all cleaned up - database->pendingCloseWorker = worker; - - for ( - std::map< uint32_t, leveldown::Iterator * >::iterator it - = database->iterators.begin() - ; it != database->iterators.end() - ; ++it) { - - // for each iterator still open, first check if it's already in - // the process of ending (ended==true means an async End() is - // in progress), if not, then we call End() with an empty callback - // function and wait for it to hit ReleaseIterator() where our - // CloseWorker will be invoked - - leveldown::Iterator *iterator = it->second; - - if (!iterator->ended) { - v8::Local end = - v8::Local::Cast(iterator->handle()->Get( - Nan::New("end").ToLocalChecked())); - v8::Local argv[] = { - Nan::New(EmptyMethod)->GetFunction() // empty callback - }; - Nan::AsyncResource ar("rocksdb:iterator.end"); - ar.runInAsyncScope(iterator->handle(), end, 1, argv); - } - } - } else { - Nan::AsyncQueueWorker(worker); - } -} - -NAN_METHOD(Database::Put) { - LD_METHOD_SETUP_COMMON(put, 2, 3) - - v8::Local keyHandle = info[0].As(); - v8::Local valueHandle = info[1].As(); - LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key); - LD_STRING_OR_BUFFER_TO_SLICE(value, valueHandle, value); - - bool sync = BooleanOptionValue(optionsObj, "sync"); - - WriteWorker* worker = new WriteWorker( - database - , new Nan::Callback(callback) - , key - , value - , sync - , keyHandle - , valueHandle - ); - - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); -} - -NAN_METHOD(Database::Get) { - LD_METHOD_SETUP_COMMON(get, 1, 2) - - v8::Local keyHandle = info[0].As(); - LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key); - - bool asBuffer = BooleanOptionValue(optionsObj, "asBuffer", true); - bool fillCache = BooleanOptionValue(optionsObj, "fillCache", true); - - ReadWorker* worker = new ReadWorker( - database - , new Nan::Callback(callback) - , key - , asBuffer - , fillCache - , keyHandle - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); -} - -NAN_METHOD(Database::Delete) { - LD_METHOD_SETUP_COMMON(del, 1, 2) - - v8::Local keyHandle = info[0].As(); - LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key); - - bool sync = BooleanOptionValue(optionsObj, "sync"); - - DeleteWorker* worker = new DeleteWorker( - database - , new Nan::Callback(callback) - , key - , sync - , keyHandle - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); -} - -NAN_METHOD(Database::Batch) { - if ((info.Length() == 0 || info.Length() == 1) && !info[0]->IsArray()) { - v8::Local optionsObj; - if (info.Length() > 0 && info[0]->IsObject()) { - optionsObj = info[0].As(); - } - info.GetReturnValue().Set(Batch::NewInstance(info.This(), optionsObj)); - return; - } - - LD_METHOD_SETUP_COMMON(batch, 1, 2); - - bool sync = BooleanOptionValue(optionsObj, "sync"); - - v8::Local array = v8::Local::Cast(info[0]); - - rocksdb::WriteBatch* batch = new rocksdb::WriteBatch(); - bool hasData = false; - - for (unsigned int i = 0; i < array->Length(); i++) { - if (!array->Get(i)->IsObject()) - continue; - - v8::Local obj = v8::Local::Cast(array->Get(i)); - v8::Local keyBuffer = obj->Get(Nan::New("key").ToLocalChecked()); - v8::Local type = obj->Get(Nan::New("type").ToLocalChecked()); - - if (type->StrictEquals(Nan::New("del").ToLocalChecked())) { - LD_STRING_OR_BUFFER_TO_SLICE(key, keyBuffer, key) - - batch->Delete(key); - if (!hasData) - hasData = true; - - DisposeStringOrBufferFromSlice(keyBuffer, key); - } else if (type->StrictEquals(Nan::New("put").ToLocalChecked())) { - v8::Local valueBuffer = obj->Get(Nan::New("value").ToLocalChecked()); - - LD_STRING_OR_BUFFER_TO_SLICE(key, keyBuffer, key) - LD_STRING_OR_BUFFER_TO_SLICE(value, valueBuffer, value) - batch->Put(key, value); - if (!hasData) - hasData = true; - - DisposeStringOrBufferFromSlice(keyBuffer, key); - DisposeStringOrBufferFromSlice(valueBuffer, value); - } - } - - // don't allow an empty batch through - if (hasData) { - BatchWorker* worker = new BatchWorker( - database - , new Nan::Callback(callback) - , batch - , sync - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); - } else { - LD_RUN_CALLBACK("rocksdb:db.batch", callback, 0, NULL); - } -} - -NAN_METHOD(Database::ApproximateSize) { - v8::Local startHandle = info[0].As(); - v8::Local endHandle = info[1].As(); - - LD_METHOD_SETUP_COMMON(approximateSize, -1, 2) - - LD_STRING_OR_BUFFER_TO_SLICE(start, startHandle, start) - LD_STRING_OR_BUFFER_TO_SLICE(end, endHandle, end) - - ApproximateSizeWorker* worker = new ApproximateSizeWorker( - database - , new Nan::Callback(callback) - , start - , end - , startHandle - , endHandle - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); -} - -NAN_METHOD(Database::CompactRange) { - v8::Local startHandle = info[0].As(); - v8::Local endHandle = info[1].As(); - - LD_METHOD_SETUP_COMMON(compactRange, -1, 2) - LD_STRING_OR_BUFFER_TO_SLICE(start, startHandle, start) - LD_STRING_OR_BUFFER_TO_SLICE(end, endHandle, end) - - CompactRangeWorker* worker = new CompactRangeWorker( - database - , new Nan::Callback(callback) - , start - , end - , startHandle - , endHandle - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("database", _this); - Nan::AsyncQueueWorker(worker); -} - -NAN_METHOD(Database::GetProperty) { - v8::Local propertyHandle = info[0].As(); - v8::Local callback; // for LD_STRING_OR_BUFFER_TO_SLICE - - LD_STRING_OR_BUFFER_TO_SLICE(property, propertyHandle, property) - - leveldown::Database* database = - Nan::ObjectWrap::Unwrap(info.This()); - - std::string* value = new std::string(); - database->GetPropertyFromDatabase(property, value); - v8::Local returnValue - = Nan::New(value->c_str(), value->length()).ToLocalChecked(); - delete value; - delete[] property.data(); - - info.GetReturnValue().Set(returnValue); -} - -NAN_METHOD(Database::Iterator) { - Database* database = Nan::ObjectWrap::Unwrap(info.This()); - - v8::Local optionsObj; - if (info.Length() > 0 && info[0]->IsObject()) { - optionsObj = v8::Local::Cast(info[0]); - } - - // each iterator gets a unique id for this Database, so we can - // easily store & lookup on our `iterators` map - uint32_t id = database->currentIteratorId++; - Nan::TryCatch try_catch; - v8::Local iteratorHandle = Iterator::NewInstance( - info.This() - , Nan::New(id) - , optionsObj - ); - if (try_catch.HasCaught()) { - // NB: node::FatalException can segfault here if there is no room on stack. - return Nan::ThrowError("Fatal Error in Database::Iterator!"); - } - - leveldown::Iterator *iterator = - Nan::ObjectWrap::Unwrap(iteratorHandle); - - database->iterators[id] = iterator; - - // register our iterator - /* - v8::Local obj = Nan::New(); - obj->Set(Nan::New("iterator"), iteratorHandle); - Nan::Persistent persistent; - persistent.Reset(nan_isolate, obj); - database->iterators.insert(std::pair< uint32_t, Nan::Persistent & > - (id, persistent)); - */ - - info.GetReturnValue().Set(iteratorHandle); -} - - -} // namespace leveldown diff --git a/src/database.h b/src/database.h deleted file mode 100644 index 1beaa03a..00000000 --- a/src/database.h +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef LD_DATABASE_H -#define LD_DATABASE_H - -#include -#include -#include - -#include -#include -#include -#include - -#include "leveldown.h" -#include "iterator.h" - -namespace leveldown { - -NAN_METHOD(LevelDOWN); - -struct Reference { - Nan::Persistent handle; - rocksdb::Slice slice; - - Reference(v8::Local obj, rocksdb::Slice slice) : slice(slice) { - v8::Local _obj = Nan::New(); - _obj->Set(Nan::New("obj").ToLocalChecked(), obj); - handle.Reset(_obj); - }; -}; - -static inline void ClearReferences (std::vector *references) { - for (std::vector::iterator it = references->begin() - ; it != references->end() - ; ) { - DisposeStringOrBufferFromSlice((*it)->handle, (*it)->slice); - it = references->erase(it); - } - delete references; -} - -class Database : public Nan::ObjectWrap { -public: - static void Init (); - static v8::Local NewInstance (v8::Local &location); - - rocksdb::Status OpenDatabase (rocksdb::Options* options, bool readOnly); - rocksdb::Status PutToDatabase ( - rocksdb::WriteOptions* options - , rocksdb::Slice key - , rocksdb::Slice value - ); - rocksdb::Status GetFromDatabase ( - rocksdb::ReadOptions* options - , rocksdb::Slice key - , std::string& value - ); - rocksdb::Status DeleteFromDatabase ( - rocksdb::WriteOptions* options - , rocksdb::Slice key - ); - rocksdb::Status WriteBatchToDatabase ( - rocksdb::WriteOptions* options - , rocksdb::WriteBatch* batch - ); - uint64_t ApproximateSizeFromDatabase (const rocksdb::Range* range); - void CompactRangeFromDatabase (const rocksdb::Slice* start, const rocksdb::Slice* end); - void GetPropertyFromDatabase (const rocksdb::Slice& property, std::string* value); - rocksdb::Iterator* NewIterator (rocksdb::ReadOptions* options); - const rocksdb::Snapshot* NewSnapshot (); - void ReleaseSnapshot (const rocksdb::Snapshot* snapshot); - void CloseDatabase (); - void ReleaseIterator (uint32_t id); - - Database (const v8::Local& from); - ~Database (); - -private: - Nan::Utf8String* location; - rocksdb::DB* db; - uint32_t currentIteratorId; - void(*pendingCloseWorker); - std::shared_ptr blockCache; - const rocksdb::FilterPolicy* filterPolicy; - - std::map< uint32_t, leveldown::Iterator * > iterators; - - static void WriteDoing(uv_work_t *req); - static void WriteAfter(uv_work_t *req); - - static NAN_METHOD(New); - static NAN_METHOD(Open); - static NAN_METHOD(Close); - static NAN_METHOD(Put); - static NAN_METHOD(Delete); - static NAN_METHOD(Get); - static NAN_METHOD(Batch); - static NAN_METHOD(Write); - static NAN_METHOD(Iterator); - static NAN_METHOD(ApproximateSize); - static NAN_METHOD(CompactRange); - static NAN_METHOD(GetProperty); -}; - -} // namespace leveldown - -#endif diff --git a/src/database_async.cc b/src/database_async.cc deleted file mode 100644 index c24c6c17..00000000 --- a/src/database_async.cc +++ /dev/null @@ -1,351 +0,0 @@ -#include -#include - -#include -#include - -#include -#include -#include -#include -#include -#include - -// #include -// #include - -#include "database.h" -#include "leveldown.h" -#include "async.h" -#include "database_async.h" - -namespace leveldown { - -/** OPEN WORKER **/ - -OpenWorker::OpenWorker ( - Database *database - , Nan::Callback *callback - , std::shared_ptr blockCache - , const rocksdb::FilterPolicy* filterPolicy - , bool createIfMissing - , bool errorIfExists - , bool compression - , uint32_t writeBufferSize - , uint32_t blockSize - , uint32_t maxOpenFiles - , uint32_t blockRestartInterval - , uint32_t maxFileSize - , bool readOnly -) : AsyncWorker(database, callback, "rocksdb:db.open") - , readOnly_(readOnly) -{ - rocksdb::LevelDBOptions levelOptions; - - if (blockCache != NULL) { - levelOptions.block_cache = blockCache.get(); - } - - levelOptions.filter_policy = filterPolicy; - levelOptions.create_if_missing = createIfMissing; - levelOptions.error_if_exists = errorIfExists; - levelOptions.compression = compression - ? rocksdb::kSnappyCompression - : rocksdb::kNoCompression; - levelOptions.write_buffer_size = writeBufferSize; - levelOptions.block_size = blockSize; - levelOptions.max_open_files = maxOpenFiles; - levelOptions.block_restart_interval = blockRestartInterval; - - // rocksdb::Options opts = ConvertOptions(levelOptions); - // options = &opts; - - options = new rocksdb::Options(rocksdb::ConvertOptions(levelOptions)); - options->max_log_file_size = maxFileSize; -/* - options = new rocksdb::Options(); - options->create_if_missing = levelOptions.create_if_missing; - options->error_if_exists = levelOptions.error_if_exists; - options->paranoid_checks = levelOptions.paranoid_checks; - options->env = levelOptions.env; - options->info_log.reset(levelOptions.info_log); - options->write_buffer_size = levelOptions.write_buffer_size; - options->max_open_files = levelOptions.max_open_files; - options->compression = levelOptions.compression; - - rocksdb::BlockBasedTableOptions table_options; - table_options.block_cache.reset(blockCache.get()); - table_options.block_size = levelOptions.block_size; - table_options.block_restart_interval = levelOptions.block_restart_interval; - table_options.filter_policy.reset(levelOptions.filter_policy); - options->table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); -*/ -}; - -OpenWorker::~OpenWorker () { - // delete options; -} - -void OpenWorker::Execute () { - SetStatus(database->OpenDatabase(options, readOnly_)); -} - -/** CLOSE WORKER **/ - -CloseWorker::CloseWorker ( - Database *database - , Nan::Callback *callback -) : AsyncWorker(database, callback, "rocksdb:db.close") -{} - -CloseWorker::~CloseWorker () {} - -void CloseWorker::Execute () { - database->CloseDatabase(); -} - -void CloseWorker::WorkComplete () { - Nan::HandleScope scope; - HandleOKCallback(); - delete callback; - callback = NULL; -} - -/** IO WORKER (abstract) **/ - -IOWorker::IOWorker ( - Database *database - , Nan::Callback *callback - , const char *resource_name - , rocksdb::Slice key - , v8::Local &keyHandle -) : AsyncWorker(database, callback, resource_name) - , key(key) -{ - Nan::HandleScope scope; - - SaveToPersistent("key", keyHandle); -}; - -IOWorker::~IOWorker () {} - -void IOWorker::WorkComplete () { - Nan::HandleScope scope; - - DisposeStringOrBufferFromSlice(GetFromPersistent("key"), key); - AsyncWorker::WorkComplete(); -} - -/** READ WORKER **/ - -ReadWorker::ReadWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice key - , bool asBuffer - , bool fillCache - , v8::Local &keyHandle -) : IOWorker(database, callback, "rocksdb:db.get", key, keyHandle) - , asBuffer(asBuffer) -{ - Nan::HandleScope scope; - - options = new rocksdb::ReadOptions(); - options->fill_cache = fillCache; - SaveToPersistent("key", keyHandle); -}; - -ReadWorker::~ReadWorker () { - delete options; -} - -void ReadWorker::Execute () { - SetStatus(database->GetFromDatabase(options, key, value)); -} - -void ReadWorker::HandleOKCallback () { - Nan::HandleScope scope; - - v8::Local returnValue; - if (asBuffer) { - //TODO: could use NewBuffer if we carefully manage the lifecycle of `value` - //and avoid an an extra allocation. We'd have to clean up properly when not OK - //and let the new Buffer manage the data when OK - returnValue = Nan::CopyBuffer((char*)value.data(), value.size()).ToLocalChecked(); - } else { - returnValue = Nan::New((char*)value.data(), value.size()).ToLocalChecked(); - } - v8::Local argv[] = { - Nan::Null() - , returnValue - }; - callback->Call(2, argv, async_resource); -} - -/** DELETE WORKER **/ - -DeleteWorker::DeleteWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice key - , bool sync - , v8::Local &keyHandle - , const char *resource_name -) : IOWorker(database, callback, resource_name, key, keyHandle) -{ - Nan::HandleScope scope; - - options = new rocksdb::WriteOptions(); - options->sync = sync; - SaveToPersistent("key", keyHandle); -}; - -DeleteWorker::~DeleteWorker () { - delete options; -} - -void DeleteWorker::Execute () { - SetStatus(database->DeleteFromDatabase(options, key)); -} - -/** WRITE WORKER **/ - -WriteWorker::WriteWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice key - , rocksdb::Slice value - , bool sync - , v8::Local &keyHandle - , v8::Local &valueHandle -) : DeleteWorker(database, callback, key, sync, keyHandle, "rocksdb:db.put") - , value(value) -{ - Nan::HandleScope scope; - - SaveToPersistent("value", valueHandle); -}; - -WriteWorker::~WriteWorker () { } - -void WriteWorker::Execute () { - SetStatus(database->PutToDatabase(options, key, value)); -} - -void WriteWorker::WorkComplete () { - Nan::HandleScope scope; - - DisposeStringOrBufferFromSlice(GetFromPersistent("value"), value); - IOWorker::WorkComplete(); -} - -/** BATCH WORKER **/ - -BatchWorker::BatchWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::WriteBatch* batch - , bool sync -) : AsyncWorker(database, callback, "rocksdb:db.batch") - , batch(batch) -{ - options = new rocksdb::WriteOptions(); - options->sync = sync; -}; - -BatchWorker::~BatchWorker () { - delete batch; - delete options; -} - -void BatchWorker::Execute () { - SetStatus(database->WriteBatchToDatabase(options, batch)); -} - -/** APPROXIMATE SIZE WORKER **/ - -ApproximateSizeWorker::ApproximateSizeWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice start - , rocksdb::Slice end - , v8::Local &startHandle - , v8::Local &endHandle -) : AsyncWorker(database, callback, "rocksdb:db.approximateSize") - , range(start, end) -{ - Nan::HandleScope scope; - - SaveToPersistent("start", startHandle); - SaveToPersistent("end", endHandle); -}; - -ApproximateSizeWorker::~ApproximateSizeWorker () {} - -void ApproximateSizeWorker::Execute () { - size = database->ApproximateSizeFromDatabase(&range); -} - -void ApproximateSizeWorker::WorkComplete() { - Nan::HandleScope scope; - - DisposeStringOrBufferFromSlice(GetFromPersistent("start"), range.start); - DisposeStringOrBufferFromSlice(GetFromPersistent("end"), range.limit); - AsyncWorker::WorkComplete(); -} - -void ApproximateSizeWorker::HandleOKCallback () { - Nan::HandleScope scope; - - v8::Local returnValue = Nan::New((double) size); - v8::Local argv[] = { - Nan::Null() - , returnValue - }; - callback->Call(2, argv, async_resource); -} - -/** COMPACT RANGE WORKER **/ - -CompactRangeWorker::CompactRangeWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice start - , rocksdb::Slice end - , v8::Local &startHandle - , v8::Local &endHandle -) : AsyncWorker(database, callback, "rocksdb:db.compactRange") -{ - Nan::HandleScope scope; - - rangeStart = start; - rangeEnd = end; - - SaveToPersistent("compactStart", startHandle); - SaveToPersistent("compactEnd", endHandle); -}; - -CompactRangeWorker::~CompactRangeWorker () {} - -void CompactRangeWorker::Execute () { - database->CompactRangeFromDatabase(&rangeStart, &rangeEnd); -} - -void CompactRangeWorker::WorkComplete() { - Nan::HandleScope scope; - - DisposeStringOrBufferFromSlice(GetFromPersistent("compactStart"), rangeStart); - DisposeStringOrBufferFromSlice(GetFromPersistent("compactEnd"), rangeEnd); - AsyncWorker::WorkComplete(); -} - -void CompactRangeWorker::HandleOKCallback () { - Nan::HandleScope scope; - - v8::Local argv[] = { - Nan::Null() - }; - callback->Call(1, argv, async_resource); -} - -} // namespace leveldown diff --git a/src/database_async.h b/src/database_async.h deleted file mode 100644 index 61e6c438..00000000 --- a/src/database_async.h +++ /dev/null @@ -1,189 +0,0 @@ -#ifndef LD_DATABASE_ASYNC_H -#define LD_DATABASE_ASYNC_H - -#include -#include - -#include - -#include "async.h" - -namespace leveldown { - -class OpenWorker : public AsyncWorker { -public: - OpenWorker ( - Database *database - , Nan::Callback *callback - , std::shared_ptr blockCache - , const rocksdb::FilterPolicy* filterPolicy - , bool createIfMissing - , bool errorIfExists - , bool compression - , uint32_t writeBufferSize - , uint32_t blockSize - , uint32_t maxOpenFiles - , uint32_t blockRestartInterval - , uint32_t maxFileSize - , bool readOnly - ); - - virtual ~OpenWorker (); - virtual void Execute (); - -private: - rocksdb::Options* options; - bool readOnly_; -}; - -class CloseWorker : public AsyncWorker { -public: - CloseWorker ( - Database *database - , Nan::Callback *callback - ); - - virtual ~CloseWorker (); - virtual void Execute (); - virtual void WorkComplete (); -}; - -class IOWorker : public AsyncWorker { -public: - IOWorker ( - Database *database - , Nan::Callback *callback - , const char *resource_name - , rocksdb::Slice key - , v8::Local &keyHandle - ); - - virtual ~IOWorker (); - virtual void WorkComplete (); - -protected: - rocksdb::Slice key; -}; - -class ReadWorker : public IOWorker { -public: - ReadWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice key - , bool asBuffer - , bool fillCache - , v8::Local &keyHandle - ); - - virtual ~ReadWorker (); - virtual void Execute (); - virtual void HandleOKCallback (); - -private: - bool asBuffer; - rocksdb::ReadOptions* options; - std::string value; -}; - -class DeleteWorker : public IOWorker { -public: - DeleteWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice key - , bool sync - , v8::Local &keyHandle - , const char *resource_name = "rocksdb:db.del" - ); - - virtual ~DeleteWorker (); - virtual void Execute (); - -protected: - rocksdb::WriteOptions* options; -}; - -class WriteWorker : public DeleteWorker { -public: - WriteWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice key - , rocksdb::Slice value - , bool sync - , v8::Local &keyHandle - , v8::Local &valueHandle - ); - - virtual ~WriteWorker (); - virtual void Execute (); - virtual void WorkComplete (); - -private: - rocksdb::Slice value; -}; - -class BatchWorker : public AsyncWorker { -public: - BatchWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::WriteBatch* batch - , bool sync - ); - - virtual ~BatchWorker (); - virtual void Execute (); - -private: - rocksdb::WriteOptions* options; - rocksdb::WriteBatch* batch; -}; - -class ApproximateSizeWorker : public AsyncWorker { -public: - ApproximateSizeWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice start - , rocksdb::Slice end - , v8::Local &startHandle - , v8::Local &endHandle - ); - - virtual ~ApproximateSizeWorker (); - virtual void Execute (); - virtual void HandleOKCallback (); - virtual void WorkComplete (); - - private: - rocksdb::Range range; - uint64_t size; -}; - -class CompactRangeWorker : public AsyncWorker { -public: - CompactRangeWorker ( - Database *database - , Nan::Callback *callback - , rocksdb::Slice start - , rocksdb::Slice end - , v8::Local &startHandle - , v8::Local &endHandle - ); - - virtual ~CompactRangeWorker (); - virtual void Execute (); - virtual void HandleOKCallback (); - virtual void WorkComplete (); - - private: - rocksdb::Slice rangeStart; - rocksdb::Slice rangeEnd; -}; - - -} // namespace leveldown - -#endif diff --git a/src/iterator.cc b/src/iterator.cc deleted file mode 100644 index b2381c01..00000000 --- a/src/iterator.cc +++ /dev/null @@ -1,613 +0,0 @@ -#include -#include - -#include "database.h" -#include "iterator.h" -#include "iterator_async.h" -#include "common.h" - -namespace leveldown { - -static Nan::Persistent iterator_constructor; - -Iterator::Iterator ( - Database* database - , uint32_t id - , rocksdb::Slice* start - , std::string* end - , bool reverse - , bool keys - , bool values - , int limit - , std::string* lt - , std::string* lte - , std::string* gt - , std::string* gte - , bool fillCache - , bool keyAsBuffer - , bool valueAsBuffer - , size_t highWaterMark -) : database(database) - , id(id) - , start(start) - , end(end) - , reverse(reverse) - , keys(keys) - , values(values) - , limit(limit) - , lt(lt) - , lte(lte) - , gt(gt) - , gte(gte) - , highWaterMark(highWaterMark) - , keyAsBuffer(keyAsBuffer) - , valueAsBuffer(valueAsBuffer) -{ - Nan::HandleScope scope; - - options = new rocksdb::ReadOptions(); - options->fill_cache = fillCache; - // get a snapshot of the current state - options->snapshot = database->NewSnapshot(); - dbIterator = NULL; - count = 0; - target = NULL; - seeking = false; - landed = false; - nexting = false; - ended = false; - endWorker = NULL; -}; - -Iterator::~Iterator () { - delete options; - ReleaseTarget(); - if (start != NULL) { - // Special case for `start` option: it won't be - // freed up by any of the delete calls below. - if (!((lt != NULL && reverse) - || (lte != NULL && reverse) - || (gt != NULL && !reverse) - || (gte != NULL && !reverse))) { - delete[] start->data(); - } - delete start; - } - if (end != NULL) - delete end; - if (lt != NULL) - delete lt; - if (gt != NULL) - delete gt; - if (lte != NULL) - delete lte; - if (gte != NULL) - delete gte; -}; - -bool Iterator::GetIterator () { - if (dbIterator == NULL) { - dbIterator = database->NewIterator(options); - - if (start != NULL) { - dbIterator->Seek(*start); - - if (reverse) { - if (!dbIterator->Valid()) { - // if it's past the last key, step back - dbIterator->SeekToLast(); - } else { - std::string key_ = dbIterator->key().ToString(); - - if (lt != NULL) { - if (lt->compare(key_) <= 0) - dbIterator->Prev(); - } else if (lte != NULL) { - if (lte->compare(key_) < 0) - dbIterator->Prev(); - } else if (start != NULL) { - if (start->compare(key_)) - dbIterator->Prev(); - } - } - - if (dbIterator->Valid() && lt != NULL) { - if (lt->compare(dbIterator->key().ToString()) <= 0) - dbIterator->Prev(); - } - } else { - if (dbIterator->Valid() && gt != NULL - && gt->compare(dbIterator->key().ToString()) == 0) - dbIterator->Next(); - } - } else if (reverse) { - dbIterator->SeekToLast(); - } else { - dbIterator->SeekToFirst(); - } - - return true; - } - return false; -} - -bool Iterator::Read (std::string& key, std::string& value) { - // if it's not the first call, move to next item. - if (!GetIterator() && !seeking) { - if (reverse) - dbIterator->Prev(); - else - dbIterator->Next(); - } - - seeking = false; - - // now check if this is the end or not, if not then return the key & value - if (dbIterator->Valid()) { - std::string key_ = dbIterator->key().ToString(); - int isEnd = end == NULL ? 1 : end->compare(key_); - - if ((limit < 0 || ++count <= limit) - && (end == NULL - || (reverse && (isEnd <= 0)) - || (!reverse && (isEnd >= 0))) - && ( lt != NULL ? (lt->compare(key_) > 0) - : lte != NULL ? (lte->compare(key_) >= 0) - : true ) - && ( gt != NULL ? (gt->compare(key_) < 0) - : gte != NULL ? (gte->compare(key_) <= 0) - : true ) - ) { - if (keys) - key.assign(dbIterator->key().data(), dbIterator->key().size()); - if (values) - value.assign(dbIterator->value().data(), dbIterator->value().size()); - return true; - } - } - - return false; -} - -bool Iterator::OutOfRange (rocksdb::Slice* target) { - if (lt != NULL) { - if (target->compare(*lt) >= 0) - return true; - } else if (lte != NULL) { - if (target->compare(*lte) > 0) - return true; - } else if (start != NULL && reverse) { - if (target->compare(*start) > 0) - return true; - } - - if (end != NULL) { - int d = target->compare(*end); - if (reverse ? d < 0 : d > 0) - return true; - } - - if (gt != NULL) { - if (target->compare(*gt) <= 0) - return true; - } else if (gte != NULL) { - if (target->compare(*gte) < 0) - return true; - } else if (start != NULL && !reverse) { - if (target->compare(*start) < 0) - return true; - } - - return false; -} - -bool Iterator::IteratorNext (std::vector >& result) { - size_t size = 0; - while(true) { - std::string key, value; - bool ok = Read(key, value); - - if (ok) { - result.push_back(std::make_pair(key, value)); - - if (!landed) { - landed = true; - return true; - } - - size = size + key.size() + value.size(); - if (size > highWaterMark) - return true; - - } else { - return false; - } - } -} - -rocksdb::Status Iterator::IteratorStatus () { - return dbIterator->status(); -} - -void Iterator::IteratorEnd () { - //TODO: could return it->status() - delete dbIterator; - dbIterator = NULL; - database->ReleaseSnapshot(options->snapshot); -} - -void Iterator::Release () { - database->ReleaseIterator(id); -} - -void Iterator::ReleaseTarget () { - if (target != NULL) { - - if (!target->empty()) - delete[] target->data(); - - delete target; - target = NULL; - } -} - -void checkEndCallback (Iterator* iterator) { - iterator->ReleaseTarget(); - iterator->nexting = false; - if (iterator->endWorker != NULL) { - Nan::AsyncQueueWorker(iterator->endWorker); - iterator->endWorker = NULL; - } -} - -NAN_METHOD(Iterator::Seek) { - Iterator* iterator = Nan::ObjectWrap::Unwrap(info.This()); - - iterator->ReleaseTarget(); - - v8::Local targetBuffer = info[0].As(); - LD_STRING_OR_BUFFER_TO_COPY(_target, targetBuffer, target); - iterator->target = new rocksdb::Slice(_targetCh_, _targetSz_); - - iterator->GetIterator(); - rocksdb::Iterator* dbIterator = iterator->dbIterator; - - dbIterator->Seek(*iterator->target); - iterator->seeking = true; - iterator->landed = false; - - if (iterator->OutOfRange(iterator->target)) { - if (iterator->reverse) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } - else { - if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(*iterator->target); - if (cmp > 0 && iterator->reverse) { - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse) { - dbIterator->Next(); - } - } else { - if (iterator->reverse) { - dbIterator->SeekToLast(); - } else { - dbIterator->SeekToFirst(); - } - if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(*iterator->target); - if (cmp > 0 && iterator->reverse) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse) { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } - } - } - - info.GetReturnValue().Set(info.Holder()); -} - -NAN_METHOD(Iterator::Next) { - Iterator* iterator = Nan::ObjectWrap::Unwrap(info.This()); - - if (!info[0]->IsFunction()) { - return Nan::ThrowError("next() requires a callback argument"); - } - - v8::Local callback = info[0].As(); - - if (iterator->ended) { - if (!callback.IsEmpty() && callback->IsFunction()) { - v8::Local argv[] = { Nan::Error("iterator has ended") }; - LD_RUN_CALLBACK("rocksdb:iterator.next", callback, 1, argv); - info.GetReturnValue().SetUndefined(); - return; - } - return Nan::ThrowError("iterator has ended"); - } - - NextWorker* worker = new NextWorker( - iterator - , new Nan::Callback(callback) - , checkEndCallback - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("iterator", _this); - iterator->nexting = true; - Nan::AsyncQueueWorker(worker); - - info.GetReturnValue().Set(info.Holder()); -} - -NAN_METHOD(Iterator::End) { - Iterator* iterator = Nan::ObjectWrap::Unwrap(info.This()); - - if (!info[0]->IsFunction()) { - return Nan::ThrowError("end() requires a callback argument"); - } - - if (!iterator->ended) { - v8::Local callback = v8::Local::Cast(info[0]); - - EndWorker* worker = new EndWorker( - iterator - , new Nan::Callback(callback) - ); - // persist to prevent accidental GC - v8::Local _this = info.This(); - worker->SaveToPersistent("iterator", _this); - iterator->ended = true; - - if (iterator->nexting) { - // waiting for a next() to return, queue the end - iterator->endWorker = worker; - } else { - Nan::AsyncQueueWorker(worker); - } - } - - info.GetReturnValue().Set(info.Holder()); -} - -void Iterator::Init () { - v8::Local tpl = - Nan::New(Iterator::New); - iterator_constructor.Reset(tpl); - tpl->SetClassName(Nan::New("Iterator").ToLocalChecked()); - tpl->InstanceTemplate()->SetInternalFieldCount(1); - Nan::SetPrototypeMethod(tpl, "seek", Iterator::Seek); - Nan::SetPrototypeMethod(tpl, "next", Iterator::Next); - Nan::SetPrototypeMethod(tpl, "end", Iterator::End); -} - -v8::Local Iterator::NewInstance ( - v8::Local database - , v8::Local id - , v8::Local optionsObj - ) { - - Nan::EscapableHandleScope scope; - - Nan::MaybeLocal maybeInstance; - v8::Local instance; - v8::Local constructorHandle = - Nan::New(iterator_constructor); - - if (optionsObj.IsEmpty()) { - v8::Local argv[2] = { database, id }; - maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 2, argv); - } else { - v8::Local argv[3] = { database, id, optionsObj }; - maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 3, argv); - } - - if (maybeInstance.IsEmpty()) - Nan::ThrowError("Could not create new Iterator instance"); - else - instance = maybeInstance.ToLocalChecked(); - return scope.Escape(instance); -} - -NAN_METHOD(Iterator::New) { - Database* database = Nan::ObjectWrap::Unwrap(info[0]->ToObject()); - - rocksdb::Slice* start = NULL; - std::string* end = NULL; - int limit = -1; - // default highWaterMark from Readble-streams - size_t highWaterMark = 16 * 1024; - - v8::Local id = info[1]; - - v8::Local optionsObj; - - v8::Local ltHandle; - v8::Local lteHandle; - v8::Local gtHandle; - v8::Local gteHandle; - - char *startStr = NULL; - std::string* lt = NULL; - std::string* lte = NULL; - std::string* gt = NULL; - std::string* gte = NULL; - - //default to forward. - bool reverse = false; - - if (info.Length() > 1 && info[2]->IsObject()) { - optionsObj = v8::Local::Cast(info[2]); - - reverse = BooleanOptionValue(optionsObj, "reverse"); - - if (optionsObj->Has(Nan::New("start").ToLocalChecked()) - && (node::Buffer::HasInstance(optionsObj->Get(Nan::New("start").ToLocalChecked())) - || optionsObj->Get(Nan::New("start").ToLocalChecked())->IsString())) { - - v8::Local startBuffer = optionsObj->Get(Nan::New("start").ToLocalChecked()); - - // ignore start if it has size 0 since a Slice can't have length 0 - if (StringOrBufferLength(startBuffer) > 0) { - LD_STRING_OR_BUFFER_TO_COPY(_start, startBuffer, start) - start = new rocksdb::Slice(_startCh_, _startSz_); - startStr = _startCh_; - } - } - - if (optionsObj->Has(Nan::New("end").ToLocalChecked()) - && (node::Buffer::HasInstance(optionsObj->Get(Nan::New("end").ToLocalChecked())) - || optionsObj->Get(Nan::New("end").ToLocalChecked())->IsString())) { - - v8::Local endBuffer = optionsObj->Get(Nan::New("end").ToLocalChecked()); - - // ignore end if it has size 0 since a Slice can't have length 0 - if (StringOrBufferLength(endBuffer) > 0) { - LD_STRING_OR_BUFFER_TO_COPY(_end, endBuffer, end) - end = new std::string(_endCh_, _endSz_); - delete[] _endCh_; - } - } - - if (!optionsObj.IsEmpty() && optionsObj->Has(Nan::New("limit").ToLocalChecked())) { - limit = v8::Local::Cast(optionsObj->Get( - Nan::New("limit").ToLocalChecked()))->Value(); - } - - if (optionsObj->Has(Nan::New("highWaterMark").ToLocalChecked())) { - highWaterMark = v8::Local::Cast(optionsObj->Get( - Nan::New("highWaterMark").ToLocalChecked()))->Value(); - } - - if (optionsObj->Has(Nan::New("lt").ToLocalChecked()) - && (node::Buffer::HasInstance(optionsObj->Get(Nan::New("lt").ToLocalChecked())) - || optionsObj->Get(Nan::New("lt").ToLocalChecked())->IsString())) { - - v8::Local ltBuffer = optionsObj->Get(Nan::New("lt").ToLocalChecked()); - - // ignore end if it has size 0 since a Slice can't have length 0 - if (StringOrBufferLength(ltBuffer) > 0) { - LD_STRING_OR_BUFFER_TO_COPY(_lt, ltBuffer, lt) - lt = new std::string(_ltCh_, _ltSz_); - delete[] _ltCh_; - if (reverse) { - if (startStr != NULL) { - delete[] startStr; - startStr = NULL; - } - if (start != NULL) - delete start; - start = new rocksdb::Slice(lt->data(), lt->size()); - } - } - } - - if (optionsObj->Has(Nan::New("lte").ToLocalChecked()) - && (node::Buffer::HasInstance(optionsObj->Get(Nan::New("lte").ToLocalChecked())) - || optionsObj->Get(Nan::New("lte").ToLocalChecked())->IsString())) { - - v8::Local lteBuffer = optionsObj->Get(Nan::New("lte").ToLocalChecked()); - - // ignore end if it has size 0 since a Slice can't have length 0 - if (StringOrBufferLength(lteBuffer) > 0) { - LD_STRING_OR_BUFFER_TO_COPY(_lte, lteBuffer, lte) - lte = new std::string(_lteCh_, _lteSz_); - delete[] _lteCh_; - if (reverse) { - if (startStr != NULL) { - delete[] startStr; - startStr = NULL; - } - if (start != NULL) - delete start; - start = new rocksdb::Slice(lte->data(), lte->size()); - } - } - } - - if (optionsObj->Has(Nan::New("gt").ToLocalChecked()) - && (node::Buffer::HasInstance(optionsObj->Get(Nan::New("gt").ToLocalChecked())) - || optionsObj->Get(Nan::New("gt").ToLocalChecked())->IsString())) { - - v8::Local gtBuffer = optionsObj->Get(Nan::New("gt").ToLocalChecked()); - - // ignore end if it has size 0 since a Slice can't have length 0 - if (StringOrBufferLength(gtBuffer) > 0) { - LD_STRING_OR_BUFFER_TO_COPY(_gt, gtBuffer, gt) - gt = new std::string(_gtCh_, _gtSz_); - delete[] _gtCh_; - if (!reverse) { - if (startStr != NULL) { - delete[] startStr; - startStr = NULL; - } - if (start != NULL) - delete start; - start = new rocksdb::Slice(gt->data(), gt->size()); - } - } - } - - if (optionsObj->Has(Nan::New("gte").ToLocalChecked()) - && (node::Buffer::HasInstance(optionsObj->Get(Nan::New("gte").ToLocalChecked())) - || optionsObj->Get(Nan::New("gte").ToLocalChecked())->IsString())) { - - v8::Local gteBuffer = optionsObj->Get(Nan::New("gte").ToLocalChecked()); - - // ignore end if it has size 0 since a Slice can't have length 0 - if (StringOrBufferLength(gteBuffer) > 0) { - LD_STRING_OR_BUFFER_TO_COPY(_gte, gteBuffer, gte) - gte = new std::string(_gteCh_, _gteSz_); - delete[] _gteCh_; - if (!reverse) { - if (startStr != NULL) { - delete[] startStr; - startStr = NULL; - } - if (start != NULL) - delete start; - start = new rocksdb::Slice(gte->data(), gte->size()); - } - } - } - - } - - bool keys = BooleanOptionValue(optionsObj, "keys", true); - bool values = BooleanOptionValue(optionsObj, "values", true); - bool keyAsBuffer = BooleanOptionValue(optionsObj, "keyAsBuffer", true); - bool valueAsBuffer = BooleanOptionValue(optionsObj, "valueAsBuffer", true); - bool fillCache = BooleanOptionValue(optionsObj, "fillCache"); - - Iterator* iterator = new Iterator( - database - , (uint32_t)id->Int32Value() - , start - , end - , reverse - , keys - , values - , limit - , lt - , lte - , gt - , gte - , fillCache - , keyAsBuffer - , valueAsBuffer - , highWaterMark - ); - iterator->Wrap(info.This()); - - info.GetReturnValue().Set(info.This()); -} - -} // namespace leveldown diff --git a/src/iterator.h b/src/iterator.h deleted file mode 100644 index 201775cf..00000000 --- a/src/iterator.h +++ /dev/null @@ -1,94 +0,0 @@ -#ifndef LD_ITERATOR_H -#define LD_ITERATOR_H - -#include -#include -#include - -#include "leveldown.h" -#include "database.h" -#include "async.h" - -namespace leveldown { - -class Database; -class AsyncWorker; - -class Iterator : public Nan::ObjectWrap { -public: - static void Init (); - static v8::Local NewInstance ( - v8::Local database - , v8::Local id - , v8::Local optionsObj - ); - - Iterator ( - Database* database - , uint32_t id - , rocksdb::Slice* start - , std::string* end - , bool reverse - , bool keys - , bool values - , int limit - , std::string* lt - , std::string* lte - , std::string* gt - , std::string* gte - , bool fillCache - , bool keyAsBuffer - , bool valueAsBuffer - , size_t highWaterMark - ); - - ~Iterator (); - - bool IteratorNext (std::vector >& result); - rocksdb::Status IteratorStatus (); - void IteratorEnd (); - void Release (); - void ReleaseTarget (); - -private: - Database* database; - uint32_t id; - rocksdb::Iterator* dbIterator; - rocksdb::ReadOptions* options; - rocksdb::Slice* start; - rocksdb::Slice* target; - std::string* end; - bool seeking; - bool landed; - bool reverse; - bool keys; - bool values; - int limit; - std::string* lt; - std::string* lte; - std::string* gt; - std::string* gte; - int count; - size_t highWaterMark; - -public: - bool keyAsBuffer; - bool valueAsBuffer; - bool nexting; - bool ended; - AsyncWorker* endWorker; - -private: - bool Read (std::string& key, std::string& value); - bool GetIterator (); - bool OutOfRange (rocksdb::Slice* target); - - static NAN_METHOD(New); - static NAN_METHOD(Seek); - static NAN_METHOD(Next); - static NAN_METHOD(End); -}; - -} // namespace leveldown - -#endif diff --git a/src/iterator_async.cc b/src/iterator_async.cc deleted file mode 100644 index 7dea9740..00000000 --- a/src/iterator_async.cc +++ /dev/null @@ -1,95 +0,0 @@ -#include -#include - -#include "database.h" -#include "leveldown.h" -#include "async.h" -#include "iterator_async.h" - -namespace leveldown { - -/** NEXT-MULTI WORKER **/ - -NextWorker::NextWorker ( - Iterator* iterator - , Nan::Callback *callback - , void (*localCallback)(Iterator*) -) : AsyncWorker(NULL, callback, "rocksdb:iterator.next") - , iterator(iterator) - , localCallback(localCallback) -{}; - -NextWorker::~NextWorker () {} - -void NextWorker::Execute () { - ok = iterator->IteratorNext(result); - if (!ok) - SetStatus(iterator->IteratorStatus()); -} - -void NextWorker::HandleOKCallback () { - Nan::HandleScope scope; - size_t idx = 0; - - size_t arraySize = result.size() * 2; - v8::Local returnArray = Nan::New(arraySize); - - for(idx = 0; idx < result.size(); ++idx) { - std::pair row = result[idx]; - std::string key = row.first; - std::string value = row.second; - - v8::Local returnKey; - if (iterator->keyAsBuffer) { - //TODO: use NewBuffer, see database_async.cc - returnKey = Nan::CopyBuffer((char*)key.data(), key.size()).ToLocalChecked(); - } else { - returnKey = Nan::New((char*)key.data(), key.size()).ToLocalChecked(); - } - - v8::Local returnValue; - if (iterator->valueAsBuffer) { - //TODO: use NewBuffer, see database_async.cc - returnValue = Nan::CopyBuffer((char*)value.data(), value.size()).ToLocalChecked(); - } else { - returnValue = Nan::New((char*)value.data(), value.size()).ToLocalChecked(); - } - - // put the key & value in a descending order, so that they can be .pop:ed in javascript-land - returnArray->Set(Nan::New(static_cast(arraySize - idx * 2 - 1)), returnKey); - returnArray->Set(Nan::New(static_cast(arraySize - idx * 2 - 2)), returnValue); - } - - // clean up & handle the next/end state see iterator.cc/checkEndCallback - localCallback(iterator); - - v8::Local argv[] = { - Nan::Null() - , returnArray - // when ok === false all data has been read, so it's then finished - , Nan::New(!ok) - }; - callback->Call(3, argv, async_resource); -} - -/** END WORKER **/ - -EndWorker::EndWorker ( - Iterator* iterator - , Nan::Callback *callback -) : AsyncWorker(NULL, callback, "rocksdb:iterator.end") - , iterator(iterator) -{}; - -EndWorker::~EndWorker () { } - -void EndWorker::Execute () { - iterator->IteratorEnd(); -} - -void EndWorker::HandleOKCallback () { - iterator->Release(); - callback->Call(0, NULL, async_resource); -} - -} // namespace leveldown diff --git a/src/iterator_async.h b/src/iterator_async.h deleted file mode 100644 index 0906ebf2..00000000 --- a/src/iterator_async.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef LD_ITERATOR_ASYNC_H -#define LD_ITERATOR_ASYNC_H - -#include -#include - -#include "async.h" -#include "iterator.h" - -namespace leveldown { - -class NextWorker : public AsyncWorker { -public: - NextWorker ( - Iterator* iterator - , Nan::Callback *callback - , void (*localCallback)(Iterator*) - ); - - virtual ~NextWorker (); - virtual void Execute (); - virtual void HandleOKCallback (); - -private: - Iterator* iterator; - void (*localCallback)(Iterator*); - std::vector > result; - bool ok; -}; - -class EndWorker : public AsyncWorker { -public: - EndWorker ( - Iterator* iterator - , Nan::Callback *callback - ); - - virtual ~EndWorker (); - virtual void Execute (); - virtual void HandleOKCallback (); - -private: - Iterator* iterator; -}; - -} // namespace leveldown - -#endif diff --git a/src/leveldown.cc b/src/leveldown.cc deleted file mode 100644 index 2e7f6c1d..00000000 --- a/src/leveldown.cc +++ /dev/null @@ -1,70 +0,0 @@ -#include - -#include "leveldown.h" -#include "database.h" -#include "iterator.h" -#include "batch.h" -#include "leveldown_async.h" - -namespace leveldown { - -NAN_METHOD(DestroyDB) { - Nan::HandleScope scope; - - Nan::Utf8String* location = new Nan::Utf8String(info[0]); - - Nan::Callback* callback = new Nan::Callback( - v8::Local::Cast(info[1])); - - DestroyWorker* worker = new DestroyWorker( - location - , callback - ); - - Nan::AsyncQueueWorker(worker); - - info.GetReturnValue().SetUndefined(); -} - -NAN_METHOD(RepairDB) { - Nan::HandleScope scope; - - Nan::Utf8String* location = new Nan::Utf8String(info[0]); - - Nan::Callback* callback = new Nan::Callback( - v8::Local::Cast(info[1])); - - RepairWorker* worker = new RepairWorker( - location - , callback - ); - - Nan::AsyncQueueWorker(worker); - - info.GetReturnValue().SetUndefined(); -} - -void Init (v8::Local target) { - Database::Init(); - leveldown::Iterator::Init(); - leveldown::Batch::Init(); - - v8::Local leveldown = - Nan::New(LevelDOWN)->GetFunction(); - - leveldown->Set( - Nan::New("destroy").ToLocalChecked() - , Nan::New(DestroyDB)->GetFunction() - ); - - leveldown->Set( - Nan::New("repair").ToLocalChecked() - , Nan::New(RepairDB)->GetFunction() - ); - - target->Set(Nan::New("leveldown").ToLocalChecked(), leveldown); -} - -NODE_MODULE(leveldown, Init) - -} // namespace leveldown diff --git a/src/leveldown.h b/src/leveldown.h deleted file mode 100644 index f86209bf..00000000 --- a/src/leveldown.h +++ /dev/null @@ -1,117 +0,0 @@ -#ifndef LD_LEVELDOWN_H -#define LD_LEVELDOWN_H - -#include -#include -#include -#include - -static inline size_t StringOrBufferLength(v8::Local obj) { - Nan::HandleScope scope; - - return (!obj->ToObject().IsEmpty() - && node::Buffer::HasInstance(obj->ToObject())) - ? node::Buffer::Length(obj->ToObject()) - : obj->ToString()->Utf8Length(); -} - -// NOTE: this MUST be called on objects created by -// LD_STRING_OR_BUFFER_TO_SLICE -static inline void DisposeStringOrBufferFromSlice( - Nan::Persistent &handle - , rocksdb::Slice slice) { - Nan::HandleScope scope; - - if (!slice.empty()) { - v8::Local obj = Nan::New(handle)->Get(Nan::New("obj").ToLocalChecked()); - if (!node::Buffer::HasInstance(obj)) - delete[] slice.data(); - } - - handle.Reset(); -} - -static inline void DisposeStringOrBufferFromSlice( - v8::Local handle - , rocksdb::Slice slice) { - - if (!slice.empty() && !node::Buffer::HasInstance(handle)) - delete[] slice.data(); -} - -// NOTE: must call DisposeStringOrBufferFromSlice() on objects created here -#define LD_STRING_OR_BUFFER_TO_SLICE(to, from, name) \ - size_t to ## Sz_; \ - char* to ## Ch_; \ - if (from->IsNull() || from->IsUndefined()) { \ - to ## Sz_ = 0; \ - to ## Ch_ = 0; \ - } else if (!from->ToObject().IsEmpty() \ - && node::Buffer::HasInstance(from->ToObject())) { \ - to ## Sz_ = node::Buffer::Length(from->ToObject()); \ - to ## Ch_ = node::Buffer::Data(from->ToObject()); \ - } else { \ - v8::Local to ## Str = from->ToString(); \ - to ## Sz_ = to ## Str->Utf8Length(); \ - to ## Ch_ = new char[to ## Sz_]; \ - to ## Str->WriteUtf8( \ - to ## Ch_ \ - , -1 \ - , NULL, v8::String::NO_NULL_TERMINATION \ - ); \ - } \ - rocksdb::Slice to(to ## Ch_, to ## Sz_); - -#define LD_STRING_OR_BUFFER_TO_COPY(to, from, name) \ - size_t to ## Sz_; \ - char* to ## Ch_; \ - if (!from->ToObject().IsEmpty() \ - && node::Buffer::HasInstance(from->ToObject())) { \ - to ## Sz_ = node::Buffer::Length(from->ToObject()); \ - to ## Ch_ = new char[to ## Sz_]; \ - memcpy(to ## Ch_, node::Buffer::Data(from->ToObject()), to ## Sz_); \ - } else { \ - v8::Local to ## Str = from->ToString(); \ - to ## Sz_ = to ## Str->Utf8Length(); \ - to ## Ch_ = new char[to ## Sz_]; \ - to ## Str->WriteUtf8( \ - to ## Ch_ \ - , -1 \ - , NULL, v8::String::NO_NULL_TERMINATION \ - ); \ - } - -#define LD_RUN_CALLBACK(resource, callback, argc, argv) \ - Nan::AsyncResource ar(resource); \ - ar.runInAsyncScope(Nan::GetCurrentContext()->Global(), \ - callback, argc, argv); - -/* LD_METHOD_SETUP_COMMON setup the following objects: - * - Database* database - * - v8::Local optionsObj (may be empty) - * - Nan::Persistent callback (won't be empty) - * Will throw/return if there isn't a callback in arg 0 or 1 - */ -#define LD_METHOD_SETUP_COMMON(name, optionPos, callbackPos) \ - if (info.Length() == 0) \ - return Nan::ThrowError(#name "() requires a callback argument"); \ - leveldown::Database* database = \ - Nan::ObjectWrap::Unwrap(info.This()); \ - v8::Local optionsObj; \ - v8::Local callback; \ - if (optionPos == -1 && info[callbackPos]->IsFunction()) { \ - callback = info[callbackPos].As(); \ - } else if (optionPos != -1 && info[callbackPos - 1]->IsFunction()) { \ - callback = info[callbackPos - 1].As(); \ - } else if (optionPos != -1 \ - && info[optionPos]->IsObject() \ - && info[callbackPos]->IsFunction()) { \ - optionsObj = info[optionPos].As(); \ - callback = info[callbackPos].As(); \ - } else { \ - return Nan::ThrowError(#name "() requires a callback argument"); \ - } - -#define LD_METHOD_SETUP_COMMON_ONEARG(name) LD_METHOD_SETUP_COMMON(name, -1, 0) - -#endif diff --git a/src/leveldown_async.cc b/src/leveldown_async.cc deleted file mode 100644 index 884845a8..00000000 --- a/src/leveldown_async.cc +++ /dev/null @@ -1,44 +0,0 @@ -#include - -#include "leveldown.h" -#include "leveldown_async.h" - -namespace leveldown { - -/** DESTROY WORKER **/ - -DestroyWorker::DestroyWorker ( - Nan::Utf8String* location - , Nan::Callback *callback -) : AsyncWorker(NULL, callback, "rocksdb:destroy") - , location(location) -{}; - -DestroyWorker::~DestroyWorker () { - delete location; -} - -void DestroyWorker::Execute () { - rocksdb::Options options; - SetStatus(rocksdb::DestroyDB(**location, options)); -} - -/** REPAIR WORKER **/ - -RepairWorker::RepairWorker ( - Nan::Utf8String* location - , Nan::Callback *callback -) : AsyncWorker(NULL, callback, "rocksdb:repair") - , location(location) -{}; - -RepairWorker::~RepairWorker () { - delete location; -} - -void RepairWorker::Execute () { - rocksdb::Options options; - SetStatus(rocksdb::RepairDB(**location, options)); -} - -} // namespace leveldown diff --git a/src/leveldown_async.h b/src/leveldown_async.h deleted file mode 100644 index 82facfa3..00000000 --- a/src/leveldown_async.h +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef LD_LEVELDOWN_ASYNC_H -#define LD_LEVELDOWN_ASYNC_H - -#include - -#include "async.h" - -namespace leveldown { - -class DestroyWorker : public AsyncWorker { -public: - DestroyWorker ( - Nan::Utf8String* location - , Nan::Callback *callback - ); - - virtual ~DestroyWorker (); - virtual void Execute (); - -private: - Nan::Utf8String* location; -}; - -class RepairWorker : public AsyncWorker { -public: - RepairWorker ( - Nan::Utf8String* location - , Nan::Callback *callback - ); - - virtual ~RepairWorker (); - virtual void Execute (); - -private: - Nan::Utf8String* location; -}; - -} // namespace leveldown - -#endif diff --git a/test/chained-batch-gc-test.js b/test/chained-batch-gc-test.js new file mode 100644 index 00000000..1981efcc --- /dev/null +++ b/test/chained-batch-gc-test.js @@ -0,0 +1,37 @@ +'use strict' + +const test = require('tape') +const testCommon = require('./common') + +// When we have a chained batch object without a reference, V8 might GC it +// before we get a chance to (asynchronously) write the batch. +test('chained batch without ref does not get GCed before write', function (t) { + t.plan(2) + + const db = testCommon.factory() + + db.open(function (err) { + t.ifError(err, 'no open error') + + let batch = db.batch() + + for (let i = 0; i < 1e3; i++) { + batch.put(String(i), 'value') + } + + // The sync option makes the operation slower and thus more likely to + // cause a segfault (if the batch were to be GC-ed before it is written). + batch.write({ sync: true }, function (err) { + t.ifError(err, 'no error from write()') + }) + + // Remove reference + batch = null + + if (global.gc) { + // This is the reliable way to trigger GC (and the bug if it exists). + // Useful for manual testing with "node --expose-gc". + global.gc() + } + }) +}) diff --git a/test/cleanup-hanging-iterators-test.js b/test/cleanup-hanging-iterators-test.js index e1e54092..5ff54103 100644 --- a/test/cleanup-hanging-iterators-test.js +++ b/test/cleanup-hanging-iterators-test.js @@ -1,9 +1,10 @@ const makeTest = require('./make') +const repeats = 200 makeTest('test ended iterator', function (db, t, done) { - // standard iterator with an end() properly called, easy - + // First test normal and proper usage: calling it.end() before db.close() var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false }) + it.next(function (err, key, value) { t.ifError(err, 'no error from next()') t.equal(key, 'one', 'correct key') @@ -15,9 +16,11 @@ makeTest('test ended iterator', function (db, t, done) { }) }) -makeTest('test non-ended iterator', function (db, t, done) { - // no end() call on our iterator, cleanup should crash Node if not handled properly +makeTest('test likely-ended iterator', function (db, t, done) { + // Test improper usage: not calling it.end() before db.close(). Cleanup of the + // database will crash Node if not handled properly. var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false }) + it.next(function (err, key, value) { t.ifError(err, 'no error from next()') t.equal(key, 'one', 'correct key') @@ -26,17 +29,61 @@ makeTest('test non-ended iterator', function (db, t, done) { }) }) +makeTest('test non-ended iterator', function (db, t, done) { + // Same as the test above but with a highWaterMark of 0 so that we don't + // preemptively fetch all records, to ensure that the iterator is still + // active when we (attempt to) close the database. + var it = db.iterator({ + highWaterMark: 0, + keyAsBuffer: false, + valueAsBuffer: false + }) + + it.next(function (err, key, value) { + t.ifError(err, 'no error from next()') + t.equal(key, 'one', 'correct key') + t.equal(value, '1', 'correct value') + done() + }) +}) + +makeTest('test multiple likely-ended iterators', function (db, t, done) { + // Same as the test above but repeated and with an extra iterator that is not + // nexting, which means its EndWorker will be executed almost immediately. + for (let i = 0; i < repeats; i++) { + db.iterator() + db.iterator().next(function () {}) + } + + setTimeout(done, Math.floor(Math.random() * 50)) +}) + makeTest('test multiple non-ended iterators', function (db, t, done) { - // no end() call on our iterator, cleanup should crash Node if not handled properly - db.iterator() - db.iterator().next(function () {}) - db.iterator().next(function () {}) - db.iterator().next(function () {}) - setTimeout(done, 50) + // Same as the test above but with a highWaterMark of 0. + for (let i = 0; i < repeats; i++) { + db.iterator({ highWaterMark: 0 }) + db.iterator({ highWaterMark: 0 }).next(function () {}) + } + + setTimeout(done, Math.floor(Math.random() * 50)) +}) + +global.gc && makeTest('test multiple non-ended iterators with forced gc', function (db, t, done) { + // Same as the test above but with forced GC, to test that the lifespan of an + // iterator is tied to *both* its JS object and whether the iterator was ended. + for (let i = 0; i < repeats; i++) { + db.iterator({ highWaterMark: 0 }) + db.iterator({ highWaterMark: 0 }).next(function () {}) + } + + setTimeout(function () { + global.gc() + done() + }, Math.floor(Math.random() * 50)) }) makeTest('test ending iterators', function (db, t, done) { - // at least one end() should be in progress when we try to close the db + // At least one end() should be in progress when we try to close the db. var it1 = db.iterator().next(function () { it1.end(function () {}) }) diff --git a/test/compact-range-test.js b/test/compact-range-test.js index d83263f2..f4fe4d57 100644 --- a/test/compact-range-test.js +++ b/test/compact-range-test.js @@ -1,7 +1,7 @@ const test = require('tape') const testCommon = require('./common') -var db +let db test('setUp common', testCommon.setUp) diff --git a/test/compression-test.js b/test/compression-test.js index a3b831c2..515296ab 100644 --- a/test/compression-test.js +++ b/test/compression-test.js @@ -5,11 +5,14 @@ const testCommon = require('./common') const leveldown = require('..') const test = require('tape') -var compressableData = Buffer.from(Array.apply(null, Array(1024 * 100)).map(function () { return 'aaaaaaaaaa' }).join('')) -var multiples = 10 -var dataSize = compressableData.length * multiples +const compressableData = Buffer.from(Array.apply(null, Array(1024 * 100)).map(function () { + return 'aaaaaaaaaa' +}).join('')) -var verify = function (location, compression, t) { +const multiples = 10 +const dataSize = compressableData.length * multiples + +const verify = function (location, compression, t) { du(location, function (err, size) { t.error(err) if (compression) { @@ -22,7 +25,7 @@ var verify = function (location, compression, t) { } // close, open, close again.. 'compaction' is also performed on open()s -var cycle = function (db, compression, t, callback) { +const cycle = function (db, compression, t, callback) { var location = db.location db.close(function (err) { t.error(err) @@ -45,11 +48,13 @@ test('compression', function (t) { var db = testCommon.factory() db.open(function (err) { t.error(err) - async.forEach(Array.apply(null, Array(multiples)).map(function (e, i) { - return [ i, compressableData ] - }), function (args, callback) { - db.put.apply(db, args.concat([callback])) - }, cycle.bind(null, db, true, t, delayed.delayed(verify.bind(null, db.location, true, t), 0.01))) + async.forEach( + Array.apply(null, Array(multiples)).map(function (e, i) { + return [ i, compressableData ] + }), function (args, callback) { + db.put.apply(db, args.concat([callback])) + }, cycle.bind(null, db, true, t, delayed.delayed(verify.bind(null, db.location, true, t), 0.01)) + ) }) }) @@ -57,11 +62,13 @@ test('compression', function (t) { var db = testCommon.factory() db.open({ compression: false }, function (err) { t.error(err) - async.forEach(Array.apply(null, Array(multiples)).map(function (e, i) { - return [ i, compressableData ] - }), function (args, callback) { - db.put.apply(db, args.concat([callback])) - }, cycle.bind(null, db, false, t, delayed.delayed(verify.bind(null, db.location, false, t), 0.01))) + async.forEach( + Array.apply(null, Array(multiples)).map(function (e, i) { + return [ i, compressableData ] + }), function (args, callback) { + db.put.apply(db, args.concat([callback])) + }, cycle.bind(null, db, false, t, delayed.delayed(verify.bind(null, db.location, false, t), 0.01)) + ) }) }) @@ -69,9 +76,11 @@ test('compression', function (t) { var db = testCommon.factory() db.open(function (err) { t.error(err) - db.batch(Array.apply(null, Array(multiples)).map(function (e, i) { - return { type: 'put', key: i, value: compressableData } - }), cycle.bind(null, db, false, t, delayed.delayed(verify.bind(null, db.location, false, t), 0.01))) + db.batch( + Array.apply(null, Array(multiples)).map(function (e, i) { + return { type: 'put', key: i, value: compressableData } + }), cycle.bind(null, db, false, t, delayed.delayed(verify.bind(null, db.location, false, t), 0.01)) + ) }) }) }) diff --git a/test/destroy-test.js b/test/destroy-test.js index 96b88748..32dfa976 100644 --- a/test/destroy-test.js +++ b/test/destroy-test.js @@ -5,7 +5,7 @@ const path = require('path') const mkfiletree = require('mkfiletree') const readfiletree = require('readfiletree') const rimraf = require('rimraf') -const leveldown = require('../') +const leveldown = require('..') const makeTest = require('./make') test('test argument-less destroy() throws', function (t) { @@ -18,7 +18,8 @@ test('test argument-less destroy() throws', function (t) { test('test callback-less, 1-arg, destroy() throws', function (t) { t.throws(leveldown.destroy.bind(null, 'foo'), { - name: 'Error', message: 'destroy() requires `location` and `callback` arguments' + name: 'Error', + message: 'destroy() requires `location` and `callback` arguments' }, 'callback-less, 1-arg destroy() throws') t.end() }) @@ -36,8 +37,8 @@ test('test destroy non-existent directory', function (t) { rimraf(location, { glob: false }, function (err) { t.ifError(err, 'no error from rimraf()') - leveldown.destroy(location, function () { - t.is(arguments.length, 0, 'no arguments returned on callback') + leveldown.destroy(location, function (err) { + t.error(err, 'no error') // Assert that destroy() didn't inadvertently create the directory. // Or if it did, that it was at least cleaned up afterwards. @@ -47,17 +48,15 @@ test('test destroy non-existent directory', function (t) { }) test('test destroy non-existent parent directory', function (t) { - t.plan(4) + t.plan(3) var location = '/1/2/3/4' var parent = path.dirname(location) t.notOk(fs.existsSync(parent), 'parent does not exist before') - leveldown.destroy(location, function () { - // This behavior differs from leveldown, which is silent. - t.is(arguments.length, 1, 'error object returned on callback') - t.ok(/.*IO error.*\/1\/2\/3\/4\/LOCK.*/.test(arguments[0]), 'got IO error') + leveldown.destroy(location, function (err) { + t.error(err, 'no error') t.notOk(fs.existsSync(location), 'directory does not exist after') }) }) diff --git a/test/electron.js b/test/electron.js new file mode 100644 index 00000000..086c5a94 --- /dev/null +++ b/test/electron.js @@ -0,0 +1,21 @@ +'use strict' + +const tape = require('tape') +const electron = require('electron') +const path = require('path') +const glob = require('glob') +const app = electron.app + +process.on('uncaughtException', function (err) { + console.error(err) + app.exit(1) +}) + +app.on('ready', function () { + tape.onFinish(() => app.quit()) + tape.onFailure(() => app.exit(1)) + + for (let file of glob.sync('test/*-test.js')) { + require(path.resolve('.', file)) + } +}) diff --git a/test/getproperty-test.js b/test/getproperty-test.js index e9b8ce49..fa072c63 100644 --- a/test/getproperty-test.js +++ b/test/getproperty-test.js @@ -1,7 +1,7 @@ const test = require('tape') const testCommon = require('./common') -var db +let db test('setUp common', testCommon.setUp) @@ -28,27 +28,28 @@ test('test non-string getProperty() throws', function (t) { test('test invalid getProperty() returns empty string', function (t) { t.equal(db.getProperty('foo'), '', 'invalid property') - t.equal(db.getProperty('rocksdb.foo'), '', 'invalid rocksdb.* property') + t.equal(db.getProperty('leveldb.foo'), '', 'invalid leveldb.* property') t.end() }) -test('test invalid getProperty("rocksdb.num-files-at-levelN") returns numbers', function (t) { +test('test invalid getProperty("leveldb.num-files-at-levelN") returns numbers', function (t) { for (var i = 0; i < 7; i++) { - t.equal(db.getProperty('rocksdb.num-files-at-level' + i), '0', '"rocksdb.num-files-at-levelN" === "0"') + t.equal(db.getProperty('leveldb.num-files-at-level' + i), + '0', '"leveldb.num-files-at-levelN" === "0"') } t.end() }) -test('test invalid getProperty("rocksdb.stats")', function (t) { - t.ok(db.getProperty('rocksdb.stats').split('\n').length > 3, 'rocksdb.stats has > 3 newlines') +test('test invalid getProperty("leveldb.stats")', function (t) { + t.ok(db.getProperty('leveldb.stats').split('\n').length > 3, 'leveldb.stats has > 3 newlines') t.end() }) -test('test invalid getProperty("rocksdb.sstables")', function (t) { +test('test invalid getProperty("leveldb.sstables")', function (t) { var expected = [0, 1, 2, 3, 4, 5, 6].map(function (l) { - return '--- level ' + l + ' --- version# 1 ---' + return '--- level ' + l + ' ---' }).join('\n') + '\n' - t.equal(db.getProperty('rocksdb.sstables'), expected, 'rocksdb.sstables') + t.equal(db.getProperty('leveldb.sstables'), expected, 'leveldb.sstables') t.end() }) diff --git a/test/iterator-recursion-test.js b/test/iterator-recursion-test.js index 71bd9edb..1c799fd2 100644 --- a/test/iterator-recursion-test.js +++ b/test/iterator-recursion-test.js @@ -3,8 +3,9 @@ const testCommon = require('./common') const fork = require('child_process').fork const path = require('path') -var db -var sourceData = (function () { +let db + +const sourceData = (function () { var d = [] var i = 0 var k diff --git a/test/iterator-test.js b/test/iterator-test.js index 40f59bfe..1a03b7cf 100644 --- a/test/iterator-test.js +++ b/test/iterator-test.js @@ -71,8 +71,10 @@ make('close db with open iterator', function (db, t, done) { ite.next(function loop (err, key, value) { if (cnt++ === 0) { + // The first call should succeed, because it was scheduled before close() t.ifError(err, 'no error from next()') } else { + // The second call should fail, because it was scheduled after close() t.equal(err.message, 'iterator has ended') hadError = true } diff --git a/test/leak-tester-batch.js b/test/leak-tester-batch.js index 03315119..5130ffe7 100644 --- a/test/leak-tester-batch.js +++ b/test/leak-tester-batch.js @@ -1,5 +1,3 @@ -/* global gc */ - const BUFFERS = false const CHAINED = false @@ -13,7 +11,7 @@ let db function print () { if (writeCount % 100 === 0) { - if (typeof gc !== 'undefined') gc() + if (typeof global.gc !== 'undefined') global.gc() console.log( 'writeCount =', writeCount, ', rss =', diff --git a/test/leak-tester-iterator.js b/test/leak-tester-iterator.js new file mode 100644 index 00000000..5e2cc593 --- /dev/null +++ b/test/leak-tester-iterator.js @@ -0,0 +1,45 @@ +const db = require('./common').factory() + +let count = 0 +let rssBase + +if (!global.gc) { + console.error('To force GC, run with "node --expose-gc"') +} + +function run () { + var it = db.iterator() + + it.next(function (err) { + if (err) throw err + + it.end(function (err) { + if (err) throw err + + if (!rssBase) { + rssBase = process.memoryUsage().rss + } + + if (++count % 1000 === 0) { + if (global.gc) global.gc() + + const rss = process.memoryUsage().rss + const percent = Math.round((rss / rssBase) * 100) + const mb = Math.round(rss / 1024 / 1024) + + console.log('count = %d, rss = %d% %dM', count, percent, mb) + } + + run() + }) + }) +} + +db.open(function (err) { + if (err) throw err + + db.put('key', 'value', function (err) { + if (err) throw err + run() + }) +}) diff --git a/test/leak-tester.js b/test/leak-tester.js index c745d3b0..6419bf6b 100644 --- a/test/leak-tester.js +++ b/test/leak-tester.js @@ -1,13 +1,12 @@ -/* global gc */ +const BUFFERS = false const testCommon = require('./common') const crypto = require('crypto') -var BUFFERS = false -var putCount = 0 -var getCount = 0 -var rssBase -var db +let putCount = 0 +let getCount = 0 +let rssBase +let db function run () { var key = 'long key to test memory usage ' + String(Math.floor(Math.random() * 10000000)) @@ -31,14 +30,13 @@ function run () { }) if (getCount % 1000 === 0) { - if (typeof gc !== 'undefined') gc() + if (typeof global.gc !== 'undefined') global.gc() console.log('getCount =', getCount, ', putCount = ', putCount, ', rss =', Math.round(process.memoryUsage().rss / rssBase * 100) + '%', Math.round(process.memoryUsage().rss / 1024 / 1024) + 'M', JSON.stringify([0, 1, 2, 3, 4, 5, 6].map(function (l) { return db.getProperty('leveldb.num-files-at-level' + l) - })) - ) + }))) } } diff --git a/test/port-libuv-fix-test.js b/test/port-libuv-fix-test.js index cdc41f9c..496d76a7 100644 --- a/test/port-libuv-fix-test.js +++ b/test/port-libuv-fix-test.js @@ -1,13 +1,11 @@ -/* Not sure if this is needed/supported with RocksDB. It looks like it might be - a small floated patch on leveldown. const test = require('tape') - , path = require('path') - , fs = require('fs') +const path = require('path') +const fs = require('fs') test('test port-libuv is being used', function (t) { var version = fs.readFileSync(path.join(__dirname, '../deps/leveldb/leveldb.gyp'), 'utf8') - .match(/'ldbversion': '([^']+)'/)[1] - , porth + .match(/"ldbversion": "([^"]+)"/)[1] + var porth t.ok(version, 'matched current leveldb version') @@ -17,4 +15,3 @@ test('test port-libuv is being used', function (t) { t.end() }) -*/ diff --git a/test/repair-test.js b/test/repair-test.js index 6591b770..9dc03d48 100644 --- a/test/repair-test.js +++ b/test/repair-test.js @@ -1,6 +1,6 @@ const test = require('tape') const fs = require('fs') -const leveldown = require('../') +const leveldown = require('..') const makeTest = require('./make') test('test argument-less repair() throws', function (t) { @@ -21,7 +21,11 @@ test('test callback-less, 1-arg, repair() throws', function (t) { test('test repair non-existent directory returns error', function (t) { leveldown.repair('/1/2/3/4', function (err) { - t.ok(/^Error: NotFound:/i.test(err), 'error on callback') + if (process.platform !== 'win32') { + t.ok(/no such file or directory/i.test(err), 'error on callback') + } else { + t.ok(/IO error/i.test(err), 'error on callback') + } t.end() }) }) @@ -35,14 +39,14 @@ makeTest('test repair() compacts', function (db, t, done) { var files = fs.readdirSync(location) t.ok(files.some(function (f) { return (/\.log$/).test(f) }), 'directory contains log file(s)') - t.notOk(files.some(function (f) { return (/\.sst$/).test(f) }), 'directory does not contain sst file(s)') + t.notOk(files.some(function (f) { return (/\.ldb$/).test(f) }), 'directory does not contain ldb file(s)') leveldown.repair(location, function (err) { t.ifError(err, 'no error from repair()') files = fs.readdirSync(location) t.notOk(files.some(function (f) { return (/\.log$/).test(f) }), 'directory does not contain log file(s)') - t.ok(files.some(function (f) { return (/\.sst$/).test(f) }), 'directory contains sst file(s)') + t.ok(files.some(function (f) { return (/\.ldb$/).test(f) }), 'directory contains ldb file(s)') done(null, false) }) diff --git a/test/segfault-test.js b/test/segfault-test.js index e35210cc..e7941520 100644 --- a/test/segfault-test.js +++ b/test/segfault-test.js @@ -1,28 +1,89 @@ const test = require('tape') const testCommon = require('./common') +const operations = [] -// See https://github.com/Level/leveldown/issues/157, not yet ported to rocksdb. -test.skip('close() does not segfault if there is a pending write', function (t) { - t.plan(3) +// The db must wait for pending operations to finish before closing. This to +// prevent segfaults and in the case of compactRange() to prevent hanging. See +// https://github.com/Level/leveldown/issues/157 and 32. +function testPending (name, expectedCount, fn) { + operations.push(fn) - const db = testCommon.factory() + test(`close() waits for pending ${name}`, function (t) { + const db = testCommon.factory() + let count = 0 - db.open(function (err) { - t.ifError(err, 'no open error') + db.open(function (err) { + t.ifError(err, 'no error from open()') - // The "sync" option seems to be a reliable way to trigger a segfault, - // but is not necessarily the cause of that segfault. More likely, it - // exposes a race condition that's already there. - db.put('foo', 'bar', { sync: true }, function (err) { - // We never get here, due to segfault. - t.ifError(err, 'no put error') - }) + db.put('key', 'value', function (err) { + t.ifError(err, 'no error from put()') - db.close(function (err) { - // We never get here, due to segfault. - t.ifError(err, 'no close error') + fn(db, function (err) { + count++ + t.ifError(err, 'no error from operation') + }) + + db.close(function (err) { + t.ifError(err, 'no error from close()') + t.is(count, expectedCount, 'operation(s) finished before close') + t.end() + }) + }) }) }) +} + +testPending('get()', 1, function (db, next) { + db.get('key', next) +}) + +testPending('put()', 1, function (db, next) { + db.put('key2', 'value', next) +}) + +testPending('put() with { sync }', 1, function (db, next) { + // The sync option makes the operation slower and thus more likely to + // cause a segfault (if closing were to happen during the operation). + db.put('key2', 'value', { sync: true }, next) +}) + +testPending('del()', 1, function (db, next) { + db.del('key', next) +}) + +testPending('del() with { sync }', 1, function (db, next) { + db.del('key', { sync: true }, next) +}) + +testPending('batch([])', 1, function (db, next) { + db.batch([{ type: 'del', key: 'key' }], next) +}) + +testPending('batch([]) with { sync }', 1, function (db, next) { + db.batch([{ type: 'del', key: 'key' }], { sync: true }, next) +}) + +testPending('batch()', 1, function (db, next) { + db.batch().del('key').write(next) +}) + +testPending('batch() with { sync }', 1, function (db, next) { + db.batch().del('key').write({ sync: true }, next) +}) + +testPending('approximateSize()', 1, function (db, next) { + db.approximateSize('a', 'z', next) +}) + +testPending('compactRange()', 1, function (db, next) { + db.compactRange('a', 'z', next) +}) + +// Test multiple pending operations, using all of the above. +testPending('operations', operations.length, function (db, next) { + for (let fn of operations.slice(0, -1)) { + fn(db, next) + } }) // See https://github.com/Level/leveldown/issues/134