From 08e05c425b119ae665c977e91095a671e99e3079 Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 27 Jun 2022 23:15:43 +0800 Subject: [PATCH 1/2] chore: remove unused code --- frontend/rust-lib/flowy-sdk/src/lib.rs | 2 +- .../rust-lib/lib-dispatch/src/dispatcher.rs | 6 +- frontend/rust-lib/lib-dispatch/src/lib.rs | 2 +- frontend/rust-lib/lib-dispatch/src/runtime.rs | 26 +++ frontend/rust-lib/lib-dispatch/src/system.rs | 165 ------------------ .../rust-lib/lib-dispatch/src/util/mod.rs | 26 --- 6 files changed, 32 insertions(+), 195 deletions(-) create mode 100644 frontend/rust-lib/lib-dispatch/src/runtime.rs delete mode 100644 frontend/rust-lib/lib-dispatch/src/system.rs diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 60e277babd..4a37faefef 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -14,7 +14,7 @@ use flowy_net::{ use flowy_text_block::TextBlockManager; use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig}; use lib_dispatch::prelude::*; -use lib_dispatch::util::tokio_default_runtime; +use lib_dispatch::runtime::tokio_default_runtime; use module::mk_modules; pub use module::*; use std::{ diff --git a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs index ae2b15e0a0..fd7296a70d 100644 --- a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs +++ b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs @@ -1,3 +1,4 @@ +use crate::runtime::FlowyRuntime; use crate::{ errors::{DispatchError, Error, InternalError}, module::{as_module_map, Module, ModuleMap, ModuleRequest}, @@ -10,13 +11,14 @@ use futures_util::task::Context; use pin_project::pin_project; use std::{future::Future, sync::Arc}; use tokio::macros::support::{Pin, Poll}; + pub struct EventDispatcher { module_map: ModuleMap, - runtime: tokio::runtime::Runtime, + runtime: FlowyRuntime, } impl EventDispatcher { - pub fn construct(runtime: tokio::runtime::Runtime, module_factory: F) -> EventDispatcher + pub fn construct(runtime: FlowyRuntime, module_factory: F) -> EventDispatcher where F: FnOnce() -> Vec, { diff --git a/frontend/rust-lib/lib-dispatch/src/lib.rs b/frontend/rust-lib/lib-dispatch/src/lib.rs index 2d76fbd090..d35d17e3a5 100644 --- a/frontend/rust-lib/lib-dispatch/src/lib.rs +++ b/frontend/rust-lib/lib-dispatch/src/lib.rs @@ -8,10 +8,10 @@ pub mod util; mod byte_trait; mod data; mod dispatcher; -mod system; #[macro_use] pub mod macros; +pub mod runtime; pub use errors::Error; diff --git a/frontend/rust-lib/lib-dispatch/src/runtime.rs b/frontend/rust-lib/lib-dispatch/src/runtime.rs new file mode 100644 index 0000000000..de18d29963 --- /dev/null +++ b/frontend/rust-lib/lib-dispatch/src/runtime.rs @@ -0,0 +1,26 @@ +use std::{io, thread}; +use tokio::runtime; + +pub type FlowyRuntime = tokio::runtime::Runtime; + +pub fn tokio_default_runtime() -> io::Result { + runtime::Builder::new_multi_thread() + .thread_name("dispatch-rt") + .enable_io() + .enable_time() + .on_thread_start(move || { + tracing::trace!( + "{:?} thread started: thread_id= {}", + thread::current(), + thread_id::get() + ); + }) + .on_thread_stop(move || { + tracing::trace!( + "{:?} thread stopping: thread_id= {}", + thread::current(), + thread_id::get(), + ); + }) + .build() +} diff --git a/frontend/rust-lib/lib-dispatch/src/system.rs b/frontend/rust-lib/lib-dispatch/src/system.rs deleted file mode 100644 index 081f4fa05d..0000000000 --- a/frontend/rust-lib/lib-dispatch/src/system.rs +++ /dev/null @@ -1,165 +0,0 @@ -use crate::module::{as_module_map, Module, ModuleMap}; -use futures_core::{ready, task::Context}; -use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc}; -use tokio::{ - macros::support::{Pin, Poll}, - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot, - }, -}; - -thread_local!( - static CURRENT: RefCell>> = RefCell::new(None); -); - -#[derive(Debug)] -#[allow(dead_code)] -pub enum SystemCommand { - Exit(i8), -} - -pub struct FlowySystem { - sys_cmd_tx: UnboundedSender, -} - -impl FlowySystem { - #[allow(dead_code)] - pub fn construct(module_factory: F, sender_factory: S) -> SystemRunner - where - F: FnOnce() -> Vec, - S: FnOnce(ModuleMap, &Runtime), - { - let runtime = Arc::new(Runtime::new().unwrap()); - let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::(); - let (stop_tx, stop_rx) = oneshot::channel(); - - runtime.spawn(SystemController { - stop_tx: Some(stop_tx), - sys_cmd_rx, - }); - - let module_map = as_module_map(module_factory()); - sender_factory(module_map, &runtime); - - let system = Self { sys_cmd_tx }; - FlowySystem::set_current(system); - SystemRunner { rt: runtime, stop_rx } - } - - #[allow(dead_code)] - pub fn stop(&self) { - match self.sys_cmd_tx.send(SystemCommand::Exit(0)) { - Ok(_) => {} - Err(e) => { - log::error!("Stop system error: {}", e); - } - } - } - - #[allow(dead_code)] - pub fn set_current(sys: FlowySystem) { - CURRENT.with(|cell| { - *cell.borrow_mut() = Some(Arc::new(sys)); - }) - } - - #[allow(dead_code)] - pub fn current() -> Arc { - CURRENT.with(|cell| match *cell.borrow() { - Some(ref sys) => sys.clone(), - None => panic!("System is not running"), - }) - } -} - -struct SystemController { - stop_tx: Option>, - sys_cmd_rx: UnboundedReceiver, -} - -impl Future for SystemController { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) { - None => return Poll::Ready(()), - Some(cmd) => match cmd { - SystemCommand::Exit(code) => { - if let Some(tx) = self.stop_tx.take() { - let _ = tx.send(code); - } - } - }, - } - } - } -} - -pub struct SystemRunner { - rt: Arc, - stop_rx: oneshot::Receiver, -} - -impl SystemRunner { - #[allow(dead_code)] - pub fn run(self) -> io::Result<()> { - let SystemRunner { rt, stop_rx } = self; - match rt.block_on(stop_rx) { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } - - #[allow(dead_code)] - pub fn spawn + 'static>(self, future: F) -> Self { - self.rt.spawn(future); - self - } -} - -use crate::util::tokio_default_runtime; -use tokio::{runtime, task::LocalSet}; - -#[derive(Debug)] -pub struct Runtime { - local: LocalSet, - rt: runtime::Runtime, -} - -impl Runtime { - #[allow(dead_code)] - pub fn new() -> io::Result { - let rt = tokio_default_runtime()?; - Ok(Runtime { - rt, - local: LocalSet::new(), - }) - } - - #[allow(dead_code)] - pub fn spawn(&self, future: F) -> &Self - where - F: Future + 'static, - { - self.local.spawn_local(future); - self - } - - #[allow(dead_code)] - pub fn block_on(&self, f: F) -> F::Output - where - F: Future + 'static, - { - self.local.block_on(&self.rt, f) - } -} diff --git a/frontend/rust-lib/lib-dispatch/src/util/mod.rs b/frontend/rust-lib/lib-dispatch/src/util/mod.rs index a1b02a3795..849debcc97 100644 --- a/frontend/rust-lib/lib-dispatch/src/util/mod.rs +++ b/frontend/rust-lib/lib-dispatch/src/util/mod.rs @@ -1,27 +1 @@ -use std::{io, thread}; - -use tokio::runtime; - pub mod ready; - -pub fn tokio_default_runtime() -> io::Result { - runtime::Builder::new_multi_thread() - .thread_name("flowy-rt") - .enable_io() - .enable_time() - .on_thread_start(move || { - tracing::trace!( - "{:?} thread started: thread_id= {}", - thread::current(), - thread_id::get() - ); - }) - .on_thread_stop(move || { - tracing::trace!( - "{:?} thread stopping: thread_id= {}", - thread::current(), - thread_id::get(), - ); - }) - .build() -} From 4cafef6c8d98d809e7943f275034a936355403ba Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 28 Jun 2022 08:56:15 +0800 Subject: [PATCH 2/2] chore: fix test --- frontend/rust-lib/lib-dispatch/tests/api/module.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frontend/rust-lib/lib-dispatch/tests/api/module.rs b/frontend/rust-lib/lib-dispatch/tests/api/module.rs index 33b5e315f0..dbdf5b88dc 100644 --- a/frontend/rust-lib/lib-dispatch/tests/api/module.rs +++ b/frontend/rust-lib/lib-dispatch/tests/api/module.rs @@ -1,4 +1,5 @@ -use lib_dispatch::{prelude::*, util::tokio_default_runtime}; +use lib_dispatch::prelude::*; +use lib_dispatch::runtime::tokio_default_runtime; use std::sync::Arc; pub async fn hello() -> String {