From 1358ec8f2486aee3239c3caa79fa9e3e1ae86d85 Mon Sep 17 00:00:00 2001 From: tom8zds <2407164659@qq.com> Date: Sun, 14 Jul 2024 15:09:44 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8feature:=20implement=20receive=20progr?= =?UTF-8?q?ess?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/core/rust/bridge.dart | 3 ++ lib/core/rust/frb_generated.dart | 83 ++++++++++++++++++++++++++-- lib/core/rust/frb_generated.io.dart | 20 +++++++ lib/i18n/strings.g.dart | 2 +- lib/view/pages/mission_page.dart | 5 +- lib/view/widget/mission_widget.dart | 23 ++++++++ rust/src/actor/mission/transfer.rs | 19 ++----- rust/src/bridge.rs | 14 +++++ rust/src/frb_generated.rs | 84 +++++++++++++++++++++++++++-- rust/src/util.rs | 11 +++- 10 files changed, 235 insertions(+), 29 deletions(-) diff --git a/lib/core/rust/bridge.dart b/lib/core/rust/bridge.dart index d437f15..75024aa 100644 --- a/lib/core/rust/bridge.dart +++ b/lib/core/rust/bridge.dart @@ -40,6 +40,9 @@ Stream> listenDevice() => Stream listenMission() => RustLib.instance.api.crateBridgeListenMission(); +Stream listenTaskProgress() => + RustLib.instance.api.crateBridgeListenTaskProgress(); + Future clearMission() => RustLib.instance.api.crateBridgeClearMission(); Future cancelPending({required String id}) => diff --git a/lib/core/rust/frb_generated.dart b/lib/core/rust/frb_generated.dart index eed64a9..54ad075 100644 --- a/lib/core/rust/frb_generated.dart +++ b/lib/core/rust/frb_generated.dart @@ -61,7 +61,7 @@ class RustLib extends BaseEntrypoint { String get codegenVersion => '2.1.0'; @override - int get rustContentHash => -1293166539; + int get rustContentHash => 1509709400; static const kDefaultExternalLibraryLoaderConfig = ExternalLibraryLoaderConfig( @@ -92,6 +92,8 @@ abstract class RustLibApi extends BaseApi { Stream crateBridgeListenServerState(); + Stream crateBridgeListenTaskProgress(); + Future crateBridgeRestartServer(); Future crateBridgeSetup( @@ -357,13 +359,40 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { argNames: ["s"], ); + @override + Stream crateBridgeListenTaskProgress() { + final s = RustStreamSink(); + unawaited(handler.executeNormal(NormalTask( + callFfi: (port_) { + final serializer = SseSerializer(generalizedFrbRustBinding); + sse_encode_StreamSink_usize_Sse(s, serializer); + pdeCallFfi(generalizedFrbRustBinding, serializer, + funcId: 11, port: port_); + }, + codec: SseCodec( + decodeSuccessData: sse_decode_unit, + decodeErrorData: null, + ), + constMeta: kCrateBridgeListenTaskProgressConstMeta, + argValues: [s], + apiImpl: this, + ))); + return s.stream; + } + + TaskConstMeta get kCrateBridgeListenTaskProgressConstMeta => + const TaskConstMeta( + debugName: "listen_task_progress", + argNames: ["s"], + ); + @override Future crateBridgeRestartServer() { return handler.executeNormal(NormalTask( callFfi: (port_) { final serializer = SseSerializer(generalizedFrbRustBinding); pdeCallFfi(generalizedFrbRustBinding, serializer, - funcId: 11, port: port_); + funcId: 12, port: port_); }, codec: SseCodec( decodeSuccessData: sse_decode_unit, @@ -389,7 +418,7 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { sse_encode_box_autoadd_node_device(device, serializer); sse_encode_box_autoadd_core_config(config, serializer); pdeCallFfi(generalizedFrbRustBinding, serializer, - funcId: 12, port: port_); + funcId: 13, port: port_); }, codec: SseCodec( decodeSuccessData: sse_decode_unit, @@ -412,7 +441,7 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { callFfi: (port_) { final serializer = SseSerializer(generalizedFrbRustBinding); pdeCallFfi(generalizedFrbRustBinding, serializer, - funcId: 13, port: port_); + funcId: 14, port: port_); }, codec: SseCodec( decodeSuccessData: sse_decode_unit, @@ -435,7 +464,7 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { callFfi: (port_) { final serializer = SseSerializer(generalizedFrbRustBinding); pdeCallFfi(generalizedFrbRustBinding, serializer, - funcId: 14, port: port_); + funcId: 15, port: port_); }, codec: SseCodec( decodeSuccessData: sse_decode_unit, @@ -484,6 +513,12 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { throw UnimplementedError(); } + @protected + RustStreamSink dco_decode_StreamSink_usize_Sse(dynamic raw) { + // Codec=Dco (DartCObject based), see doc to use other codecs + throw UnimplementedError(); + } + @protected String dco_decode_String(dynamic raw) { // Codec=Dco (DartCObject based), see doc to use other codecs @@ -699,6 +734,12 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { return; } + @protected + BigInt dco_decode_usize(dynamic raw) { + // Codec=Dco (DartCObject based), see doc to use other codecs + return dcoDecodeU64(raw); + } + @protected AnyhowException sse_decode_AnyhowException(SseDeserializer deserializer) { // Codec=Sse (Serialization based), see doc to use other codecs @@ -735,6 +776,13 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { throw UnimplementedError('Unreachable ()'); } + @protected + RustStreamSink sse_decode_StreamSink_usize_Sse( + SseDeserializer deserializer) { + // Codec=Sse (Serialization based), see doc to use other codecs + throw UnimplementedError('Unreachable ()'); + } + @protected String sse_decode_String(SseDeserializer deserializer) { // Codec=Sse (Serialization based), see doc to use other codecs @@ -986,6 +1034,12 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { // Codec=Sse (Serialization based), see doc to use other codecs } + @protected + BigInt sse_decode_usize(SseDeserializer deserializer) { + // Codec=Sse (Serialization based), see doc to use other codecs + return deserializer.buffer.getBigUint64(); + } + @protected void sse_encode_AnyhowException( AnyhowException self, SseSerializer serializer) { @@ -1045,6 +1099,19 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { serializer); } + @protected + void sse_encode_StreamSink_usize_Sse( + RustStreamSink self, SseSerializer serializer) { + // Codec=Sse (Serialization based), see doc to use other codecs + sse_encode_String( + self.setupAndSerialize( + codec: SseCodec( + decodeSuccessData: sse_decode_usize, + decodeErrorData: sse_decode_AnyhowException, + )), + serializer); + } + @protected void sse_encode_String(String self, SseSerializer serializer) { // Codec=Sse (Serialization based), see doc to use other codecs @@ -1255,4 +1322,10 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { void sse_encode_unit(void self, SseSerializer serializer) { // Codec=Sse (Serialization based), see doc to use other codecs } + + @protected + void sse_encode_usize(BigInt self, SseSerializer serializer) { + // Codec=Sse (Serialization based), see doc to use other codecs + serializer.buffer.putBigUint64(self); + } } diff --git a/lib/core/rust/frb_generated.io.dart b/lib/core/rust/frb_generated.io.dart index e4fa3c5..ddaa079 100644 --- a/lib/core/rust/frb_generated.io.dart +++ b/lib/core/rust/frb_generated.io.dart @@ -40,6 +40,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl { RustStreamSink dco_decode_StreamSink_opt_box_autoadd_mission_info_Sse(dynamic raw); + @protected + RustStreamSink dco_decode_StreamSink_usize_Sse(dynamic raw); + @protected String dco_decode_String(dynamic raw); @@ -112,6 +115,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl { @protected void dco_decode_unit(dynamic raw); + @protected + BigInt dco_decode_usize(dynamic raw); + @protected AnyhowException sse_decode_AnyhowException(SseDeserializer deserializer); @@ -132,6 +138,10 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl { sse_decode_StreamSink_opt_box_autoadd_mission_info_Sse( SseDeserializer deserializer); + @protected + RustStreamSink sse_decode_StreamSink_usize_Sse( + SseDeserializer deserializer); + @protected String sse_decode_String(SseDeserializer deserializer); @@ -206,6 +216,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl { @protected void sse_decode_unit(SseDeserializer deserializer); + @protected + BigInt sse_decode_usize(SseDeserializer deserializer); + @protected void sse_encode_AnyhowException( AnyhowException self, SseSerializer serializer); @@ -226,6 +239,10 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl { void sse_encode_StreamSink_opt_box_autoadd_mission_info_Sse( RustStreamSink self, SseSerializer serializer); + @protected + void sse_encode_StreamSink_usize_Sse( + RustStreamSink self, SseSerializer serializer); + @protected void sse_encode_String(String self, SseSerializer serializer); @@ -306,6 +323,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl { @protected void sse_encode_unit(void self, SseSerializer serializer); + + @protected + void sse_encode_usize(BigInt self, SseSerializer serializer); } // Section: wire_class diff --git a/lib/i18n/strings.g.dart b/lib/i18n/strings.g.dart index d25f24b..cc67c5e 100644 --- a/lib/i18n/strings.g.dart +++ b/lib/i18n/strings.g.dart @@ -6,7 +6,7 @@ /// Locales: 2 /// Strings: 60 (30 per locale) /// -/// Built on 2024-07-07 at 14:19 UTC +/// Built on 2024-07-14 at 06:18 UTC // coverage:ignore-file // ignore_for_file: type=lint diff --git a/lib/view/pages/mission_page.dart b/lib/view/pages/mission_page.dart index e7064d5..7bb8443 100644 --- a/lib/view/pages/mission_page.dart +++ b/lib/view/pages/mission_page.dart @@ -10,6 +10,7 @@ import '../../core/rust/bridge.dart'; import '../../i18n/strings.g.dart'; import '../widget/common_widget.dart'; import '../widget/device_widget.dart'; +import '../widget/mission_widget.dart'; class IdlePage extends StatelessWidget { @override @@ -119,7 +120,9 @@ class _TransferPageState extends State { ), ), if (file.state == const FileState.transfer()) - LinearProgressIndicator(value: 0.3), + TaskProgress( + total: file.info.size, + ), ], ), ), diff --git a/lib/view/widget/mission_widget.dart b/lib/view/widget/mission_widget.dart index 4d72bba..faf7135 100644 --- a/lib/view/widget/mission_widget.dart +++ b/lib/view/widget/mission_widget.dart @@ -1,5 +1,28 @@ import 'package:flutter/material.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; +import 'package:localsend_rs/core/rust/bridge.dart'; + +class TaskProgress extends StatelessWidget { + final progressStream = listenTaskProgress(); + final int total; + + TaskProgress({super.key, required this.total}); + + @override + Widget build(BuildContext context) { + return StreamBuilder( + stream: progressStream, + builder: (context, snapshot) { + if (snapshot.hasData) { + print("${snapshot.data} / ${total}"); + return LinearProgressIndicator( + value: (snapshot.data?.toDouble() ?? 0) / total, + ); + } + return Container(); + }); + } +} class MissionWidget extends ConsumerWidget { const MissionWidget({super.key}); diff --git a/rust/src/actor/mission/transfer.rs b/rust/src/actor/mission/transfer.rs index 6ca9292..90e9bf7 100644 --- a/rust/src/actor/mission/transfer.rs +++ b/rust/src/actor/mission/transfer.rs @@ -16,7 +16,6 @@ enum Message { respond_to: oneshot::Sender>, }, ListenTask { - token: String, respond_to: oneshot::Sender, String>>, }, StartTask { @@ -225,16 +224,12 @@ impl Actor { let _ = respond_to.send(()); } - Message::ListenTask { token, respond_to } => match &self.store.mission { + Message::ListenTask { respond_to } => match &self.store.mission { Some(_) => { let task = self.store.task.clone(); match task { Some(task) => { - if task.token == token { - let _ = respond_to.send(Ok(task.progress)); - return; - } - let _ = respond_to.send(Err("task token not match".to_string())); + let _ = respond_to.send(Ok(task.progress)); } None => { let _ = respond_to.send(Err("task not found".to_string())); @@ -283,15 +278,9 @@ impl Handle { recv.await.expect("Actor task has been killed") } - pub async fn listen_task_progress( - &self, - token: String, - ) -> Result, String> { + pub async fn listen_task_progress(&self) -> Result, String> { let (send, recv) = oneshot::channel(); - let msg = Message::ListenTask { - token, - respond_to: send, - }; + let msg = Message::ListenTask { respond_to: send }; let _ = self.sender.send(msg).await; diff --git a/rust/src/bridge.rs b/rust/src/bridge.rs index 5624cf1..220dc6a 100644 --- a/rust/src/bridge.rs +++ b/rust/src/bridge.rs @@ -81,6 +81,20 @@ pub async fn listen_mission(s: StreamSink>) { } } +pub async fn listen_task_progress(s: StreamSink) { + let mut rx = _get_core() + .mission + .transfer + .listen_task_progress() + .await + .unwrap(); + loop { + let _ = rx.changed().await; + let data = rx.borrow().clone(); + let _ = s.add(data); + } +} + pub async fn clear_mission() { MISSION_NOTIFY.clear().await; } diff --git a/rust/src/frb_generated.rs b/rust/src/frb_generated.rs index 792e690..cf6d789 100644 --- a/rust/src/frb_generated.rs +++ b/rust/src/frb_generated.rs @@ -37,7 +37,7 @@ flutter_rust_bridge::frb_generated_boilerplate!( default_rust_auto_opaque = RustAutoOpaqueMoi, ); pub(crate) const FLUTTER_RUST_BRIDGE_CODEGEN_VERSION: &str = "2.1.0"; -pub(crate) const FLUTTER_RUST_BRIDGE_CODEGEN_CONTENT_HASH: i32 = -1293166539; +pub(crate) const FLUTTER_RUST_BRIDGE_CODEGEN_CONTENT_HASH: i32 = 1509709400; // Section: executor @@ -432,6 +432,47 @@ fn wire__crate__bridge__listen_server_state_impl( }, ) } +fn wire__crate__bridge__listen_task_progress_impl( + port_: flutter_rust_bridge::for_generated::MessagePort, + ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr, + rust_vec_len_: i32, + data_len_: i32, +) { + FLUTTER_RUST_BRIDGE_HANDLER.wrap_async::( + flutter_rust_bridge::for_generated::TaskInfo { + debug_name: "listen_task_progress", + port: Some(port_), + mode: flutter_rust_bridge::for_generated::FfiCallMode::Normal, + }, + move || { + let message = unsafe { + flutter_rust_bridge::for_generated::Dart2RustMessageSse::from_wire( + ptr_, + rust_vec_len_, + data_len_, + ) + }; + let mut deserializer = + flutter_rust_bridge::for_generated::SseDeserializer::new(message); + let api_s = + >::sse_decode( + &mut deserializer, + ); + deserializer.end(); + move |context| async move { + transform_result_sse::<_, ()>( + (move || async move { + let output_ok = Result::<_, ()>::Ok({ + crate::bridge::listen_task_progress(api_s).await; + })?; + Ok(output_ok) + })() + .await, + ) + } + }, + ) +} fn wire__crate__bridge__restart_server_impl( port_: flutter_rust_bridge::for_generated::MessagePort, ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr, @@ -637,6 +678,14 @@ impl SseDecode } } +impl SseDecode for StreamSink { + // Codec=Sse (Serialization based), see doc to use other codecs + fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self { + let mut inner = ::sse_decode(deserializer); + return StreamSink::deserialize(inner); + } +} + impl SseDecode for String { // Codec=Sse (Serialization based), see doc to use other codecs fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self { @@ -915,6 +964,13 @@ impl SseDecode for () { fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {} } +impl SseDecode for usize { + // Codec=Sse (Serialization based), see doc to use other codecs + fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self { + deserializer.cursor.read_u64::().unwrap() as _ + } +} + fn pde_ffi_dispatcher_primary_impl( func_id: i32, port: flutter_rust_bridge::for_generated::MessagePort, @@ -934,10 +990,11 @@ fn pde_ffi_dispatcher_primary_impl( 8 => wire__crate__bridge__listen_device_impl(port, ptr, rust_vec_len, data_len), 9 => wire__crate__bridge__listen_mission_impl(port, ptr, rust_vec_len, data_len), 10 => wire__crate__bridge__listen_server_state_impl(port, ptr, rust_vec_len, data_len), - 11 => wire__crate__bridge__restart_server_impl(port, ptr, rust_vec_len, data_len), - 12 => wire__crate__bridge__setup_impl(port, ptr, rust_vec_len, data_len), - 13 => wire__crate__bridge__shutdown_server_impl(port, ptr, rust_vec_len, data_len), - 14 => wire__crate__bridge__start_server_impl(port, ptr, rust_vec_len, data_len), + 11 => wire__crate__bridge__listen_task_progress_impl(port, ptr, rust_vec_len, data_len), + 12 => wire__crate__bridge__restart_server_impl(port, ptr, rust_vec_len, data_len), + 13 => wire__crate__bridge__setup_impl(port, ptr, rust_vec_len, data_len), + 14 => wire__crate__bridge__shutdown_server_impl(port, ptr, rust_vec_len, data_len), + 15 => wire__crate__bridge__start_server_impl(port, ptr, rust_vec_len, data_len), _ => unreachable!(), } } @@ -1196,6 +1253,13 @@ impl SseEncode } } +impl SseEncode for StreamSink { + // Codec=Sse (Serialization based), see doc to use other codecs + fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) { + unimplemented!("") + } +} + impl SseEncode for String { // Codec=Sse (Serialization based), see doc to use other codecs fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) { @@ -1419,6 +1483,16 @@ impl SseEncode for () { fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {} } +impl SseEncode for usize { + // Codec=Sse (Serialization based), see doc to use other codecs + fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) { + serializer + .cursor + .write_u64::(self as _) + .unwrap(); + } +} + #[cfg(not(target_family = "wasm"))] mod io { // This file is automatically generated, so please do not edit it. diff --git a/rust/src/util.rs b/rust/src/util.rs index 02b0ac0..c47fd8f 100644 --- a/rust/src/util.rs +++ b/rust/src/util.rs @@ -34,12 +34,19 @@ impl AsyncWrite for ProgressWriteAdapter { let this = self.project(); let result = this.inner.poll_write(cx, buf); - let after = buf.len(); + match result { + Poll::Ready(ref res) => { + if res.is_ok() { + *this.interval_bytes = *this.interval_bytes + res.as_ref().clone().unwrap(); + } + } + _ => {} + } match this.interval.poll_tick(cx) { Poll::Pending => {} Poll::Ready(_) => { - let _ = this.tx.send(after); + let _ = this.tx.send(*this.interval_bytes); } };