AppFlowy/shared-lib/flowy-server-sync/src/server_document/document_manager.rs

296 lines
10 KiB
Rust
Raw Normal View History

2023-01-30 03:11:19 +00:00
use crate::server_document::document_pad::ServerDocument;
use async_stream::stream;
use dashmap::DashMap;
2023-01-30 03:11:19 +00:00
use document_model::document::DocumentInfo;
use flowy_sync::errors::{internal_sync_error, SyncError, SyncResult};
use flowy_sync::ext::DocumentCloudPersistence;
use flowy_sync::{RevisionSyncResponse, RevisionSynchronizer, RevisionUser};
use futures::stream::StreamExt;
use lib_ot::core::AttributeHashMap;
2022-10-22 13:57:44 +00:00
use lib_ot::text_delta::DeltaTextOperations;
2023-01-30 03:11:19 +00:00
use revision_model::Revision;
use std::{collections::HashMap, sync::Arc};
use tokio::{
2022-01-03 04:20:06 +00:00
sync::{mpsc, oneshot, RwLock},
task::spawn_blocking,
};
2023-01-30 03:11:19 +00:00
use ws_model::ws_revision::{ClientRevisionWSData, ServerRevisionWSDataBuilder};
2022-01-21 13:41:24 +00:00
2021-12-22 10:53:52 +00:00
pub struct ServerDocumentManager {
2022-01-21 13:41:24 +00:00
document_handlers: Arc<RwLock<HashMap<String, Arc<OpenDocumentHandler>>>>,
2022-10-13 15:29:37 +00:00
persistence: Arc<dyn DocumentCloudPersistence>,
}
2021-12-22 10:53:52 +00:00
impl ServerDocumentManager {
2022-10-13 15:29:37 +00:00
pub fn new(persistence: Arc<dyn DocumentCloudPersistence>) -> Self {
Self {
2022-01-21 13:41:24 +00:00
document_handlers: Arc::new(RwLock::new(HashMap::new())),
2021-12-12 08:02:58 +00:00
persistence,
}
}
2022-01-01 15:09:13 +00:00
pub async fn handle_client_revisions(
2021-12-25 13:44:45 +00:00
&self,
user: Arc<dyn RevisionUser>,
client_data: ClientRevisionWSData,
2023-01-30 03:11:19 +00:00
) -> Result<(), SyncError> {
2021-12-26 15:59:45 +00:00
let cloned_user = user.clone();
let ack_id = client_data.rev_id;
2022-01-14 07:23:21 +00:00
let object_id = client_data.object_id;
2021-12-26 15:59:45 +00:00
2022-01-14 07:23:21 +00:00
let result = match self.get_document_handler(&object_id).await {
2021-12-25 13:44:45 +00:00
None => {
2022-01-23 14:33:47 +00:00
tracing::trace!("Can't find the document. Creating the document {}", object_id);
let _ = self
.create_document(&object_id, client_data.revisions)
.await
2023-01-30 03:11:19 +00:00
.map_err(|e| SyncError::internal().context(format!("Server create document failed: {}", e)))?;
2021-12-26 15:59:45 +00:00
Ok(())
2022-01-24 09:35:58 +00:00
}
2021-12-26 15:59:45 +00:00
Some(handler) => {
handler.apply_revisions(user, client_data.revisions).await?;
2021-12-26 15:59:45 +00:00
Ok(())
2022-01-24 09:35:58 +00:00
}
2021-12-25 13:44:45 +00:00
};
2021-12-26 15:59:45 +00:00
if result.is_ok() {
2022-01-14 12:52:03 +00:00
cloned_user.receive(RevisionSyncResponse::Ack(
ServerRevisionWSDataBuilder::build_ack_message(&object_id, ack_id),
));
2021-12-26 15:59:45 +00:00
}
result
2021-12-25 13:44:45 +00:00
}
2022-01-01 15:09:13 +00:00
pub async fn handle_client_ping(
&self,
user: Arc<dyn RevisionUser>,
2022-01-14 07:23:21 +00:00
client_data: ClientRevisionWSData,
2023-01-30 03:11:19 +00:00
) -> Result<(), SyncError> {
let rev_id = client_data.rev_id;
2022-01-14 07:23:21 +00:00
let doc_id = client_data.object_id.clone();
2022-01-01 15:09:13 +00:00
match self.get_document_handler(&doc_id).await {
2022-01-02 02:34:42 +00:00
None => {
2022-01-11 14:23:19 +00:00
tracing::trace!("Document:{} doesn't exist, ignore client ping", doc_id);
2022-01-02 02:34:42 +00:00
Ok(())
2022-01-24 09:35:58 +00:00
}
2022-01-01 15:09:13 +00:00
Some(handler) => {
handler.apply_ping(rev_id, user).await?;
2022-01-02 14:23:33 +00:00
Ok(())
2022-01-24 09:35:58 +00:00
}
2022-01-02 14:23:33 +00:00
}
}
2023-01-30 03:11:19 +00:00
pub async fn handle_document_reset(&self, doc_id: &str, mut revisions: Vec<Revision>) -> Result<(), SyncError> {
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
2022-07-19 05:07:30 +00:00
2022-01-02 14:23:33 +00:00
match self.get_document_handler(doc_id).await {
None => {
tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
Ok(())
2022-01-24 09:35:58 +00:00
}
2022-01-02 14:23:33 +00:00
Some(handler) => {
handler.apply_document_reset(revisions).await?;
2022-01-01 15:09:13 +00:00
Ok(())
2022-01-24 09:35:58 +00:00
}
2022-01-01 15:09:13 +00:00
}
}
2022-01-21 13:41:24 +00:00
async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocumentHandler>> {
if let Some(handler) = self.document_handlers.read().await.get(doc_id).cloned() {
2022-01-03 04:20:06 +00:00
return Some(handler);
2021-12-13 05:55:44 +00:00
}
2022-01-03 04:20:06 +00:00
2022-01-21 13:41:24 +00:00
let mut write_guard = self.document_handlers.write().await;
2022-10-13 15:29:37 +00:00
match self.persistence.read_document(doc_id).await {
2022-01-07 09:37:11 +00:00
Ok(doc) => {
2023-01-30 03:11:19 +00:00
let handler = self.create_document_handler(doc).await.unwrap();
2022-01-07 09:37:11 +00:00
write_guard.insert(doc_id.to_owned(), handler.clone());
drop(write_guard);
Some(handler)
2022-01-24 09:35:58 +00:00
}
2022-01-07 09:37:11 +00:00
Err(_) => None,
}
}
2021-12-26 15:59:45 +00:00
async fn create_document(
&self,
doc_id: &str,
revisions: Vec<Revision>,
2023-01-30 03:11:19 +00:00
) -> Result<Arc<OpenDocumentHandler>, SyncError> {
match self.persistence.create_document(doc_id, revisions).await? {
2023-01-30 03:11:19 +00:00
None => Err(SyncError::internal().context("Create document info from revisions failed")),
2022-01-21 13:41:24 +00:00
Some(doc) => {
let handler = self.create_document_handler(doc).await?;
self.document_handlers
.write()
.await
.insert(doc_id.to_owned(), handler.clone());
Ok(handler)
2022-01-24 09:35:58 +00:00
}
2022-01-21 13:41:24 +00:00
}
2021-12-13 14:46:35 +00:00
}
2022-04-10 08:29:45 +00:00
#[tracing::instrument(level = "debug", skip(self, doc), err)]
2023-01-30 03:11:19 +00:00
async fn create_document_handler(&self, doc: DocumentInfo) -> Result<Arc<OpenDocumentHandler>, SyncError> {
2021-12-25 13:44:45 +00:00
let persistence = self.persistence.clone();
2022-01-21 13:41:24 +00:00
let handle = spawn_blocking(|| OpenDocumentHandler::new(doc, persistence))
.await
2023-01-30 03:11:19 +00:00
.map_err(|e| SyncError::internal().context(format!("Create document handler failed: {}", e)))?;
2022-01-03 04:20:06 +00:00
Ok(Arc::new(handle?))
}
}
impl std::ops::Drop for ServerDocumentManager {
fn drop(&mut self) {
2022-01-12 04:40:41 +00:00
log::trace!("ServerDocumentManager was dropped");
}
}
type DocumentRevisionSynchronizer = RevisionSynchronizer<AttributeHashMap>;
2022-01-21 13:41:24 +00:00
struct OpenDocumentHandler {
2021-12-28 16:34:00 +00:00
doc_id: String,
sender: mpsc::Sender<DocumentCommand>,
users: DashMap<String, Arc<dyn RevisionUser>>,
}
2022-01-21 13:41:24 +00:00
impl OpenDocumentHandler {
2023-01-30 03:11:19 +00:00
fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentCloudPersistence>) -> Result<Self, SyncError> {
2022-10-13 15:29:37 +00:00
let doc_id = doc.doc_id.clone();
2022-01-21 13:41:24 +00:00
let (sender, receiver) = mpsc::channel(1000);
let users = DashMap::new();
2022-01-11 14:23:19 +00:00
let operations = DeltaTextOperations::from_bytes(&doc.data)?;
let sync_object = ServerDocument::from_operations(&doc_id, operations);
2022-01-21 13:41:24 +00:00
let synchronizer = Arc::new(DocumentRevisionSynchronizer::new(doc.rev_id, sync_object, persistence));
2022-01-11 14:23:19 +00:00
2022-10-13 15:29:37 +00:00
let queue = DocumentCommandRunner::new(&doc.doc_id, receiver, synchronizer);
2022-01-11 14:23:19 +00:00
tokio::task::spawn(queue.run());
Ok(Self { doc_id, sender, users })
}
2022-01-21 13:41:24 +00:00
#[tracing::instrument(
name = "server_document_apply_revision",
level = "trace",
skip(self, user, revisions),
2022-01-21 13:41:24 +00:00
err
)]
2023-01-30 03:11:19 +00:00
async fn apply_revisions(&self, user: Arc<dyn RevisionUser>, revisions: Vec<Revision>) -> Result<(), SyncError> {
let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone());
let msg = DocumentCommand::ApplyRevisions { user, revisions, ret };
2021-12-28 16:34:00 +00:00
self.send(msg, rx).await?
}
2023-01-30 03:11:19 +00:00
async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), SyncError> {
2022-01-01 15:09:13 +00:00
let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone());
2022-01-11 14:23:19 +00:00
let msg = DocumentCommand::Ping { user, rev_id, ret };
self.send(msg, rx).await?
2022-01-01 15:09:13 +00:00
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
2023-01-30 03:11:19 +00:00
async fn apply_document_reset(&self, revisions: Vec<Revision>) -> Result<(), SyncError> {
2022-01-02 14:23:33 +00:00
let (ret, rx) = oneshot::channel();
let msg = DocumentCommand::Reset { revisions, ret };
self.send(msg, rx).await?
2022-01-02 14:23:33 +00:00
}
2023-01-30 03:11:19 +00:00
async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> SyncResult<T> {
self.sender
2021-12-28 16:34:00 +00:00
.send(msg)
.await
2023-01-30 03:11:19 +00:00
.map_err(|e| SyncError::internal().context(format!("Send document command failed: {}", e)))?;
rx.await.map_err(internal_sync_error)
}
}
2022-01-21 13:41:24 +00:00
impl std::ops::Drop for OpenDocumentHandler {
2021-12-28 16:34:00 +00:00
fn drop(&mut self) {
2022-01-12 04:40:41 +00:00
tracing::trace!("{} OpenDocHandle was dropped", self.doc_id);
2021-12-28 16:34:00 +00:00
}
}
// #[derive(Debug)]
enum DocumentCommand {
2021-12-25 13:44:45 +00:00
ApplyRevisions {
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
2023-01-30 03:11:19 +00:00
ret: oneshot::Sender<SyncResult<()>>,
},
2022-01-01 15:09:13 +00:00
Ping {
user: Arc<dyn RevisionUser>,
rev_id: i64,
2023-01-30 03:11:19 +00:00
ret: oneshot::Sender<SyncResult<()>>,
2022-01-01 15:09:13 +00:00
},
2022-01-02 14:23:33 +00:00
Reset {
revisions: Vec<Revision>,
2023-01-30 03:11:19 +00:00
ret: oneshot::Sender<SyncResult<()>>,
2022-01-02 14:23:33 +00:00
},
}
2022-01-21 13:41:24 +00:00
struct DocumentCommandRunner {
2021-12-25 13:44:45 +00:00
pub doc_id: String,
receiver: Option<mpsc::Receiver<DocumentCommand>>,
2022-01-21 13:41:24 +00:00
synchronizer: Arc<DocumentRevisionSynchronizer>,
}
2022-01-21 13:41:24 +00:00
impl DocumentCommandRunner {
2022-01-11 14:23:19 +00:00
fn new(
doc_id: &str,
receiver: mpsc::Receiver<DocumentCommand>,
2022-01-21 13:41:24 +00:00
synchronizer: Arc<DocumentRevisionSynchronizer>,
) -> Self {
Self {
2022-01-11 14:23:19 +00:00
doc_id: doc_id.to_owned(),
receiver: Some(receiver),
2021-12-25 13:44:45 +00:00
synchronizer,
2022-01-21 13:41:24 +00:00
}
}
async fn run(mut self) {
let mut receiver = self
.receiver
.take()
2022-01-21 13:41:24 +00:00
.expect("DocumentCommandRunner's receiver should only take one time");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each(|msg| self.handle_message(msg)).await;
}
async fn handle_message(&self, msg: DocumentCommand) {
match msg {
DocumentCommand::ApplyRevisions { user, revisions, ret } => {
2021-12-28 16:34:00 +00:00
let result = self
.synchronizer
.sync_revisions(user, revisions)
2021-12-25 13:44:45 +00:00
.await
2023-01-30 03:11:19 +00:00
.map_err(internal_sync_error);
2022-01-01 15:09:13 +00:00
let _ = ret.send(result);
2022-01-24 09:35:58 +00:00
}
2022-01-11 14:23:19 +00:00
DocumentCommand::Ping { user, rev_id, ret } => {
2023-01-30 03:11:19 +00:00
let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_sync_error);
2022-01-02 14:23:33 +00:00
let _ = ret.send(result);
2022-01-24 09:35:58 +00:00
}
DocumentCommand::Reset { revisions, ret } => {
2023-01-30 03:11:19 +00:00
let result = self.synchronizer.reset(revisions).await.map_err(internal_sync_error);
2021-12-28 16:34:00 +00:00
let _ = ret.send(result);
2022-01-24 09:35:58 +00:00
}
}
}
}
2021-12-28 16:34:00 +00:00
2022-01-21 13:41:24 +00:00
impl std::ops::Drop for DocumentCommandRunner {
2021-12-28 16:34:00 +00:00
fn drop(&mut self) {
2022-01-12 04:40:41 +00:00
tracing::trace!("{} DocumentCommandQueue was dropped", self.doc_id);
2021-12-28 16:34:00 +00:00
}
}