From 07b4113dc1d52f6d00953bbec613db45814702c9 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 19 Sep 2021 23:21:10 +0800 Subject: [PATCH] refactor test module --- backend/Cargo.toml | 1 + .../src/service/user_service/logged_user.rs | 4 +- .../service/ws_service/entities/connect.rs | 24 ++-- .../service/ws_service/entities/message.rs | 21 ++-- backend/src/service/ws_service/router.rs | 5 +- backend/src/service/ws_service/ws_client.rs | 117 ++++++++---------- backend/src/service/ws_service/ws_server.rs | 9 +- backend/tests/api/{main.rs => mod.rs} | 2 - backend/tests/{api => }/helper.rs | 14 +-- backend/tests/main.rs | 3 + backend/tests/{api/ws.rs => ws/helper.rs} | 12 -- backend/tests/ws/mod.rs | 2 + backend/tests/ws/ws.rs | 13 ++ .../src/services/user/user_session.rs | 4 +- rust-lib/flowy-ws/src/connect.rs | 8 +- rust-lib/flowy-ws/src/ws.rs | 35 ++---- 16 files changed, 124 insertions(+), 150 deletions(-) rename backend/tests/api/{main.rs => mod.rs} (62%) rename backend/tests/{api => }/helper.rs (95%) create mode 100644 backend/tests/main.rs rename backend/tests/{api/ws.rs => ws/helper.rs} (87%) create mode 100644 backend/tests/ws/mod.rs create mode 100644 backend/tests/ws/ws.rs diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 93942dce4e..03d8069fb9 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -57,6 +57,7 @@ tokio = { version = "1", features = ["full"] } flowy-user = { path = "../rust-lib/flowy-user" } flowy-workspace = { path = "../rust-lib/flowy-workspace" } flowy-document = { path = "../rust-lib/flowy-document" } +flowy-ws = { path = "../rust-lib/flowy-ws" } flowy-net = { path = "../rust-lib/flowy-net", features = ["http_server"] } ormx = { version = "0.7", features = ["postgres"]} diff --git a/backend/src/service/user_service/logged_user.rs b/backend/src/service/user_service/logged_user.rs index 973cef419a..40c06f3141 100644 --- a/backend/src/service/user_service/logged_user.rs +++ b/backend/src/service/user_service/logged_user.rs @@ -1,10 +1,8 @@ use crate::entities::token::{Claim, Token}; - +use actix_web::http::HeaderValue; use chrono::{DateTime, Utc}; use dashmap::DashMap; use flowy_net::errors::ServerError; - -use actix_web::http::HeaderValue; use lazy_static::lazy_static; lazy_static! { diff --git a/backend/src/service/ws_service/entities/connect.rs b/backend/src/service/ws_service/entities/connect.rs index 086441598b..3e2b998171 100644 --- a/backend/src/service/ws_service/entities/connect.rs +++ b/backend/src/service/ws_service/entities/connect.rs @@ -7,8 +7,17 @@ use std::fmt::Formatter; pub type Socket = Recipient; #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] -pub struct SessionId { - pub id: String, +pub struct SessionId(pub String); + +impl> std::convert::From for SessionId { + fn from(s: T) -> Self { SessionId(s.as_ref().to_owned()) } +} + +impl std::fmt::Display for SessionId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let desc = format!("{}", &self.0); + f.write_str(&desc) + } } pub struct Session { @@ -25,17 +34,6 @@ impl std::convert::From for Session { } } -impl SessionId { - pub fn new(id: String) -> Self { SessionId { id } } -} - -impl std::fmt::Display for SessionId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let desc = format!("{}", &self.id); - f.write_str(&desc) - } -} - #[derive(Debug, Message, Clone)] #[rtype(result = "Result<(), ServerError>")] pub struct Connect { diff --git a/backend/src/service/ws_service/entities/message.rs b/backend/src/service/ws_service/entities/message.rs index fa641b626b..486f854209 100644 --- a/backend/src/service/ws_service/entities/message.rs +++ b/backend/src/service/ws_service/entities/message.rs @@ -5,33 +5,36 @@ use std::fmt::Formatter; #[derive(Debug, Clone)] pub enum MessageData { - Text(String), Binary(Bytes), Connect(SessionId), - Disconnect(String), + Disconnect(SessionId), } #[derive(Debug, Message, Clone)] #[rtype(result = "()")] pub struct ClientMessage { - pub sid: SessionId, + pub session_id: SessionId, pub data: MessageData, } impl ClientMessage { - pub fn new(sid: SessionId, data: MessageData) -> Self { ClientMessage { sid, data } } + pub fn new>(session_id: T, data: MessageData) -> Self { + ClientMessage { + session_id: session_id.into(), + data, + } + } } impl std::fmt::Display for ClientMessage { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let content = match &self.data { - MessageData::Text(t) => format!("[Text]: {}", t), - MessageData::Binary(_) => "[Binary message]".to_owned(), - MessageData::Connect(_) => "Connect".to_owned(), - MessageData::Disconnect(_) => "Disconnect".to_owned(), + MessageData::Binary(_) => "[Binary]".to_owned(), + MessageData::Connect(_) => "[Connect]".to_owned(), + MessageData::Disconnect(_) => "[Disconnect]".to_owned(), }; - let desc = format!("{}:{}", &self.sid, content); + let desc = format!("{}:{}", &self.session_id, content); f.write_str(&desc) } } diff --git a/backend/src/service/ws_service/router.rs b/backend/src/service/ws_service/router.rs index e61617d87a..f97d059c33 100644 --- a/backend/src/service/ws_service/router.rs +++ b/backend/src/service/ws_service/router.rs @@ -20,10 +20,7 @@ pub async fn establish_ws_connection( ) -> Result { match LoggedUser::from_token(token.clone()) { Ok(user) => { - let client = WSClient::new( - SessionId::new(user.user_id.clone()), - server.get_ref().clone(), - ); + let client = WSClient::new(&user.user_id, server.get_ref().clone()); let result = ws::start(client, &request, payload); match result { Ok(response) => Ok(response.into()), diff --git a/backend/src/service/ws_service/ws_client.rs b/backend/src/service/ws_service/ws_client.rs index c6ef3bc7cf..0d6279d883 100644 --- a/backend/src/service/ws_service/ws_client.rs +++ b/backend/src/service/ws_service/ws_client.rs @@ -1,20 +1,3 @@ -use std::time::Instant; - -use actix::{ - fut, - Actor, - ActorContext, - ActorFutureExt, - Addr, - AsyncContext, - ContextFutureSpawner, - Handler, - Running, - StreamHandler, - WrapFuture, -}; -use actix_web_actors::{ws, ws::Message::Text}; - use crate::{ config::{HEARTBEAT_INTERVAL, PING_TIMEOUT}, service::ws_service::{ @@ -24,37 +7,57 @@ use crate::{ WSServer, }, }; +use actix::*; +use actix_web_actors::{ws, ws::Message::Text}; +use std::time::Instant; +// Frontend │ Backend +// +// │ +// ┌──────────┐ WsMessage ┌───────────┐ ClientMessage ┌──────────┐ +// │ user 1 │─────────┼────▶│ws_client_1│──────────────────▶│ws_server │ +// └──────────┘ └───────────┘ └──────────┘ +// │ │ +// WsMessage ▼ +// ┌──────────┐ │ ┌───────────┐ ClientMessage Group +// │ user 2 │◀──────────────│ws_client_2│◀───────┐ ┌───────────────┐ +// └──────────┘ │ └───────────┘ │ │ ws_user_1 │ +// │ │ │ +// │ └────────│ ws_user_2 │ +// ┌──────────┐ ┌───────────┐ │ │ +// │ user 3 │─────────┼────▶│ws_client_3│ └───────────────┘ +// └──────────┘ └───────────┘ +// │ pub struct WSClient { - sid: SessionId, + session_id: SessionId, server: Addr, hb: Instant, } impl WSClient { - pub fn new(sid: SessionId, server: Addr) -> Self { + pub fn new>(session_id: T, server: Addr) -> Self { Self { - sid, + session_id: session_id.into(), hb: Instant::now(), server, } } fn hb(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |ws_session, ctx| { - if Instant::now().duration_since(ws_session.hb) > PING_TIMEOUT { - ws_session.server.do_send(Disconnect { - sid: ws_session.sid.clone(), + ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| { + if Instant::now().duration_since(client.hb) > PING_TIMEOUT { + client.server.do_send(Disconnect { + sid: client.session_id.clone(), }); ctx.stop(); - return; + } else { + ctx.ping(b""); } - ctx.ping(b""); }); } fn send(&self, data: MessageData) { - let msg = ClientMessage::new(self.sid.clone(), data); + let msg = ClientMessage::new(self.session_id.clone(), data); self.server.do_send(msg); } } @@ -67,18 +70,16 @@ impl Actor for WSClient { let socket = ctx.address().recipient(); let connect = Connect { socket, - sid: self.sid.clone(), + sid: self.session_id.clone(), }; self.server .send(connect) .into_actor(self) - .then(|res, _ws_session, _ctx| { + .then(|res, client, _ctx| { match res { - Ok(Ok(_)) => {}, - Ok(Err(_e)) => { - unimplemented!() - }, - Err(_e) => unimplemented!(), + Ok(Ok(_)) => log::trace!("Send connect message to server success"), + Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e), + Err(e) => log::error!("Send connect message to server failed: {:?}", e), } fut::ready(()) }) @@ -87,7 +88,7 @@ impl Actor for WSClient { fn stopping(&mut self, _: &mut Self::Context) -> Running { self.server.do_send(Disconnect { - sid: self.sid.clone(), + sid: self.session_id.clone(), }); Running::Stop @@ -98,39 +99,33 @@ impl StreamHandler> for WSClient { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => { - log::debug!("Receive {} ping {:?}", &self.sid, &msg); self.hb = Instant::now(); ctx.pong(&msg); }, Ok(ws::Message::Pong(msg)) => { - log::debug!("Receive {} pong {:?}", &self.sid, &msg); - self.send(MessageData::Connect(self.sid.clone())); + log::debug!("Receive {} pong {:?}", &self.session_id, &msg); self.hb = Instant::now(); }, Ok(ws::Message::Binary(bin)) => { - log::debug!(" Receive {} binary", &self.sid); + log::debug!(" Receive {} binary", &self.session_id); self.send(MessageData::Binary(bin)); }, + Ok(Text(_)) => { + log::warn!("Receive unexpected text message"); + }, Ok(ws::Message::Close(reason)) => { - log::debug!("Receive {} close {:?}", &self.sid, &reason); + self.send(MessageData::Disconnect(self.session_id.clone())); ctx.close(reason); ctx.stop(); }, - Ok(ws::Message::Continuation(c)) => { - log::debug!("Receive {} continues message {:?}", &self.sid, &c); - }, - Ok(ws::Message::Nop) => { - log::debug!("Receive Nop message"); - }, - Ok(Text(s)) => { - log::debug!("Receive {} text {:?}", &self.sid, &s); - self.send(MessageData::Text(s.to_string())); - }, - + Ok(ws::Message::Continuation(_)) => {}, + Ok(ws::Message::Nop) => {}, Err(e) => { - let msg = format!("{} error: {:?}", &self.sid, e); - log::error!("stream {}", msg); - ctx.text(msg); + log::error!( + "[{}]: WebSocketStream protocol error {:?}", + self.session_id, + e + ); ctx.stop(); }, } @@ -142,21 +137,11 @@ impl Handler for WSClient { fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) { match msg.data { - MessageData::Text(text) => { - ctx.text(text); - }, MessageData::Binary(binary) => { ctx.binary(binary); }, - MessageData::Connect(sid) => { - let connect_msg = format!("{} connect", &sid); - ctx.text(connect_msg); - }, - MessageData::Disconnect(text) => { - log::debug!("Session start disconnecting {}", self.sid); - ctx.text(text); - ctx.stop(); - }, + MessageData::Connect(_) => {}, + MessageData::Disconnect(_) => {}, } } } diff --git a/backend/src/service/ws_service/ws_server.rs b/backend/src/service/ws_service/ws_server.rs index 4e0052d267..5d9e80e2f7 100644 --- a/backend/src/service/ws_service/ws_server.rs +++ b/backend/src/service/ws_service/ws_server.rs @@ -1,6 +1,7 @@ use crate::service::ws_service::{ entities::{Connect, Disconnect, Session, SessionId}, ClientMessage, + MessageData, }; use actix::{Actor, Context, Handler}; use dashmap::DashMap; @@ -46,7 +47,13 @@ impl Handler for WSServer { impl Handler for WSServer { type Result = (); - fn handle(&mut self, _msg: ClientMessage, _ctx: &mut Context) -> Self::Result {} + fn handle(&mut self, msg: ClientMessage, _ctx: &mut Context) -> Self::Result { + match msg.data { + MessageData::Binary(_) => {}, + MessageData::Connect(_) => {}, + MessageData::Disconnect(_) => {}, + } + } } impl actix::Supervised for WSServer { diff --git a/backend/tests/api/main.rs b/backend/tests/api/mod.rs similarity index 62% rename from backend/tests/api/main.rs rename to backend/tests/api/mod.rs index 5f27c3ca92..d5903daf2c 100644 --- a/backend/tests/api/main.rs +++ b/backend/tests/api/mod.rs @@ -1,5 +1,3 @@ mod auth; mod doc; -mod helper; mod workspace; -mod ws; diff --git a/backend/tests/api/helper.rs b/backend/tests/helper.rs similarity index 95% rename from backend/tests/api/helper.rs rename to backend/tests/helper.rs index 297980203e..a151e03e4d 100644 --- a/backend/tests/api/helper.rs +++ b/backend/tests/helper.rs @@ -164,7 +164,7 @@ impl TestServer { doc } - pub(crate) async fn register_user(&self) -> SignUpResponse { + pub async fn register_user(&self) -> SignUpResponse { let params = SignUpParams { email: "annie@appflowy.io".to_string(), name: "annie".to_string(), @@ -174,15 +174,15 @@ impl TestServer { self.register(params).await } - pub(crate) async fn register(&self, params: SignUpParams) -> SignUpResponse { + pub async fn register(&self, params: SignUpParams) -> SignUpResponse { let url = format!("{}/api/register", self.http_addr()); let response = user_sign_up_request(params, &url).await.unwrap(); response } - pub(crate) fn http_addr(&self) -> String { format!("http://{}", self.host) } + pub fn http_addr(&self) -> String { format!("http://{}", self.host) } - pub(crate) fn ws_addr(&self) -> String { + pub fn ws_addr(&self) -> String { format!( "ws://{}/ws/{}", self.host, @@ -265,7 +265,7 @@ async fn drop_test_database(database_name: String) { .expect("Failed to drop database."); } -pub(crate) async fn create_test_workspace(server: &TestServer) -> Workspace { +pub async fn create_test_workspace(server: &TestServer) -> Workspace { let params = CreateWorkspaceParams { name: "My first workspace".to_string(), desc: "This is my first workspace".to_string(), @@ -275,7 +275,7 @@ pub(crate) async fn create_test_workspace(server: &TestServer) -> Workspace { workspace } -pub(crate) async fn create_test_app(server: &TestServer, workspace_id: &str) -> App { +pub async fn create_test_app(server: &TestServer, workspace_id: &str) -> App { let params = CreateAppParams { workspace_id: workspace_id.to_owned(), name: "My first app".to_string(), @@ -287,7 +287,7 @@ pub(crate) async fn create_test_app(server: &TestServer, workspace_id: &str) -> app } -pub(crate) async fn create_test_view(application: &TestServer, app_id: &str) -> View { +pub async fn create_test_view(application: &TestServer, app_id: &str) -> View { let name = "My first view".to_string(); let desc = "This is my first view".to_string(); let thumbnail = "http://1.png".to_string(); diff --git a/backend/tests/main.rs b/backend/tests/main.rs new file mode 100644 index 0000000000..b0bc0cbca8 --- /dev/null +++ b/backend/tests/main.rs @@ -0,0 +1,3 @@ +mod api; +pub mod helper; +mod ws; diff --git a/backend/tests/api/ws.rs b/backend/tests/ws/helper.rs similarity index 87% rename from backend/tests/api/ws.rs rename to backend/tests/ws/helper.rs index 2a02aeceef..9d5c02647c 100644 --- a/backend/tests/api/ws.rs +++ b/backend/tests/ws/helper.rs @@ -69,15 +69,3 @@ impl WsScriptRunner { } } } - -#[actix_rt::test] -async fn ws_connect() { - let mut ws = WsTest::new(vec![ - WsScript::SendText("abc"), - WsScript::SendText("abc"), - WsScript::SendText("abc"), - WsScript::Disconnect("abc"), - ]) - .await; - ws.run_scripts().await -} diff --git a/backend/tests/ws/mod.rs b/backend/tests/ws/mod.rs new file mode 100644 index 0000000000..969e6e79b7 --- /dev/null +++ b/backend/tests/ws/mod.rs @@ -0,0 +1,2 @@ +mod helper; +mod ws; diff --git a/backend/tests/ws/ws.rs b/backend/tests/ws/ws.rs new file mode 100644 index 0000000000..aa71e1d126 --- /dev/null +++ b/backend/tests/ws/ws.rs @@ -0,0 +1,13 @@ +use crate::ws::helper::{WsScript, WsTest}; + +#[actix_rt::test] +async fn ws_connect() { + let mut ws = WsTest::new(vec![ + WsScript::SendText("abc"), + WsScript::SendText("abc"), + WsScript::SendText("abc"), + WsScript::Disconnect("close by user"), + ]) + .await; + ws.run_scripts().await +} diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index 0176c45d35..69f407fd33 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -18,10 +18,10 @@ use flowy_database::{ }; use flowy_infra::kv::KV; use flowy_sqlite::ConnectionPool; -use flowy_ws::{connect::Retry, WsController, WsMessage, WsMessageHandler}; +use flowy_ws::{connect::Retry, WsController, WsMessageHandler}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; pub struct UserSessionConfig { root_dir: String, diff --git a/rust-lib/flowy-ws/src/connect.rs b/rust-lib/flowy-ws/src/connect.rs index 7a4b384d2c..ebd644170b 100644 --- a/rust-lib/flowy-ws/src/connect.rs +++ b/rust-lib/flowy-ws/src/connect.rs @@ -1,19 +1,17 @@ use crate::{errors::WsError, MsgReceiver, MsgSender}; -use flowy_net::errors::ServerError; use futures_core::{future::BoxFuture, ready}; -use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use futures_util::{FutureExt, StreamExt}; use pin_project::pin_project; use std::{ fmt, future::Future, pin::Pin, - sync::Arc, task::{Context, Poll}, }; use tokio::net::TcpStream; use tokio_tungstenite::{ connect_async, - tungstenite::{handshake::client::Response, http::StatusCode, Error, Message}, + tungstenite::{handshake::client::Response, Error, Message}, MaybeTlsStream, WebSocketStream, }; @@ -160,7 +158,7 @@ where { type Output = (); - fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { (self.f)(&self.addr); Poll::Ready(()) diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 12a43480e0..3794cea199 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -1,28 +1,23 @@ -use crate::{connect::WsConnection, errors::WsError, WsMessage}; +use crate::{ + connect::{Retry, WsConnection}, + errors::WsError, + WsMessage, +}; use flowy_net::errors::ServerError; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; -use futures_core::{ready, Stream}; - -use crate::connect::Retry; -use bytes::Buf; -use futures_core::future::BoxFuture; +use futures_core::{future::BoxFuture, ready, Stream}; use pin_project::pin_project; use std::{ collections::HashMap, future::Future, - marker::PhantomData, pin::Pin, sync::Arc, task::{Context, Poll}, }; use tokio::{sync::RwLock, task::JoinHandle}; -use tokio_tungstenite::{ - tungstenite::{ - protocol::{frame::coding::CloseCode, CloseFrame}, - Message, - }, - MaybeTlsStream, - WebSocketStream, +use tokio_tungstenite::tungstenite::{ + protocol::{frame::coding::CloseCode, CloseFrame}, + Message, }; pub type MsgReceiver = UnboundedReceiver; @@ -188,18 +183,6 @@ impl Future for WsHandlers { } } -// impl WsSender for WsController { -// fn send_msg(&self, msg: WsMessage) -> Result<(), WsError> { -// match self.ws_tx.as_ref() { -// None => Err(WsError::internal().context("Should call make_connect -// first")), Some(sender) => { -// let _ = sender.unbounded_send(msg.into()).map_err(|e| -// WsError::internal().context(e))?; Ok(()) -// }, -// } -// } -// } - #[derive(Debug, Clone)] pub struct WsSender { ws_tx: MsgSender,