diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart index b2c9812de0..a40dafffca 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart @@ -12,10 +12,12 @@ import 'package:protobuf/protobuf.dart' as $pb; class WsDataType extends $pb.ProtobufEnum { static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); static const WsDataType Rev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Rev'); + static const WsDataType Conflict = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict'); static const $core.List values = [ Acked, Rev, + Conflict, ]; static final $core.Map<$core.int, WsDataType> _byValue = $pb.ProtobufEnum.initByValue(values); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart index 41a08e287a..047cc5f579 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart @@ -14,11 +14,12 @@ const WsDataType$json = const { '2': const [ const {'1': 'Acked', '2': 0}, const {'1': 'Rev', '2': 1}, + const {'1': 'Conflict', '2': 2}, ], }; /// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASBwoDUmV2EAE='); +final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASBwoDUmV2EAESDAoIQ29uZmxpY3QQAg=='); @$core.Deprecated('Use wsDocumentDataDescriptor instead') const WsDocumentData$json = const { '1': 'WsDocumentData', diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pb.dart index 23c5cb8fc4..6ca0ce8790 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pb.dart @@ -319,64 +319,3 @@ class UpdateViewParams extends $pb.GeneratedMessage { void clearIsTrash() => clearField(5); } -class DocDeltaRequest extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocDeltaRequest', createEmptyInstance: create) - ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'viewId') - ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) - ..hasRequiredFields = false - ; - - DocDeltaRequest._() : super(); - factory DocDeltaRequest({ - $core.String? viewId, - $core.List<$core.int>? data, - }) { - final _result = create(); - if (viewId != null) { - _result.viewId = viewId; - } - if (data != null) { - _result.data = data; - } - return _result; - } - factory DocDeltaRequest.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory DocDeltaRequest.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); - @$core.Deprecated( - 'Using this can add significant overhead to your binary. ' - 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' - 'Will be removed in next major version') - DocDeltaRequest clone() => DocDeltaRequest()..mergeFromMessage(this); - @$core.Deprecated( - 'Using this can add significant overhead to your binary. ' - 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' - 'Will be removed in next major version') - DocDeltaRequest copyWith(void Function(DocDeltaRequest) updates) => super.copyWith((message) => updates(message as DocDeltaRequest)) as DocDeltaRequest; // ignore: deprecated_member_use - $pb.BuilderInfo get info_ => _i; - @$core.pragma('dart2js:noInline') - static DocDeltaRequest create() => DocDeltaRequest._(); - DocDeltaRequest createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); - @$core.pragma('dart2js:noInline') - static DocDeltaRequest getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static DocDeltaRequest? _defaultInstance; - - @$pb.TagNumber(1) - $core.String get viewId => $_getSZ(0); - @$pb.TagNumber(1) - set viewId($core.String v) { $_setString(0, v); } - @$pb.TagNumber(1) - $core.bool hasViewId() => $_has(0); - @$pb.TagNumber(1) - void clearViewId() => clearField(1); - - @$pb.TagNumber(2) - $core.List<$core.int> get data => $_getN(1); - @$pb.TagNumber(2) - set data($core.List<$core.int> v) { $_setBytes(1, v); } - @$pb.TagNumber(2) - $core.bool hasData() => $_has(1); - @$pb.TagNumber(2) - void clearData() => clearField(2); -} - diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pbjson.dart index 375680fc3a..860b0f05eb 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/view_update.pbjson.dart @@ -48,14 +48,3 @@ const UpdateViewParams$json = const { /// Descriptor for `UpdateViewParams`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List updateViewParamsDescriptor = $convert.base64Decode('ChBVcGRhdGVWaWV3UGFyYW1zEhcKB3ZpZXdfaWQYASABKAlSBnZpZXdJZBIUCgRuYW1lGAIgASgJSABSBG5hbWUSFAoEZGVzYxgDIAEoCUgBUgRkZXNjEh4KCXRodW1ibmFpbBgEIAEoCUgCUgl0aHVtYm5haWwSGwoIaXNfdHJhc2gYBSABKAhIA1IHaXNUcmFzaEINCgtvbmVfb2ZfbmFtZUINCgtvbmVfb2ZfZGVzY0ISChBvbmVfb2ZfdGh1bWJuYWlsQhEKD29uZV9vZl9pc190cmFzaA=='); -@$core.Deprecated('Use docDeltaRequestDescriptor instead') -const DocDeltaRequest$json = const { - '1': 'DocDeltaRequest', - '2': const [ - const {'1': 'view_id', '3': 1, '4': 1, '5': 9, '10': 'viewId'}, - const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'}, - ], -}; - -/// Descriptor for `DocDeltaRequest`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List docDeltaRequestDescriptor = $convert.base64Decode('Cg9Eb2NEZWx0YVJlcXVlc3QSFwoHdmlld19pZBgBIAEoCVIGdmlld0lkEhIKBGRhdGEYAiABKAxSBGRhdGE='); diff --git a/backend/src/service/doc/edit_doc_context.rs b/backend/src/service/doc/edit_doc_context.rs index 9b324c6b29..7e922e5190 100644 --- a/backend/src/service/doc/edit_doc_context.rs +++ b/backend/src/service/doc/edit_doc_context.rs @@ -1,11 +1,12 @@ use crate::service::{ doc::update_doc, util::md5, - ws::{entities::Socket, WsMessageAdaptor}, + ws::{entities::Socket, WsClientData, WsMessageAdaptor, WsUser}, }; use actix_web::web::Data; use byteorder::{BigEndian, WriteBytesExt}; use bytes::Bytes; +use dashmap::DashMap; use flowy_document::{ entities::ws::{WsDataType, WsDocumentData}, protobuf::{Doc, RevType, Revision, UpdateDocParams}, @@ -20,48 +21,93 @@ use flowy_ws::WsMessage; use parking_lot::RwLock; use protobuf::Message; use sqlx::PgPool; -use std::{convert::TryInto, sync::Arc, time::Duration}; +use std::{ + convert::TryInto, + sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, + }, + time::Duration, +}; + +struct EditUser { + user: Arc, + socket: Socket, +} pub(crate) struct EditDocContext { doc_id: String, - rev_id: i64, + rev_id: AtomicI64, document: Arc>, pg_pool: Data, + users: DashMap, } impl EditDocContext { pub(crate) fn new(doc: Doc, pg_pool: Data) -> Result { let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?; let document = Arc::new(RwLock::new(Document::from_delta(delta))); + let users = DashMap::new(); Ok(Self { doc_id: doc.id.clone(), - rev_id: doc.rev_id, + rev_id: AtomicI64::new(doc.rev_id), document, pg_pool, + users, }) } - #[tracing::instrument(level = "debug", skip(self, socket, revision))] - pub(crate) async fn apply_revision(&self, socket: Socket, revision: Revision) -> Result<(), ServerError> { + #[tracing::instrument(level = "debug", skip(self, client_data, revision))] + pub(crate) async fn apply_revision( + &self, + client_data: WsClientData, + revision: Revision, + ) -> Result<(), ServerError> { let _ = self.verify_md5(&revision)?; + // Rest EditUser for each client websocket message to keep the socket available. + let user = EditUser { + user: client_data.user.clone(), + socket: client_data.socket.clone(), + }; + self.users.insert(client_data.user.id().to_owned(), user); - if self.rev_id > revision.rev_id { - let (cli_prime, server_prime) = self.compose(&revision.delta).map_err(internal_error)?; + log::debug!( + "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}", + self.rev_id.load(SeqCst), + revision.base_rev_id, + revision.rev_id + ); + + let cli_socket = client_data.socket; + let cur_rev_id = self.rev_id.load(SeqCst); + // Transform the revision if client rev_id less than server rev_id. Sending the + // prime delta to client. + if cur_rev_id > revision.rev_id { + let (cli_prime, server_prime) = self.transform(&revision.delta).map_err(internal_error)?; let _ = self.update_document_delta(server_prime)?; log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json()); let cli_revision = self.mk_revision(revision.rev_id, cli_prime); let ws_cli_revision = mk_rev_ws_message(&self.doc_id, cli_revision); - socket.do_send(ws_cli_revision).map_err(internal_error)?; + cli_socket.do_send(ws_cli_revision).map_err(internal_error)?; + Ok(()) + } else if cur_rev_id < revision.rev_id { + if cur_rev_id != revision.base_rev_id { + let missing_rev_range = revision.rev_id - cur_rev_id; + // TODO: pull the missing revs from client + } else { + let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?; + let _ = self.update_document_delta(delta)?; + cli_socket.do_send(mk_acked_ws_message(&revision)); + self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); + + // Opti: save with multiple revisions + let _ = self.save_revision(&revision).await?; + } + Ok(()) } else { - let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?; - let _ = self.update_document_delta(delta)?; - socket.do_send(mk_acked_ws_message(&revision)); - - // Opti: save with multiple revisions - let _ = self.save_revision(&revision).await?; - Ok(()) + log::error!("Client rev_id should not equal to server rev_id"); } } @@ -70,7 +116,7 @@ impl EditDocContext { let md5 = md5(&delta_data); let revision = Revision { base_rev_id, - rev_id: self.rev_id, + rev_id: self.rev_id.load(SeqCst), delta: delta_data, md5, doc_id: self.doc_id.to_string(), @@ -81,10 +127,12 @@ impl EditDocContext { } #[tracing::instrument(level = "debug", skip(self, delta_data))] - fn compose(&self, delta_data: &Vec) -> Result<(Delta, Delta), OTError> { - log::debug!("{} document data: {}", self.doc_id, self.document.read().to_json()); + fn transform(&self, delta_data: &Vec) -> Result<(Delta, Delta), OTError> { + log::debug!("Document: {}", self.document.read().to_json()); let doc_delta = self.document.read().delta().clone(); let cli_delta = Delta::from_bytes(delta_data)?; + + log::debug!("Compose delta: {}", cli_delta); let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?; Ok((cli_prime, server_prime)) @@ -99,8 +147,7 @@ impl EditDocContext { }, Some(mut write_guard) => { let _ = write_guard.compose_delta(&delta).map_err(internal_error)?; - - log::debug!("Document: {}", write_guard.to_plain_string()); + log::debug!("Document: {}", write_guard.to_json()); }, } Ok(()) diff --git a/backend/src/service/doc/ws_handler.rs b/backend/src/service/doc/ws_handler.rs index 18f9f18642..b4f8d521bd 100644 --- a/backend/src/service/doc/ws_handler.rs +++ b/backend/src/service/doc/ws_handler.rs @@ -6,6 +6,7 @@ use crate::service::{ }; use actix_web::web::Data; +use crate::service::ws::WsUser; use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData}; use flowy_net::errors::ServerError; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; @@ -53,22 +54,19 @@ impl EditDocManager { async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> { let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?; - match document_data.ty { WsDataType::Acked => {}, WsDataType::Rev => { let revision: Revision = parse_from_bytes(&document_data.data)?; let edited_doc = self.get_edit_doc(&revision.doc_id).await?; tokio::spawn(async move { - match edited_doc - .apply_revision(client_data.socket, revision) - .await - { + match edited_doc.apply_revision(client_data, revision).await { Ok(_) => {}, Err(e) => log::error!("Doc apply revision failed: {:?}", e), } }); }, + _ => {}, } Ok(()) diff --git a/backend/src/service/ws/router.rs b/backend/src/service/ws/router.rs index bd88dee75a..170b52a375 100644 --- a/backend/src/service/ws/router.rs +++ b/backend/src/service/ws/router.rs @@ -1,4 +1,4 @@ -use crate::service::ws::{WsBizHandlers, WsClient, WsServer}; +use crate::service::ws::{WsBizHandlers, WsClient, WsServer, WsUser}; use actix::Addr; use crate::service::user::LoggedUser; @@ -21,7 +21,8 @@ pub async fn establish_ws_connection( ) -> Result { match LoggedUser::from_token(token.clone()) { Ok(user) => { - let client = WsClient::new(&user.user_id, server.get_ref().clone(), biz_handlers); + let ws_user = WsUser::new(user.clone()); + let client = WsClient::new(ws_user, server.get_ref().clone(), biz_handlers); let result = ws::start(client, &request, payload); match result { Ok(response) => Ok(response.into()), diff --git a/backend/src/service/ws/ws_client.rs b/backend/src/service/ws/ws_client.rs index 35cd713240..b173026eb4 100644 --- a/backend/src/service/ws/ws_client.rs +++ b/backend/src/service/ws/ws_client.rs @@ -1,11 +1,14 @@ use crate::{ config::{HEARTBEAT_INTERVAL, PING_TIMEOUT}, - service::ws::{ - entities::{Connect, Disconnect, SessionId, Socket}, - WsBizHandler, - WsBizHandlers, - WsMessageAdaptor, - WsServer, + service::{ + user::LoggedUser, + ws::{ + entities::{Connect, Disconnect, SessionId, Socket}, + WsBizHandler, + WsBizHandlers, + WsMessageAdaptor, + WsServer, + }, }, }; use actix::*; @@ -13,28 +16,35 @@ use actix_web::web::Data; use actix_web_actors::{ws, ws::Message::Text}; use bytes::Bytes; use flowy_ws::WsMessage; -use std::{convert::TryFrom, time::Instant}; +use std::{convert::TryFrom, sync::Arc, time::Instant}; + +pub struct WsUser { + inner: LoggedUser, +} + +impl WsUser { + pub fn new(inner: LoggedUser) -> Self { Self { inner } } + + pub fn id(&self) -> &str { &self.inner.user_id } +} pub struct WsClientData { + pub(crate) user: Arc, pub(crate) socket: Socket, pub(crate) data: Bytes, } pub struct WsClient { - session_id: SessionId, + user: Arc, server: Addr, biz_handlers: Data, hb: Instant, } impl WsClient { - pub fn new>( - session_id: T, - server: Addr, - biz_handlers: Data, - ) -> Self { + pub fn new(user: WsUser, server: Addr, biz_handlers: Data) -> Self { Self { - session_id: session_id.into(), + user: Arc::new(user), server, biz_handlers, hb: Instant::now(), @@ -45,7 +55,7 @@ impl WsClient { ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| { if Instant::now().duration_since(client.hb) > PING_TIMEOUT { client.server.do_send(Disconnect { - sid: client.session_id.clone(), + sid: client.user.id().into(), }); ctx.stop(); } else { @@ -63,6 +73,7 @@ impl WsClient { }, Some(handler) => { let client_data = WsClientData { + user: self.user.clone(), socket, data: Bytes::from(message.data), }; @@ -84,7 +95,6 @@ impl StreamHandler> for WsClient { self.hb = Instant::now(); }, Ok(ws::Message::Binary(bytes)) => { - log::debug!(" Receive {} binary", &self.session_id); let socket = ctx.address().recipient(); self.handle_binary_message(bytes, socket); }, @@ -98,11 +108,7 @@ impl StreamHandler> for WsClient { Ok(ws::Message::Continuation(_)) => {}, Ok(ws::Message::Nop) => {}, Err(e) => { - log::error!( - "[{}]: WebSocketStream protocol error {:?}", - self.session_id, - e - ); + log::error!("[{}]: WebSocketStream protocol error {:?}", self.user.id(), e); ctx.stop(); }, } @@ -123,7 +129,7 @@ impl Actor for WsClient { let socket = ctx.address().recipient(); let connect = Connect { socket, - sid: self.session_id.clone(), + sid: self.user.id().into(), }; self.server .send(connect) @@ -141,7 +147,7 @@ impl Actor for WsClient { fn stopping(&mut self, _: &mut Self::Context) -> Running { self.server.do_send(Disconnect { - sid: self.session_id.clone(), + sid: self.user.id().into(), }); Running::Stop diff --git a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs index f5d522689d..f07dd625bf 100644 --- a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -41,7 +41,6 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "CurrentWorkspace" | "UpdateViewRequest" | "UpdateViewParams" - | "DocDeltaRequest" | "DeleteViewRequest" | "DeleteViewParams" | "QueryViewRequest" diff --git a/rust-lib/flowy-document/src/entities/ws/ws.rs b/rust-lib/flowy-document/src/entities/ws/ws.rs index ab553cabf5..55b03e6d41 100644 --- a/rust-lib/flowy-document/src/entities/ws/ws.rs +++ b/rust-lib/flowy-document/src/entities/ws/ws.rs @@ -6,8 +6,9 @@ use std::convert::TryInto; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] pub enum WsDataType { - Acked = 0, - Rev = 1, + Acked = 0, + Rev = 1, + Conflict = 2, } impl std::default::Default for WsDataType { diff --git a/rust-lib/flowy-document/src/protobuf/model/ws.rs b/rust-lib/flowy-document/src/protobuf/model/ws.rs index 8273341aac..396c89d978 100644 --- a/rust-lib/flowy-document/src/protobuf/model/ws.rs +++ b/rust-lib/flowy-document/src/protobuf/model/ws.rs @@ -259,6 +259,7 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData { pub enum WsDataType { Acked = 0, Rev = 1, + Conflict = 2, } impl ::protobuf::ProtobufEnum for WsDataType { @@ -270,6 +271,7 @@ impl ::protobuf::ProtobufEnum for WsDataType { match value { 0 => ::std::option::Option::Some(WsDataType::Acked), 1 => ::std::option::Option::Some(WsDataType::Rev), + 2 => ::std::option::Option::Some(WsDataType::Conflict), _ => ::std::option::Option::None } } @@ -278,6 +280,7 @@ impl ::protobuf::ProtobufEnum for WsDataType { static values: &'static [WsDataType] = &[ WsDataType::Acked, WsDataType::Rev, + WsDataType::Conflict, ]; values } @@ -308,23 +311,26 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\ R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\ - \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\x20\n\nWsDataType\x12\t\n\ - \x05Acked\x10\0\x12\x07\n\x03Rev\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\ - \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\ - \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\ - \x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\ - \x05\x04\0\x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\ - \x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\ - \x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\ - \x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\ - \x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\ - \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\ - \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\ - \x07\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\ - \x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\ - \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ - \x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x07\n\x0c\ - \n\x05\x05\0\x02\x01\x02\x12\x03\t\n\x0bb\x06proto3\ + \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*.\n\nWsDataType\x12\t\n\x05\ + Acked\x10\0\x12\x07\n\x03Rev\x10\x01\x12\x0c\n\x08Conflict\x10\x02J\xe2\ + \x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ + \x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\ + \x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\ + \x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\ + \x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\x01\x12\ + \x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\ + \n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\ + \x03\x12\x03\x04\x14\x15\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\ + \x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\ + \x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\ + \n\n\n\x02\x05\0\x12\x04\x07\0\x0b\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\ + \x05\x0f\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\ + \x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\ + \r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\ + \x01\x12\x03\t\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\n\x0b\n\ + \x0b\n\x04\x05\0\x02\x02\x12\x03\n\x04\x11\n\x0c\n\x05\x05\0\x02\x02\x01\ + \x12\x03\n\x04\x0c\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\n\x0f\x10b\x06p\ + roto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/ws.proto b/rust-lib/flowy-document/src/protobuf/proto/ws.proto index a80f4ddac9..36a7b051b6 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/ws.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/ws.proto @@ -8,4 +8,5 @@ message WsDocumentData { enum WsDataType { Acked = 0; Rev = 1; + Conflict = 2; } diff --git a/rust-lib/flowy-document/src/services/cache.rs b/rust-lib/flowy-document/src/services/cache.rs index 2740ad2af0..361db88550 100644 --- a/rust-lib/flowy-document/src/services/cache.rs +++ b/rust-lib/flowy-document/src/services/cache.rs @@ -22,7 +22,7 @@ impl DocCache { } pub(crate) fn set(&self, doc: Arc) { - let doc_id = doc.id.clone(); + let doc_id = doc.doc_id.clone(); if self.inner.contains_key(&doc_id) { log::warn!("Doc:{} already exists in cache", &doc_id); } diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index 9fe2f061e2..a734e43509 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -1,5 +1,5 @@ use crate::{ - entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams, UpdateDocParams}, + entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams}, errors::{internal_error, DocError}, module::DocumentUser, services::{ @@ -21,7 +21,6 @@ use tokio::time::{interval, Duration}; pub(crate) struct DocController { server: Server, doc_sql: Arc, - op_sql: Arc, ws: Arc>, cache: Arc, user: Arc, @@ -30,24 +29,14 @@ pub(crate) struct DocController { impl DocController { pub(crate) fn new(server: Server, user: Arc, ws: Arc>) -> Self { let doc_sql = Arc::new(DocTableSql {}); - let op_sql = Arc::new(OpTableSql {}); let cache = Arc::new(DocCache::new()); - let controller = Self { server, doc_sql, - op_sql, user, ws, cache: cache.clone(), }; - - // tokio::spawn(async move { - // tokio::select! { - // _ = event_loop(cache.clone()) => {}, - // } - // }); - controller } @@ -69,10 +58,8 @@ impl DocController { pool: Arc, ) -> Result, DocError> { if self.cache.is_opened(¶ms.doc_id) == false { - return match self._open(params, pool).await { - Ok(doc) => Ok(doc), - Err(error) => Err(error), - }; + let edit_ctx = self.make_edit_context(¶ms.doc_id, pool.clone()).await?; + return Ok(edit_ctx); } let edit_doc_ctx = self.cache.get(¶ms.doc_id)?; @@ -105,40 +92,6 @@ impl DocController { } impl DocController { - #[tracing::instrument(level = "debug", skip(self, params), err)] - fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(), DocError> { - let token = self.user.token()?; - let server = self.server.clone(); - tokio::spawn(async move { - match server.update_doc(&token, params).await { - Ok(_) => {}, - Err(e) => { - // TODO: retry? - log::error!("Update doc failed: {}", e); - }, - } - }); - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, pool), err)] - async fn read_doc_from_server( - &self, - params: QueryDocParams, - pool: Arc, - ) -> Result, DocError> { - let token = self.user.token()?; - match self.server.read_doc(&token, params).await? { - None => Err(DocError::not_found()), - Some(doc) => { - let edit = self.make_edit_context(doc.clone(), pool.clone())?; - let conn = &*(pool.get().map_err(internal_error)?); - let _ = self.doc_sql.create_doc_table(doc.into(), conn)?; - Ok(edit) - }, - } - } - #[tracing::instrument(level = "debug", skip(self), err)] fn delete_doc_on_server(&self, params: QueryDocParams) -> Result<(), DocError> { let token = self.user.token()?; @@ -155,32 +108,45 @@ impl DocController { Ok(()) } - async fn _open(&self, params: QueryDocParams, pool: Arc) -> Result, DocError> { - match self.doc_sql.read_doc_table(¶ms.doc_id, pool.clone()) { - Ok(doc_table) => Ok(self.make_edit_context(doc_table.into(), pool.clone())?), + async fn make_edit_context( + &self, + doc_id: &str, + pool: Arc, + ) -> Result, DocError> { + // Opti: require upgradable_read lock and then upgrade to write lock using + // RwLockUpgradableReadGuard::upgrade(xx) of ws + let delta = self.read_doc(doc_id, pool.clone()).await?; + let ws_sender = self.ws.read().sender(); + let edit_ctx = Arc::new(EditDocContext::new(&doc_id, delta, pool, ws_sender).await?); + self.ws.write().register_handler(&doc_id, edit_ctx.clone()); + self.cache.set(edit_ctx.clone()); + Ok(edit_ctx) + } + + #[tracing::instrument(level = "debug", skip(self, pool), err)] + async fn read_doc(&self, doc_id: &str, pool: Arc) -> Result { + match self.doc_sql.read_doc_table(doc_id, pool.clone()) { + Ok(doc_table) => Ok(Delta::from_bytes(doc_table.data)?), Err(error) => { if error.is_record_not_found() { - log::debug!("Doc:{} don't exist, reading from server", params.doc_id); - Ok(self.read_doc_from_server(params, pool.clone()).await?) + let token = self.user.token()?; + let params = QueryDocParams { + doc_id: doc_id.to_string(), + }; + match self.server.read_doc(&token, params).await? { + None => Err(DocError::not_found()), + Some(doc) => { + let conn = &*pool.get().map_err(internal_error)?; + let _ = self.doc_sql.create_doc_table(doc.clone().into(), conn)?; + Ok(Delta::from_bytes(doc.data)?) + }, + } } else { return Err(error); } }, } } - - fn make_edit_context(&self, doc: Doc, pool: Arc) -> Result, DocError> { - // Opti: require upgradable_read lock and then upgrade to write lock using - // RwLockUpgradableReadGuard::upgrade(xx) of ws - let doc_id = doc.id.clone(); - let delta = Delta::from_bytes(&doc.data)?; - let ws_sender = self.ws.read().sender(); - let rev_manager = RevisionManager::new(&doc_id, doc.rev_id, self.op_sql.clone(), pool, ws_sender); - let edit_ctx = Arc::new(EditDocContext::new(&doc_id, delta, rev_manager)?); - self.ws.write().register_handler(&doc_id, edit_ctx.clone()); - self.cache.set(edit_ctx.clone()); - Ok(edit_ctx) - } } #[allow(dead_code)] diff --git a/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs b/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs index 10a2d473ed..162bd6df2d 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs @@ -6,13 +6,18 @@ use crate::{ errors::*, services::{ doc::{rev_manager::RevisionManager, Document}, - util::bytes_to_rev_id, + util::{bytes_to_rev_id, md5}, ws::WsDocumentHandler, }, - sql_tables::{OpTableSql, RevTable}, }; use bytes::Bytes; +use crate::{ + entities::doc::RevType, + services::ws::WsDocumentSender, + sql_tables::{doc::DocTableSql, DocTableChangeset}, +}; +use flowy_database::ConnectionPool; use flowy_ot::core::Delta; use parking_lot::RwLock; use std::{convert::TryFrom, sync::Arc}; @@ -20,27 +25,34 @@ use std::{convert::TryFrom, sync::Arc}; pub type DocId = String; pub(crate) struct EditDocContext { - pub(crate) id: DocId, + pub(crate) doc_id: DocId, document: Arc>, rev_manager: Arc, + pool: Arc, } impl EditDocContext { - pub(crate) fn new(doc_id: &str, delta: Delta, rev_manager: RevisionManager) -> Result { - let id = doc_id.to_owned(); - let rev_manager = Arc::new(rev_manager); + pub(crate) async fn new( + doc_id: &str, + delta: Delta, + pool: Arc, + ws_sender: Arc, + ) -> Result { + let doc_id = doc_id.to_owned(); + let rev_manager = Arc::new(RevisionManager::new(&doc_id, 1, pool.clone(), ws_sender)); let document = Arc::new(RwLock::new(Document::from_delta(delta))); let edit_context = Self { - id, + doc_id, document, rev_manager, + pool, }; Ok(edit_context) } pub(crate) fn doc(&self) -> Doc { Doc { - id: self.id.clone(), + id: self.doc_id.clone(), data: self.document.read().to_json(), rev_id: self.rev_manager.rev_id(), } @@ -48,12 +60,63 @@ impl EditDocContext { #[tracing::instrument(level = "debug", skip(self, data), err)] pub(crate) fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> { - let delta = Delta::from_bytes(&data)?; - self.document.write().compose_delta(&delta)?; - self.rev_manager.add_delta(data); + let (base_rev_id, rev_id) = self.rev_manager.next_rev_id(); + let revision = Revision::new( + base_rev_id, + rev_id, + data.to_vec(), + md5(&data), + self.doc_id.clone(), + RevType::Local, + ); + let _ = self.update_document(&revision)?; + self.rev_manager.add_revision(revision); Ok(()) } + + #[tracing::instrument(level = "debug", skip(self, revision), err)] + pub fn update_document(&self, revision: &Revision) -> Result<(), DocError> { + let delta = Delta::from_bytes(&revision.delta)?; + self.document.write().compose_delta(&delta)?; + let data = self.document.read().to_json(); + let changeset = DocTableChangeset { + id: self.doc_id.clone(), + data, + revision: revision.rev_id, + }; + + let sql = DocTableSql {}; + let conn = self.pool.get().map_err(internal_error)?; + sql.update_doc_table(changeset, &*conn); + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self), err)] + fn compose_remote_delta(&self) -> Result<(), DocError> { + self.rev_manager.next_compose_revision(|revision| { + let _ = self.update_document(revision)?; + log::debug!("😁Document: {:?}", self.document.read().to_plain_string()); + Ok(()) + }); + Ok(()) + } + + // #[tracing::instrument(level = "debug", skip(self, params), err)] + // fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(), + // DocError> { let token = self.user.token()?; + // let server = self.server.clone(); + // tokio::spawn(async move { + // match server.update_doc(&token, params).await { + // Ok(_) => {}, + // Err(e) => { + // // TODO: retry? + // log::error!("Update doc failed: {}", e); + // }, + // } + // }); + // Ok(()) + // } } impl WsDocumentHandler for EditDocContext { @@ -64,16 +127,13 @@ impl WsDocumentHandler for EditDocContext { let bytes = Bytes::from(doc_data.data); let revision = Revision::try_from(bytes)?; self.rev_manager.add_revision(revision); - self.rev_manager.next_compose_delta(|delta| { - let _ = self.document.write().compose_delta(delta)?; - log::debug!("😁Document: {:?}", self.document.read().to_plain_string()); - Ok(()) - }); + let _ = self.compose_remote_delta()?; }, WsDataType::Acked => { let rev_id = bytes_to_rev_id(doc_data.data)?; self.rev_manager.remove(rev_id); }, + _ => {}, } Result::<(), DocError>::Ok(()) }; diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager.rs b/rust-lib/flowy-document/src/services/doc/rev_manager.rs index e1c1c0d2e8..1c42fc03c2 100644 --- a/rust-lib/flowy-document/src/services/doc/rev_manager.rs +++ b/rust-lib/flowy-document/src/services/doc/rev_manager.rs @@ -2,14 +2,14 @@ use crate::{ entities::doc::{RevType, Revision}, errors::{internal_error, DocError}, services::{ - util::{md5, RevIdCounter}, + util::RevIdCounter, ws::{WsDocumentHandler, WsDocumentSender}, }, sql_tables::{OpTableSql, RevTable}, }; -use bytes::Bytes; + use flowy_database::ConnectionPool; -use flowy_ot::core::Delta; + use parking_lot::RwLock; use std::{ collections::{BTreeMap, VecDeque}, @@ -29,13 +29,8 @@ pub struct RevisionManager { } impl RevisionManager { - pub fn new( - doc_id: &str, - rev_id: i64, - op_sql: Arc, - pool: Arc, - ws_sender: Arc, - ) -> Self { + pub fn new(doc_id: &str, rev_id: i64, pool: Arc, ws_sender: Arc) -> Self { + let op_sql = Arc::new(OpTableSql {}); let rev_id_counter = RevIdCounter::new(rev_id); let local_rev_cache = Arc::new(RwLock::new(BTreeMap::new())); let remote_rev_cache = RwLock::new(VecDeque::new()); @@ -51,39 +46,21 @@ impl RevisionManager { } } - pub fn next_compose_delta(&self, mut f: F) + pub fn next_compose_revision(&self, mut f: F) where - F: FnMut(&Delta) -> Result<(), DocError>, + F: FnMut(&Revision) -> Result<(), DocError>, { if let Some(rev) = self.remote_rev_cache.write().pop_front() { - match Delta::from_bytes(&rev.delta) { - Ok(delta) => match f(&delta) { - Ok(_) => {}, - Err(e) => { - log::error!("{}", e); - self.remote_rev_cache.write().push_front(rev); - }, + match f(&rev) { + Ok(_) => {}, + Err(e) => { + log::error!("{}", e); + self.remote_rev_cache.write().push_front(rev); }, - Err(_) => {}, } } } - #[tracing::instrument(level = "debug", skip(self, delta_data))] - pub fn add_delta(&self, delta_data: Bytes) -> Result<(), DocError> { - let (base_rev_id, rev_id) = self.next_rev_id(); - let revision = Revision::new( - base_rev_id, - rev_id, - delta_data.to_vec(), - md5(&delta_data), - self.doc_id.clone(), - RevType::Local, - ); - let _ = self.add_revision(revision)?; - Ok(()) - } - #[tracing::instrument(level = "debug", skip(self, revision))] pub fn add_revision(&self, revision: Revision) -> Result<(), DocError> { match revision.ty { diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs index a0d5077c24..f434c42273 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs @@ -18,21 +18,29 @@ impl DocTableSql { Ok(()) } - pub(crate) fn update_doc_table(&self, changeset: DocTableChangeset, conn: &SqliteConnection) -> Result<(), DocError> { + pub(crate) fn update_doc_table( + &self, + changeset: DocTableChangeset, + conn: &SqliteConnection, + ) -> Result<(), DocError> { diesel_update_table!(doc_table, changeset, conn); Ok(()) } pub(crate) fn read_doc_table(&self, doc_id: &str, pool: Arc) -> Result { let conn = &*pool.get().map_err(internal_error)?; - let doc_table = dsl::doc_table.filter(doc_table::id.eq(doc_id)).first::(conn)?; + let doc_table = dsl::doc_table + .filter(doc_table::id.eq(doc_id)) + .first::(conn)?; Ok(doc_table) } #[allow(dead_code)] pub(crate) fn delete_doc(&self, doc_id: &str, conn: &SqliteConnection) -> Result { - let doc_table = dsl::doc_table.filter(doc_table::id.eq(doc_id)).first::(conn)?; + let doc_table = dsl::doc_table + .filter(doc_table::id.eq(doc_id)) + .first::(conn)?; diesel_delete_table!(doc_table, doc_id, conn); Ok(doc_table) } diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs index 0f1cffd3f8..b443fda96b 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs @@ -1,4 +1,4 @@ -use crate::entities::doc::{Doc, UpdateDocParams}; +use crate::entities::doc::Doc; use flowy_database::schema::doc_table; #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] @@ -24,15 +24,7 @@ impl DocTable { pub(crate) struct DocTableChangeset { pub id: String, pub data: String, -} - -impl DocTableChangeset { - pub(crate) fn new(params: UpdateDocParams) -> Self { - Self { - id: params.doc_id, - data: params.data, - } - } + pub revision: i64, } impl std::convert::Into for DocTable { diff --git a/rust-lib/flowy-document/tests/editor/op_test.rs b/rust-lib/flowy-document/tests/editor/op_test.rs index de7bb3e8d7..b0d42b5b7d 100644 --- a/rust-lib/flowy-document/tests/editor/op_test.rs +++ b/rust-lib/flowy-document/tests/editor/op_test.rs @@ -1,6 +1,6 @@ use crate::editor::{Rng, TestBuilder, TestOp::*}; use bytecount::num_chars; -use flowy_document::services::doc::PlainDoc; +use flowy_document::services::doc::{FlowyDoc, PlainDoc}; use flowy_ot::core::*; #[test] @@ -715,3 +715,16 @@ fn delta_invert_attribute_delta_with_attribute_delta() { ]; TestBuilder::new().run_script::(ops); } + +#[test] +#[should_panic] +fn delta_compose_with_missing_delta() { + let ops = vec![ + Insert(0, "123", 0), + Insert(0, "4", 3), + DocComposeDelta(1, 0), + AssertDocJson(0, r#"[{"insert":"1234\n"}]"#), + AssertStr(1, r#"4\n"#), + ]; + TestBuilder::new().run_script::(ops); +} diff --git a/rust-lib/flowy-ot/src/core/delta/delta.rs b/rust-lib/flowy-ot/src/core/delta/delta.rs index 1a6e6a53fd..61604103c7 100644 --- a/rust-lib/flowy-ot/src/core/delta/delta.rs +++ b/rust-lib/flowy-ot/src/core/delta/delta.rs @@ -307,7 +307,12 @@ impl OperationTransformable for Delta { Self: Sized, { if self.base_len != other.base_len { - return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build()); + return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength) + .msg(format!( + "cur base length: {}, other base length: {}", + self.base_len, other.base_len + )) + .build()); } let mut a_prime = Delta::default(); diff --git a/rust-lib/flowy-workspace/src/protobuf/model/view_update.rs b/rust-lib/flowy-workspace/src/protobuf/model/view_update.rs index 2bc92362e7..77ac106c2d 100644 --- a/rust-lib/flowy-workspace/src/protobuf/model/view_update.rs +++ b/rust-lib/flowy-workspace/src/protobuf/model/view_update.rs @@ -943,207 +943,6 @@ impl ::protobuf::reflect::ProtobufValue for UpdateViewParams { } } -#[derive(PartialEq,Clone,Default)] -pub struct DocDeltaRequest { - // message fields - pub view_id: ::std::string::String, - pub data: ::std::vec::Vec, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a DocDeltaRequest { - fn default() -> &'a DocDeltaRequest { - ::default_instance() - } -} - -impl DocDeltaRequest { - pub fn new() -> DocDeltaRequest { - ::std::default::Default::default() - } - - // string view_id = 1; - - - pub fn get_view_id(&self) -> &str { - &self.view_id - } - pub fn clear_view_id(&mut self) { - self.view_id.clear(); - } - - // Param is passed by value, moved - pub fn set_view_id(&mut self, v: ::std::string::String) { - self.view_id = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_view_id(&mut self) -> &mut ::std::string::String { - &mut self.view_id - } - - // Take field - pub fn take_view_id(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.view_id, ::std::string::String::new()) - } - - // bytes data = 2; - - - pub fn get_data(&self) -> &[u8] { - &self.data - } - pub fn clear_data(&mut self) { - self.data.clear(); - } - - // Param is passed by value, moved - pub fn set_data(&mut self, v: ::std::vec::Vec) { - self.data = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_data(&mut self) -> &mut ::std::vec::Vec { - &mut self.data - } - - // Take field - pub fn take_data(&mut self) -> ::std::vec::Vec { - ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) - } -} - -impl ::protobuf::Message for DocDeltaRequest { - fn is_initialized(&self) -> bool { - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.view_id)?; - }, - 2 => { - ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if !self.view_id.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.view_id); - } - if !self.data.is_empty() { - my_size += ::protobuf::rt::bytes_size(2, &self.data); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.view_id.is_empty() { - os.write_string(1, &self.view_id)?; - } - if !self.data.is_empty() { - os.write_bytes(2, &self.data)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> DocDeltaRequest { - DocDeltaRequest::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "view_id", - |m: &DocDeltaRequest| { &m.view_id }, - |m: &mut DocDeltaRequest| { &mut m.view_id }, - )); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( - "data", - |m: &DocDeltaRequest| { &m.data }, - |m: &mut DocDeltaRequest| { &mut m.data }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "DocDeltaRequest", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static DocDeltaRequest { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(DocDeltaRequest::new) - } -} - -impl ::protobuf::Clear for DocDeltaRequest { - fn clear(&mut self) { - self.view_id.clear(); - self.data.clear(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for DocDeltaRequest { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for DocDeltaRequest { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - static file_descriptor_proto_data: &'static [u8] = b"\ \n\x11view_update.proto\"\xda\x01\n\x11UpdateViewRequest\x12\x17\n\x07vi\ ew_id\x18\x01\x20\x01(\tR\x06viewId\x12\x14\n\x04name\x18\x02\x20\x01(\t\ @@ -1156,55 +955,47 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \x03\x20\x01(\tH\x01R\x04desc\x12\x1e\n\tthumbnail\x18\x04\x20\x01(\tH\ \x02R\tthumbnail\x12\x1b\n\x08is_trash\x18\x05\x20\x01(\x08H\x03R\x07isT\ rashB\r\n\x0bone_of_nameB\r\n\x0bone_of_descB\x12\n\x10one_of_thumbnailB\ - \x11\n\x0fone_of_is_trash\">\n\x0fDocDeltaRequest\x12\x17\n\x07view_id\ - \x18\x01\x20\x01(\tR\x06viewId\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\ - \x04dataJ\xc6\x07\n\x06\x12\x04\0\0\x13\x01\n\x08\n\x01\x0c\x12\x03\0\0\ - \x12\n\n\n\x02\x04\0\x12\x04\x02\0\x08\x01\n\n\n\x03\x04\0\x01\x12\x03\ - \x02\x08\x19\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\ - \0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\ - \x0b\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\ - \0\x08\0\x12\x03\x04\x04*\n\x0c\n\x05\x04\0\x08\0\x01\x12\x03\x04\n\x15\ - \n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x18(\n\x0c\n\x05\x04\0\x02\x01\ - \x05\x12\x03\x04\x18\x1e\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x1f#\ - \n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04&'\n\x0b\n\x04\x04\0\x08\x01\ - \x12\x03\x05\x04*\n\x0c\n\x05\x04\0\x08\x01\x01\x12\x03\x05\n\x15\n\x0b\ - \n\x04\x04\0\x02\x02\x12\x03\x05\x18(\n\x0c\n\x05\x04\0\x02\x02\x05\x12\ - \x03\x05\x18\x1e\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\x1f#\n\x0c\n\ - \x05\x04\0\x02\x02\x03\x12\x03\x05&'\n\x0b\n\x04\x04\0\x08\x02\x12\x03\ - \x06\x044\n\x0c\n\x05\x04\0\x08\x02\x01\x12\x03\x06\n\x1a\n\x0b\n\x04\ - \x04\0\x02\x03\x12\x03\x06\x1d2\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\ - \x06\x1d#\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06$-\n\x0c\n\x05\x04\0\ - \x02\x03\x03\x12\x03\x0601\n\x0b\n\x04\x04\0\x08\x03\x12\x03\x07\x040\n\ - \x0c\n\x05\x04\0\x08\x03\x01\x12\x03\x07\n\x19\n\x0b\n\x04\x04\0\x02\x04\ - \x12\x03\x07\x1c.\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x1c\x20\n\ - \x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07!)\n\x0c\n\x05\x04\0\x02\x04\x03\ - \x12\x03\x07,-\n\n\n\x02\x04\x01\x12\x04\t\0\x0f\x01\n\n\n\x03\x04\x01\ - \x01\x12\x03\t\x08\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\x17\n\x0c\ - \n\x05\x04\x01\x02\0\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\ - \x12\x03\n\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\n\x15\x16\n\x0b\ - \n\x04\x04\x01\x08\0\x12\x03\x0b\x04*\n\x0c\n\x05\x04\x01\x08\0\x01\x12\ - \x03\x0b\n\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x0b\x18(\n\x0c\n\x05\ - \x04\x01\x02\x01\x05\x12\x03\x0b\x18\x1e\n\x0c\n\x05\x04\x01\x02\x01\x01\ - \x12\x03\x0b\x1f#\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x0b&'\n\x0b\n\ - \x04\x04\x01\x08\x01\x12\x03\x0c\x04*\n\x0c\n\x05\x04\x01\x08\x01\x01\ - \x12\x03\x0c\n\x15\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0c\x18(\n\x0c\n\ - \x05\x04\x01\x02\x02\x05\x12\x03\x0c\x18\x1e\n\x0c\n\x05\x04\x01\x02\x02\ - \x01\x12\x03\x0c\x1f#\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\x0c&'\n\ - \x0b\n\x04\x04\x01\x08\x02\x12\x03\r\x044\n\x0c\n\x05\x04\x01\x08\x02\ - \x01\x12\x03\r\n\x1a\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\r\x1d2\n\x0c\n\ - \x05\x04\x01\x02\x03\x05\x12\x03\r\x1d#\n\x0c\n\x05\x04\x01\x02\x03\x01\ - \x12\x03\r$-\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\r01\n\x0b\n\x04\x04\ - \x01\x08\x03\x12\x03\x0e\x040\n\x0c\n\x05\x04\x01\x08\x03\x01\x12\x03\ - \x0e\n\x19\n\x0b\n\x04\x04\x01\x02\x04\x12\x03\x0e\x1c.\n\x0c\n\x05\x04\ - \x01\x02\x04\x05\x12\x03\x0e\x1c\x20\n\x0c\n\x05\x04\x01\x02\x04\x01\x12\ - \x03\x0e!)\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\x0e,-\n\n\n\x02\x04\ - \x02\x12\x04\x10\0\x13\x01\n\n\n\x03\x04\x02\x01\x12\x03\x10\x08\x17\n\ - \x0b\n\x04\x04\x02\x02\0\x12\x03\x11\x04\x17\n\x0c\n\x05\x04\x02\x02\0\ - \x05\x12\x03\x11\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x11\x0b\x12\ - \n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x11\x15\x16\n\x0b\n\x04\x04\x02\ - \x02\x01\x12\x03\x12\x04\x13\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x12\ - \x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\x12\n\x0e\n\x0c\n\x05\x04\ - \x02\x02\x01\x03\x12\x03\x12\x11\x12b\x06proto3\ + \x11\n\x0fone_of_is_trashJ\xc0\x06\n\x06\x12\x04\0\0\x0f\x01\n\x08\n\x01\ + \x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x08\x01\n\n\n\x03\x04\ + \0\x01\x12\x03\x02\x08\x19\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\ + \x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\ + \x12\x03\x03\x0b\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\ + \x0b\n\x04\x04\0\x08\0\x12\x03\x04\x04*\n\x0c\n\x05\x04\0\x08\0\x01\x12\ + \x03\x04\n\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x18(\n\x0c\n\x05\ + \x04\0\x02\x01\x05\x12\x03\x04\x18\x1e\n\x0c\n\x05\x04\0\x02\x01\x01\x12\ + \x03\x04\x1f#\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04&'\n\x0b\n\x04\ + \x04\0\x08\x01\x12\x03\x05\x04*\n\x0c\n\x05\x04\0\x08\x01\x01\x12\x03\ + \x05\n\x15\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x18(\n\x0c\n\x05\x04\0\ + \x02\x02\x05\x12\x03\x05\x18\x1e\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\ + \x05\x1f#\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05&'\n\x0b\n\x04\x04\0\ + \x08\x02\x12\x03\x06\x044\n\x0c\n\x05\x04\0\x08\x02\x01\x12\x03\x06\n\ + \x1a\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x1d2\n\x0c\n\x05\x04\0\x02\ + \x03\x05\x12\x03\x06\x1d#\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06$-\n\ + \x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x0601\n\x0b\n\x04\x04\0\x08\x03\x12\ + \x03\x07\x040\n\x0c\n\x05\x04\0\x08\x03\x01\x12\x03\x07\n\x19\n\x0b\n\ + \x04\x04\0\x02\x04\x12\x03\x07\x1c.\n\x0c\n\x05\x04\0\x02\x04\x05\x12\ + \x03\x07\x1c\x20\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07!)\n\x0c\n\x05\ + \x04\0\x02\x04\x03\x12\x03\x07,-\n\n\n\x02\x04\x01\x12\x04\t\0\x0f\x01\n\ + \n\n\x03\x04\x01\x01\x12\x03\t\x08\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\ + \n\x04\x17\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\ + \x01\x02\0\x01\x12\x03\n\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\n\ + \x15\x16\n\x0b\n\x04\x04\x01\x08\0\x12\x03\x0b\x04*\n\x0c\n\x05\x04\x01\ + \x08\0\x01\x12\x03\x0b\n\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x0b\x18\ + (\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x0b\x18\x1e\n\x0c\n\x05\x04\ + \x01\x02\x01\x01\x12\x03\x0b\x1f#\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\ + \x03\x0b&'\n\x0b\n\x04\x04\x01\x08\x01\x12\x03\x0c\x04*\n\x0c\n\x05\x04\ + \x01\x08\x01\x01\x12\x03\x0c\n\x15\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\ + \x0c\x18(\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0c\x18\x1e\n\x0c\n\ + \x05\x04\x01\x02\x02\x01\x12\x03\x0c\x1f#\n\x0c\n\x05\x04\x01\x02\x02\ + \x03\x12\x03\x0c&'\n\x0b\n\x04\x04\x01\x08\x02\x12\x03\r\x044\n\x0c\n\ + \x05\x04\x01\x08\x02\x01\x12\x03\r\n\x1a\n\x0b\n\x04\x04\x01\x02\x03\x12\ + \x03\r\x1d2\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\r\x1d#\n\x0c\n\x05\ + \x04\x01\x02\x03\x01\x12\x03\r$-\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\ + \r01\n\x0b\n\x04\x04\x01\x08\x03\x12\x03\x0e\x040\n\x0c\n\x05\x04\x01\ + \x08\x03\x01\x12\x03\x0e\n\x19\n\x0b\n\x04\x04\x01\x02\x04\x12\x03\x0e\ + \x1c.\n\x0c\n\x05\x04\x01\x02\x04\x05\x12\x03\x0e\x1c\x20\n\x0c\n\x05\ + \x04\x01\x02\x04\x01\x12\x03\x0e!)\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\ + \x03\x0e,-b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-workspace/src/protobuf/proto/view_update.proto b/rust-lib/flowy-workspace/src/protobuf/proto/view_update.proto index 37d575e072..7daacbb927 100644 --- a/rust-lib/flowy-workspace/src/protobuf/proto/view_update.proto +++ b/rust-lib/flowy-workspace/src/protobuf/proto/view_update.proto @@ -14,7 +14,3 @@ message UpdateViewParams { oneof one_of_thumbnail { string thumbnail = 4; }; oneof one_of_is_trash { bool is_trash = 5; }; } -message DocDeltaRequest { - string view_id = 1; - bytes data = 2; -}