diff --git a/yazi-plugin/src/bindings/chan.rs b/yazi-plugin/src/bindings/chan.rs new file mode 100644 index 000000000..9eef85ccc --- /dev/null +++ b/yazi-plugin/src/bindings/chan.rs @@ -0,0 +1,84 @@ +use mlua::{ExternalError, IntoLuaMulti, UserData, Value}; + +use crate::Error; + +pub struct MpscTx(pub tokio::sync::mpsc::Sender); +pub struct MpscRx(pub tokio::sync::mpsc::Receiver); + +impl UserData for MpscTx { + fn add_methods>(methods: &mut M) { + methods.add_async_method("send", |lua, me, value: Value| async move { + match me.0.send(value).await { + Ok(()) => (true, Value::Nil).into_lua_multi(&lua), + Err(e) => (false, Error::Custom(e.to_string())).into_lua_multi(&lua), + } + }); + } +} + +impl UserData for MpscRx { + fn add_methods>(methods: &mut M) { + methods.add_async_method_mut("recv", |lua, mut me, ()| async move { + match me.0.recv().await { + Some(value) => (value, true).into_lua_multi(&lua), + None => (Value::Nil, false).into_lua_multi(&lua), + } + }); + } +} + +pub struct MpscUnboundedTx(pub tokio::sync::mpsc::UnboundedSender); +pub struct MpscUnboundedRx(pub tokio::sync::mpsc::UnboundedReceiver); + +impl UserData for MpscUnboundedTx { + fn add_methods>(methods: &mut M) { + methods.add_method("send", |lua, me, value: Value| match me.0.send(value) { + Ok(()) => (true, Value::Nil).into_lua_multi(&lua), + Err(e) => (false, Error::Custom(e.to_string())).into_lua_multi(&lua), + }); + } +} + +impl UserData for MpscUnboundedRx { + fn add_methods>(methods: &mut M) { + methods.add_async_method_mut("recv", |lua, mut me, ()| async move { + match me.0.recv().await { + Some(value) => (value, true).into_lua_multi(&lua), + None => (Value::Nil, false).into_lua_multi(&lua), + } + }); + } +} + +pub struct OneshotTx(pub Option>); +pub struct OneshotRx(pub Option>); + +impl UserData for OneshotTx { + fn add_methods>(methods: &mut M) { + methods.add_method_mut("send", |lua, me, value: Value| { + let Some(tx) = me.0.take() else { + return Err("Oneshot sender already used".into_lua_err()); + }; + match tx.send(value) { + Ok(()) => (true, Value::Nil).into_lua_multi(&lua), + Err(_) => { + (false, Error::Custom("Oneshot receiver closed".to_string())).into_lua_multi(&lua) + } + } + }); + } +} + +impl UserData for OneshotRx { + fn add_methods>(methods: &mut M) { + methods.add_async_method_mut("recv", |lua, mut me, ()| async move { + let Some(rx) = me.0.take() else { + return Err("Oneshot receiver already used".into_lua_err()); + }; + match rx.await { + Ok(value) => (value, Value::Nil).into_lua_multi(&lua), + Err(e) => (Value::Nil, Error::Custom(e.to_string())).into_lua_multi(&lua), + } + }); + } +} diff --git a/yazi-plugin/src/bindings/mod.rs b/yazi-plugin/src/bindings/mod.rs index 7667ebe1a..913a52ff9 100644 --- a/yazi-plugin/src/bindings/mod.rs +++ b/yazi-plugin/src/bindings/mod.rs @@ -1,3 +1,3 @@ #![allow(clippy::module_inception)] -yazi_macro::mod_flat!(bindings cha icon image input mouse permit range window); +yazi_macro::mod_flat!(bindings cha chan icon image input mouse permit range window); diff --git a/yazi-plugin/src/utils/sync.rs b/yazi-plugin/src/utils/sync.rs index 58b3bb851..625352c3f 100644 --- a/yazi-plugin/src/utils/sync.rs +++ b/yazi-plugin/src/utils/sync.rs @@ -1,11 +1,12 @@ -use mlua::{ExternalError, ExternalResult, Function, Lua, MultiValue, Value}; +use futures::future::join_all; +use mlua::{ExternalError, ExternalResult, Function, IntoLuaMulti, Lua, MultiValue, Value, Variadic}; use tokio::sync::oneshot; use yazi_dds::Sendable; use yazi_proxy::{AppProxy, options::{PluginCallback, PluginOpt}}; use yazi_shared::event::Data; use super::Utils; -use crate::{loader::LOADER, runtime::RtRef}; +use crate::{bindings::{MpscRx, MpscTx, MpscUnboundedRx, MpscUnboundedTx, OneshotRx, OneshotTx}, loader::LOADER, runtime::RtRef}; impl Utils { pub(super) fn sync(lua: &Lua, isolate: bool) -> mlua::Result { @@ -39,6 +40,44 @@ impl Utils { } } + pub(super) fn chan(lua: &Lua) -> mlua::Result { + lua.create_function(|lua, (type_, buffer): (mlua::String, Option)| { + match (&*type_.as_bytes(), buffer) { + (b"mpsc", Some(buffer)) if buffer < 1 => { + Err("Buffer size must be greater than 0".into_lua_err()) + } + (b"mpsc", Some(buffer)) => { + let (tx, rx) = tokio::sync::mpsc::channel::(buffer); + (MpscTx(tx), MpscRx(rx)).into_lua_multi(lua) + } + (b"mpsc", None) => { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + (MpscUnboundedTx(tx), MpscUnboundedRx(rx)).into_lua_multi(lua) + } + (b"oneshot", _) => { + let (tx, rx) = tokio::sync::oneshot::channel::(); + (OneshotTx(Some(tx)), OneshotRx(Some(rx))).into_lua_multi(lua) + } + _ => Err("Channel type must be `mpsc` or `oneshot`".into_lua_err()), + } + }) + } + + pub(super) fn join(lua: &Lua) -> mlua::Result { + lua.create_async_function(|_, fns: Variadic| async move { + let mut results = MultiValue::with_capacity(fns.len()); + for r in join_all(fns.into_iter().map(|f| f.call_async::(()))).await { + results.extend(r?); + } + Ok(results) + }) + } + + // TODO + pub(super) fn select(lua: &Lua) -> mlua::Result { + lua.create_async_function(|_lua, _futs: MultiValue| async move { Ok(()) }) + } + async fn retrieve(id: &str, calls: usize, args: MultiValue) -> mlua::Result> { let args = Sendable::values_to_list(args)?; let (tx, rx) = oneshot::channel::>(); diff --git a/yazi-plugin/src/utils/utils.rs b/yazi-plugin/src/utils/utils.rs index 2033acb39..af7d699f8 100644 --- a/yazi-plugin/src/utils/utils.rs +++ b/yazi-plugin/src/utils/utils.rs @@ -47,6 +47,9 @@ pub fn compose(lua: &Lua, isolate: bool) -> mlua::Result { // Sync b"sync" => Utils::sync(lua, isolate)?, + b"chan" => Utils::chan(lua)?, + b"join" => Utils::join(lua)?, + b"select" => Utils::select(lua)?, // Target b"target_os" => Utils::target_os(lua)?,