Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let workspace own doc #26

Merged
merged 1 commit into from
Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 33 additions & 39 deletions apps/keck/src/server/api/blocks/block.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use jwst::{BlockHistory, InsertChildren, RemoveChildren, Workspace};
use jwst::{BlockHistory, InsertChildren, RemoveChildren};

#[utoipa::path(
get,
Expand All @@ -22,11 +22,9 @@ pub async fn get_block(
) -> impl IntoResponse {
let (workspace, block) = params;
info!("get_block: {}, {}", workspace, block);
if let Some(doc) = context.doc.get(&workspace) {
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
if let Some(block) = workspace.get(block, doc.client_id) {
if let Some(workspace) = context.workspace.get(&workspace) {
let workspace = workspace.value().lock().await;
if let Some(block) = workspace.get(block) {
Json(block).into_response()
} else {
StatusCode::NOT_FOUND.into_response()
Expand Down Expand Up @@ -64,19 +62,20 @@ pub async fn set_block(
) -> impl IntoResponse {
let (workspace, block) = params;
info!("set_block: {}, {}", workspace, block);
if let Some(doc) = context.doc.get(&workspace) {
if let Some(workspace) = context.workspace.get(&workspace) {
// init block instance
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
let mut block = workspace.create(&mut trx, &block, "text", doc.client_id);

let workspace = workspace.lock().await;
// set block content
if let Some(block_content) = payload.as_object() {
for (key, value) in block_content.iter() {
block.set(&mut trx, key, value.clone());
let block = workspace.with_trx(|mut t| {
let mut block = t.create(&block, "text");
// set block content
if let Some(block_content) = payload.as_object() {
for (key, value) in block_content.iter() {
block.set(&mut t.trx, key, value.clone());
}
}
}
block
});

// response block content
Json(block.block().to_json()).into_response()
Expand Down Expand Up @@ -107,12 +106,10 @@ pub async fn get_block_history(
) -> impl IntoResponse {
let (workspace, block) = params;
info!("get_block_history: {}, {}", workspace, block);
if let Some(doc) = context.doc.get(&workspace) {
if let Some(workspace) = context.workspace.get(&workspace) {
// init block instance
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
if let Some(block) = workspace.get(block, doc.client_id) {
let workspace = workspace.value().lock().await;
if let Some(block) = workspace.get(block) {
Json(&block.history()).into_response()
} else {
StatusCode::NOT_FOUND.into_response()
Expand Down Expand Up @@ -143,12 +140,9 @@ pub async fn delete_block(
) -> impl IntoResponse {
let (workspace, block) = params;
info!("delete_block: {}, {}", workspace, block);
if let Some(doc) = context.doc.get(&workspace) {
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
if workspace.remove(&mut trx, &block) {
trx.commit();
if let Some(workspace) = context.workspace.get(&workspace) {
let workspace = workspace.value().lock().await;
if workspace.get_trx().remove(&block) {
StatusCode::NO_CONTENT
} else {
StatusCode::NOT_FOUND
Expand Down Expand Up @@ -186,13 +180,13 @@ pub async fn insert_block(
) -> impl IntoResponse {
let (workspace, block) = params;
info!("insert_block: {}, {}", workspace, block);
if let Some(doc) = context.doc.get(&workspace) {
if let Some(workspace) = context.workspace.get(&workspace) {
// init block instance
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
if let Some(mut block) = workspace.get(block, doc.client_id) {
block.insert_children(&mut trx, payload);
let workspace = workspace.value().lock().await;
if let Some(mut block) = workspace.get(block) {
workspace.with_trx(|mut t| {
block.insert_children(&mut t.trx, payload);
});
// response block content
Json(block.block().to_json()).into_response()
} else {
Expand Down Expand Up @@ -231,13 +225,13 @@ pub async fn remove_block(
) -> impl IntoResponse {
let (workspace, block) = params;
info!("insert_block: {}, {}", workspace, block);
if let Some(doc) = context.doc.get(&workspace) {
if let Some(workspace) = context.workspace.get(&workspace) {
// init block instance
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
if let Some(mut block) = workspace.get(&block, doc.client_id) {
block.remove_children(&mut trx, payload);
let workspace = workspace.value().lock().await;
if let Some(mut block) = workspace.get(&block) {
workspace.with_trx(|mut t| {
block.remove_children(&mut t.trx, payload);
});
// response block content
Json(block.block().to_json()).into_response()
} else {
Expand Down
47 changes: 19 additions & 28 deletions apps/keck/src/server/api/blocks/workspace.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::*;
use axum::{extract::Path, http::header};
use jwst::Workspace;

#[utoipa::path(
get,
Expand All @@ -22,11 +21,9 @@ pub async fn get_workspace(
info!("get_workspace: {}", workspace);
utils::init_doc(context.clone(), &workspace).await;

if let Some(doc) = context.doc.get(&workspace) {
let doc = doc.value().lock().await;
let mut trx = doc.transact();
let workspace = Workspace::new(&mut trx, workspace);
Json(workspace).into_response()
if let Some(workspace) = context.workspace.get(&workspace) {
let workspace = workspace.lock().await;
Json(&*workspace).into_response()
} else {
StatusCode::NOT_FOUND.into_response()
}
Expand All @@ -52,10 +49,10 @@ pub async fn set_workspace(

utils::init_doc(context.clone(), &workspace).await;

let doc = context.doc.get(&workspace).unwrap();
let doc = doc.lock().await;
let workspace = context.workspace.get(&workspace).unwrap();
let workspace = workspace.lock().await;

Json(doc.transact().get_map("blocks").to_json()).into_response()
Json(workspace.doc().transact().get_map("blocks").to_json()).into_response()
}

#[utoipa::path(
Expand All @@ -76,7 +73,7 @@ pub async fn delete_workspace(
Path(workspace): Path<String>,
) -> impl IntoResponse {
info!("delete_workspace: {}", workspace);
if context.doc.remove(&workspace).is_none() {
if context.workspace.remove(&workspace).is_none() {
return StatusCode::NOT_FOUND;
}
if let Err(_) = context.db.drop(&workspace).await {
Expand All @@ -103,13 +100,9 @@ pub async fn workspace_client(
Extension(context): Extension<Arc<Context>>,
Path(workspace): Path<String>,
) -> impl IntoResponse {
if let Some(doc) = context.doc.get(&workspace) {
let doc = doc.lock().await;
(
[(header::CONTENT_TYPE, "application/json")],
doc.client_id.to_string(),
)
.into_response()
if let Some(workspace) = context.workspace.get(&workspace) {
let workspace = workspace.lock().await;
Json(workspace.client_id()).into_response()
} else {
StatusCode::NOT_FOUND.into_response()
}
Expand All @@ -132,12 +125,10 @@ pub async fn history_workspace_clients(
Extension(context): Extension<Arc<Context>>,
Path(workspace): Path<String>,
) -> impl IntoResponse {
if let Some(doc) = context.doc.get(&workspace) {
let doc = doc.lock().await;
if let Some(json) =
parse_history_client(&doc).and_then(|clients| serde_json::to_string(&clients).ok())
{
([(header::CONTENT_TYPE, "application/json")], json).into_response()
if let Some(workspace) = context.workspace.get(&workspace) {
let workspace = workspace.lock().await;
if let Some(history) = parse_history_client(workspace.doc()) {
Json(history).into_response()
} else {
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
Expand Down Expand Up @@ -166,11 +157,11 @@ pub async fn history_workspace(
Path(params): Path<(String, String)>,
) -> impl IntoResponse {
let (workspace, client) = params;
if let Some(doc) = context.doc.get(&workspace) {
let doc = doc.lock().await;
if let Some(workspace) = context.workspace.get(&workspace) {
let workspace = workspace.lock().await;
if let Ok(client) = client.parse::<u64>() {
if let Some(json) =
parse_history(&doc, client).and_then(|history| serde_json::to_string(&history).ok())
if let Some(json) = parse_history(workspace.doc(), client)
.and_then(|history| serde_json::to_string(&history).ok())
{
([(header::CONTENT_TYPE, "application/json")], json).into_response()
} else {
Expand Down Expand Up @@ -206,7 +197,7 @@ mod test {

let resp = client.post("/block/test").send().await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.text().await, "{}");
assert_eq!(resp.text().await, "{\"content\":{},\"updated\":{}}");

let resp = client.get("/block/test").send().await;
assert_eq!(resp.status(), StatusCode::OK);
Expand Down
7 changes: 3 additions & 4 deletions apps/keck/src/server/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use axum::{
Router,
};
use dashmap::DashMap;
use jwst::{parse_history, parse_history_client, RawHistory};
use jwst::{parse_history, parse_history_client, RawHistory, Workspace};
use serde_json::Value as JsonValue;
use tokio::sync::{mpsc::Sender, Mutex};
use yrs::Doc;

pub struct Context {
pub doc: DashMap<String, Mutex<Doc>>,
pub workspace: DashMap<String, Mutex<Workspace>>,
pub storage: DashMap<String, Sender<Migrate>>,
pub channel: DashMap<(String, String), Sender<Message>>,
pub db: DbPool,
Expand All @@ -26,7 +25,7 @@ pub struct Context {
impl Context {
pub async fn new() -> Self {
Context {
doc: DashMap::new(),
workspace: DashMap::new(),
storage: DashMap::new(),
channel: DashMap::new(),
db: DbPool::new(init_pool("jwst").await.expect("Cannot create database!")),
Expand Down
20 changes: 12 additions & 8 deletions apps/keck/src/server/collaboration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ async fn handle_socket(socket: WebSocket, workspace: String, context: Arc<Contex
sleep(Duration::from_secs(10)).await;

let update = {
if let Some(doc) = context.doc.get(&workspace) {
if let Some(workspace) = context.workspace.get(&workspace) {
Some(
doc.lock()
workspace
.lock()
.await
.doc()
.encode_state_as_update_v1(&StateVector::default()),
)
} else {
Expand Down Expand Up @@ -138,10 +140,11 @@ async fn handle_socket(socket: WebSocket, workspace: String, context: Arc<Contex
let init_data = {
utils::init_doc(context.clone(), &workspace).await;

let doc = context.doc.get(&workspace).unwrap();
let mut doc = doc.lock().await;
let ws = context.workspace.get(&workspace).unwrap();
let mut ws = ws.lock().await;
let doc = ws.doc_mut();

subscribe_handler(context.clone(), &mut doc, uuid.clone(), workspace.clone());
subscribe_handler(context.clone(), doc, uuid.clone(), workspace.clone());

encode_init_update(&doc)
};
Expand All @@ -155,11 +158,12 @@ async fn handle_socket(socket: WebSocket, workspace: String, context: Arc<Contex
while let Some(msg) = socket_rx.next().await {
if let Ok(Message::Binary(binary)) = msg {
let payload = {
let doc = context.doc.get(&workspace).unwrap();
let doc = doc.value().lock().await;
let workspace = context.workspace.get(&workspace).unwrap();
let workspace = workspace.value().lock().await;
let doc = workspace.doc();

use std::panic::{catch_unwind, AssertUnwindSafe};
catch_unwind(AssertUnwindSafe(|| decode_remote_message(&doc, binary)))
catch_unwind(AssertUnwindSafe(|| decode_remote_message(doc, binary)))
};
if let Ok((binary, update)) = payload {
if let Some(update) = update {
Expand Down
20 changes: 10 additions & 10 deletions apps/keck/src/server/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;
use dashmap::mapref::entry::Entry;
use jwst::Workspace;
use std::sync::Arc;
use tokio::sync::{mpsc::channel, Mutex};

Expand All @@ -9,7 +10,7 @@ pub enum Migrate {
}

pub async fn init_doc(context: Arc<Context>, workspace: &str) {
if let Entry::Vacant(entry) = context.doc.entry(workspace.to_owned()) {
if let Entry::Vacant(entry) = context.workspace.entry(workspace.to_owned()) {
let doc = context.db.create_doc(workspace).await.unwrap();
let (tx, mut rx) = channel::<Migrate>(100);

Expand All @@ -34,7 +35,7 @@ pub async fn init_doc(context: Arc<Context>, workspace: &str) {

context.storage.insert(workspace.to_owned(), tx);

entry.insert(Mutex::new(doc));
entry.insert(Mutex::new(Workspace::from_doc(doc, workspace)));
};
}

Expand All @@ -44,18 +45,17 @@ mod tests {

#[tokio::test]
async fn doc_load_test() -> anyhow::Result<()> {
use jwst::Workspace;
use yrs::{updates::decoder::Decode, Doc, StateVector, Update};
let doc = Doc::default();

{
let mut trx = doc.transact();
let workspace = Workspace::new("test");
workspace.with_trx(|mut t| {
let mut block = t.create("test", "text");

let workspace = jwst::Workspace::new(&mut trx, "test");
let mut block = workspace.create(&mut trx, "test", "text", doc.client_id);
block.set(&mut t.trx, "test", "test");
});

block.set(&mut trx, "test", "test");
trx.commit();
}
let doc = workspace.doc();

let new_doc = {
let update = doc.encode_state_as_update_v1(&StateVector::default());
Expand Down
8 changes: 4 additions & 4 deletions apps/keck/src/sync/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ impl DbPool {
}

pub async fn create_doc(&self, workspace: &str) -> Result<Doc, Error> {
let mut conn = self.get_conn(workspace).await?;

conn.create().await?;

let mut doc = Doc::with_options(Options {
skip_gc: true,
..Default::default()
});

let mut conn = self.get_conn(workspace).await?;

conn.create().await?;

let all_data = conn.all().await.unwrap();

if all_data.is_empty() {
Expand Down
Loading