Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate service dashmap #2225

Merged
merged 8 commits into from
May 8, 2023
78 changes: 76 additions & 2 deletions core/src/raw/adapters/typed_kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;

use crate::*;
use crate::raw::*;
use crate::Capability;
use crate::EntryMode;
use crate::Error;
use crate::ErrorKind;
use crate::Metadata;
use crate::Result;
use crate::Scheme;

/// Adapter is the typed adapter to underlying kv services.
///
Expand All @@ -39,7 +46,7 @@ use crate::*;
#[async_trait]
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
/// Get the scheme and name of current adapter.
fn metadata(&self) -> (Scheme, String);
fn info(&self) -> Info;

/// Get a value from adapter.
async fn get(&self, path: &str) -> Result<Option<Value>>;
Expand All @@ -58,6 +65,29 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {

/// Delete a value from adapter.
fn blocking_delete(&self, path: &str) -> Result<()>;

/// Scan a key prefix to get all keys that start with this key.
async fn scan(&self, path: &str) -> Result<Vec<String>> {
let _ = path;

Err(Error::new(
ErrorKind::Unsupported,
"typed_kv adapter doesn't support this operation",
)
.with_operation("typed_kv::Adapter::scan"))
}

/// Scan a key prefix to get all keys that start with this key
/// in blocking way.
fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
let _ = path;

Err(Error::new(
ErrorKind::Unsupported,
"typed_kv adapter doesn't support this operation",
)
.with_operation("typed_kv::Adapter::blocking_scan"))
}
}

/// Value is the typed value stored in adapter.
Expand Down Expand Up @@ -87,3 +117,47 @@ impl Value {
size_of::<Metadata>() + self.value.len()
}
}

/// Info for this key value accessor.
pub struct Info {
scheme: Scheme,
name: String,
capabilities: Capability,
PsiACE marked this conversation as resolved.
Show resolved Hide resolved
}

impl Info {
/// Create a new KeyValueAccessorInfo.
pub fn new(scheme: Scheme, name: &str, capabilities: Capability) -> Self {
Self {
scheme,
name: name.to_string(),
capabilities,
}
}

/// Get the scheme.
pub fn scheme(&self) -> Scheme {
self.scheme
}

/// Get the name.
pub fn name(&self) -> &str {
&self.name
}

/// Get the capabilities.
pub fn capabilities(&self) -> Capability {
self.capabilities
}
}

impl From<Info> for AccessorInfo {
PsiACE marked this conversation as resolved.
Show resolved Hide resolved
fn from(m: Info) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_name(m.name());
am.set_scheme(m.scheme());
am.set_capability(m.capabilities());

am
}
}
96 changes: 76 additions & 20 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,25 @@ impl<S: Adapter> Accessor for Backend<S> {
type BlockingReader = oio::Cursor;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Pager = ();
type BlockingPager = ();
type Pager = KvPager;
type BlockingPager = KvPager;

fn info(&self) -> AccessorInfo {
let (scheme, name) = self.kv.metadata();

let mut am = AccessorInfo::default();
am.set_scheme(scheme);
am.set_name(&name);
let mut am: AccessorInfo = self.kv.info().into();
am.set_root(&self.root);

let cap = am.capability_mut();
cap.read = true;
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;

cap.write = true;
cap.write_with_cache_control = true;
cap.write_with_content_disposition = true;
cap.write_with_content_type = true;
cap.write_without_content_length = true;
cap.create_dir = true;
cap.delete = true;
if cap.read {
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;
}

if cap.write {
cap.create_dir = true;
cap.delete = true;
}

am
}
Expand Down Expand Up @@ -182,6 +176,22 @@ impl<S: Adapter> Accessor for Backend<S> {
self.kv.blocking_delete(&p)?;
Ok(RpDelete::default())
}

async fn scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::Pager)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.scan(&p).await?;
let pager = KvPager::new(&self.root, res);

Ok((RpScan::default(), pager))
}

fn blocking_scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.blocking_scan(&p)?;
let pager = KvPager::new(&self.root, res);

Ok((RpScan::default(), pager))
}
}

impl<S> Backend<S>
Expand All @@ -204,6 +214,52 @@ where
}
}

pub struct KvPager {
root: String,
inner: Option<Vec<String>>,
}

impl KvPager {
fn new(root: &str, inner: Vec<String>) -> Self {
Self {
root: root.to_string(),
inner: Some(inner),
}
}

fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> {
let res = self
.inner
.take()?
.into_iter()
.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
EntryMode::FILE
};

oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode))
})
.collect();

Some(res)
}
}

#[async_trait]
impl oio::Page for KvPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Ok(self.inner_next_page())
}
}

impl oio::BlockingPage for KvPager {
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Ok(self.inner_next_page())
}
}

pub struct KvWriter<S> {
kv: Arc<S>,
path: String,
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/adapters/typed_kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

mod api;
pub use api::Adapter;
pub use api::Info;
pub use api::Value;

mod backend;
Expand Down
24 changes: 12 additions & 12 deletions core/src/services/dashmap/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::Debug;
use async_trait::async_trait;
use dashmap::DashMap;

use crate::raw::adapters::kv;
use crate::raw::adapters::typed_kv;
use crate::*;

/// [dashmap](https://github.com/xacrimon/dashmap) backend support.
Expand Down Expand Up @@ -70,17 +70,17 @@ impl Builder for DashmapBuilder {
}

/// Backend is used to serve `Accessor` support in dashmap.
pub type DashmapBackend = kv::Backend<Adapter>;
pub type DashmapBackend = typed_kv::Backend<Adapter>;

#[derive(Debug, Clone)]
pub struct Adapter {
inner: DashMap<String, Vec<u8>>,
inner: DashMap<String, typed_kv::Value>,
}

#[async_trait]
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
impl typed_kv::Adapter for Adapter {
fn info(&self) -> typed_kv::Info {
typed_kv::Info::new(
Scheme::Dashmap,
&format!("{:?}", &self.inner as *const _),
Capability {
Expand All @@ -92,23 +92,23 @@ impl kv::Adapter for Adapter {
)
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
async fn get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
self.blocking_get(path)
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
fn blocking_get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
match self.inner.get(path) {
None => Ok(None),
Some(bs) => Ok(Some(bs.to_vec())),
Some(bs) => Ok(Some(bs.value().to_owned())),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.blocking_set(path, value)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
self.inner.insert(path.to_string(), value.to_vec());
fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.inner.insert(path.to_string(), value);

Ok(())
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/services/moka/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,16 @@ impl Debug for Adapter {

#[async_trait]
impl typed_kv::Adapter for Adapter {
fn metadata(&self) -> (Scheme, String) {
(
fn info(&self) -> typed_kv::Info {
typed_kv::Info::new(
Scheme::Moka,
self.inner.name().unwrap_or("moka").to_string(),
self.inner.name().unwrap_or("moka"),
Capability {
read: true,
write: true,

..Default::default()
},
)
}

Expand Down