Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate service dashmap #2225

Merged
merged 8 commits into from
May 8, 2023
Merged
100 changes: 98 additions & 2 deletions core/src/raw/adapters/typed_kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;

use crate::*;
use crate::EntryMode;
use crate::Error;
use crate::ErrorKind;
use crate::Metadata;
use crate::Result;
use crate::Scheme;

/// Adapter is the typed adapter to underlying kv services.
///
Expand All @@ -39,7 +44,7 @@ use crate::*;
#[async_trait]
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
/// Get the scheme and name of current adapter.
fn metadata(&self) -> (Scheme, String);
fn info(&self) -> Info;

/// Get a value from adapter.
async fn get(&self, path: &str) -> Result<Option<Value>>;
Expand All @@ -58,6 +63,29 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {

/// Delete a value from adapter.
fn blocking_delete(&self, path: &str) -> Result<()>;

/// Scan a key prefix to get all keys that start with this key.
async fn scan(&self, path: &str) -> Result<Vec<String>> {
let _ = path;

Err(Error::new(
ErrorKind::Unsupported,
"typed_kv adapter doesn't support this operation",
)
.with_operation("typed_kv::Adapter::scan"))
}

/// Scan a key prefix to get all keys that start with this key
/// in blocking way.
fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
let _ = path;

Err(Error::new(
ErrorKind::Unsupported,
"typed_kv adapter doesn't support this operation",
)
.with_operation("typed_kv::Adapter::blocking_scan"))
}
}

/// Value is the typed value stored in adapter.
Expand Down Expand Up @@ -87,3 +115,71 @@ impl Value {
size_of::<Metadata>() + self.value.len()
}
}

/// Capability is used to describe what operations are supported
/// by Typed KV Operator.
#[derive(Copy, Clone, Default)]
pub struct Capability {
/// If typed_kv operator supports get natively, it will be true.
pub get: bool,
/// If typed_kv operator supports set natively, it will be true.
pub set: bool,
/// If typed_kv operator supports delete natively, it will be true.
pub delete: bool,
/// If typed_kv operator supports scan natively, it will be true.
pub scan: bool,
}

impl Debug for Capability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = vec![];

if self.get {
s.push("Get")
}
if self.set {
s.push("Set");
}
if self.delete {
s.push("Delete");
}
if self.scan {
s.push("Scan");
}

write!(f, "{{ {} }}", s.join(" | "))
}
}

/// Info for this key value accessor.
pub struct Info {
scheme: Scheme,
name: String,
capabilities: Capability,
PsiACE marked this conversation as resolved.
Show resolved Hide resolved
}

impl Info {
/// Create a new KeyValueAccessorInfo.
pub fn new(scheme: Scheme, name: &str, capabilities: Capability) -> Self {
Self {
scheme,
name: name.to_string(),
capabilities,
}
}

/// Get the scheme.
pub fn scheme(&self) -> Scheme {
self.scheme
}

/// Get the name.
pub fn name(&self) -> &str {
&self.name
}

/// Get the capabilities.
pub fn capabilities(&self) -> Capability {
self.capabilities
}
}
110 changes: 89 additions & 21 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,37 @@ impl<S: Adapter> Accessor for Backend<S> {
type BlockingReader = oio::Cursor;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Pager = ();
type BlockingPager = ();
type Pager = KvPager;
type BlockingPager = KvPager;

fn info(&self) -> AccessorInfo {
let (scheme, name) = self.kv.metadata();

let mut am = AccessorInfo::default();
am.set_scheme(scheme);
am.set_name(&name);
let kv_info = self.kv.info();
let mut am: AccessorInfo = AccessorInfo::default();
am.set_root(&self.root);

am.set_scheme(kv_info.scheme());
am.set_name(kv_info.name());
let kv_cap = kv_info.capabilities();
let cap = am.capability_mut();
cap.read = true;
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;

cap.write = true;
cap.write_with_cache_control = true;
cap.write_with_content_disposition = true;
cap.write_with_content_type = true;
cap.write_without_content_length = true;
cap.create_dir = true;
cap.delete = true;
if kv_cap.get {
cap.read = true;
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;
}

if kv_cap.set {
cap.write = true;
cap.create_dir = true;
}

if kv_cap.delete {
cap.delete = true;
}

if kv_cap.scan {
cap.scan = true;
}

am
}
Expand Down Expand Up @@ -182,6 +188,22 @@ impl<S: Adapter> Accessor for Backend<S> {
self.kv.blocking_delete(&p)?;
Ok(RpDelete::default())
}

async fn scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::Pager)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.scan(&p).await?;
let pager = KvPager::new(&self.root, res);

