diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart index 8761104933..8316ff1b14 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart @@ -16,7 +16,7 @@ export 'ws.pbenum.dart'; class WsDocumentData extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') - ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDataType.Command, valueOf: WsDataType.valueOf, enumValues: WsDataType.values) + ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDataType.Acked, valueOf: WsDataType.valueOf, enumValues: WsDataType.values) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart index b862c5a613..b9dc902dee 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart @@ -10,11 +10,11 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; class WsDataType extends $pb.ProtobufEnum { - static const WsDataType Command = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Command'); + static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); static const WsDataType Delta = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Delta'); static const $core.List values = [ - Command, + Acked, Delta, ]; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart index a63f7a0ed2..d43216893c 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart @@ -12,13 +12,13 @@ import 'dart:typed_data' as $typed_data; const WsDataType$json = const { '1': 'WsDataType', '2': const [ - const {'1': 'Command', '2': 0}, + const {'1': 'Acked', '2': 0}, const {'1': 'Delta', '2': 1}, ], }; /// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgsKB0NvbW1hbmQQABIJCgVEZWx0YRAB'); +final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCQoFRGVsdGEQAQ=='); @$core.Deprecated('Use wsDocumentDataDescriptor instead') const WsDocumentData$json = const { '1': 'WsDocumentData', diff --git a/backend/Cargo.toml b/backend/Cargo.toml index ee2fd47d17..bf1e13913d 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -57,6 +57,7 @@ parking_lot = "0.11" md5 = "0.7.0" futures-core = { version = "0.3", default-features = false } pin-project = "1.0.0" +byteorder = {version = "1.3.4"} flowy-user = { path = "../rust-lib/flowy-user" } flowy-workspace = { path = "../rust-lib/flowy-workspace" } diff --git a/backend/src/application.rs b/backend/src/application.rs index ddd763cedf..e0551ffd13 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -21,7 +21,7 @@ use crate::{ view::router as view, workspace::router as workspace, ws, - ws::WSServer, + ws::WsServer, }, }; @@ -142,7 +142,7 @@ async fn init_app_context(configuration: &Settings) -> AppContext { configuration.database )); - let ws_server = WSServer::new().start(); + let ws_server = WsServer::new().start(); AppContext::new(ws_server, pg_pool) } diff --git a/backend/src/context.rs b/backend/src/context.rs index 5f6a48f44f..34ed0351a3 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -1,15 +1,15 @@ -use crate::service::ws::WSServer; +use crate::service::ws::WsServer; use actix::Addr; use sqlx::PgPool; pub struct AppContext { - pub ws_server: Addr, + pub ws_server: Addr, pub pg_pool: PgPool, } impl AppContext { - pub fn new(ws_server: Addr, db_pool: PgPool) -> Self { + pub fn new(ws_server: Addr, db_pool: PgPool) -> Self { AppContext { ws_server, pg_pool: db_pool, diff --git a/backend/src/service/doc/edit_doc.rs b/backend/src/service/doc/edit_doc.rs index 897e90a080..c042b3ddcc 100644 --- a/backend/src/service/doc/edit_doc.rs +++ b/backend/src/service/doc/edit_doc.rs @@ -1,14 +1,22 @@ -use crate::service::doc::update_doc; +use crate::service::{ + doc::update_doc, + ws::{entities::Socket, WsClientData, WsMessageAdaptor}, +}; use actix_web::web::Data; +use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use bytes::Bytes; use flowy_document::{ + entities::ws::{WsDataType, WsDocumentData}, protobuf::{Doc, Revision, UpdateDocParams}, services::doc::Document, }; use flowy_net::errors::{internal_error, ServerError}; use flowy_ot::core::Delta; +use flowy_ws::{protobuf::WsModule, WsMessage}; use parking_lot::RwLock; +use protobuf::Message; use sqlx::PgPool; -use std::{sync::Arc, time::Duration}; +use std::{convert::TryInto, sync::Arc, time::Duration}; pub(crate) struct EditDoc { doc_id: String, @@ -27,15 +35,31 @@ impl EditDoc { }) } - #[tracing::instrument(level = "debug", skip(self, revision))] - pub(crate) async fn apply_revision(&self, revision: Revision) -> Result<(), ServerError> { + #[tracing::instrument(level = "debug", skip(self, socket, revision))] + pub(crate) async fn apply_revision( + &self, + socket: Socket, + revision: Revision, + ) -> Result<(), ServerError> { let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?; match self.document.try_write_for(Duration::from_millis(300)) { None => { log::error!("Failed to acquire write lock of document"); }, - Some(mut w) => { - let _ = w.apply_delta(delta).map_err(internal_error)?; + Some(mut write_guard) => { + let _ = write_guard.apply_delta(delta).map_err(internal_error)?; + let mut wtr = vec![]; + let _ = wtr.write_i64::(revision.rev_id); + + let data = WsDocumentData { + id: self.doc_id.clone(), + ty: WsDataType::Acked, + data: wtr, + }; + + let msg: WsMessage = data.into(); + let bytes: Bytes = msg.try_into().unwrap(); + socket.do_send(WsMessageAdaptor(bytes)); }, } diff --git a/backend/src/service/doc/ws_handler.rs b/backend/src/service/doc/ws_handler.rs index 3c91456174..f1b670775c 100644 --- a/backend/src/service/doc/ws_handler.rs +++ b/backend/src/service/doc/ws_handler.rs @@ -1,5 +1,9 @@ use super::edit_doc::EditDoc; -use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler}; +use crate::service::{ + doc::read_doc, + util::parse_from_bytes, + ws::{WsBizHandler, WsClientData}, +}; use actix_web::web::Data; use bytes::Bytes; use flowy_document::{ @@ -13,22 +17,22 @@ use sqlx::PgPool; use std::{collections::HashMap, sync::Arc}; pub struct DocWsBizHandler { - inner: Arc, + doc_manager: Arc, } impl DocWsBizHandler { pub fn new(pg_pool: Data) -> Self { Self { - inner: Arc::new(Inner::new(pg_pool)), + doc_manager: Arc::new(EditDocManager::new(pg_pool)), } } } impl WsBizHandler for DocWsBizHandler { - fn receive_data(&self, data: Bytes) { - let inner = self.inner.clone(); + fn receive_data(&self, client_data: WsClientData) { + let doc_manager = self.doc_manager.clone(); actix_rt::spawn(async move { - let result = inner.handle(data).await; + let result = doc_manager.handle(client_data).await; match result { Ok(_) => {}, Err(e) => log::error!("WsBizHandler handle data error: {:?}", e), @@ -37,12 +41,12 @@ impl WsBizHandler for DocWsBizHandler { } } -struct Inner { +struct EditDocManager { pg_pool: Data, edit_docs: RwLock>>, } -impl Inner { +impl EditDocManager { fn new(pg_pool: Data) -> Self { Self { pg_pool, @@ -50,16 +54,22 @@ impl Inner { } } - async fn handle(&self, data: Bytes) -> Result<(), ServerError> { - let document_data: WsDocumentData = parse_from_bytes(&data)?; + async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> { + let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?; match document_data.ty { - WsDataType::Command => {}, + WsDataType::Acked => {}, WsDataType::Delta => { let revision: Revision = parse_from_bytes(&document_data.data)?; let edited_doc = self.get_edit_doc(&revision.doc_id).await?; tokio::spawn(async move { - edited_doc.apply_revision(revision).await.unwrap(); + match edited_doc + .apply_revision(client_data.socket, revision) + .await + { + Ok(_) => {}, + Err(e) => log::error!("Doc apply revision failed: {:?}", e), + } }); }, } diff --git a/backend/src/service/ws/biz_handler.rs b/backend/src/service/ws/biz_handler.rs index 16400f9fe9..709eb0f7ce 100644 --- a/backend/src/service/ws/biz_handler.rs +++ b/backend/src/service/ws/biz_handler.rs @@ -1,9 +1,10 @@ +use crate::service::ws::WsClientData; use bytes::Bytes; use flowy_ws::WsModule; use std::{collections::HashMap, sync::Arc}; pub trait WsBizHandler: Send + Sync { - fn receive_data(&self, data: Bytes); + fn receive_data(&self, client_data: WsClientData); } pub type BizHandler = Arc; diff --git a/backend/src/service/ws/entities/connect.rs b/backend/src/service/ws/entities/connect.rs index 06eef3e9a6..c4c55b6cec 100644 --- a/backend/src/service/ws/entities/connect.rs +++ b/backend/src/service/ws/entities/connect.rs @@ -1,10 +1,10 @@ -use crate::service::ws::ClientMessage; +use crate::service::ws::WsMessageAdaptor; use actix::{Message, Recipient}; use flowy_net::errors::ServerError; use serde::{Deserialize, Serialize}; use std::fmt::Formatter; -pub type Socket = Recipient; +pub type Socket = Recipient; #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] pub struct SessionId(pub String); diff --git a/backend/src/service/ws/entities/message.rs b/backend/src/service/ws/entities/message.rs index 406ca12df4..fafe66913b 100644 --- a/backend/src/service/ws/entities/message.rs +++ b/backend/src/service/ws/entities/message.rs @@ -3,38 +3,12 @@ use actix::Message; use bytes::Bytes; use std::fmt::Formatter; -#[derive(Debug, Clone)] -pub enum MessageData { - Binary(Bytes), - Connect(SessionId), - Disconnect(SessionId), -} - #[derive(Debug, Message, Clone)] #[rtype(result = "()")] -pub struct ClientMessage { - pub session_id: SessionId, - pub data: MessageData, -} +pub struct WsMessageAdaptor(pub Bytes); -impl ClientMessage { - pub fn new>(session_id: T, data: MessageData) -> Self { - ClientMessage { - session_id: session_id.into(), - data, - } - } -} +impl std::ops::Deref for WsMessageAdaptor { + type Target = Bytes; -impl std::fmt::Display for ClientMessage { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let content = match &self.data { - MessageData::Binary(_) => "[Binary]".to_owned(), - MessageData::Connect(_) => "[Connect]".to_owned(), - MessageData::Disconnect(_) => "[Disconnect]".to_owned(), - }; - - let desc = format!("{}:{}", &self.session_id, content); - f.write_str(&desc) - } + fn deref(&self) -> &Self::Target { &self.0 } } diff --git a/backend/src/service/ws/router.rs b/backend/src/service/ws/router.rs index 4f3522927a..bd88dee75a 100644 --- a/backend/src/service/ws/router.rs +++ b/backend/src/service/ws/router.rs @@ -1,4 +1,4 @@ -use crate::service::ws::{WSClient, WSServer, WsBizHandlers}; +use crate::service::ws::{WsBizHandlers, WsClient, WsServer}; use actix::Addr; use crate::service::user::LoggedUser; @@ -16,12 +16,12 @@ pub async fn establish_ws_connection( request: HttpRequest, payload: Payload, token: Path, - server: Data>, + server: Data>, biz_handlers: Data, ) -> Result { match LoggedUser::from_token(token.clone()) { Ok(user) => { - let client = WSClient::new(&user.user_id, server.get_ref().clone(), biz_handlers); + let client = WsClient::new(&user.user_id, server.get_ref().clone(), biz_handlers); let result = ws::start(client, &request, payload); match result { Ok(response) => Ok(response.into()), diff --git a/backend/src/service/ws/ws_client.rs b/backend/src/service/ws/ws_client.rs index 64538c538e..35cd713240 100644 --- a/backend/src/service/ws/ws_client.rs +++ b/backend/src/service/ws/ws_client.rs @@ -1,12 +1,11 @@ use crate::{ config::{HEARTBEAT_INTERVAL, PING_TIMEOUT}, service::ws::{ - entities::{Connect, Disconnect, SessionId}, - ClientMessage, - MessageData, - WSServer, + entities::{Connect, Disconnect, SessionId, Socket}, WsBizHandler, WsBizHandlers, + WsMessageAdaptor, + WsServer, }, }; use actix::*; @@ -16,17 +15,22 @@ use bytes::Bytes; use flowy_ws::WsMessage; use std::{convert::TryFrom, time::Instant}; -pub struct WSClient { +pub struct WsClientData { + pub(crate) socket: Socket, + pub(crate) data: Bytes, +} + +pub struct WsClient { session_id: SessionId, - server: Addr, + server: Addr, biz_handlers: Data, hb: Instant, } -impl WSClient { +impl WsClient { pub fn new>( session_id: T, - server: Addr, + server: Addr, biz_handlers: Data, ) -> Self { Self { @@ -50,24 +54,25 @@ impl WSClient { }); } - fn send(&self, data: MessageData) { - let msg = ClientMessage::new(self.session_id.clone(), data); - self.server.do_send(msg); - } - - fn handle_binary_message(&self, bytes: Bytes) { + fn handle_binary_message(&self, bytes: Bytes, socket: Socket) { // TODO: ok to unwrap? let message: WsMessage = WsMessage::try_from(bytes).unwrap(); match self.biz_handlers.get(&message.module) { None => { log::error!("Can't find the handler for {:?}", message.module); }, - Some(handler) => handler.receive_data(Bytes::from(message.data)), + Some(handler) => { + let client_data = WsClientData { + socket, + data: Bytes::from(message.data), + }; + handler.receive_data(client_data) + }, } } } -impl StreamHandler> for WSClient { +impl StreamHandler> for WsClient { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => { @@ -80,13 +85,13 @@ impl StreamHandler> for WSClient { }, Ok(ws::Message::Binary(bytes)) => { log::debug!(" Receive {} binary", &self.session_id); - self.handle_binary_message(bytes); + let socket = ctx.address().recipient(); + self.handle_binary_message(bytes, socket); }, Ok(Text(_)) => { log::warn!("Receive unexpected text message"); }, Ok(ws::Message::Close(reason)) => { - self.send(MessageData::Disconnect(self.session_id.clone())); ctx.close(reason); ctx.stop(); }, @@ -104,21 +109,13 @@ impl StreamHandler> for WSClient { } } -impl Handler for WSClient { +impl Handler for WsClient { type Result = (); - fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) { - match msg.data { - MessageData::Binary(binary) => { - ctx.binary(binary); - }, - MessageData::Connect(_) => {}, - MessageData::Disconnect(_) => {}, - } - } + fn handle(&mut self, msg: WsMessageAdaptor, ctx: &mut Self::Context) { ctx.binary(msg.0); } } -impl Actor for WSClient { +impl Actor for WsClient { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { diff --git a/backend/src/service/ws/ws_server.rs b/backend/src/service/ws/ws_server.rs index 318d1256a2..1cfe587760 100644 --- a/backend/src/service/ws/ws_server.rs +++ b/backend/src/service/ws/ws_server.rs @@ -1,32 +1,31 @@ use crate::service::ws::{ entities::{Connect, Disconnect, Session, SessionId}, - ClientMessage, - MessageData, + WsMessageAdaptor, }; use actix::{Actor, Context, Handler}; use dashmap::DashMap; use flowy_net::errors::ServerError; -pub struct WSServer { +pub struct WsServer { sessions: DashMap, } -impl WSServer { +impl WsServer { pub fn new() -> Self { Self { sessions: DashMap::new(), } } - pub fn send(&self, _msg: ClientMessage) { unimplemented!() } + pub fn send(&self, _msg: WsMessageAdaptor) { unimplemented!() } } -impl Actor for WSServer { +impl Actor for WsServer { type Context = Context; fn started(&mut self, _ctx: &mut Self::Context) {} } -impl Handler for WSServer { +impl Handler for WsServer { type Result = Result<(), ServerError>; fn handle(&mut self, msg: Connect, _ctx: &mut Context) -> Self::Result { let session: Session = msg.into(); @@ -36,7 +35,7 @@ impl Handler for WSServer { } } -impl Handler for WSServer { +impl Handler for WsServer { type Result = Result<(), ServerError>; fn handle(&mut self, msg: Disconnect, _: &mut Context) -> Self::Result { self.sessions.remove(&msg.sid); @@ -44,20 +43,16 @@ impl Handler for WSServer { } } -impl Handler for WSServer { +impl Handler for WsServer { type Result = (); - fn handle(&mut self, msg: ClientMessage, _ctx: &mut Context) -> Self::Result { - match msg.data { - MessageData::Binary(_) => {}, - MessageData::Connect(_) => {}, - MessageData::Disconnect(_) => {}, - } + fn handle(&mut self, _msg: WsMessageAdaptor, _ctx: &mut Context) -> Self::Result { + unimplemented!() } } -impl actix::Supervised for WSServer { - fn restarting(&mut self, _ctx: &mut Context) { +impl actix::Supervised for WsServer { + fn restarting(&mut self, _ctx: &mut Context) { log::warn!("restarting"); } } diff --git a/rust-lib/flowy-document/Cargo.toml b/rust-lib/flowy-document/Cargo.toml index 9e0e056af1..dbb327d015 100644 --- a/rust-lib/flowy-document/Cargo.toml +++ b/rust-lib/flowy-document/Cargo.toml @@ -14,6 +14,7 @@ flowy-database = { path = "../flowy-database" } flowy-infra = { path = "../flowy-infra" } flowy-observable = { path = "../flowy-observable" } flowy-ot = { path = "../flowy-ot" } +flowy-ws = { path = "../flowy-ws" } flowy-net = { path = "../flowy-net", features = ["flowy_request"] } @@ -37,6 +38,7 @@ serde_json = {version = "1.0"} chrono = "0.4.19" futures-core = { version = "0.3", default-features = false } md5 = "0.7.0" +byteorder = {version = "1.3.4"} [dev-dependencies] flowy-test = { path = "../flowy-test" } diff --git a/rust-lib/flowy-document/src/entities/ws/ws.rs b/rust-lib/flowy-document/src/entities/ws/ws.rs index 3c9ee911ed..0f2f409023 100644 --- a/rust-lib/flowy-document/src/entities/ws/ws.rs +++ b/rust-lib/flowy-document/src/entities/ws/ws.rs @@ -1,16 +1,17 @@ use crate::entities::doc::Revision; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use flowy_ws::{WsMessage, WsModule}; use std::convert::TryInto; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] pub enum WsDataType { - Command = 0, - Delta = 1, + Acked = 0, + Delta = 1, } impl std::default::Default for WsDataType { - fn default() -> Self { WsDataType::Command } + fn default() -> Self { WsDataType::Acked } } #[derive(ProtoBuf, Default, Debug, Clone)] @@ -37,3 +38,14 @@ impl std::convert::From for WsDocumentData { } } } + +impl std::convert::Into for WsDocumentData { + fn into(self) -> WsMessage { + let bytes: Bytes = self.try_into().unwrap(); + let msg = WsMessage { + module: WsModule::Doc, + data: bytes.to_vec(), + }; + msg + } +} diff --git a/rust-lib/flowy-document/src/protobuf/model/ws.rs b/rust-lib/flowy-document/src/protobuf/model/ws.rs index 3b83649d53..5a6fced126 100644 --- a/rust-lib/flowy-document/src/protobuf/model/ws.rs +++ b/rust-lib/flowy-document/src/protobuf/model/ws.rs @@ -78,7 +78,7 @@ impl WsDocumentData { self.ty } pub fn clear_ty(&mut self) { - self.ty = WsDataType::Command; + self.ty = WsDataType::Acked; } // Param is passed by value, moved @@ -146,7 +146,7 @@ impl ::protobuf::Message for WsDocumentData { if !self.id.is_empty() { my_size += ::protobuf::rt::string_size(1, &self.id); } - if self.ty != WsDataType::Command { + if self.ty != WsDataType::Acked { my_size += ::protobuf::rt::enum_size(2, self.ty); } if !self.data.is_empty() { @@ -161,7 +161,7 @@ impl ::protobuf::Message for WsDocumentData { if !self.id.is_empty() { os.write_string(1, &self.id)?; } - if self.ty != WsDataType::Command { + if self.ty != WsDataType::Acked { os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; } if !self.data.is_empty() { @@ -237,7 +237,7 @@ impl ::protobuf::Message for WsDocumentData { impl ::protobuf::Clear for WsDocumentData { fn clear(&mut self) { self.id.clear(); - self.ty = WsDataType::Command; + self.ty = WsDataType::Acked; self.data.clear(); self.unknown_fields.clear(); } @@ -257,7 +257,7 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData { #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum WsDataType { - Command = 0, + Acked = 0, Delta = 1, } @@ -268,7 +268,7 @@ impl ::protobuf::ProtobufEnum for WsDataType { fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(WsDataType::Command), + 0 => ::std::option::Option::Some(WsDataType::Acked), 1 => ::std::option::Option::Some(WsDataType::Delta), _ => ::std::option::Option::None } @@ -276,7 +276,7 @@ impl ::protobuf::ProtobufEnum for WsDataType { fn values() -> &'static [Self] { static values: &'static [WsDataType] = &[ - WsDataType::Command, + WsDataType::Acked, WsDataType::Delta, ]; values @@ -295,7 +295,7 @@ impl ::std::marker::Copy for WsDataType { impl ::std::default::Default for WsDataType { fn default() -> Self { - WsDataType::Command + WsDataType::Acked } } @@ -308,8 +308,8 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\ R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\ - \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*$\n\nWsDataType\x12\x0b\n\ - \x07Command\x10\0\x12\t\n\x05Delta\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\ + \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\"\n\nWsDataType\x12\t\n\ + \x05Acked\x10\0\x12\t\n\x05Delta\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\ \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\ \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\ \x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\ @@ -321,10 +321,10 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\ \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\ \x07\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\ - \x02\0\x12\x03\x08\x04\x10\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\ - \x0b\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0e\x0f\n\x0b\n\x04\x05\0\ - \x02\x01\x12\x03\t\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\t\ - \n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\x0c\rb\x06proto3\ + \x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\ + \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ + \x12\x03\t\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\t\n\x0c\n\ + \x05\x05\0\x02\x01\x02\x12\x03\t\x0c\rb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/ws.proto b/rust-lib/flowy-document/src/protobuf/proto/ws.proto index d50b40cdd4..9496f06212 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/ws.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/ws.proto @@ -6,6 +6,6 @@ message WsDocumentData { bytes data = 3; } enum WsDataType { - Command = 0; + Acked = 0; Delta = 1; } diff --git a/rust-lib/flowy-document/src/services/doc/edit_context.rs b/rust-lib/flowy-document/src/services/doc/edit_context.rs index 4796582ae5..0498c23a46 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_context.rs @@ -67,7 +67,7 @@ impl EditDocContext { // Opti: it is necessary to save the rev if send success? let md5 = format!("{:x}", md5::compute(json)); let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5, self.id.clone().into()); - self.save_revision(revision.clone(), pool.clone()); + let _ = self.save_revision(revision.clone(), pool.clone())?; match self.ws.send(revision.into()) { Ok(_) => { // TODO: remove the rev if send success @@ -84,11 +84,11 @@ impl EditDocContext { impl EditDocContext { fn save_revision(&self, revision: Revision, pool: Arc) -> Result<(), DocError> { let conn = &*pool.get().map_err(internal_error)?; - conn.immediate_transaction::<_, DocError, _>(|| { - let op_table: OpTable = revision.into(); - let _ = self.op_sql.create_op_table(op_table, conn)?; - Ok(()) - })?; + // conn.immediate_transaction::<_, DocError, _>(|| { + // let op_table: OpTable = revision.into(); + // let _ = self.op_sql.create_op_table(op_table, conn)?; + // Ok(()) + // })?; Ok(()) } @@ -103,11 +103,16 @@ impl EditDocContext { } } +use byteorder::{BigEndian, ReadBytesExt}; +use std::io::Cursor; impl WsDocumentHandler for EditDocContext { - fn receive(&self, data: WsDocumentData) { - match data.ty { + fn receive(&self, doc_data: WsDocumentData) { + match doc_data.ty { WsDataType::Delta => {}, - WsDataType::Command => {}, + WsDataType::Acked => { + let mut rdr = Cursor::new(doc_data.data); + let rev = rdr.read_i64::().unwrap(); + }, } } } diff --git a/rust-lib/flowy-infra/src/macros.rs b/rust-lib/flowy-infra/src/macros.rs index 49bf43e54c..87b1bee097 100644 --- a/rust-lib/flowy-infra/src/macros.rs +++ b/rust-lib/flowy-infra/src/macros.rs @@ -1,5 +1,5 @@ #[macro_export] -macro_rules! dispatch_future { +macro_rules! wrap_future_fn { ($fut:expr) => { ClosureFuture { fut: Box::pin(async move { $fut.await }), diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 7061879005..c9ec1f1288 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -73,11 +73,7 @@ struct WsSenderImpl { impl WsDocumentSender for WsSenderImpl { fn send(&self, data: WsDocumentData) -> Result<(), DocError> { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WsMessage { - module: WsModule::Doc, - data: bytes.to_vec(), - }; + let msg: WsMessage = data.into(); let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?; Ok(()) } diff --git a/rust-lib/flowy-ws/src/connect.rs b/rust-lib/flowy-ws/src/connect.rs index 4b2ff15aed..ad7a78113b 100644 --- a/rust-lib/flowy-ws/src/connect.rs +++ b/rust-lib/flowy-ws/src/connect.rs @@ -130,7 +130,9 @@ fn post_message(tx: MsgSender, message: Result) { Err(e) => log::error!("tx send error: {:?}", e), }, Ok(_) => {}, - Err(e) => log::error!("ws read error: {:?}", e), + Err(e) => { + log::error!("ws read error: {:?}", e) + }, } } diff --git a/rust-lib/flowy-ws/src/msg.rs b/rust-lib/flowy-ws/src/msg.rs index bc6fb1a212..72b45373c2 100644 --- a/rust-lib/flowy-ws/src/msg.rs +++ b/rust-lib/flowy-ws/src/msg.rs @@ -1,7 +1,8 @@ +use crate::errors::WsError; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use std::convert::{TryFrom, TryInto}; -use tokio_tungstenite::tungstenite::Message as TokioMessage; +use tokio_tungstenite::tungstenite::{Message as TokioMessage, Message}; // Opti: using four bytes of the data to represent the source #[derive(ProtoBuf, Debug, Clone, Default)] @@ -42,15 +43,3 @@ impl std::convert::Into for WsMessage { } } } - -impl std::convert::From for WsMessage { - fn from(value: TokioMessage) -> Self { - match value { - TokioMessage::Binary(bytes) => WsMessage::try_from(Bytes::from(bytes)).unwrap(), - _ => { - log::error!("WsMessage deserialize failed. Unsupported message"); - WsMessage::default() - }, - } - } -} diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index f2274cc3a2..c2187e2f04 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -4,12 +4,14 @@ use crate::{ WsMessage, WsModule, }; +use bytes::Bytes; use flowy_net::errors::ServerError; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_core::{future::BoxFuture, ready, Stream}; use pin_project::pin_project; use std::{ collections::HashMap, + convert::{Infallible, TryFrom}, future::Future, pin::Pin, sync::Arc, @@ -169,6 +171,26 @@ pub struct WsHandlerFuture { impl WsHandlerFuture { fn new(handlers: HashMap>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } + + fn handler_ws_message(&self, message: Message) { + match message { + Message::Binary(bytes) => self.handle_binary_message(bytes), + _ => {}, + } + } + + fn handle_binary_message(&self, bytes: Vec) { + let bytes = Bytes::from(bytes); + match WsMessage::try_from(bytes) { + Ok(message) => match self.handlers.get(&message.module) { + None => log::error!("Can't find any handler for message: {:?}", message), + Some(handler) => handler.receive_message(message.clone()), + }, + Err(e) => { + log::error!("Deserialize binary ws message failed: {:?}", e); + }, + } + } } impl Future for WsHandlerFuture { @@ -179,13 +201,7 @@ impl Future for WsHandlerFuture { None => { return Poll::Ready(()); }, - Some(message) => { - let message = WsMessage::from(message); - match self.handlers.get(&message.module) { - None => log::error!("Can't find any handler for message: {:?}", message), - Some(handler) => handler.receive_message(message.clone()), - } - }, + Some(message) => self.handler_ws_message(message), } } }