From 71806393afe0da3705c0265791ef714a8c6d5fe2 Mon Sep 17 00:00:00 2001 From: feathercyc Date: Thu, 12 Sep 2024 15:09:59 +0000 Subject: [PATCH] feat: Support NebulaGraph Signed-off-by: feathercyc --- core/Cargo.lock | 261 ++++++++++++-- core/Cargo.toml | 6 + core/src/services/mod.rs | 3 + core/src/services/nebula_graph/backend.rs | 395 ++++++++++++++++++++++ core/src/services/nebula_graph/config.rs | 64 ++++ core/src/services/nebula_graph/docs.md | 53 +++ core/src/services/nebula_graph/mod.rs | 24 ++ core/src/types/operator/builder.rs | 2 + core/src/types/scheme.rs | 6 + 9 files changed, 793 insertions(+), 21 deletions(-) create mode 100644 core/src/services/nebula_graph/backend.rs create mode 100644 core/src/services/nebula_graph/config.rs create mode 100644 core/src/services/nebula_graph/docs.md create mode 100644 core/src/services/nebula_graph/mod.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index eacc8122a373..938e26169327 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -226,7 +226,7 @@ checksum = "4dcb391558246d27a13f195c1e3a53eda422270fdd452bd57a5aa9c1da1bb198" dependencies = [ "async-backtrace-attributes", "dashmap 5.5.3", - "futures", + "futures 0.3.30", "loom", "once_cell", "pin-project-lite", @@ -268,6 +268,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compat" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-executor" version = "1.13.1" @@ -348,6 +361,16 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "async-sleep" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c327a532ed3acb8ad885b50bb2ea5fc7c132a396dd990cf855d2825fbdc16c6c" +dependencies = [ + "futures-util", + "tokio", +] + [[package]] name = "async-std" version = "1.13.0" @@ -432,7 +455,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" dependencies = [ - "futures", + "futures 0.3.30", "pharos", "rustc_version", ] @@ -1031,6 +1054,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" +[[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" + [[package]] name = "base64" version = "0.13.1" @@ -1280,6 +1309,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "bufsize" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7864afba28009cd99a4d973c3de89cc766b800cdf1bd909966d454906f3bce5d" +dependencies = [ + "bytes", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1331,6 +1369,9 @@ name = "bytes" version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +dependencies = [ + "serde", +] [[package]] name = "bytes-utils" @@ -1361,7 +1402,7 @@ checksum = "a61ff12b19d89c752c213316b87fdb4a587f073d219b893cc56974b8c9f39bf7" dependencies = [ "digest", "either", - "futures", + "futures 0.3.30", "hex", "libc", "memmap2", @@ -1819,6 +1860,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-cstr" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3d0b5ff30645a68f35ece8cea4556ca14ef8a1651455f789a099a0513532a6" + [[package]] name = "const-oid" version = "0.9.6" @@ -1926,7 +1973,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", - "futures", + "futures 0.3.30", "is-terminal", "itertools 0.10.5", "num-traits", @@ -2377,7 +2424,7 @@ dependencies = [ name = "edge_test_file_write_on_full_disk" version = "0.50.0" dependencies = [ - "futures", + "futures 0.3.30", "opendal", "rand 0.8.5", "tokio", @@ -2607,6 +2654,49 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +[[package]] +name = "fbthrift-git" +version = "0.0.7+b377f46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e19f309ed5608a8fe140931ffd5a247ceee2e4a5d81d61d5648965197c8a481" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.11.0", + "bufsize", + "bytes", + "futures 0.3.30", + "ghost", + "num-derive", + "num-traits", + "ordered-float", + "panic-message", + "serde_json", + "thiserror", +] + +[[package]] +name = "fbthrift-transport" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7ab5cd4fdcb4337457ebdc6ed136b45ad20e2d8198f1e23a01cf4ea8516dd5c" +dependencies = [ + "anyhow", + "async-compat", + "async-sleep", + "bytes", + "fbthrift-git", + "fbthrift-transport-response-handler", + "futures-util", + "tokio", +] + +[[package]] +name = "fbthrift-transport-response-handler" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aed56a6ebf5f73302af76a9d4713c9d1de7924b81aa02d29df481cf69e3dfad1" + [[package]] name = "ff" version = "0.12.1" @@ -2707,7 +2797,7 @@ dependencies = [ "foundationdb-gen", "foundationdb-macros", "foundationdb-sys", - "futures", + "futures 0.3.30", "memchr", "rand 0.8.5", "serde", @@ -2770,6 +2860,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.30" @@ -2918,6 +3014,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures 0.1.31", "futures-channel", "futures-core", "futures-io", @@ -3080,6 +3177,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "ghost" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0e085ded9f1267c32176b40921b9754c474f7dd96f7e808d4a982e48aa1e854" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "gimli" version = "0.31.0" @@ -3112,7 +3220,7 @@ checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" dependencies = [ "cfg-if", "dashmap 5.5.3", - "futures", + "futures 0.3.30", "futures-timer", "no-std-compat", "nonzero_ext", @@ -3236,7 +3344,7 @@ dependencies = [ "crc", "ctr", "des", - "futures", + "futures 0.3.30", "g2p", "hex", "hmac", @@ -3277,7 +3385,7 @@ checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" dependencies = [ "blocking", "errno", - "futures", + "futures 0.3.30", "hdfs-sys", "libc", "log", @@ -4427,6 +4535,73 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nebula-fbthrift-common-v3" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33209e11130d0e445be030b35d07919cfff76c83214757552ef7912b40ccb447" +dependencies = [ + "anyhow", + "fbthrift-git", + "nebula-fbthrift-double", +] + +[[package]] +name = "nebula-fbthrift-double" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15562daf67d2f2b17d42ad028d10c2400a718e9f925b9374fc1f81a41fdcdb66" +dependencies = [ + "anyhow", + "fbthrift-git", +] + +[[package]] +name = "nebula-fbthrift-graph-v3" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5c6eb24c1693b6b6185566bf3ba6fe5d4fcc4efe96e04aabad6d35bddae37a4" +dependencies = [ + "anyhow", + "async-trait", + "const-cstr", + "fbthrift-git", + "futures 0.3.30", + "nebula-fbthrift-common-v3", + "tracing", +] + +[[package]] +name = "nebula-fbthrift-meta-v3" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46657e74bc5273cbc711693783735bc99f872295ba163a5f8dbafc4deda616c" +dependencies = [ + "anyhow", + "async-trait", + "const-cstr", + "fbthrift-git", + "futures 0.3.30", + "nebula-fbthrift-common-v3", + "tracing", +] + +[[package]] +name = "nebula-fbthrift-storage-v3" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc9f7e8f0ee39a98e0b063f4331052951dafa2603d4ffe8b3aff97da96e503b4" +dependencies = [ + "anyhow", + "async-trait", + "const-cstr", + "fbthrift-git", + "futures 0.3.30", + "nebula-fbthrift-common-v3", + "nebula-fbthrift-meta-v3", + "tracing", +] + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -4589,7 +4764,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "futures", + "futures 0.3.30", "humantime", "itertools 0.11.0", "parking_lot 0.12.3", @@ -4640,7 +4815,7 @@ dependencies = [ "flagset", "flume", "foundationdb", - "futures", + "futures 0.3.30", "getrandom 0.2.15", "governor", "hdfs-native", @@ -4677,12 +4852,14 @@ dependencies = [ "reqsign", "reqwest 0.12.7", "rocksdb", + "rust-nebula", "serde", "serde_json", "sha1", "sha2", "size", "sled", + "snowflaked", "sqlx", "suppaftp", "surrealdb", @@ -4725,7 +4902,7 @@ dependencies = [ name = "opendal-examples-basic" version = "0.50.0" dependencies = [ - "futures", + "futures 0.3.30", "opendal", "tokio", ] @@ -4734,7 +4911,7 @@ dependencies = [ name = "opendal-examples-concurrent-upload" version = "0.50.0" dependencies = [ - "futures", + "futures 0.3.30", "opendal", "tokio", ] @@ -4743,7 +4920,7 @@ dependencies = [ name = "opendal-examples-multipart-upload" version = "0.50.0" dependencies = [ - "futures", + "futures 0.3.30", "opendal", "tokio", ] @@ -4963,6 +5140,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", + "serde", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -5006,6 +5193,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + [[package]] name = "parking" version = "2.2.1" @@ -5177,7 +5370,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" dependencies = [ - "futures", + "futures 0.3.30", "rustc_version", ] @@ -5878,7 +6071,7 @@ dependencies = [ "bytes", "combine", "crc16", - "futures", + "futures 0.3.30", "futures-util", "itoa", "log", @@ -6336,6 +6529,23 @@ dependencies = [ "trim-in-place", ] +[[package]] +name = "rust-nebula" +version = "0.1.0" +source = "git+https://github.com/nebula-contrib/rust-nebula#2c78e2783a8d9c566632a23114a88c795560e32a" +dependencies = [ + "async-trait", + "bb8", + "bytes", + "fbthrift-git", + "fbthrift-transport", + "fbthrift-transport-response-handler", + "nebula-fbthrift-graph-v3", + "nebula-fbthrift-meta-v3", + "nebula-fbthrift-storage-v3", + "serde", +] + [[package]] name = "rust-stemmers" version = "1.2.0" @@ -6956,6 +7166,15 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "snowflaked" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "398d462c4c454399be452039b24b0aa0ecb4c7a57f6ae615f5d25de2b032f850" +dependencies = [ + "loom", +] + [[package]] name = "socket2" version = "0.5.7" @@ -7390,7 +7609,7 @@ dependencies = [ "chrono", "dmp", "flume", - "futures", + "futures 0.3.30", "futures-concurrency", "geo 0.27.0", "indexmap 2.5.0", @@ -7440,7 +7659,7 @@ dependencies = [ "deunicode", "dmp", "fst", - "futures", + "futures 0.3.30", "fuzzy-matcher", "geo 0.27.0", "geo-types", @@ -7703,7 +7922,7 @@ dependencies = [ "derive-new", "either", "fail", - "futures", + "futures 0.3.30", "lazy_static", "log", "pin-project", @@ -8576,7 +8795,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf" dependencies = [ - "futures", + "futures 0.3.30", "js-sys", "parking_lot 0.12.3", "pin-utils", @@ -8970,7 +9189,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" dependencies = [ "async_io_stream", - "futures", + "futures 0.3.30", "js-sys", "log", "pharos", diff --git a/core/Cargo.toml b/core/Cargo.toml index accbee1cfb12..bdf292699404 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -163,6 +163,7 @@ services-moka = ["dep:moka"] services-mongodb = ["dep:mongodb"] services-monoiofs = ["dep:monoio", "dep:flume"] services-mysql = ["dep:sqlx", "sqlx?/mysql"] +services-nebulagraph = ["dep:rust-nebula", "dep:bb8", "dep:snowflaked"] services-obs = [ "dep:reqsign", "reqsign?/services-huaweicloud", @@ -342,6 +343,11 @@ compio = { version = "0.11.0", optional = true, features = [ ] } # for services-s3 crc32c = { version = "0.6.6", optional = true } +# for services-nebula-graph +rust-nebula = { version = "0.1", optional = true, git = "https://github.com/nebula-contrib/rust-nebula", features = [ + "graph", +] } +snowflaked = { version = "1", optional = true, features = ["sync"] } # for services-monoiofs flume = { version = "0.11", optional = true } monoio = { version = "0.2.4", optional = true, features = [ diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 753838b31314..0437dff4a759 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -145,6 +145,9 @@ pub use monoiofs::*; mod mysql; pub use self::mysql::*; +mod nebula_graph; +pub use nebula_graph::*; + mod obs; pub use obs::*; diff --git a/core/src/services/nebula_graph/backend.rs b/core/src/services/nebula_graph/backend.rs new file mode 100644 index 000000000000..66c0ce3f633c --- /dev/null +++ b/core/src/services/nebula_graph/backend.rs @@ -0,0 +1,395 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +#[cfg(feature = "tests")] +use std::time::Duration; + +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::engine::Engine as _; +use bb8::{PooledConnection, RunError}; +use rust_nebula::{ + graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager, +}; +use snowflaked::sync::Generator; +use tokio::sync::OnceCell; + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::services::NebulaGraphConfig; +use crate::*; + +static GENERATOR: Generator = Generator::new(0); + +impl Configurator for NebulaGraphConfig { + type Builder = NebulaGraphBuilder; + fn into_builder(self) -> Self::Builder { + NebulaGraphBuilder { config: self } + } +} + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct NebulaGraphBuilder { + config: NebulaGraphConfig, +} + +impl Debug for NebulaGraphBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("MysqlBuilder"); + + d.field("config", &self.config).finish() + } +} + +impl NebulaGraphBuilder { + /// Set the host addr of nebulagraph's graphd server + pub fn host(&mut self, host: &str) -> &mut Self { + if !host.is_empty() { + self.config.host = Some(host.to_string()); + } + self + } + + /// Set the host port of nebulagraph's graphd server + pub fn port(&mut self, port: u16) -> &mut Self { + self.config.port = Some(port); + self + } + + /// Set the username of nebulagraph's graphd server + pub fn username(&mut self, username: &str) -> &mut Self { + if !username.is_empty() { + self.config.username = Some(username.to_string()); + } + self + } + + /// Set the password of nebulagraph's graphd server + pub fn password(&mut self, password: &str) -> &mut Self { + if !password.is_empty() { + self.config.password = Some(password.to_string()); + } + self + } + + /// Set the space name of nebulagraph's graphd server + pub fn space(&mut self, space: &str) -> &mut Self { + if !space.is_empty() { + self.config.space = Some(space.to_string()); + } + self + } + + /// Set the tag name of nebulagraph's graphd server + pub fn tag(&mut self, tag: &str) -> &mut Self { + if !tag.is_empty() { + self.config.tag = Some(tag.to_string()); + } + self + } + + /// Set the key field name of the NebulaGraph service to read/write. + /// + /// Default to `key` if not specified. + pub fn key_field(&mut self, key_field: &str) -> &mut Self { + if !key_field.is_empty() { + self.config.key_field = Some(key_field.to_string()); + } + self + } + + /// Set the value field name of the NebulaGraph service to read/write. + /// + /// Default to `value` if not specified. + pub fn value_field(&mut self, value_field: &str) -> &mut Self { + if !value_field.is_empty() { + self.config.value_field = Some(value_field.to_string()); + } + self + } + + /// set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.config.root = Some(root.to_string()); + } + self + } +} + +impl Builder for NebulaGraphBuilder { + const SCHEME: Scheme = Scheme::NebulaGraph; + type Config = NebulaGraphConfig; + + fn build(self) -> Result { + let host = match self.config.host.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "host is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let port = match self.config.port { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "port is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let username = match self.config.username.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "username is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let password = match self.config.password.clone() { + Some(v) => v, + None => "".to_string(), + }; + let space = match self.config.space.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "space is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let tag = match self.config.tag.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "tag is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let key_field = match self.config.key_field.clone() { + Some(v) => v, + None => "key".to_string(), + }; + let value_field = match self.config.value_field.clone() { + Some(v) => v, + None => "value".to_string(), + }; + let root = normalize_root( + self.config + .root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + let mut session_config = SingleConnSessionConf::new( + vec![HostAddress::new(&host, port)], + username, + password, + Some(space), + ); + // NebulaGraph use fbthrift for communication. fbthrift's max_buffer_size is default 4 KB, + // which is too small to store something. + // So we could set max_buffer_size to 10 MB so that NebulaGraph can store files with filesize < 1 MB at least. + session_config.set_buf_size(1024 * 1024); + session_config.set_max_buf_size(64 * 1024 * 1024); + session_config.set_max_parse_response_bytes_count(254); + + Ok(NebulaGraphBackend::new(Adapter { + session_pool: OnceCell::new(), + session_config, + + tag, + key_field, + value_field, + }) + .with_root(root.as_str())) + } +} + +/// Backend for NebulaGraph service +pub type NebulaGraphBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + session_pool: OnceCell>, + session_config: SingleConnSessionConf, + + tag: String, + key_field: String, + value_field: String, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Adapter") + .field("session_config", &self.session_config) + .field("tag", &self.tag) + .field("key_field", &self.key_field) + .field("value_field", &self.value_field) + .finish() + } +} + +impl Adapter { + async fn get_session(&self) -> Result> { + let session_pool = self + .session_pool + .get_or_try_init(|| async { + bb8::Pool::builder() + .max_size(64) + .build(SingleConnSessionManager::new(self.session_config.clone())) + .await + }) + .await + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary())?; + + session_pool.get().await.map_err(|err| match err { + RunError::User(err) => { + Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary() + } + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + } + }) + } +} + +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::NebulaGraph, + &self.session_config.space.clone().unwrap(), + Capability { + read: true, + write: true, + write_total_max_size: Some(1024 * 1024), + write_can_empty: true, + delete: true, + list: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result> { + let path = path.replace("'", "\\'").replace('"', "\\\""); + let query = format!( + "LOOKUP ON {} WHERE {}.{} == '{}' YIELD properties(vertex).{} AS {};", + self.tag, self.tag, self.key_field, path, self.value_field, self.value_field + ); + let mut sess = self.get_session().await?; + let result = sess + .query(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + if result.is_empty() { + Ok(None) + } else { + let row = result + .get_row_values_by_index(0) + .map_err(parse_nebulagraph_dataset_error)?; + let value = row + .get_value_by_col_name(&self.value_field) + .map_err(parse_nebulagraph_dataset_error)?; + let base64_str = value.as_string().map_err(parse_nebulagraph_dataset_error)?; + let value_str = BASE64.decode(base64_str).map_err(|err| { + Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph") + .set_source(err) + })?; + let buf = Buffer::from(value_str); + Ok(Some(buf)) + } + } + + async fn set(&self, path: &str, value: Buffer) -> Result<()> { + #[cfg(feature = "tests")] + let path_copy = path; + + self.delete(path).await?; + let path = path.replace("'", "\\'").replace('"', "\\\""); + let file = value.to_vec(); + let file = BASE64.encode(&file); + let snowflake_id: u64 = GENERATOR.generate(); + let query = format!( + "INSERT VERTEX {} VALUES {}:('{}', '{}');", + self.tag, snowflake_id, path, file + ); + let mut sess = self.get_session().await?; + sess.execute(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + + // To pass tests, we should confirm NebulaGraph has inserted data successfully + #[cfg(feature = "tests")] + loop { + let v = self.get(path_copy).await.unwrap(); + if v.is_none() { + std::thread::sleep(Duration::from_millis(1000)); + } else { + break; + } + } + Ok(()) + } + + async fn delete(&self, path: &str) -> Result<()> { + let path = path.replace("'", "\\'").replace('"', "\\\""); + let query = format!( + "LOOKUP ON {} WHERE {}.{} == '{}' YIELD id(vertex) AS id | DELETE VERTEX $-.id;", + self.tag, self.tag, self.key_field, path + ); + let mut sess = self.get_session().await?; + sess.execute(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + Ok(()) + } + + async fn scan(&self, path: &str) -> Result> { + let path = path.replace("'", "\\'").replace('"', "\\\""); + let query = format!( + "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};", + self.tag, self.tag, self.key_field, path, self.key_field, self.key_field + ); + + let mut sess = self.get_session().await?; + let result = sess + .query(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + let mut res_vec = vec![]; + for row_i in 0..result.get_row_size() { + let row = result + .get_row_values_by_index(row_i) + .map_err(parse_nebulagraph_dataset_error)?; + let value = row + .get_value_by_col_name(&self.key_field) + .map_err(parse_nebulagraph_dataset_error)?; + let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?; + + res_vec.push(sub_path); + } + Ok(res_vec) + } +} + +fn parse_nebulagraph_session_error(err: rust_nebula::SingleConnSessionError) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err) +} + +fn parse_nebulagraph_dataset_error(err: rust_nebula::DataSetError) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err) +} diff --git a/core/src/services/nebula_graph/config.rs b/core/src/services/nebula_graph/config.rs new file mode 100644 index 000000000000..6c3da6c7c388 --- /dev/null +++ b/core/src/services/nebula_graph/config.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use serde::Deserialize; +use serde::Serialize; + +/// Config for Mysql services support. +#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +#[non_exhaustive] +pub struct NebulaGraphConfig { + /// The host addr of nebulagraph's graphd server + pub host: Option, + /// The host port of nebulagraph's graphd server + pub port: Option, + /// The username of nebulagraph's graphd server + pub username: Option, + /// The password of nebulagraph's graphd server + pub password: Option, + + /// The space name of nebulagraph's graphd server + pub space: Option, + /// The tag name of nebulagraph's graphd server + pub tag: Option, + /// The key field name of the NebulaGraph service to read/write. + pub key_field: Option, + /// The value field name of the NebulaGraph service to read/write. + pub value_field: Option, + /// The root for NebulaGraph + pub root: Option, +} + +impl Debug for NebulaGraphConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("NebulaGraphConfig"); + + d.field("host", &self.host) + .field("port", &self.port) + .field("username", &self.username) + .field("password", &self.password) + .field("space", &self.space) + .field("tag", &self.tag) + .field("key_field", &self.key_field) + .field("value_field", &self.value_field) + .field("root", &self.root) + .finish() + } +} diff --git a/core/src/services/nebula_graph/docs.md b/core/src/services/nebula_graph/docs.md new file mode 100644 index 000000000000..bd219e717694 --- /dev/null +++ b/core/src/services/nebula_graph/docs.md @@ -0,0 +1,53 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [x] list +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `host`: Set the host address of NebulaGraph's graphd server +- `port`: Set the port of NebulaGraph's graphd server +- `username`: Set the username of NebulaGraph's graphd server +- `password`: Set the password of NebulaGraph's graphd server +- `space`: Set the passspaceword of NebulaGraph +- `tag`: Set the tag of NebulaGraph +- `key_field`: Set the key_field of NebulaGraph +- `value_field`: Set the value_field of NebulaGraph + +## Example + +### Via Builder + +```rust,no_run +use anyhow::Result; +use opendal::services::NebulaGraph; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = NebulaGraph::default(); + builder.root("/"); + builder.host("127.0.0.1"); + builder.port(9669); + builder.space("your_space"); + builder.tag("your_tag"); + // key field type in the table should be compatible with Rust's &str like text + builder.key_field("key"); + // value field type in the table should be compatible with Rust's Vec like bytea + builder.value_field("value"); + + let op = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/nebula_graph/mod.rs b/core/src/services/nebula_graph/mod.rs new file mode 100644 index 000000000000..312bee917b97 --- /dev/null +++ b/core/src/services/nebula_graph/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(feature = "services-nebulagraph")] +mod backend; +#[cfg(feature = "services-nebulagraph")] +pub use backend::NebulaGraphBuilder as NebulaGraph; + +mod config; +pub use config::NebulaGraphConfig; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 4691eda6b249..f494ca9fba14 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -292,6 +292,8 @@ impl Operator { Scheme::HdfsNative => Self::from_iter::(iter)?.finish(), #[cfg(feature = "services-lakefs")] Scheme::Lakefs => Self::from_iter::(iter)?.finish(), + #[cfg(feature = "services-nebulagraph")] + Scheme::NebulaGraph => Self::from_iter::(iter)?.finish(), v => { return Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 4dd8acc69fb4..f7487f109239 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -165,6 +165,8 @@ pub enum Scheme { Surrealdb, /// [lakefs](crate::services::Lakefs): LakeFS Services Lakefs, + /// [NebulaGraph](crate::services::NebulaGraph): NebulaGraph Services + NebulaGraph, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -315,6 +317,8 @@ impl Scheme { Scheme::Surrealdb, #[cfg(feature = "services-lakefs")] Scheme::Lakefs, + #[cfg(feature = "services-nebulagraph")] + Scheme::NebulaGraph, ]) } } @@ -406,6 +410,7 @@ impl FromStr for Scheme { "hdfs_native" => Ok(Scheme::HdfsNative), "surrealdb" => Ok(Scheme::Surrealdb), "lakefs" => Ok(Scheme::Lakefs), + "nebulagraph" => Ok(Scheme::NebulaGraph), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -480,6 +485,7 @@ impl From for &'static str { Scheme::HdfsNative => "hdfs_native", Scheme::Surrealdb => "surrealdb", Scheme::Lakefs => "lakefs", + Scheme::NebulaGraph => "nebulagraph", Scheme::Custom(v) => v, } }