From e0db7bd4f932322cec005673fcc9112800a54085 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 20 Jul 2022 18:27:12 +0800 Subject: [PATCH 1/2] chore: remove feature flag: filter --- .../src/services/persistence/mod.rs | 12 +- frontend/rust-lib/flowy-grid/Cargo.toml | 3 +- frontend/rust-lib/flowy-grid/src/manager.rs | 15 ++- .../flowy-grid/src/services/grid_editor.rs | 4 +- .../src/services/persistence/block_index.rs | 1 + .../src/services/persistence/migration.rs | 113 ++++++++++++++++++ .../src/services/persistence/mod.rs | 1 + .../flowy-revision/src/cache/disk/mod.rs | 10 ++ .../flowy-revision/src/rev_persistence.rs | 11 +- .../flowy-sdk/src/deps_resolve/grid_deps.rs | 17 ++- frontend/rust-lib/flowy-sdk/src/lib.rs | 21 +++- .../src/revision/grid_rev.rs | 5 - .../src/client_grid/grid_revision_pad.rs | 12 +- 13 files changed, 196 insertions(+), 29 deletions(-) create mode 100644 frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs index 54fcc3b522..5676db54da 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs @@ -11,9 +11,9 @@ use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision}; use flowy_revision::disk::{RevisionRecord, RevisionState}; -use flowy_revision::mk_revision_disk_cache; -use flowy_sync::client_folder::initial_folder_delta; +use flowy_revision::mk_text_block_revision_disk_cache; use flowy_sync::{client_folder::FolderPad, entities::revision::Revision}; +use lib_ot::core::PlainTextDeltaBuilder; use std::sync::Arc; use tokio::sync::RwLock; pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*}; @@ -109,16 +109,16 @@ impl FolderPersistence { pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> { let pool = self.database.db_pool()?; - let delta_data = initial_folder_delta(&folder)?.to_delta_bytes(); - let md5 = folder.md5(); - let revision = Revision::new(folder_id.as_ref(), 0, 0, delta_data, user_id, md5); + let json = folder.to_json()?; + let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes(); + let revision = Revision::initial_revision(user_id, folder_id.as_ref(), delta_data); let record = RevisionRecord { revision, state: RevisionState::Sync, write_to_disk: true, }; - let disk_cache = mk_revision_disk_cache(user_id, pool); + let disk_cache = mk_text_block_revision_disk_cache(user_id, pool); disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record]) } } diff --git a/frontend/rust-lib/flowy-grid/Cargo.toml b/frontend/rust-lib/flowy-grid/Cargo.toml index e09e156772..ba3702038e 100644 --- a/frontend/rust-lib/flowy-grid/Cargo.toml +++ b/frontend/rust-lib/flowy-grid/Cargo.toml @@ -50,7 +50,6 @@ lib-infra = { path = "../../../shared-lib/lib-infra", features = ["protobuf_file [features] -default = ["filter"] +default = [] dart = ["lib-infra/dart"] -filter = [] flowy_unit_test = ["flowy-revision/flowy_unit_test"] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 77f019293f..320f18b560 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -2,6 +2,7 @@ use crate::services::block_revision_editor::GridBlockRevisionCompactor; use crate::services::grid_editor::{GridRevisionCompactor, GridRevisionEditor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; +use crate::services::persistence::migration::GridMigration; use crate::services::persistence::GridDatabase; use crate::services::tasks::GridTaskScheduler; use bytes::Bytes; @@ -31,6 +32,7 @@ pub struct GridManager { #[allow(dead_code)] kv_persistence: Arc, task_scheduler: GridTaskSchedulerRwLock, + migration: GridMigration, } impl GridManager { @@ -41,17 +43,27 @@ impl GridManager { ) -> Self { let grid_editors = Arc::new(DashMap::new()); let kv_persistence = Arc::new(GridKVPersistence::new(database.clone())); - let block_index_cache = Arc::new(BlockIndexCache::new(database)); + let block_index_cache = Arc::new(BlockIndexCache::new(database.clone())); let task_scheduler = GridTaskScheduler::new(); + let migration = GridMigration::new(grid_user.clone(), database); Self { grid_editors, grid_user, kv_persistence, block_index_cache, task_scheduler, + migration, } } + pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> { + Ok(()) + } + + pub async fn initialize(&self, _user_id: &str, _token: &str) -> FlowyResult<()> { + Ok(()) + } + #[tracing::instrument(level = "debug", skip_all, err)] pub async fn create_grid>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { let grid_id = grid_id.as_ref(); @@ -74,6 +86,7 @@ impl GridManager { pub async fn open_grid>(&self, grid_id: T) -> FlowyResult> { let grid_id = grid_id.as_ref(); tracing::Span::current().record("grid_id", &grid_id); + let _ = self.migration.migration_grid_if_need(grid_id).await; self.get_or_create_grid_editor(grid_id).await } diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index bcb4a7d288..572665bd0c 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -643,8 +643,8 @@ pub struct GridPadBuilder(); impl RevisionObjectBuilder for GridPadBuilder { type Output = GridRevisionPad; - fn build_object(object_id: &str, revisions: Vec) -> FlowyResult { - let pad = GridRevisionPad::from_revisions(object_id, revisions)?; + fn build_object(_object_id: &str, revisions: Vec) -> FlowyResult { + let pad = GridRevisionPad::from_revisions(revisions)?; Ok(pad) } } diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/block_index.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/block_index.rs index c62dc502ad..798819fb98 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/block_index.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/block_index.rs @@ -7,6 +7,7 @@ use flowy_database::{ use flowy_error::FlowyResult; use std::sync::Arc; +/// Allow getting the block id from row id. pub struct BlockIndexCache { database: Arc, } diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs new file mode 100644 index 0000000000..026ba3bc20 --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs @@ -0,0 +1,113 @@ +use crate::manager::GridUser; + +use crate::services::persistence::GridDatabase; +use flowy_database::kv::KV; +use flowy_error::FlowyResult; +use flowy_grid_data_model::revision::GridRevision; +use flowy_revision::disk::{RevisionRecord, SQLiteGridRevisionPersistence}; +use flowy_revision::{mk_grid_block_revision_disk_cache, RevisionLoader, RevisionPersistence}; +use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad}; +use flowy_sync::entities::revision::Revision; + +use lib_ot::core::PlainTextDeltaBuilder; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use std::sync::Arc; + +pub(crate) struct GridMigration { + user: Arc, + database: Arc, +} + +impl GridMigration { + pub fn new(user: Arc, database: Arc) -> Self { + Self { user, database } + } + + pub async fn migration_grid_if_need(&self, grid_id: &str) -> FlowyResult<()> { + match KV::get_str(grid_id) { + None => { + let _ = self.reset_grid_rev(grid_id).await?; + let _ = self.save_migrate_record(grid_id)?; + } + Some(s) => { + let mut record = MigrationGridRecord::from_str(&s)?; + let empty_json = self.empty_grid_rev_json()?; + if record.len < empty_json.len() { + let _ = self.reset_grid_rev(grid_id).await?; + record.len = empty_json.len(); + KV::set_str(grid_id, record.to_string()); + } + } + } + Ok(()) + } + + async fn reset_grid_rev(&self, grid_id: &str) -> FlowyResult<()> { + let user_id = self.user.user_id()?; + let pool = self.database.db_pool()?; + let grid_rev_pad = self.get_grid_revision_pad(grid_id).await?; + let json = grid_rev_pad.json_str()?; + let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes(); + let revision = Revision::initial_revision(&user_id, grid_id, delta_data); + let record = RevisionRecord::new(revision); + // + let disk_cache = mk_grid_block_revision_disk_cache(&user_id, pool); + let _ = disk_cache.delete_and_insert_records(grid_id, None, vec![record]); + Ok(()) + } + + fn save_migrate_record(&self, grid_id: &str) -> FlowyResult<()> { + let empty_json_str = self.empty_grid_rev_json()?; + let record = MigrationGridRecord { + grid_id: grid_id.to_owned(), + len: empty_json_str.len(), + }; + KV::set_str(grid_id, record.to_string()); + Ok(()) + } + + fn empty_grid_rev_json(&self) -> FlowyResult { + let empty_grid_rev = GridRevision::default(); + let empty_json = make_grid_rev_json_str(&empty_grid_rev)?; + Ok(empty_json) + } + + async fn get_grid_revision_pad(&self, grid_id: &str) -> FlowyResult { + let pool = self.database.db_pool()?; + let user_id = self.user.user_id()?; + let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool); + let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache)); + let (revisions, _) = RevisionLoader { + object_id: grid_id.to_owned(), + user_id, + cloud: None, + rev_persistence, + } + .load() + .await?; + + let pad = GridRevisionPad::from_revisions(revisions)?; + Ok(pad) + } +} + +#[derive(Serialize, Deserialize)] +struct MigrationGridRecord { + grid_id: String, + len: usize, +} + +impl FromStr for MigrationGridRecord { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_str::(s) + } +} + +impl ToString for MigrationGridRecord { + fn to_string(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| "".to_string()) + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/mod.rs index d6167cf8e6..7bd196acc7 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub mod block_index; pub mod kv; +pub mod migration; pub trait GridDatabase: Send + Sync { fn db_pool(&self) -> Result, FlowyError>; diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs index 814b591f15..991d8f9b9f 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs @@ -53,6 +53,14 @@ pub struct RevisionRecord { } impl RevisionRecord { + pub fn new(revision: Revision) -> Self { + Self { + revision, + state: RevisionState::Sync, + write_to_disk: true, + } + } + pub fn ack(&mut self) { self.state = RevisionState::Ack; } @@ -64,6 +72,8 @@ pub struct RevisionChangeset { pub(crate) state: RevisionState, } +/// Sync: revision is not synced to the server +/// Ack: revision is synced to the server #[derive(Debug, Clone, Eq, PartialEq)] pub enum RevisionState { Sync = 0, diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 98bc89ba24..eb3da339b7 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -2,7 +2,7 @@ use crate::cache::{ disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence}, memory::RevisionMemoryCacheDelegate, }; -use crate::disk::{RevisionRecord, RevisionState}; +use crate::disk::{RevisionRecord, RevisionState, SQLiteGridBlockRevisionPersistence}; use crate::memory::RevisionMemoryCache; use crate::RevisionCompactor; use flowy_database::ConnectionPool; @@ -214,13 +214,20 @@ impl RevisionPersistence { } } -pub fn mk_revision_disk_cache( +pub fn mk_text_block_revision_disk_cache( user_id: &str, pool: Arc, ) -> Arc> { Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)) } +pub fn mk_grid_block_revision_disk_cache( + user_id: &str, + pool: Arc, +) -> Arc> { + Arc::new(SQLiteGridBlockRevisionPersistence::new(user_id, pool)) +} + impl RevisionMemoryCacheDelegate for Arc> { fn checkpoint_tick(&self, mut records: Vec) -> FlowyResult<()> { records.retain(|record| record.write_to_disk); diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs index 88073a3107..f2b862c53a 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs @@ -16,14 +16,23 @@ use std::sync::Arc; pub struct GridDepsResolver(); impl GridDepsResolver { - pub fn resolve(ws_conn: Arc, user_session: Arc) -> Arc { + pub async fn resolve(ws_conn: Arc, user_session: Arc) -> Arc { let user = Arc::new(GridUserImpl(user_session.clone())); let rev_web_socket = Arc::new(GridWebSocket(ws_conn)); - Arc::new(GridManager::new( - user, + let grid_manager = Arc::new(GridManager::new( + user.clone(), rev_web_socket, Arc::new(GridDatabaseImpl(user_session)), - )) + )); + + if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) { + match grid_manager.initialize(&user_id, &token).await { + Ok(_) => {} + Err(e) => tracing::error!("Initialize grid manager failed: {}", e), + } + } + + grid_manager } } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 4a37faefef..00a3785122 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -112,7 +112,7 @@ impl FlowySDK { &config.server_config, ); - let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()); + let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()).await; let folder_manager = FolderDepsResolver::resolve( local_server.clone(), @@ -147,7 +147,7 @@ impl FlowySDK { ) })); - _start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager); + _start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager, &grid_manager); Self { config, @@ -171,10 +171,12 @@ fn _start_listening( ws_conn: &Arc, user_session: &Arc, folder_manager: &Arc, + grid_manager: &Arc, ) { let subscribe_user_status = user_session.notifier.subscribe_user_status(); let subscribe_network_type = ws_conn.subscribe_network_ty(); let folder_manager = folder_manager.clone(); + let grid_manager = grid_manager.clone(); let cloned_folder_manager = folder_manager.clone(); let ws_conn = ws_conn.clone(); let user_session = user_session.clone(); @@ -182,7 +184,13 @@ fn _start_listening( dispatch.spawn(async move { user_session.init(); listen_on_websocket(ws_conn.clone()); - _listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await; + _listen_user_status( + ws_conn.clone(), + subscribe_user_status, + folder_manager.clone(), + grid_manager.clone(), + ) + .await; }); dispatch.spawn(async move { @@ -209,6 +217,7 @@ async fn _listen_user_status( ws_conn: Arc, mut subscribe: broadcast::Receiver, folder_manager: Arc, + grid_manager: Arc, ) { while let Ok(status) = subscribe.recv().await { let result = || async { @@ -216,6 +225,7 @@ async fn _listen_user_status( UserStatus::Login { token, user_id } => { tracing::trace!("User did login"); let _ = folder_manager.initialize(&user_id, &token).await?; + let _ = grid_manager.initialize(&user_id, &token).await?; let _ = ws_conn.start(token, user_id).await?; } UserStatus::Logout { .. } => { @@ -233,6 +243,11 @@ async fn _listen_user_status( let _ = folder_manager .initialize_with_new_user(&profile.id, &profile.token) .await?; + + let _ = grid_manager + .initialize_with_new_user(&profile.id, &profile.token) + .await?; + let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?; let _ = ret.send(()); } diff --git a/shared-lib/flowy-grid-data-model/src/revision/grid_rev.rs b/shared-lib/flowy-grid-data-model/src/revision/grid_rev.rs index 671f71563f..2298eb485f 100644 --- a/shared-lib/flowy-grid-data-model/src/revision/grid_rev.rs +++ b/shared-lib/flowy-grid-data-model/src/revision/grid_rev.rs @@ -31,13 +31,8 @@ pub struct GridRevision { pub fields: Vec>, pub blocks: Vec>, - #[cfg(feature = "filter")] #[serde(default)] pub setting: GridSettingRevision, - - #[cfg(not(feature = "filter"))] - #[serde(default, skip)] - pub setting: GridSettingRevision, } impl GridRevision { diff --git a/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs index e47305f7fa..eaa6f1823a 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs @@ -62,7 +62,7 @@ impl GridRevisionPad { }) } - pub fn from_revisions(_grid_id: &str, revisions: Vec) -> CollaborateResult { + pub fn from_revisions(revisions: Vec) -> CollaborateResult { let grid_delta: GridRevisionDelta = make_delta_from_revisions::(revisions)?; Self::from_delta(grid_delta) } @@ -480,8 +480,8 @@ impl GridRevisionPad { match f(Arc::make_mut(&mut self.grid_rev))? { None => Ok(None), Some(_) => { - let old = json_from_grid(&cloned_grid)?; - let new = json_from_grid(&self.grid_rev)?; + let old = make_grid_rev_json_str(&cloned_grid)?; + let new = self.json_str()?; match cal_diff::(old, new) { None => Ok(None), Some(delta) => { @@ -528,9 +528,13 @@ impl GridRevisionPad { }, ) } + + pub fn json_str(&self) -> CollaborateResult { + make_grid_rev_json_str(&self.grid_rev) + } } -fn json_from_grid(grid: &Arc) -> CollaborateResult { +pub fn make_grid_rev_json_str(grid: &GridRevision) -> CollaborateResult { let json = serde_json::to_string(grid) .map_err(|err| internal_error(format!("Serialize grid to json str failed. {:?}", err)))?; Ok(json) From c97e1ccb953fe1260d6cc9d5078317ebe4bcadcb Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 20 Jul 2022 18:47:05 +0800 Subject: [PATCH 2/2] chore: fix tests --- shared-lib/flowy-grid-data-model/tests/serde_test.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/shared-lib/flowy-grid-data-model/tests/serde_test.rs b/shared-lib/flowy-grid-data-model/tests/serde_test.rs index b544e10588..1dff913547 100644 --- a/shared-lib/flowy-grid-data-model/tests/serde_test.rs +++ b/shared-lib/flowy-grid-data-model/tests/serde_test.rs @@ -6,5 +6,8 @@ fn grid_default_serde_test() { let grid = GridRevision::new(&grid_id); let json = serde_json::to_string(&grid).unwrap(); - assert_eq!(json, r#"{"grid_id":"1","fields":[],"blocks":[]}"#) + assert_eq!( + json, + r#"{"grid_id":"1","fields":[],"blocks":[],"setting":{"layout":0,"filters":[]}}"# + ) }