Skip to content

Commit

Permalink
feat: channel and multi-concurrent task join support for the plugin s…
Browse files Browse the repository at this point in the history
…ystem (#2210)
  • Loading branch information
sxyazi authored Jan 16, 2025
1 parent 63eb82a commit 6c94227
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 3 deletions.
84 changes: 84 additions & 0 deletions yazi-plugin/src/bindings/chan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use mlua::{ExternalError, IntoLuaMulti, UserData, Value};

use crate::Error;

pub struct MpscTx(pub tokio::sync::mpsc::Sender<Value>);
pub struct MpscRx(pub tokio::sync::mpsc::Receiver<Value>);

impl UserData for MpscTx {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_method("send", |lua, me, value: Value| async move {
match me.0.send(value).await {
Ok(()) => (true, Value::Nil).into_lua_multi(&lua),
Err(e) => (false, Error::Custom(e.to_string())).into_lua_multi(&lua),
}
});
}
}

impl UserData for MpscRx {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_method_mut("recv", |lua, mut me, ()| async move {
match me.0.recv().await {
Some(value) => (value, true).into_lua_multi(&lua),
None => (Value::Nil, false).into_lua_multi(&lua),
}
});
}
}

pub struct MpscUnboundedTx(pub tokio::sync::mpsc::UnboundedSender<Value>);
pub struct MpscUnboundedRx(pub tokio::sync::mpsc::UnboundedReceiver<Value>);

impl UserData for MpscUnboundedTx {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_method("send", |lua, me, value: Value| match me.0.send(value) {
Ok(()) => (true, Value::Nil).into_lua_multi(&lua),
Err(e) => (false, Error::Custom(e.to_string())).into_lua_multi(&lua),
});
}
}

impl UserData for MpscUnboundedRx {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_method_mut("recv", |lua, mut me, ()| async move {
match me.0.recv().await {
Some(value) => (value, true).into_lua_multi(&lua),
None => (Value::Nil, false).into_lua_multi(&lua),
}
});
}
}

pub struct OneshotTx(pub Option<tokio::sync::oneshot::Sender<Value>>);
pub struct OneshotRx(pub Option<tokio::sync::oneshot::Receiver<Value>>);

impl UserData for OneshotTx {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_method_mut("send", |lua, me, value: Value| {
let Some(tx) = me.0.take() else {
return Err("Oneshot sender already used".into_lua_err());
};
match tx.send(value) {
Ok(()) => (true, Value::Nil).into_lua_multi(&lua),
Err(_) => {
(false, Error::Custom("Oneshot receiver closed".to_string())).into_lua_multi(&lua)
}
}
});
}
}

impl UserData for OneshotRx {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_method_mut("recv", |lua, mut me, ()| async move {
let Some(rx) = me.0.take() else {
return Err("Oneshot receiver already used".into_lua_err());
};
match rx.await {
Ok(value) => (value, Value::Nil).into_lua_multi(&lua),
Err(e) => (Value::Nil, Error::Custom(e.to_string())).into_lua_multi(&lua),
}
});
}
}
2 changes: 1 addition & 1 deletion yazi-plugin/src/bindings/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#![allow(clippy::module_inception)]

yazi_macro::mod_flat!(bindings cha icon image input mouse permit range window);
yazi_macro::mod_flat!(bindings cha chan icon image input mouse permit range window);
43 changes: 41 additions & 2 deletions yazi-plugin/src/utils/sync.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use mlua::{ExternalError, ExternalResult, Function, Lua, MultiValue, Value};
use futures::future::join_all;
use mlua::{ExternalError, ExternalResult, Function, IntoLuaMulti, Lua, MultiValue, Value, Variadic};
use tokio::sync::oneshot;
use yazi_dds::Sendable;
use yazi_proxy::{AppProxy, options::{PluginCallback, PluginOpt}};
use yazi_shared::event::Data;

use super::Utils;
use crate::{loader::LOADER, runtime::RtRef};
use crate::{bindings::{MpscRx, MpscTx, MpscUnboundedRx, MpscUnboundedTx, OneshotRx, OneshotTx}, loader::LOADER, runtime::RtRef};

impl Utils {
pub(super) fn sync(lua: &Lua, isolate: bool) -> mlua::Result<Function> {
Expand Down Expand Up @@ -39,6 +40,44 @@ impl Utils {
}
}

pub(super) fn chan(lua: &Lua) -> mlua::Result<Function> {
lua.create_function(|lua, (type_, buffer): (mlua::String, Option<usize>)| {
match (&*type_.as_bytes(), buffer) {
(b"mpsc", Some(buffer)) if buffer < 1 => {
Err("Buffer size must be greater than 0".into_lua_err())
}
(b"mpsc", Some(buffer)) => {
let (tx, rx) = tokio::sync::mpsc::channel::<Value>(buffer);
(MpscTx(tx), MpscRx(rx)).into_lua_multi(lua)
}
(b"mpsc", None) => {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
(MpscUnboundedTx(tx), MpscUnboundedRx(rx)).into_lua_multi(lua)
}
(b"oneshot", _) => {
let (tx, rx) = tokio::sync::oneshot::channel::<Value>();
(OneshotTx(Some(tx)), OneshotRx(Some(rx))).into_lua_multi(lua)
}
_ => Err("Channel type must be `mpsc` or `oneshot`".into_lua_err()),
}
})
}

pub(super) fn join(lua: &Lua) -> mlua::Result<Function> {
lua.create_async_function(|_, fns: Variadic<Function>| async move {
let mut results = MultiValue::with_capacity(fns.len());
for r in join_all(fns.into_iter().map(|f| f.call_async::<MultiValue>(()))).await {
results.extend(r?);
}
Ok(results)
})
}

// TODO
pub(super) fn select(lua: &Lua) -> mlua::Result<Function> {
lua.create_async_function(|_lua, _futs: MultiValue| async move { Ok(()) })
}

async fn retrieve(id: &str, calls: usize, args: MultiValue) -> mlua::Result<Vec<Data>> {
let args = Sendable::values_to_list(args)?;
let (tx, rx) = oneshot::channel::<Vec<Data>>();
Expand Down
3 changes: 3 additions & 0 deletions yazi-plugin/src/utils/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub fn compose(lua: &Lua, isolate: bool) -> mlua::Result<Table> {

// Sync
b"sync" => Utils::sync(lua, isolate)?,
b"chan" => Utils::chan(lua)?,
b"join" => Utils::join(lua)?,
b"select" => Utils::select(lua)?,

// Target
b"target_os" => Utils::target_os(lua)?,
Expand Down

0 comments on commit 6c94227

Please sign in to comment.