Ok((RpScan::default(), pager))
}

fn blocking_scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.blocking_scan(&p)?;
let pager = KvPager::new(&self.root, res);

Ok((RpScan::default(), pager))
}
}

impl<S> Backend<S>
Expand All @@ -204,6 +226,52 @@ where
}
}

pub struct KvPager {
root: String,
inner: Option<Vec<String>>,
}

impl KvPager {
fn new(root: &str, inner: Vec<String>) -> Self {
Self {
root: root.to_string(),
inner: Some(inner),
}
}

fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> {
let res = self
.inner
.take()?
.into_iter()
.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
EntryMode::FILE
};

oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode))
})
.collect();

Some(res)
}
}

#[async_trait]
impl oio::Page for KvPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Ok(self.inner_next_page())
}
}

impl oio::BlockingPage for KvPager {
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Ok(self.inner_next_page())
}
}

pub struct KvWriter<S> {
kv: Arc<S>,
path: String,
Expand Down
2 changes: 2 additions & 0 deletions core/src/raw/adapters/typed_kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

mod api;
pub use api::Adapter;
pub use api::Capability;
pub use api::Info;
pub use api::Value;

mod backend;
Expand Down
32 changes: 16 additions & 16 deletions core/src/services/dashmap/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::Debug;
use async_trait::async_trait;
use dashmap::DashMap;

use crate::raw::adapters::kv;
use crate::raw::adapters::typed_kv;
use crate::*;

/// [dashmap](https://github.com/xacrimon/dashmap) backend support.
Expand Down Expand Up @@ -70,45 +70,45 @@ impl Builder for DashmapBuilder {
}

/// Backend is used to serve `Accessor` support in dashmap.
pub type DashmapBackend = kv::Backend<Adapter>;
pub type DashmapBackend = typed_kv::Backend<Adapter>;

#[derive(Debug, Clone)]
pub struct Adapter {
inner: DashMap<String, Vec<u8>>,
inner: DashMap<String, typed_kv::Value>,
}

#[async_trait]
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
impl typed_kv::Adapter for Adapter {
fn info(&self) -> typed_kv::Info {
typed_kv::Info::new(
Scheme::Dashmap,
&format!("{:?}", &self.inner as *const _),
Capability {
read: true,
write: true,
typed_kv::Capability {
get: true,
set: true,
scan: true,
..Default::default()
delete: true,
},
)
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
async fn get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
self.blocking_get(path)
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
fn blocking_get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
match self.inner.get(path) {
None => Ok(None),
Some(bs) => Ok(Some(bs.to_vec())),
Some(bs) => Ok(Some(bs.value().to_owned())),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.blocking_set(path, value)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
self.inner.insert(path.to_string(), value.to_vec());
fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.inner.insert(path.to_string(), value);

Ok(())
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/services/moka/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,16 @@ impl Debug for Adapter {

#[async_trait]
impl typed_kv::Adapter for Adapter {
fn metadata(&self) -> (Scheme, String) {
(
fn info(&self) -> typed_kv::Info {
typed_kv::Info::new(
Scheme::Moka,
self.inner.name().unwrap_or("moka").to_string(),
self.inner.name().unwrap_or("moka"),
typed_kv::Capability {
get: true,
set: true,
delete: true,
..Default::default()
},
)
}

Expand Down