From 97dde2d71161d266971d623cd038684b7e3ee2d8 Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 28 Jun 2021 14:27:16 +0800 Subject: [PATCH] add stream & prepare dart ffi --- rust-lib/flowy-sys/src/lib.rs | 4 +- rust-lib/flowy-sys/src/module/module.rs | 88 ++++++------- rust-lib/flowy-sys/src/request/request.rs | 10 +- rust-lib/flowy-sys/src/rt/mod.rs | 8 +- rust-lib/flowy-sys/src/stream.rs | 131 +++++++++++++++++++ rust-lib/flowy-sys/src/{rt => }/system.rs | 76 ++++------- rust-lib/flowy-sys/tests/api/module_event.rs | 46 ++++--- 7 files changed, 243 insertions(+), 120 deletions(-) create mode 100644 rust-lib/flowy-sys/src/stream.rs rename rust-lib/flowy-sys/src/{rt => }/system.rs (59%) diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs index 9f6852bbbe..375785d38c 100644 --- a/rust-lib/flowy-sys/src/lib.rs +++ b/rust-lib/flowy-sys/src/lib.rs @@ -11,7 +11,9 @@ mod util; #[cfg(feature = "dart_ffi")] mod dart_ffi; +mod stream; +mod system; pub mod prelude { - pub use crate::{error::*, module::*, request::*, response::*, rt::*}; + pub use crate::{error::*, module::*, request::*, response::*, rt::*, stream::*}; } diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs index fc4d5baf2a..19b72cabcc 100644 --- a/rust-lib/flowy-sys/src/module/module.rs +++ b/rust-lib/flowy-sys/src/module/module.rs @@ -10,7 +10,7 @@ use crate::{ use crate::{ error::InternalError, request::{payload::Payload, EventRequest}, - response::{EventResponse, EventResponseBuilder}, + response::EventResponse, rt::SystemCommand, service::{factory, BoxServiceFactory, HandlerService}, }; @@ -34,11 +34,10 @@ pub struct Module { service_map: Rc>, req_tx: UnboundedSender, req_rx: UnboundedReceiver, - sys_tx: UnboundedSender, } impl Module { - pub fn new(sys_tx: UnboundedSender) -> Self { + pub fn new() -> Self { let (req_tx, req_rx) = unbounded_channel::(); Self { name: "".to_owned(), @@ -46,7 +45,6 @@ impl Module { service_map: Rc::new(HashMap::new()), req_tx, req_rx, - sys_tx, } } @@ -95,6 +93,8 @@ impl Module { .map(|key| (key.clone(), self.req_tx())) .collect::>() } + + pub fn events(&self) -> Vec { self.service_map.keys().map(|key| key.clone()).collect::>() } } impl Future for Module { @@ -118,7 +118,7 @@ impl Future for Module { } impl ServiceFactory for Module { - type Response = (); + type Response = EventResponse; type Error = SystemError; type Service = BoxService; type Config = String; @@ -126,10 +126,9 @@ impl ServiceFactory for Module { fn new_service(&self, cfg: Self::Config) -> Self::Future { log::trace!("Create module service for request {}", cfg); - let sys_tx = self.sys_tx.clone(); let service_map = self.service_map.clone(); Box::pin(async move { - let service = ModuleService { service_map, sys_tx }; + let service = ModuleService { service_map }; let module_service = Box::new(service) as Self::Service; Ok(module_service) }) @@ -138,11 +137,10 @@ impl ServiceFactory for Module { pub struct ModuleService { service_map: Rc>, - sys_tx: UnboundedSender, } impl Service for ModuleService { - type Response = (); + type Response = EventResponse; type Error = SystemError; type Future = LocalBoxFuture<'static, Result>; @@ -154,13 +152,7 @@ impl Service for ModuleService { request, fut: factory.new_service(()), }; - - let sys_tx = self.sys_tx.clone(); - Box::pin(async move { - let resp = fut.await.unwrap_or_else(|e| e.into()); - sys_tx.send(SystemCommand::EventResponse(resp)); - Ok(()) - }) + Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) }, None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }), } @@ -190,35 +182,35 @@ impl Future for ModuleServiceFuture { } } -#[cfg(test)] -mod tests { - use super::*; - use crate::rt::Runtime; - use futures_util::{future, pin_mut}; - use tokio::sync::mpsc::unbounded_channel; - pub async fn hello_service() -> String { "hello".to_string() } - #[test] - fn test() { - let mut runtime = Runtime::new().unwrap(); - runtime.block_on(async { - let (sys_tx, mut sys_rx) = unbounded_channel::(); - let event = "hello".to_string(); - let mut module = Module::new(sys_tx).event(event.clone(), hello_service); - let req_tx = module.req_tx(); - let mut event = async move { - let request = EventRequest::new(event.clone()); - req_tx.send(request).unwrap(); - - match sys_rx.recv().await { - Some(cmd) => { - log::info!("{:?}", cmd); - }, - None => panic!(""), - } - }; - - pin_mut!(module, event); - future::select(module, event).await; - }); - } -} +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::rt::Runtime; +// use futures_util::{future, pin_mut}; +// use tokio::sync::mpsc::unbounded_channel; +// pub async fn hello_service() -> String { "hello".to_string() } +// #[test] +// fn test() { +// let runtime = Runtime::new().unwrap(); +// runtime.block_on(async { +// let (sys_tx, mut sys_rx) = unbounded_channel::(); +// let event = "hello".to_string(); +// let module = Module::new(sys_tx).event(event.clone(), +// hello_service); let req_tx = module.req_tx(); +// let event = async move { +// let request = EventRequest::new(event.clone()); +// req_tx.send(request).unwrap(); +// +// match sys_rx.recv().await { +// Some(cmd) => { +// log::info!("{:?}", cmd); +// }, +// None => panic!(""), +// } +// }; +// +// pin_mut!(module, event); +// future::select(module, event).await; +// }); +// } +// } diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs index 62d6646ff9..de343170f0 100644 --- a/rust-lib/flowy-sys/src/request/request.rs +++ b/rust-lib/flowy-sys/src/request/request.rs @@ -10,6 +10,7 @@ use crate::{ pub struct EventRequest { id: String, event: String, + data: Option>, } impl EventRequest { @@ -17,12 +18,17 @@ impl EventRequest { Self { id: uuid::Uuid::new_v4().to_string(), event, + data: None, } } -} -impl EventRequest { + pub fn data(mut self, data: Vec) -> Self { + self.data = Some(data); + self + } + pub fn get_event(&self) -> &str { &self.event } + pub fn get_id(&self) -> &str { &self.id } } diff --git a/rust-lib/flowy-sys/src/rt/mod.rs b/rust-lib/flowy-sys/src/rt/mod.rs index 228f72de2b..b26dccd35e 100644 --- a/rust-lib/flowy-sys/src/rt/mod.rs +++ b/rust-lib/flowy-sys/src/rt/mod.rs @@ -1,5 +1,5 @@ -mod runtime; -mod system; - pub use runtime::*; -pub use system::*; + +pub use crate::system::*; + +mod runtime; diff --git a/rust-lib/flowy-sys/src/stream.rs b/rust-lib/flowy-sys/src/stream.rs new file mode 100644 index 0000000000..63dc9ae06d --- /dev/null +++ b/rust-lib/flowy-sys/src/stream.rs @@ -0,0 +1,131 @@ +use crate::{ + error::{InternalError, SystemError}, + module::{Event, Module}, + request::EventRequest, + response::EventResponse, + service::{BoxService, Service, ServiceFactory}, + system::ModuleMap, +}; +use futures_core::{future::LocalBoxFuture, ready, task::Context}; +use std::{collections::HashMap, future::Future, rc::Rc}; +use tokio::{ + macros::support::{Pin, Poll}, + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, +}; + +pub type BoxStreamCallback = Box; +pub struct StreamData +where + T: 'static, +{ + config: T, + request: Option, + callback: BoxStreamCallback, +} + +impl StreamData { + pub fn new(config: T, request: Option, callback: BoxStreamCallback) -> Self { + Self { + config, + request, + callback, + } + } +} + +pub struct CommandStream +where + T: 'static, +{ + module_map: ModuleMap, + pub data_tx: UnboundedSender>, + data_rx: UnboundedReceiver>, +} + +impl CommandStream { + pub fn new(module_map: ModuleMap) -> Self { + let (data_tx, data_rx) = unbounded_channel::>(); + Self { + module_map, + data_tx, + data_rx, + } + } + + pub fn send(&self, data: StreamData) { let _ = self.data_tx.send(data); } +} + +impl Future for CommandStream +where + T: 'static, +{ + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match ready!(Pin::new(&mut self.data_rx).poll_recv(cx)) { + None => return Poll::Ready(()), + Some(ctx) => { + let factory = self.new_service(()); + tokio::task::spawn_local(async move { + let service = factory.await.unwrap(); + let _ = service.call(ctx).await; + }); + }, + } + } + } +} + +impl ServiceFactory> for CommandStream +where + T: 'static, +{ + type Response = (); + type Error = SystemError; + type Service = BoxService, Self::Response, Self::Error>; + type Config = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _cfg: Self::Config) -> Self::Future { + let module_map = self.module_map.clone(); + let service = Box::new(CommandStreamService { module_map }); + Box::pin(async move { Ok(service as Self::Service) }) + } +} + +pub struct CommandStreamService { + module_map: ModuleMap, +} + +impl Service> for CommandStreamService +where + T: 'static, +{ + type Response = (); + type Error = SystemError; + type Future = LocalBoxFuture<'static, Result>; + + fn call(&self, mut data: StreamData) -> Self::Future { + let module_map = self.module_map.clone(); + + let fut = async move { + let request = data.request.take().unwrap(); + let result = || async { + match module_map.get(request.get_event()) { + Some(module) => { + let config = request.get_id().to_owned(); + let fut = module.new_service(config); + let service_fut = fut.await?.call(request); + service_fut.await + }, + None => Err(InternalError::new("".to_owned()).into()), + } + }; + + let resp = result().await.unwrap(); + (data.callback)(data.config, resp); + Ok(()) + }; + Box::pin(fut) + } +} diff --git a/rust-lib/flowy-sys/src/rt/system.rs b/rust-lib/flowy-sys/src/system.rs similarity index 59% rename from rust-lib/flowy-sys/src/rt/system.rs rename to rust-lib/flowy-sys/src/system.rs index 8402c8be06..3e961cf055 100644 --- a/rust-lib/flowy-sys/src/rt/system.rs +++ b/rust-lib/flowy-sys/src/system.rs @@ -1,13 +1,13 @@ use crate::{ + error::SystemError, module::{Event, Module}, request::EventRequest, response::EventResponse, - rt::runtime::Runtime, + rt::Runtime, + stream::CommandStreamService, }; use futures_core::{ready, task::Context}; - -use crate::error::{InternalError, SystemError}; -use std::{cell::RefCell, collections::HashMap, future::Future, io, sync::Arc}; +use std::{cell::RefCell, collections::HashMap, future::Future, io, rc::Rc, sync::Arc}; use tokio::{ macros::support::{Pin, Poll}, sync::{ @@ -23,60 +23,52 @@ thread_local!( #[derive(Debug)] pub enum SystemCommand { Exit(i8), - EventResponse(EventResponse), + Response(EventResponse), } +pub type ModuleMap = Rc>>; pub struct FlowySystem { - sys_tx: UnboundedSender, - forward_map: HashMap>, + sys_cmd_tx: UnboundedSender, + module_map: ModuleMap, } impl FlowySystem { - pub fn construct(module_factory: F, response_tx: Option>) -> SystemRunner + pub fn construct(module_factory: F) -> SystemRunner where - F: FnOnce(UnboundedSender) -> Vec, + F: FnOnce() -> Vec, { let runtime = Runtime::new().unwrap(); - let (sys_tx, sys_rx) = unbounded_channel::(); + let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::(); let (stop_tx, stop_rx) = oneshot::channel(); runtime.spawn(SystemController { stop_tx: Some(stop_tx), - sys_rx, - response_tx, + sys_cmd_rx, }); let mut system = Self { - sys_tx: sys_tx.clone(), - forward_map: HashMap::default(), + sys_cmd_tx: sys_cmd_tx.clone(), + module_map: Rc::new(HashMap::default()), }; - let factory = module_factory(sys_tx.clone()); + let factory = module_factory(); + let mut module_map = HashMap::new(); factory.into_iter().for_each(|m| { - system.forward_map.extend(m.forward_map()); - runtime.spawn(m); + let events = m.events(); + let rc_module = Rc::new(m); + events.into_iter().for_each(|e| { + module_map.insert(e, rc_module.clone()); + }); }); + system.module_map = Rc::new(module_map); FlowySystem::set_current(system); let runner = SystemRunner { rt: runtime, stop_rx }; runner } - pub fn sink(&self, event: Event, request: EventRequest) -> Result<(), SystemError> { - log::debug!("Sink event: {}", event); - let _ = self.forward_map.get(&event)?.send(request)?; - Ok(()) - } - - pub fn request_tx(&self, event: Event) -> Option> { - match self.forward_map.get(&event) { - Some(tx) => Some(tx.clone()), - None => None, - } - } - pub fn stop(&self) { - match self.sys_tx.send(SystemCommand::Exit(0)) { + match self.sys_cmd_tx.send(SystemCommand::Exit(0)) { Ok(_) => {}, Err(e) => { log::error!("Stop system error: {}", e); @@ -84,6 +76,8 @@ impl FlowySystem { } } + pub fn module_map(&self) -> ModuleMap { self.module_map.clone() } + #[doc(hidden)] pub fn set_current(sys: FlowySystem) { CURRENT.with(|cell| { @@ -101,15 +95,14 @@ impl FlowySystem { struct SystemController { stop_tx: Option>, - sys_rx: UnboundedReceiver, - response_tx: Option>, + sys_cmd_rx: UnboundedReceiver, } impl Future for SystemController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(Pin::new(&mut self.sys_rx).poll_recv(cx)) { + match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) { None => return Poll::Ready(()), Some(cmd) => match cmd { SystemCommand::Exit(code) => { @@ -117,16 +110,8 @@ impl Future for SystemController { let _ = tx.send(code); } }, - SystemCommand::EventResponse(resp) => { + SystemCommand::Response(resp) => { log::debug!("Response: {:?}", resp); - if let Some(tx) = &self.response_tx { - match tx.send(resp) { - Ok(_) => {}, - Err(e) => { - log::error!("Response tx send fail: {:?}", e); - }, - } - } }, }, } @@ -157,10 +142,7 @@ impl SystemRunner { } } - pub fn spawn(self, future: F) -> Self - where - F: Future + 'static, - { + pub fn spawn + 'static>(self, future: F) -> Self { self.rt.spawn(future); self } diff --git a/rust-lib/flowy-sys/tests/api/module_event.rs b/rust-lib/flowy-sys/tests/api/module_event.rs index a49b3b83db..14a5a49532 100644 --- a/rust-lib/flowy-sys/tests/api/module_event.rs +++ b/rust-lib/flowy-sys/tests/api/module_event.rs @@ -2,8 +2,8 @@ use crate::helper::*; use flowy_sys::prelude::*; pub async fn no_params() -> String { "no params function call".to_string() } -pub async fn one_params(s: String) -> String { "one params function call".to_string() } -pub async fn two_params(s1: String, s2: String) -> String { "two params function call".to_string() } +pub async fn one_params(_s: String) -> String { "one params function call".to_string() } +pub async fn two_params(_s1: String, _s2: String) -> String { "two params function call".to_string() } #[test] fn test() { @@ -12,21 +12,31 @@ fn test() { let no_params_command = "no params".to_string(); let one_params_command = "one params".to_string(); let two_params_command = "two params".to_string(); - FlowySystem::construct( - |tx| { - vec![Module::new(tx.clone()) - .event(no_params_command.clone(), no_params) - .event(one_params_command.clone(), one_params) - .event(two_params_command.clone(), two_params)] - }, - None, - ) - .spawn(async { - let request = EventRequest::new(no_params_command.clone()); - FlowySystem::current().sink(no_params_command, request); - FlowySystem::current().stop(); - }) - .run() - .unwrap(); + let runner = FlowySystem::construct(|| { + vec![Module::new() + .event(no_params_command.clone(), no_params) + .event(one_params_command.clone(), one_params) + .event(two_params_command.clone(), two_params)] + }); + + let stream = CommandStream::new(FlowySystem::current().module_map()); + let tx = stream.data_tx.clone(); + + runner + .spawn(stream) + .spawn(async move { + let request = EventRequest::new(no_params_command.clone()); + let stream_data = StreamData::new( + 1, + Some(request), + Box::new(|config, response| { + log::info!("{:?}", response); + }), + ); + tx.send(stream_data); + FlowySystem::current().stop(); + }) + .run() + .unwrap(); }