diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart index 0a8704ecad..e05aa9a2bb 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart @@ -131,3 +131,78 @@ class Revision extends $pb.GeneratedMessage { void clearTy() => clearField(6); } +class RevisionRange extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create) + ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') + ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'fromRevId') + ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'toRevId') + ..hasRequiredFields = false + ; + + RevisionRange._() : super(); + factory RevisionRange({ + $core.String? docId, + $fixnum.Int64? fromRevId, + $fixnum.Int64? toRevId, + }) { + final _result = create(); + if (docId != null) { + _result.docId = docId; + } + if (fromRevId != null) { + _result.fromRevId = fromRevId; + } + if (toRevId != null) { + _result.toRevId = toRevId; + } + return _result; + } + factory RevisionRange.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory RevisionRange.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + RevisionRange clone() => RevisionRange()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + RevisionRange copyWith(void Function(RevisionRange) updates) => super.copyWith((message) => updates(message as RevisionRange)) as RevisionRange; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static RevisionRange create() => RevisionRange._(); + RevisionRange createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static RevisionRange getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static RevisionRange? _defaultInstance; + + @$pb.TagNumber(1) + $core.String get docId => $_getSZ(0); + @$pb.TagNumber(1) + set docId($core.String v) { $_setString(0, v); } + @$pb.TagNumber(1) + $core.bool hasDocId() => $_has(0); + @$pb.TagNumber(1) + void clearDocId() => clearField(1); + + @$pb.TagNumber(2) + $fixnum.Int64 get fromRevId => $_getI64(1); + @$pb.TagNumber(2) + set fromRevId($fixnum.Int64 v) { $_setInt64(1, v); } + @$pb.TagNumber(2) + $core.bool hasFromRevId() => $_has(1); + @$pb.TagNumber(2) + void clearFromRevId() => clearField(2); + + @$pb.TagNumber(3) + $fixnum.Int64 get toRevId => $_getI64(2); + @$pb.TagNumber(3) + set toRevId($fixnum.Int64 v) { $_setInt64(2, v); } + @$pb.TagNumber(3) + $core.bool hasToRevId() => $_has(2); + @$pb.TagNumber(3) + void clearToRevId() => clearField(3); +} + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart index b66287322e..f46de3e8ac 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart @@ -34,3 +34,15 @@ const Revision$json = const { /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eQ=='); +@$core.Deprecated('Use revisionRangeDescriptor instead') +const RevisionRange$json = const { + '1': 'RevisionRange', + '2': const [ + const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, + const {'1': 'from_rev_id', '3': 2, '4': 1, '5': 3, '10': 'fromRevId'}, + const {'1': 'to_rev_id', '3': 3, '4': 1, '5': 3, '10': 'toRevId'}, + ], +}; + +/// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhUKBmRvY19pZBgBIAEoCVIFZG9jSWQSHgoLZnJvbV9yZXZfaWQYAiABKANSCWZyb21SZXZJZBIaCgl0b19yZXZfaWQYAyABKANSB3RvUmV2SWQ='); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart index a40dafffca..31f54d22ea 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart @@ -11,12 +11,14 @@ import 'package:protobuf/protobuf.dart' as $pb; class WsDataType extends $pb.ProtobufEnum { static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); - static const WsDataType Rev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Rev'); - static const WsDataType Conflict = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict'); + static const WsDataType PushRev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); + static const WsDataType PullRev = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); + static const WsDataType Conflict = WsDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict'); static const $core.List values = [ Acked, - Rev, + PushRev, + PullRev, Conflict, ]; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart index 047cc5f579..8fede24e0b 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart @@ -13,13 +13,14 @@ const WsDataType$json = const { '1': 'WsDataType', '2': const [ const {'1': 'Acked', '2': 0}, - const {'1': 'Rev', '2': 1}, - const {'1': 'Conflict', '2': 2}, + const {'1': 'PushRev', '2': 1}, + const {'1': 'PullRev', '2': 2}, + const {'1': 'Conflict', '2': 3}, ], }; /// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASBwoDUmV2EAESDAoIQ29uZmxpY3QQAg=='); +final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBAD'); @$core.Deprecated('Use wsDocumentDataDescriptor instead') const WsDocumentData$json = const { '1': 'WsDocumentData', diff --git a/backend/src/service/doc/edit_doc_context.rs b/backend/src/service/doc/edit_doc_context.rs index 7e922e5190..9a5cba68a5 100644 --- a/backend/src/service/doc/edit_doc_context.rs +++ b/backend/src/service/doc/edit_doc_context.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use dashmap::DashMap; use flowy_document::{ entities::ws::{WsDataType, WsDocumentData}, - protobuf::{Doc, RevType, Revision, UpdateDocParams}, + protobuf::{Doc, RevType, Revision, RevisionRange, UpdateDocParams}, services::doc::Document, }; use flowy_net::errors::{internal_error, ServerError}; @@ -22,6 +22,7 @@ use parking_lot::RwLock; use protobuf::Message; use sqlx::PgPool; use std::{ + cmp::min, convert::TryInto, sync::{ atomic::{AtomicI64, Ordering::SeqCst}, @@ -64,7 +65,7 @@ impl EditDocContext { revision: Revision, ) -> Result<(), ServerError> { let _ = self.verify_md5(&revision)?; - // Rest EditUser for each client websocket message to keep the socket available. + // Opti: find out another way to keep the user socket available. let user = EditUser { user: client_data.user.clone(), socket: client_data.socket.clone(), @@ -80,34 +81,40 @@ impl EditDocContext { let cli_socket = client_data.socket; let cur_rev_id = self.rev_id.load(SeqCst); - // Transform the revision if client rev_id less than server rev_id. Sending the - // prime delta to client. if cur_rev_id > revision.rev_id { + // The client document is outdated. Transform the client revision delta and then + // send the prime delta to the client. Client should compose the this prime + // delta. + let (cli_prime, server_prime) = self.transform(&revision.delta).map_err(internal_error)?; let _ = self.update_document_delta(server_prime)?; log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json()); let cli_revision = self.mk_revision(revision.rev_id, cli_prime); - let ws_cli_revision = mk_rev_ws_message(&self.doc_id, cli_revision); + let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); cli_socket.do_send(ws_cli_revision).map_err(internal_error)?; Ok(()) } else if cur_rev_id < revision.rev_id { if cur_rev_id != revision.base_rev_id { - let missing_rev_range = revision.rev_id - cur_rev_id; - // TODO: pull the missing revs from client + // The server document is outdated, try to get the missing revision from the + // client. + cli_socket + .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) + .map_err(internal_error)?; } else { let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?; let _ = self.update_document_delta(delta)?; - cli_socket.do_send(mk_acked_ws_message(&revision)); + cli_socket + .do_send(mk_acked_ws_message(&revision)) + .map_err(internal_error)?; self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); - - // Opti: save with multiple revisions let _ = self.save_revision(&revision).await?; } Ok(()) } else { log::error!("Client rev_id should not equal to server rev_id"); + Ok(()) } } @@ -162,29 +169,41 @@ impl EditDocContext { #[tracing::instrument(level = "debug", skip(self, revision))] async fn save_revision(&self, revision: &Revision) -> Result<(), ServerError> { + // Opti: save with multiple revisions let mut params = UpdateDocParams::new(); params.set_doc_id(self.doc_id.clone()); params.set_data(self.document.read().to_json()); params.set_rev_id(revision.rev_id); - let _ = update_doc(self.pg_pool.get_ref(), params).await?; - Ok(()) } } -fn mk_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { +fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { let bytes = revision.write_to_bytes().unwrap(); - let data = WsDocumentData { id: doc_id.to_string(), - ty: WsDataType::Rev, + ty: WsDataType::PushRev, data: bytes, }; + mk_ws_message(data) +} - let msg: WsMessage = data.into(); - let bytes: Bytes = msg.try_into().unwrap(); - WsMessageAdaptor(bytes) +fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { + let range = RevisionRange { + doc_id: doc_id.to_string(), + from_rev_id, + to_rev_id, + ..Default::default() + }; + + let bytes = range.write_to_bytes().unwrap(); + let data = WsDocumentData { + id: doc_id.to_string(), + ty: WsDataType::PullRev, + data: bytes, + }; + mk_ws_message(data) } fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor { @@ -197,6 +216,10 @@ fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor { data: wtr, }; + mk_ws_message(data) +} + +fn mk_ws_message>(data: T) -> WsMessageAdaptor { let msg: WsMessage = data.into(); let bytes: Bytes = msg.try_into().unwrap(); WsMessageAdaptor(bytes) diff --git a/backend/src/service/doc/ws_handler.rs b/backend/src/service/doc/ws_handler.rs index b4f8d521bd..f2273c318b 100644 --- a/backend/src/service/doc/ws_handler.rs +++ b/backend/src/service/doc/ws_handler.rs @@ -7,7 +7,7 @@ use crate::service::{ use actix_web::web::Data; use crate::service::ws::WsUser; -use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData}; +use flowy_document::protobuf::{QueryDocParams, Revision, RevisionRange, WsDataType, WsDocumentData}; use flowy_net::errors::ServerError; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use protobuf::Message; @@ -55,8 +55,10 @@ impl EditDocManager { async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> { let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?; match document_data.ty { - WsDataType::Acked => {}, - WsDataType::Rev => { + WsDataType::Acked => { + // Do nothing, + }, + WsDataType::PushRev => { let revision: Revision = parse_from_bytes(&document_data.data)?; let edited_doc = self.get_edit_doc(&revision.doc_id).await?; tokio::spawn(async move { @@ -66,7 +68,12 @@ impl EditDocManager { } }); }, - _ => {}, + WsDataType::PullRev => { + // Do nothing + }, + WsDataType::Conflict => { + unimplemented!() + }, } Ok(()) diff --git a/rust-lib/flowy-database/migrations/2021-07-22-234458_flowy-editor/up.sql b/rust-lib/flowy-database/migrations/2021-07-22-234458_flowy-editor/up.sql index b4f727ece9..c5b88ab96f 100644 --- a/rust-lib/flowy-database/migrations/2021-07-22-234458_flowy-editor/up.sql +++ b/rust-lib/flowy-database/migrations/2021-07-22-234458_flowy-editor/up.sql @@ -3,5 +3,5 @@ CREATE TABLE doc_table ( id TEXT NOT NULL PRIMARY KEY, -- data BLOB NOT NULL DEFAULT (x''), data TEXT NOT NULL DEFAULT '', - revision BIGINT NOT NULL DEFAULT 0 + rev_id BIGINT NOT NULL DEFAULT 0 ); \ No newline at end of file diff --git a/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql b/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql index 59373befd0..69a6284ed3 100644 --- a/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql +++ b/rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql @@ -1,10 +1,10 @@ -- Your SQL goes here CREATE TABLE rev_table ( - doc_id TEXT NOT NULL PRIMARY KEY, + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + doc_id TEXT NOT NULL DEFAULT '', base_rev_id BIGINT NOT NULL DEFAULT 0, rev_id BIGINT NOT NULL DEFAULT 0, data BLOB NOT NULL DEFAULT (x''), - md5 TEXT NOT NULL DEFAULT '', state INTEGER NOT NULL DEFAULT 0, ty INTEGER NOT NULL DEFAULT 0 ); \ No newline at end of file diff --git a/rust-lib/flowy-database/src/schema.rs b/rust-lib/flowy-database/src/schema.rs index 5fa39e2f05..8c1e45491d 100644 --- a/rust-lib/flowy-database/src/schema.rs +++ b/rust-lib/flowy-database/src/schema.rs @@ -17,17 +17,17 @@ table! { doc_table (id) { id -> Text, data -> Text, - revision -> BigInt, + rev_id -> BigInt, } } table! { - rev_table (doc_id) { + rev_table (id) { + id -> Integer, doc_id -> Text, base_rev_id -> BigInt, rev_id -> BigInt, data -> Binary, - md5 -> Text, state -> Integer, ty -> Integer, } diff --git a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs index f07dd625bf..5f3a487f25 100644 --- a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -59,6 +59,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "DocDelta" | "QueryDocParams" | "Revision" + | "RevisionRange" | "WsDocumentData" | "DocError" | "FFIRequest" diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index 01a829885d..65d7256269 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -43,3 +43,15 @@ impl Revision { } } } + +#[derive(Debug, Clone, Default, ProtoBuf)] +pub struct RevisionRange { + #[pb(index = 1)] + pub doc_id: String, + + #[pb(index = 2)] + pub from_rev_id: i64, + + #[pb(index = 3)] + pub to_rev_id: i64, +} diff --git a/rust-lib/flowy-document/src/entities/ws/ws.rs b/rust-lib/flowy-document/src/entities/ws/ws.rs index 55b03e6d41..e8a119a4b7 100644 --- a/rust-lib/flowy-document/src/entities/ws/ws.rs +++ b/rust-lib/flowy-document/src/entities/ws/ws.rs @@ -7,8 +7,9 @@ use std::convert::TryInto; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] pub enum WsDataType { Acked = 0, - Rev = 1, - Conflict = 2, + PushRev = 1, + PullRev = 2, + Conflict = 3, } impl std::default::Default for WsDataType { @@ -34,7 +35,7 @@ impl std::convert::From for WsDocumentData { let data = bytes.to_vec(); Self { id, - ty: WsDataType::Rev, + ty: WsDataType::PushRev, data, } } diff --git a/rust-lib/flowy-document/src/protobuf/model/revision.rs b/rust-lib/flowy-document/src/protobuf/model/revision.rs index d2bc9856ae..55036e31cb 100644 --- a/rust-lib/flowy-document/src/protobuf/model/revision.rs +++ b/rust-lib/flowy-document/src/protobuf/model/revision.rs @@ -367,6 +367,235 @@ impl ::protobuf::reflect::ProtobufValue for Revision { } } +#[derive(PartialEq,Clone,Default)] +pub struct RevisionRange { + // message fields + pub doc_id: ::std::string::String, + pub from_rev_id: i64, + pub to_rev_id: i64, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a RevisionRange { + fn default() -> &'a RevisionRange { + ::default_instance() + } +} + +impl RevisionRange { + pub fn new() -> RevisionRange { + ::std::default::Default::default() + } + + // string doc_id = 1; + + + pub fn get_doc_id(&self) -> &str { + &self.doc_id + } + pub fn clear_doc_id(&mut self) { + self.doc_id.clear(); + } + + // Param is passed by value, moved + pub fn set_doc_id(&mut self, v: ::std::string::String) { + self.doc_id = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_doc_id(&mut self) -> &mut ::std::string::String { + &mut self.doc_id + } + + // Take field + pub fn take_doc_id(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) + } + + // int64 from_rev_id = 2; + + + pub fn get_from_rev_id(&self) -> i64 { + self.from_rev_id + } + pub fn clear_from_rev_id(&mut self) { + self.from_rev_id = 0; + } + + // Param is passed by value, moved + pub fn set_from_rev_id(&mut self, v: i64) { + self.from_rev_id = v; + } + + // int64 to_rev_id = 3; + + + pub fn get_to_rev_id(&self) -> i64 { + self.to_rev_id + } + pub fn clear_to_rev_id(&mut self) { + self.to_rev_id = 0; + } + + // Param is passed by value, moved + pub fn set_to_rev_id(&mut self, v: i64) { + self.to_rev_id = v; + } +} + +impl ::protobuf::Message for RevisionRange { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?; + }, + 2 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int64()?; + self.from_rev_id = tmp; + }, + 3 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int64()?; + self.to_rev_id = tmp; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.doc_id.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.doc_id); + } + if self.from_rev_id != 0 { + my_size += ::protobuf::rt::value_size(2, self.from_rev_id, ::protobuf::wire_format::WireTypeVarint); + } + if self.to_rev_id != 0 { + my_size += ::protobuf::rt::value_size(3, self.to_rev_id, ::protobuf::wire_format::WireTypeVarint); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if !self.doc_id.is_empty() { + os.write_string(1, &self.doc_id)?; + } + if self.from_rev_id != 0 { + os.write_int64(2, self.from_rev_id)?; + } + if self.to_rev_id != 0 { + os.write_int64(3, self.to_rev_id)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> RevisionRange { + RevisionRange::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "doc_id", + |m: &RevisionRange| { &m.doc_id }, + |m: &mut RevisionRange| { &mut m.doc_id }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( + "from_rev_id", + |m: &RevisionRange| { &m.from_rev_id }, + |m: &mut RevisionRange| { &mut m.from_rev_id }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( + "to_rev_id", + |m: &RevisionRange| { &m.to_rev_id }, + |m: &mut RevisionRange| { &mut m.to_rev_id }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "RevisionRange", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static RevisionRange { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(RevisionRange::new) + } +} + +impl ::protobuf::Clear for RevisionRange { + fn clear(&mut self) { + self.doc_id.clear(); + self.from_rev_id = 0; + self.to_rev_id = 0; + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for RevisionRange { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for RevisionRange { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum RevType { Local = 0, @@ -422,32 +651,44 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \x18\x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\ \x03R\x05revId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\ \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\ - (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty*\ - \x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Remote\x10\x01J\xde\ - \x03\n\x06\x12\x04\0\0\r\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ - \x04\0\x12\x04\x02\0\t\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\ - \n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\ - \x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\ - \x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\ - \x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\ - \x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\ - \x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x14\n\x0c\n\ - \x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\ - \x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x12\x13\n\ - \x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\ - \x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\ - \n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\ - \x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\ - \n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\ - \x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\ - \x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\ - \x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\ - \x08\x11\x12\n\n\n\x02\x05\0\x12\x04\n\0\r\x01\n\n\n\x03\x05\0\x01\x12\ - \x03\n\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x0b\x04\x0e\n\x0c\n\x05\ - \x05\0\x02\0\x01\x12\x03\x0b\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\ - \x0b\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x0c\x04\x0f\n\x0c\n\x05\x05\ - \0\x02\x01\x01\x12\x03\x0c\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\ - \x0c\r\x0eb\x06proto3\ + (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"\ + b\n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\ + \x1e\n\x0bfrom_rev_id\x18\x02\x20\x01(\x03R\tfromRevId\x12\x1a\n\tto_rev\ + _id\x18\x03\x20\x01(\x03R\x07toRevId*\x20\n\x07RevType\x12\t\n\x05Local\ + \x10\0\x12\n\n\x06Remote\x10\x01J\x9b\x05\n\x06\x12\x04\0\0\x12\x01\n\ + \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\t\x01\n\n\n\ + \x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\ + \x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\ + \x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\ + \x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\ + \x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\ + \x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\ + \x02\x02\x12\x03\x05\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\ + \x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\ + \x02\x02\x03\x12\x03\x05\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\ + \x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\ + \0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ + \x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\ + \x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\ + \x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\ + \n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\ + \x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\ + \x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\n\n\x02\x04\x01\x12\ + \x04\n\0\x0e\x01\n\n\n\x03\x04\x01\x01\x12\x03\n\x08\x15\n\x0b\n\x04\x04\ + \x01\x02\0\x12\x03\x0b\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x0b\ + \x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0b\x0b\x11\n\x0c\n\x05\x04\ + \x01\x02\0\x03\x12\x03\x0b\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\ + \x0c\x04\x1a\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x0c\x04\t\n\x0c\n\ + \x05\x04\x01\x02\x01\x01\x12\x03\x0c\n\x15\n\x0c\n\x05\x04\x01\x02\x01\ + \x03\x12\x03\x0c\x18\x19\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\r\x04\x18\n\ + \x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\r\x04\t\n\x0c\n\x05\x04\x01\x02\ + \x02\x01\x12\x03\r\n\x13\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\r\x16\ + \x17\n\n\n\x02\x05\0\x12\x04\x0f\0\x12\x01\n\n\n\x03\x05\0\x01\x12\x03\ + \x0f\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x10\x04\x0e\n\x0c\n\x05\x05\ + \0\x02\0\x01\x12\x03\x10\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x10\ + \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x11\x04\x0f\n\x0c\n\x05\x05\0\ + \x02\x01\x01\x12\x03\x11\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x11\ + \r\x0eb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/model/ws.rs b/rust-lib/flowy-document/src/protobuf/model/ws.rs index 396c89d978..f31de92b14 100644 --- a/rust-lib/flowy-document/src/protobuf/model/ws.rs +++ b/rust-lib/flowy-document/src/protobuf/model/ws.rs @@ -258,8 +258,9 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData { #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum WsDataType { Acked = 0, - Rev = 1, - Conflict = 2, + PushRev = 1, + PullRev = 2, + Conflict = 3, } impl ::protobuf::ProtobufEnum for WsDataType { @@ -270,8 +271,9 @@ impl ::protobuf::ProtobufEnum for WsDataType { fn from_i32(value: i32) -> ::std::option::Option { match value { 0 => ::std::option::Option::Some(WsDataType::Acked), - 1 => ::std::option::Option::Some(WsDataType::Rev), - 2 => ::std::option::Option::Some(WsDataType::Conflict), + 1 => ::std::option::Option::Some(WsDataType::PushRev), + 2 => ::std::option::Option::Some(WsDataType::PullRev), + 3 => ::std::option::Option::Some(WsDataType::Conflict), _ => ::std::option::Option::None } } @@ -279,7 +281,8 @@ impl ::protobuf::ProtobufEnum for WsDataType { fn values() -> &'static [Self] { static values: &'static [WsDataType] = &[ WsDataType::Acked, - WsDataType::Rev, + WsDataType::PushRev, + WsDataType::PullRev, WsDataType::Conflict, ]; values @@ -311,26 +314,28 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\ R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\ - \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*.\n\nWsDataType\x12\t\n\x05\ - Acked\x10\0\x12\x07\n\x03Rev\x10\x01\x12\x0c\n\x08Conflict\x10\x02J\xe2\ - \x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ - \x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\ - \x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\ - \x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\ - \x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\x01\x12\ - \x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\ - \n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\ - \x03\x12\x03\x04\x14\x15\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\ - \x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\ - \x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\ - \n\n\n\x02\x05\0\x12\x04\x07\0\x0b\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\ - \x05\x0f\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\ - \x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\ - \r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\ - \x01\x12\x03\t\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\n\x0b\n\ - \x0b\n\x04\x05\0\x02\x02\x12\x03\n\x04\x11\n\x0c\n\x05\x05\0\x02\x02\x01\ - \x12\x03\n\x04\x0c\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\n\x0f\x10b\x06p\ - roto3\ + \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*?\n\nWsDataType\x12\t\n\x05\ + Acked\x10\0\x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\ + \x12\x0c\n\x08Conflict\x10\x03J\x8b\x03\n\x06\x12\x04\0\0\x0c\x01\n\x08\ + \n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\ + \x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\ + \x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\ + \x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\ + \x11\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\ + \x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\ + \x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\x0b\n\x04\ + \x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\ + \x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\ + \x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\x07\0\x0c\ + \x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\x02\0\x12\ + \x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\ + \x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\ + \t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x0b\n\x0c\n\x05\ + \x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\x03\n\ + \x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\n\x04\x0b\n\x0c\n\x05\x05\ + \0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x0b\ + \x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x0b\x04\x0c\n\x0c\n\x05\ + \x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/revision.proto b/rust-lib/flowy-document/src/protobuf/proto/revision.proto index 859f9c132a..85d1de9259 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/revision.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/revision.proto @@ -8,6 +8,11 @@ message Revision { string doc_id = 5; RevType ty = 6; } +message RevisionRange { + string doc_id = 1; + int64 from_rev_id = 2; + int64 to_rev_id = 3; +} enum RevType { Local = 0; Remote = 1; diff --git a/rust-lib/flowy-document/src/protobuf/proto/ws.proto b/rust-lib/flowy-document/src/protobuf/proto/ws.proto index 36a7b051b6..b62a27f614 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/ws.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/ws.proto @@ -7,6 +7,7 @@ message WsDocumentData { } enum WsDataType { Acked = 0; - Rev = 1; - Conflict = 2; + PushRev = 1; + PullRev = 2; + Conflict = 3; } diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index a734e43509..ef18bd740d 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -115,18 +115,18 @@ impl DocController { ) -> Result, DocError> { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws - let delta = self.read_doc(doc_id, pool.clone()).await?; + let doc = self.read_doc(doc_id, pool.clone()).await?; let ws_sender = self.ws.read().sender(); - let edit_ctx = Arc::new(EditDocContext::new(&doc_id, delta, pool, ws_sender).await?); - self.ws.write().register_handler(&doc_id, edit_ctx.clone()); + let edit_ctx = Arc::new(EditDocContext::new(doc, pool, ws_sender).await?); + self.ws.write().register_handler(doc_id, edit_ctx.clone()); self.cache.set(edit_ctx.clone()); Ok(edit_ctx) } #[tracing::instrument(level = "debug", skip(self, pool), err)] - async fn read_doc(&self, doc_id: &str, pool: Arc) -> Result { + async fn read_doc(&self, doc_id: &str, pool: Arc) -> Result { match self.doc_sql.read_doc_table(doc_id, pool.clone()) { - Ok(doc_table) => Ok(Delta::from_bytes(doc_table.data)?), + Ok(doc_table) => Ok(doc_table.into()), Err(error) => { if error.is_record_not_found() { let token = self.user.token()?; @@ -138,7 +138,7 @@ impl DocController { Some(doc) => { let conn = &*pool.get().map_err(internal_error)?; let _ = self.doc_sql.create_doc_table(doc.clone().into(), conn)?; - Ok(Delta::from_bytes(doc.data)?) + Ok(doc) }, } } else { diff --git a/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs b/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs index 162bd6df2d..5014bac8cd 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs @@ -13,7 +13,7 @@ use crate::{ use bytes::Bytes; use crate::{ - entities::doc::RevType, + entities::doc::{RevType, RevisionRange}, services::ws::WsDocumentSender, sql_tables::{doc::DocTableSql, DocTableChangeset}, }; @@ -33,16 +33,15 @@ pub(crate) struct EditDocContext { impl EditDocContext { pub(crate) async fn new( - doc_id: &str, - delta: Delta, + doc: Doc, pool: Arc, ws_sender: Arc, ) -> Result { - let doc_id = doc_id.to_owned(); - let rev_manager = Arc::new(RevisionManager::new(&doc_id, 1, pool.clone(), ws_sender)); + let delta = Delta::from_bytes(doc.data)?; + let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender)); let document = Arc::new(RwLock::new(Document::from_delta(delta))); let edit_context = Self { - doc_id, + doc_id: doc.id, document, rev_manager, pool, @@ -71,7 +70,7 @@ impl EditDocContext { ); let _ = self.update_document(&revision)?; - self.rev_manager.add_revision(revision); + let _ = self.rev_manager.add_revision(revision)?; Ok(()) } @@ -83,12 +82,12 @@ impl EditDocContext { let changeset = DocTableChangeset { id: self.doc_id.clone(), data, - revision: revision.rev_id, + rev_id: revision.rev_id, }; let sql = DocTableSql {}; let conn = self.pool.get().map_err(internal_error)?; - sql.update_doc_table(changeset, &*conn); + let _ = sql.update_doc_table(changeset, &*conn)?; Ok(()) } @@ -122,18 +121,22 @@ impl EditDocContext { impl WsDocumentHandler for EditDocContext { fn receive(&self, doc_data: WsDocumentData) { let f = |doc_data: WsDocumentData| { + let bytes = Bytes::from(doc_data.data); match doc_data.ty { - WsDataType::Rev => { - let bytes = Bytes::from(doc_data.data); + WsDataType::PushRev => { let revision = Revision::try_from(bytes)?; - self.rev_manager.add_revision(revision); + let _ = self.rev_manager.add_revision(revision)?; let _ = self.compose_remote_delta()?; }, - WsDataType::Acked => { - let rev_id = bytes_to_rev_id(doc_data.data)?; - self.rev_manager.remove(rev_id); + WsDataType::PullRev => { + let range = RevisionRange::try_from(bytes)?; + let _ = self.rev_manager.send_rev_with_range(range)?; }, - _ => {}, + WsDataType::Acked => { + let rev_id = bytes_to_rev_id(bytes.to_vec())?; + let _ = self.rev_manager.ack(rev_id); + }, + WsDataType::Conflict => {}, } Result::<(), DocError>::Ok(()) }; diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager.rs b/rust-lib/flowy-document/src/services/doc/rev_manager.rs index 1c42fc03c2..6a762da9ce 100644 --- a/rust-lib/flowy-document/src/services/doc/rev_manager.rs +++ b/rust-lib/flowy-document/src/services/doc/rev_manager.rs @@ -1,21 +1,20 @@ use crate::{ - entities::doc::{RevType, Revision}, + entities::doc::{RevType, Revision, RevisionRange}, errors::{internal_error, DocError}, services::{ util::RevIdCounter, ws::{WsDocumentHandler, WsDocumentSender}, }, - sql_tables::{OpTableSql, RevTable}, + sql_tables::{OpTableSql, RevChangeset, RevState, RevTable}, }; - +use dashmap::{DashMap, DashSet}; use flowy_database::ConnectionPool; - -use parking_lot::RwLock; +use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::{HashMap, VecDeque}, sync::Arc, }; -use tokio::sync::{futures::Notified, Notify}; +use tokio::{task::JoinHandle, time::Duration}; pub struct RevisionManager { doc_id: String, @@ -23,26 +22,29 @@ pub struct RevisionManager { pool: Arc, rev_id_counter: RevIdCounter, ws_sender: Arc, - local_rev_cache: Arc>>, + rev_cache: Arc>>, + ack_rev_cache: Arc>, remote_rev_cache: RwLock>, - notify: Notify, + save_operation: RwLock>>, } impl RevisionManager { pub fn new(doc_id: &str, rev_id: i64, pool: Arc, ws_sender: Arc) -> Self { let op_sql = Arc::new(OpTableSql {}); let rev_id_counter = RevIdCounter::new(rev_id); - let local_rev_cache = Arc::new(RwLock::new(BTreeMap::new())); + let rev_cache = Arc::new(RwLock::new(HashMap::new())); let remote_rev_cache = RwLock::new(VecDeque::new()); + let ack_rev_cache = Arc::new(DashSet::new()); Self { doc_id: doc_id.to_owned(), op_sql, pool, rev_id_counter, ws_sender, - local_rev_cache, + rev_cache, + ack_rev_cache, remote_rev_cache, - notify: Notify::new(), + save_operation: RwLock::new(None), } } @@ -63,34 +65,30 @@ impl RevisionManager { #[tracing::instrument(level = "debug", skip(self, revision))] pub fn add_revision(&self, revision: Revision) -> Result<(), DocError> { + self.rev_cache.write().insert(revision.rev_id, revision.clone()); + self.save_revisions(); match revision.ty { - RevType::Local => { - self.local_rev_cache.write().insert(revision.rev_id, revision.clone()); - // self.save_revision(revision.clone()); - match self.ws_sender.send(revision.into()) { - Ok(_) => {}, - Err(e) => { - log::error!("Send delta failed: {:?}", e); - }, - } + RevType::Local => match self.ws_sender.send(revision.into()) { + Ok(_) => {}, + Err(e) => { + log::error!("Send delta failed: {:?}", e); + }, }, RevType::Remote => { self.remote_rev_cache.write().push_back(revision); - self.notify.notify_waiters(); }, } Ok(()) } - pub fn remove(&self, rev_id: i64) -> Result<(), DocError> { - self.local_rev_cache.write().remove(&rev_id); - // self.delete_revision(rev_id); + pub fn ack(&self, rev_id: i64) -> Result<(), DocError> { + log::debug!("Receive {} acked", rev_id); + self.ack_rev_cache.insert(rev_id); + self.update_revisions(); Ok(()) } - pub fn rev_notified(&self) -> Notified { self.notify.notified() } - pub fn next_rev_id(&self) -> (i64, i64) { let cur = self.rev_id_counter.value(); let next = self.rev_id_counter.next(); @@ -99,31 +97,103 @@ impl RevisionManager { pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() } - fn save_revision(&self, revision: Revision) { + pub fn send_rev_with_range(&self, range: RevisionRange) -> Result<(), DocError> { + debug_assert!(&range.doc_id == &self.doc_id); + + unimplemented!() + } + + fn save_revisions(&self) { let op_sql = self.op_sql.clone(); let pool = self.pool.clone(); - tokio::spawn(async move { + let mut write_guard = self.save_operation.write(); + if let Some(handler) = write_guard.take() { + handler.abort(); + } + + let rev_cache = self.rev_cache.clone(); + let ack_rev_cache = self.ack_rev_cache.clone(); + let ids = self.rev_cache.read().keys().map(|v| v.clone()).collect::>(); + *write_guard = Some(tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(300)).await; + + let revisions = rev_cache + .read() + .values() + .map(|v| { + let state = match ack_rev_cache.contains(&v.rev_id) { + true => RevState::Acked, + false => RevState::Local, + }; + (v.clone(), state) + }) + .collect::>(); + + let mut rev_cache_write = rev_cache.write(); let conn = &*pool.get().map_err(internal_error).unwrap(); let result = conn.immediate_transaction::<_, DocError, _>(|| { - let op_table: RevTable = revision.into(); - let _ = op_sql.create_op_table(op_table, conn).unwrap(); + let _ = op_sql.create_rev_table(revisions, conn).unwrap(); Ok(()) }); match result { - Ok(_) => {}, + Ok(_) => rev_cache_write.retain(|k, _| !ids.contains(k)), Err(e) => log::error!("Save revision failed: {:?}", e), } - }); + })); + } + + fn update_revisions(&self) { + match self.rev_cache.try_read_for(Duration::from_millis(300)) { + None => log::warn!("try read rev_cache failed"), + Some(read_guard) => { + let rev_ids = self + .ack_rev_cache + .iter() + .flat_map(|k| match read_guard.contains_key(&k) { + true => None, + false => Some(k.clone()), + }) + .collect::>(); + + log::debug!("Try to update {:?} state", rev_ids); + if rev_ids.is_empty() { + return; + } + + let conn = &*self.pool.get().map_err(internal_error).unwrap(); + let result = conn.immediate_transaction::<_, DocError, _>(|| { + for rev_id in &rev_ids { + let changeset = RevChangeset { + doc_id: self.doc_id.clone(), + rev_id: rev_id.clone(), + state: RevState::Acked, + }; + let _ = self.op_sql.update_rev_table(changeset, conn)?; + } + Ok(()) + }); + + match result { + Ok(_) => { + rev_ids.iter().for_each(|rev_id| { + self.ack_rev_cache.remove(rev_id); + }); + }, + Err(e) => log::error!("Save revision failed: {:?}", e), + } + }, + } } fn delete_revision(&self, rev_id: i64) { let op_sql = self.op_sql.clone(); let pool = self.pool.clone(); + let doc_id = self.doc_id.clone(); tokio::spawn(async move { let conn = &*pool.get().map_err(internal_error).unwrap(); let result = conn.immediate_transaction::<_, DocError, _>(|| { - let _ = op_sql.delete_op_table(rev_id, conn)?; + let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?; Ok(()) }); diff --git a/rust-lib/flowy-document/src/services/mod.rs b/rust-lib/flowy-document/src/services/mod.rs index 459447b3c0..323a1c9fd6 100644 --- a/rust-lib/flowy-document/src/services/mod.rs +++ b/rust-lib/flowy-document/src/services/mod.rs @@ -1,5 +1,5 @@ mod cache; pub mod doc; pub mod server; -mod util; +pub(crate) mod util; pub mod ws; diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs index 66fe2f5b49..e27671b6a9 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs @@ -1,35 +1,101 @@ use crate::{ + entities::doc::{RevType, Revision}, errors::DocError, - sql_tables::doc::{RevChangeset, RevTable}, + sql_tables::{doc::RevTable, RevChangeset, RevState, RevTableType}, }; +use diesel::{insert_into, select, update}; use flowy_database::{ prelude::*, - schema::{rev_table, rev_table::dsl}, + schema::{ + rev_table, + rev_table::{columns::*, dsl, dsl::doc_id}, + }, SqliteConnection, }; pub struct OpTableSql {} impl OpTableSql { - pub(crate) fn create_op_table(&self, op_table: RevTable, conn: &SqliteConnection) -> Result<(), DocError> { - let _ = diesel::insert_into(rev_table::table).values(op_table).execute(conn)?; + pub(crate) fn create_rev_table( + &self, + revisions: Vec<(Revision, RevState)>, + conn: &SqliteConnection, + ) -> Result<(), DocError> { + // Batch insert: https://diesel.rs/guides/all-about-inserts.html + let records = revisions + .into_iter() + .map(|(revision, new_state)| { + log::debug!("Set {} to {:?}", revision.rev_id, new_state); + let rev_ty: RevTableType = revision.ty.into(); + ( + doc_id.eq(revision.doc_id), + base_rev_id.eq(revision.base_rev_id), + rev_id.eq(revision.rev_id), + data.eq(revision.delta), + state.eq(new_state), + ty.eq(rev_ty), + ) + }) + .collect::>(); + + let _ = insert_into(dsl::rev_table).values(&records).execute(conn)?; Ok(()) } - pub(crate) fn update_op_table(&self, changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> { - let filter = dsl::rev_table.filter(rev_table::dsl::rev_id.eq(changeset.rev_id)); - let affected_row = diesel::update(filter).set(changeset).execute(conn)?; - debug_assert_eq!(affected_row, 1); + pub(crate) fn update_rev_table(&self, changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> { + let filter = dsl::rev_table + .filter(rev_id.eq(changeset.rev_id)) + .filter(doc_id.eq(changeset.doc_id)); + let _ = update(filter).set(state.eq(changeset.state)).execute(conn)?; + log::debug!("Set {} to {:?}", changeset.rev_id, changeset.state); Ok(()) } - pub(crate) fn read_op_table(&self, conn: &SqliteConnection) -> Result, DocError> { - let ops = dsl::rev_table.load::(conn)?; - Ok(ops) + pub(crate) fn read_rev_table( + &self, + doc_id_s: &str, + rev_id_s: i64, + conn: &SqliteConnection, + ) -> Result, DocError> { + let rev_tables: Vec = dsl::rev_table + .filter(rev_id.eq(rev_id_s)) + .filter(doc_id.eq(doc_id_s)) + .load::(conn)?; + + let revisions = rev_tables + .into_iter() + .map(|table| table.into()) + .collect::>(); + Ok(revisions) } - pub(crate) fn delete_op_table(&self, rev_id: i64, conn: &SqliteConnection) -> Result<(), DocError> { - let filter = dsl::rev_table.filter(rev_table::dsl::rev_id.eq(rev_id)); + pub(crate) fn read_revs_table( + &self, + doc_id_s: &str, + from_rev_id: i64, + to_rev_id: i64, + conn: &SqliteConnection, + ) -> Result, DocError> { + let rev_tables = dsl::rev_table + .filter(rev_id.ge(from_rev_id)) + .filter(rev_id.lt(to_rev_id)) + .filter(doc_id.eq(doc_id_s)) + .load::(conn)?; + + let revisions = rev_tables + .into_iter() + .map(|table| table.into()) + .collect::>(); + Ok(revisions) + } + + pub(crate) fn delete_rev_table( + &self, + doc_id_s: &str, + rev_id_s: i64, + conn: &SqliteConnection, + ) -> Result<(), DocError> { + let filter = dsl::rev_table.filter(rev_id.eq(rev_id_s)).filter(doc_id.eq(doc_id_s)); let affected_row = diesel::delete(filter).execute(conn)?; debug_assert_eq!(affected_row, 1); Ok(()) diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs index 02fa47925f..9b1713d0d1 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs @@ -1,16 +1,18 @@ -use crate::entities::doc::{RevType, Revision}; +use crate::{ + entities::doc::{RevType, Revision}, + services::util::md5, +}; use diesel::sql_types::Integer; use flowy_database::schema::rev_table; #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] #[table_name = "rev_table"] -#[primary_key(doc_id)] pub(crate) struct RevTable { + id: i32, pub(crate) doc_id: String, pub(crate) base_rev_id: i64, pub(crate) rev_id: i64, pub(crate) data: Vec, - pub(crate) md5: String, pub(crate) state: RevState, pub(crate) ty: RevTableType, } @@ -44,6 +46,38 @@ impl RevState { } impl_sql_integer_expression!(RevState); +impl std::convert::Into for RevTable { + fn into(self) -> Revision { + let md5 = md5(&self.data); + Revision { + base_rev_id: self.base_rev_id, + rev_id: self.rev_id, + delta: self.data, + md5, + doc_id: self.doc_id, + ty: self.ty.into(), + } + } +} + +impl std::convert::Into for RevType { + fn into(self) -> RevTableType { + match self { + RevType::Local => RevTableType::Local, + RevType::Remote => RevTableType::Remote, + } + } +} + +impl std::convert::From for RevType { + fn from(ty: RevTableType) -> Self { + match ty { + RevTableType::Local => RevType::Local, + RevTableType::Remote => RevType::Remote, + } + } +} + #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] #[repr(i32)] #[sql_type = "Integer"] @@ -73,32 +107,8 @@ impl RevTableType { } impl_sql_integer_expression!(RevTableType); -#[derive(AsChangeset, Identifiable, Default, Debug)] -#[table_name = "rev_table"] -#[primary_key(doc_id)] pub(crate) struct RevChangeset { pub(crate) doc_id: String, pub(crate) rev_id: i64, - pub(crate) state: Option, -} - -impl std::convert::Into for Revision { - fn into(self) -> RevTable { - RevTable { - doc_id: self.doc_id, - base_rev_id: self.base_rev_id, - rev_id: self.rev_id, - data: self.delta, - md5: self.md5, - state: RevState::Local, - ty: rev_ty_to_rev_state(self.ty), - } - } -} - -fn rev_ty_to_rev_state(ty: RevType) -> RevTableType { - match ty { - RevType::Local => RevTableType::Local, - RevType::Remote => RevTableType::Remote, - } + pub(crate) state: RevState, } diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs index b443fda96b..ee5fc7b498 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs @@ -6,7 +6,7 @@ use flowy_database::schema::doc_table; pub(crate) struct DocTable { pub(crate) id: String, pub(crate) data: String, - pub(crate) revision: i64, + pub(crate) rev_id: i64, } impl DocTable { @@ -14,7 +14,7 @@ impl DocTable { Self { id: doc.id, data: doc.data, - revision: 0, + rev_id: 0, } } } @@ -24,7 +24,7 @@ impl DocTable { pub(crate) struct DocTableChangeset { pub id: String, pub data: String, - pub revision: i64, + pub rev_id: i64, } impl std::convert::Into for DocTable { @@ -32,7 +32,7 @@ impl std::convert::Into for DocTable { Doc { id: self.id, data: self.data, - rev_id: self.revision, + rev_id: self.rev_id, } } } @@ -42,7 +42,7 @@ impl std::convert::From for DocTable { Self { id: doc.id, data: doc.data, - revision: doc.rev_id, + rev_id: doc.rev_id, } } }