From 34ed5a6be237e125b9983731cd1afe7305b34ac9 Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 16 Sep 2021 18:31:25 +0800 Subject: [PATCH] add flowy-ws crate --- .../lib/protobuf/flowy-ws/errors.pb.dart | 76 +++++ .../lib/protobuf/flowy-ws/errors.pbenum.dart | 24 ++ .../lib/protobuf/flowy-ws/errors.pbjson.dart | 31 ++ .../protobuf/flowy-ws/errors.pbserver.dart | 9 + .../lib/protobuf/flowy-ws/protobuf.dart | 2 + backend/src/service/log/mod.rs | 2 +- rust-lib/Cargo.toml | 1 + rust-lib/dart-ffi/src/protobuf/mod.rs | 2 + rust-lib/dart-ffi/src/protobuf/model/mod.rs | 10 +- .../src/derive_cache/derive_cache.rs | 7 +- rust-lib/flowy-document/src/protobuf/mod.rs | 2 + .../flowy-document/src/protobuf/model/mod.rs | 18 +- rust-lib/flowy-infra/src/protobuf/mod.rs | 2 + .../flowy-infra/src/protobuf/model/mod.rs | 6 +- rust-lib/flowy-observable/src/protobuf/mod.rs | 2 + .../src/protobuf/model/mod.rs | 6 +- rust-lib/flowy-sdk/src/lib.rs | 1 + rust-lib/flowy-user/src/protobuf/mod.rs | 2 + rust-lib/flowy-user/src/protobuf/model/mod.rs | 26 +- rust-lib/flowy-workspace/src/protobuf/mod.rs | 2 + .../flowy-workspace/src/protobuf/model/mod.rs | 66 ++-- rust-lib/flowy-ws/Cargo.toml | 25 ++ rust-lib/flowy-ws/Flowy.toml | 2 + rust-lib/flowy-ws/src/errors.rs | 55 ++++ rust-lib/flowy-ws/src/lib.rs | 3 + rust-lib/flowy-ws/src/protobuf/mod.rs | 4 + .../flowy-ws/src/protobuf/model/errors.rs | 290 ++++++++++++++++++ rust-lib/flowy-ws/src/protobuf/model/mod.rs | 4 + .../flowy-ws/src/protobuf/proto/errors.proto | 9 + rust-lib/flowy-ws/src/ws.rs | 173 +++++++++++ 30 files changed, 793 insertions(+), 69 deletions(-) create mode 100644 app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pb.dart create mode 100644 app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart create mode 100644 app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart create mode 100644 app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbserver.dart create mode 100644 app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/protobuf.dart create mode 100644 rust-lib/flowy-ws/Cargo.toml create mode 100644 rust-lib/flowy-ws/Flowy.toml create mode 100644 rust-lib/flowy-ws/src/errors.rs create mode 100644 rust-lib/flowy-ws/src/lib.rs create mode 100644 rust-lib/flowy-ws/src/protobuf/mod.rs create mode 100644 rust-lib/flowy-ws/src/protobuf/model/errors.rs create mode 100644 rust-lib/flowy-ws/src/protobuf/model/mod.rs create mode 100644 rust-lib/flowy-ws/src/protobuf/proto/errors.proto create mode 100644 rust-lib/flowy-ws/src/ws.rs diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pb.dart new file mode 100644 index 0000000000..63c73c28f3 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pb.dart @@ -0,0 +1,76 @@ +/// +// Generated code. Do not modify. +// source: errors.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields + +import 'dart:core' as $core; + +import 'package:protobuf/protobuf.dart' as $pb; + +import 'errors.pbenum.dart'; + +export 'errors.pbenum.dart'; + +class WsError extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsError', createEmptyInstance: create) + ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'code', $pb.PbFieldType.OE, defaultOrMaker: ErrorCode.InternalError, valueOf: ErrorCode.valueOf, enumValues: ErrorCode.values) + ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'msg') + ..hasRequiredFields = false + ; + + WsError._() : super(); + factory WsError({ + ErrorCode? code, + $core.String? msg, + }) { + final _result = create(); + if (code != null) { + _result.code = code; + } + if (msg != null) { + _result.msg = msg; + } + return _result; + } + factory WsError.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory WsError.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + WsError clone() => WsError()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + WsError copyWith(void Function(WsError) updates) => super.copyWith((message) => updates(message as WsError)) as WsError; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static WsError create() => WsError._(); + WsError createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static WsError getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static WsError? _defaultInstance; + + @$pb.TagNumber(1) + ErrorCode get code => $_getN(0); + @$pb.TagNumber(1) + set code(ErrorCode v) { setField(1, v); } + @$pb.TagNumber(1) + $core.bool hasCode() => $_has(0); + @$pb.TagNumber(1) + void clearCode() => clearField(1); + + @$pb.TagNumber(2) + $core.String get msg => $_getSZ(1); + @$pb.TagNumber(2) + set msg($core.String v) { $_setString(1, v); } + @$pb.TagNumber(2) + $core.bool hasMsg() => $_has(1); + @$pb.TagNumber(2) + void clearMsg() => clearField(2); +} + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart new file mode 100644 index 0000000000..d16d79e397 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart @@ -0,0 +1,24 @@ +/// +// Generated code. Do not modify. +// source: errors.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields + +// ignore_for_file: UNDEFINED_SHOWN_NAME +import 'dart:core' as $core; +import 'package:protobuf/protobuf.dart' as $pb; + +class ErrorCode extends $pb.ProtobufEnum { + static const ErrorCode InternalError = ErrorCode._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError'); + + static const $core.List values = [ + InternalError, + ]; + + static final $core.Map<$core.int, ErrorCode> _byValue = $pb.ProtobufEnum.initByValue(values); + static ErrorCode? valueOf($core.int value) => _byValue[value]; + + const ErrorCode._($core.int v, $core.String n) : super(v, n); +} + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart new file mode 100644 index 0000000000..629328d718 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart @@ -0,0 +1,31 @@ +/// +// Generated code. Do not modify. +// source: errors.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package + +import 'dart:core' as $core; +import 'dart:convert' as $convert; +import 'dart:typed_data' as $typed_data; +@$core.Deprecated('Use errorCodeDescriptor instead') +const ErrorCode$json = const { + '1': 'ErrorCode', + '2': const [ + const {'1': 'InternalError', '2': 0}, + ], +}; + +/// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAA'); +@$core.Deprecated('Use wsErrorDescriptor instead') +const WsError$json = const { + '1': 'WsError', + '2': const [ + const {'1': 'code', '3': 1, '4': 1, '5': 14, '6': '.ErrorCode', '10': 'code'}, + const {'1': 'msg', '3': 2, '4': 1, '5': 9, '10': 'msg'}, + ], +}; + +/// Descriptor for `WsError`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List wsErrorDescriptor = $convert.base64Decode('CgdXc0Vycm9yEh4KBGNvZGUYASABKA4yCi5FcnJvckNvZGVSBGNvZGUSEAoDbXNnGAIgASgJUgNtc2c='); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbserver.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbserver.dart new file mode 100644 index 0000000000..18b02b9216 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbserver.dart @@ -0,0 +1,9 @@ +/// +// Generated code. Do not modify. +// source: errors.proto +// +// @dart = 2.12 +// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package + +export 'errors.pb.dart'; + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/protobuf.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/protobuf.dart new file mode 100644 index 0000000000..92eb134641 --- /dev/null +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/protobuf.dart @@ -0,0 +1,2 @@ +// Auto-generated, do not edit +export './errors.pb.dart'; diff --git a/backend/src/service/log/mod.rs b/backend/src/service/log/mod.rs index d6f00fed04..7e821832da 100644 --- a/backend/src/service/log/mod.rs +++ b/backend/src/service/log/mod.rs @@ -1,5 +1,5 @@ use log::LevelFilter; -use std::path::Path; + use tracing::subscriber::set_global_default; use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; diff --git a/rust-lib/Cargo.toml b/rust-lib/Cargo.toml index 75b7f6b5f1..be571a7f68 100644 --- a/rust-lib/Cargo.toml +++ b/rust-lib/Cargo.toml @@ -17,6 +17,7 @@ members = [ "flowy-editor", "flowy-ot", "flowy-net", + "flowy-ws", ] exclude = ["../backend"] diff --git a/rust-lib/dart-ffi/src/protobuf/mod.rs b/rust-lib/dart-ffi/src/protobuf/mod.rs index 8177ff8614..2480f62fbe 100644 --- a/rust-lib/dart-ffi/src/protobuf/mod.rs +++ b/rust-lib/dart-ffi/src/protobuf/mod.rs @@ -1,2 +1,4 @@ + mod model; pub use model::*; + \ No newline at end of file diff --git a/rust-lib/dart-ffi/src/protobuf/model/mod.rs b/rust-lib/dart-ffi/src/protobuf/model/mod.rs index faa39cb5d8..8220f60f66 100644 --- a/rust-lib/dart-ffi/src/protobuf/model/mod.rs +++ b/rust-lib/dart-ffi/src/protobuf/model/mod.rs @@ -1,7 +1,7 @@ -// Auto-generated, do not edit +// Auto-generated, do not edit -mod ffi_response; -pub use ffi_response::*; +mod ffi_response; +pub use ffi_response::*; -mod ffi_request; -pub use ffi_request::*; +mod ffi_request; +pub use ffi_request::*; diff --git a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs index 289e7c70b8..4ec763441f 100644 --- a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -53,6 +53,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "View" | "RepeatedView" | "WorkspaceError" + | "WsError" | "CreateDocParams" | "Doc" | "SaveDocParams" @@ -71,7 +72,8 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "UserProfile" | "UpdateUserRequest" | "UpdateUserParams" - | "UserError" => TypeCategory::Protobuf, + | "UserError" + => TypeCategory::Protobuf, "ViewType" | "WorkspaceEvent" | "ErrorCode" @@ -80,7 +82,8 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "FFIStatusCode" | "UserStatus" | "UserEvent" - | "UserObservable" => TypeCategory::Enum, + | "UserObservable" + => TypeCategory::Enum, "Option" => TypeCategory::Opt, _ => TypeCategory::Primitive, diff --git a/rust-lib/flowy-document/src/protobuf/mod.rs b/rust-lib/flowy-document/src/protobuf/mod.rs index 8177ff8614..2480f62fbe 100644 --- a/rust-lib/flowy-document/src/protobuf/mod.rs +++ b/rust-lib/flowy-document/src/protobuf/mod.rs @@ -1,2 +1,4 @@ + mod model; pub use model::*; + \ No newline at end of file diff --git a/rust-lib/flowy-document/src/protobuf/model/mod.rs b/rust-lib/flowy-document/src/protobuf/model/mod.rs index 68d74ca94b..4a18e82c46 100644 --- a/rust-lib/flowy-document/src/protobuf/model/mod.rs +++ b/rust-lib/flowy-document/src/protobuf/model/mod.rs @@ -1,13 +1,13 @@ -// Auto-generated, do not edit +// Auto-generated, do not edit -mod observable; -pub use observable::*; +mod observable; +pub use observable::*; -mod errors; -pub use errors::*; +mod errors; +pub use errors::*; -mod event; -pub use event::*; +mod event; +pub use event::*; -mod doc; -pub use doc::*; +mod doc; +pub use doc::*; diff --git a/rust-lib/flowy-infra/src/protobuf/mod.rs b/rust-lib/flowy-infra/src/protobuf/mod.rs index 8177ff8614..2480f62fbe 100644 --- a/rust-lib/flowy-infra/src/protobuf/mod.rs +++ b/rust-lib/flowy-infra/src/protobuf/mod.rs @@ -1,2 +1,4 @@ + mod model; pub use model::*; + \ No newline at end of file diff --git a/rust-lib/flowy-infra/src/protobuf/model/mod.rs b/rust-lib/flowy-infra/src/protobuf/model/mod.rs index fac22cac7e..b481acecd1 100644 --- a/rust-lib/flowy-infra/src/protobuf/model/mod.rs +++ b/rust-lib/flowy-infra/src/protobuf/model/mod.rs @@ -1,4 +1,4 @@ -// Auto-generated, do not edit +// Auto-generated, do not edit -mod kv; -pub use kv::*; +mod kv; +pub use kv::*; diff --git a/rust-lib/flowy-observable/src/protobuf/mod.rs b/rust-lib/flowy-observable/src/protobuf/mod.rs index 8177ff8614..2480f62fbe 100644 --- a/rust-lib/flowy-observable/src/protobuf/mod.rs +++ b/rust-lib/flowy-observable/src/protobuf/mod.rs @@ -1,2 +1,4 @@ + mod model; pub use model::*; + \ No newline at end of file diff --git a/rust-lib/flowy-observable/src/protobuf/model/mod.rs b/rust-lib/flowy-observable/src/protobuf/model/mod.rs index 41671a628b..ce50dd797d 100644 --- a/rust-lib/flowy-observable/src/protobuf/model/mod.rs +++ b/rust-lib/flowy-observable/src/protobuf/model/mod.rs @@ -1,4 +1,4 @@ -// Auto-generated, do not edit +// Auto-generated, do not edit -mod subject; -pub use subject::*; +mod subject; +pub use subject::*; diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index 04cdfc6101..f2517eaf71 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -41,6 +41,7 @@ fn crate_log_filter(level: Option) -> String { filters.push(format!("flowy_document={}", level)); filters.push(format!("flowy_observable={}", level)); filters.push(format!("flowy_ot={}", level)); + filters.push(format!("flowy_ws={}", level)); filters.push(format!("info")); filters.join(",") } diff --git a/rust-lib/flowy-user/src/protobuf/mod.rs b/rust-lib/flowy-user/src/protobuf/mod.rs index 8177ff8614..2480f62fbe 100644 --- a/rust-lib/flowy-user/src/protobuf/mod.rs +++ b/rust-lib/flowy-user/src/protobuf/mod.rs @@ -1,2 +1,4 @@ + mod model; pub use model::*; + \ No newline at end of file diff --git a/rust-lib/flowy-user/src/protobuf/model/mod.rs b/rust-lib/flowy-user/src/protobuf/model/mod.rs index 7c09f0b489..ca6e0867af 100644 --- a/rust-lib/flowy-user/src/protobuf/model/mod.rs +++ b/rust-lib/flowy-user/src/protobuf/model/mod.rs @@ -1,19 +1,19 @@ -// Auto-generated, do not edit +// Auto-generated, do not edit -mod observable; -pub use observable::*; +mod observable; +pub use observable::*; -mod user_table; -pub use user_table::*; +mod user_table; +pub use user_table::*; -mod errors; -pub use errors::*; +mod errors; +pub use errors::*; -mod user_profile; -pub use user_profile::*; +mod user_profile; +pub use user_profile::*; -mod event; -pub use event::*; +mod event; +pub use event::*; -mod auth; -pub use auth::*; +mod auth; +pub use auth::*; diff --git a/rust-lib/flowy-workspace/src/protobuf/mod.rs b/rust-lib/flowy-workspace/src/protobuf/mod.rs index 8177ff8614..2480f62fbe 100644 --- a/rust-lib/flowy-workspace/src/protobuf/mod.rs +++ b/rust-lib/flowy-workspace/src/protobuf/mod.rs @@ -1,2 +1,4 @@ + mod model; pub use model::*; + \ No newline at end of file diff --git a/rust-lib/flowy-workspace/src/protobuf/model/mod.rs b/rust-lib/flowy-workspace/src/protobuf/model/mod.rs index d1466cf304..3aaff47db2 100644 --- a/rust-lib/flowy-workspace/src/protobuf/model/mod.rs +++ b/rust-lib/flowy-workspace/src/protobuf/model/mod.rs @@ -1,49 +1,49 @@ -// Auto-generated, do not edit +// Auto-generated, do not edit -mod view_update; -pub use view_update::*; +mod view_update; +pub use view_update::*; -mod view_delete; -pub use view_delete::*; +mod view_delete; +pub use view_delete::*; -mod app_query; -pub use app_query::*; +mod app_query; +pub use app_query::*; -mod workspace_delete; -pub use workspace_delete::*; +mod workspace_delete; +pub use workspace_delete::*; -mod observable; -pub use observable::*; +mod observable; +pub use observable::*; -mod errors; -pub use errors::*; +mod errors; +pub use errors::*; -mod workspace_update; -pub use workspace_update::*; +mod workspace_update; +pub use workspace_update::*; -mod app_create; -pub use app_create::*; +mod app_create; +pub use app_create::*; -mod workspace_query; -pub use workspace_query::*; +mod workspace_query; +pub use workspace_query::*; -mod event; -pub use event::*; +mod event; +pub use event::*; -mod view_create; -pub use view_create::*; +mod view_create; +pub use view_create::*; -mod workspace_user_detail; -pub use workspace_user_detail::*; +mod workspace_user_detail; +pub use workspace_user_detail::*; -mod workspace_create; -pub use workspace_create::*; +mod workspace_create; +pub use workspace_create::*; -mod app_update; -pub use app_update::*; +mod app_update; +pub use app_update::*; -mod view_query; -pub use view_query::*; +mod view_query; +pub use view_query::*; -mod app_delete; -pub use app_delete::*; +mod app_delete; +pub use app_delete::*; diff --git a/rust-lib/flowy-ws/Cargo.toml b/rust-lib/flowy-ws/Cargo.toml new file mode 100644 index 0000000000..d9c245d33d --- /dev/null +++ b/rust-lib/flowy-ws/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "flowy-ws" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +flowy-derive = { path = "../flowy-derive" } + +tokio-tungstenite = "0.15" +futures-util = "0.3.15" +futures-channel = "0.3.15" +tokio = {version = "1.6.0", features = ["full"]} +futures = "0.3.15" +lazy_static = "1.4" +bytes = "0.5" +pin-project = "1.0.0" +futures-core = { version = "0.3", default-features = false } +paste = "1" +url = "2.2.2" +log = "0.4" +protobuf = {version = "2.18.0"} +strum = "0.21" +strum_macros = "0.21" \ No newline at end of file diff --git a/rust-lib/flowy-ws/Flowy.toml b/rust-lib/flowy-ws/Flowy.toml new file mode 100644 index 0000000000..642bd7427c --- /dev/null +++ b/rust-lib/flowy-ws/Flowy.toml @@ -0,0 +1,2 @@ +proto_crates = ["src/errors.rs"] +event_files = [] \ No newline at end of file diff --git a/rust-lib/flowy-ws/src/errors.rs b/rust-lib/flowy-ws/src/errors.rs new file mode 100644 index 0000000000..9e55d3417c --- /dev/null +++ b/rust-lib/flowy-ws/src/errors.rs @@ -0,0 +1,55 @@ +use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use futures_channel::mpsc::TrySendError; +use std::fmt::Debug; +use strum_macros::Display; +use tokio_tungstenite::tungstenite::Message; +use url::ParseError; + +#[derive(Debug, Default, Clone, ProtoBuf)] +pub struct WsError { + #[pb(index = 1)] + code: ErrorCode, + + #[pb(index = 2)] + msg: String, +} + +macro_rules! static_user_error { + ($name:ident, $status:expr) => { + #[allow(non_snake_case, missing_docs)] + pub fn $name() -> WsError { + WsError { + code: $status, + msg: format!("{}", $status), + } + } + }; +} + +impl WsError { + pub(crate) fn new(code: ErrorCode) -> WsError { WsError { code, msg: "".to_string() } } + + pub fn context(mut self, error: T) -> Self { + self.msg = format!("{:?}", error); + self + } + + static_user_error!(internal, ErrorCode::InternalError); +} + +#[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)] +pub enum ErrorCode { + InternalError = 0, +} + +impl std::default::Default for ErrorCode { + fn default() -> Self { ErrorCode::InternalError } +} + +impl std::convert::From for WsError { + fn from(error: ParseError) -> Self { WsError::internal().context(error) } +} + +impl std::convert::From> for WsError { + fn from(error: TrySendError) -> Self { WsError::internal().context(error) } +} diff --git a/rust-lib/flowy-ws/src/lib.rs b/rust-lib/flowy-ws/src/lib.rs new file mode 100644 index 0000000000..15ba12e1a6 --- /dev/null +++ b/rust-lib/flowy-ws/src/lib.rs @@ -0,0 +1,3 @@ +pub mod errors; +pub mod protobuf; +pub mod ws; diff --git a/rust-lib/flowy-ws/src/protobuf/mod.rs b/rust-lib/flowy-ws/src/protobuf/mod.rs new file mode 100644 index 0000000000..2480f62fbe --- /dev/null +++ b/rust-lib/flowy-ws/src/protobuf/mod.rs @@ -0,0 +1,4 @@ + +mod model; +pub use model::*; + \ No newline at end of file diff --git a/rust-lib/flowy-ws/src/protobuf/model/errors.rs b/rust-lib/flowy-ws/src/protobuf/model/errors.rs new file mode 100644 index 0000000000..b8130290f8 --- /dev/null +++ b/rust-lib/flowy-ws/src/protobuf/model/errors.rs @@ -0,0 +1,290 @@ +// This file is generated by rust-protobuf 2.22.1. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![cfg_attr(rustfmt, rustfmt::skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `errors.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; + +#[derive(PartialEq,Clone,Default)] +pub struct WsError { + // message fields + pub code: ErrorCode, + pub msg: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a WsError { + fn default() -> &'a WsError { + ::default_instance() + } +} + +impl WsError { + pub fn new() -> WsError { + ::std::default::Default::default() + } + + // .ErrorCode code = 1; + + + pub fn get_code(&self) -> ErrorCode { + self.code + } + pub fn clear_code(&mut self) { + self.code = ErrorCode::InternalError; + } + + // Param is passed by value, moved + pub fn set_code(&mut self, v: ErrorCode) { + self.code = v; + } + + // string msg = 2; + + + pub fn get_msg(&self) -> &str { + &self.msg + } + pub fn clear_msg(&mut self) { + self.msg.clear(); + } + + // Param is passed by value, moved + pub fn set_msg(&mut self, v: ::std::string::String) { + self.msg = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_msg(&mut self) -> &mut ::std::string::String { + &mut self.msg + } + + // Take field + pub fn take_msg(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.msg, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for WsError { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.code, 1, &mut self.unknown_fields)? + }, + 2 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.msg)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if self.code != ErrorCode::InternalError { + my_size += ::protobuf::rt::enum_size(1, self.code); + } + if !self.msg.is_empty() { + my_size += ::protobuf::rt::string_size(2, &self.msg); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if self.code != ErrorCode::InternalError { + os.write_enum(1, ::protobuf::ProtobufEnum::value(&self.code))?; + } + if !self.msg.is_empty() { + os.write_string(2, &self.msg)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> WsError { + WsError::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "code", + |m: &WsError| { &m.code }, + |m: &mut WsError| { &mut m.code }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "msg", + |m: &WsError| { &m.msg }, + |m: &mut WsError| { &mut m.msg }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "WsError", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static WsError { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(WsError::new) + } +} + +impl ::protobuf::Clear for WsError { + fn clear(&mut self) { + self.code = ErrorCode::InternalError; + self.msg.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for WsError { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for WsError { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +#[derive(Clone,PartialEq,Eq,Debug,Hash)] +pub enum ErrorCode { + InternalError = 0, +} + +impl ::protobuf::ProtobufEnum for ErrorCode { + fn value(&self) -> i32 { + *self as i32 + } + + fn from_i32(value: i32) -> ::std::option::Option { + match value { + 0 => ::std::option::Option::Some(ErrorCode::InternalError), + _ => ::std::option::Option::None + } + } + + fn values() -> &'static [Self] { + static values: &'static [ErrorCode] = &[ + ErrorCode::InternalError, + ]; + values + } + + fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + ::protobuf::reflect::EnumDescriptor::new_pb_name::("ErrorCode", file_descriptor_proto()) + }) + } +} + +impl ::std::marker::Copy for ErrorCode { +} + +impl ::std::default::Default for ErrorCode { + fn default() -> Self { + ErrorCode::InternalError + } +} + +impl ::protobuf::reflect::ProtobufValue for ErrorCode { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x0cerrors.proto\";\n\x07WsError\x12\x1e\n\x04code\x18\x01\x20\x01(\ + \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*\ + \x1e\n\tErrorCode\x12\x11\n\rInternalError\x10\0J\xd9\x01\n\x06\x12\x04\ + \0\0\x08\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\ + \0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x0f\n\x0b\n\x04\x04\0\x02\ + \0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\ + \x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\ + \x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\ + \x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\ + \x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\ + \x12\n\n\n\x02\x05\0\x12\x04\x06\0\x08\x01\n\n\n\x03\x05\0\x01\x12\x03\ + \x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x16\n\x0c\n\x05\x05\ + \0\x02\0\x01\x12\x03\x07\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\ + \x14\x15b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/rust-lib/flowy-ws/src/protobuf/model/mod.rs b/rust-lib/flowy-ws/src/protobuf/model/mod.rs new file mode 100644 index 0000000000..00f047d293 --- /dev/null +++ b/rust-lib/flowy-ws/src/protobuf/model/mod.rs @@ -0,0 +1,4 @@ +// Auto-generated, do not edit + +mod errors; +pub use errors::*; diff --git a/rust-lib/flowy-ws/src/protobuf/proto/errors.proto b/rust-lib/flowy-ws/src/protobuf/proto/errors.proto new file mode 100644 index 0000000000..14a5c85098 --- /dev/null +++ b/rust-lib/flowy-ws/src/protobuf/proto/errors.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +message WsError { + ErrorCode code = 1; + string msg = 2; +} +enum ErrorCode { + InternalError = 0; +} diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs new file mode 100644 index 0000000000..e10d015541 --- /dev/null +++ b/rust-lib/flowy-ws/src/ws.rs @@ -0,0 +1,173 @@ +use crate::errors::WsError; +use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures_core::{ready, Stream}; +use futures_util::StreamExt; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +pub type MsgReceiver = UnboundedReceiver; +pub type MsgSender = UnboundedSender; + +pub trait WsMessageHandler: Sync + Send + 'static { + fn handler_message(&self, msg: &Message); +} + +pub struct WsController { + connection: Option>, + handlers: Vec>, +} + +impl WsController { + pub fn new() -> Self { + Self { + connection: None, + handlers: vec![], + } + } + + pub fn add_handlers(&mut self, handler: Arc) { self.handlers.push(handler); } + + pub async fn connect(&mut self, addr: &str) -> Result<(), WsError> { + // Stream User + // ┌───────────────┐ ┌──────────────┐ + // ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │ + // │Server│──────┼─▶│ ws_read │──┼───▶│ msg_tx │───┼─▶│ msg_rx │ │ + // └──────┘ │ └─────────┘ │ └────────┘ │ └────────┘ │ + // ▲ │ │ │ │ + // │ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │ + // └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │ + // │ └─────────┘ │ └────────┘ │ └────────┘ │ + // └───────────────┘ └──────────────┘ + + let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded(); + let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded(); + let mut ws_raw = WsRaw::new(msg_tx, ws_rx); + let connection = Arc::new(WsConnect::new(ws_tx)); + + self.connection = Some(connection.clone()); + + let start_connect = { ws_raw.connect(&addr) }; + let spawn_handlers = SpawnHandlers::new(self.handlers.clone(), msg_rx); + // let spawn_handlers = { + // msg_rx.for_each(|message| async move { + // let handlers: Arc>> = Arc::new(vec![]); + // handlers.iter().for_each(|handler| { + // handler.handler_message(&message); + // }); + // }) + // }; + tokio::select! { + _ = spawn_handlers => { + log::debug!("Websocket read completed") + } + _ = start_connect => { + log::debug!("Connection completed") + } + }; + + Ok(()) + } + + pub fn send_message(&self, msg: Message) -> Result<(), WsError> { + match &self.connection { + None => panic!(), + Some(conn) => conn.send(msg), + } + } +} + +#[pin_project] +struct SpawnHandlers { + #[pin] + msg_rx: MsgReceiver, + handlers: Vec>, +} + +impl SpawnHandlers { + fn new(handlers: Vec>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } +} + +impl Future for SpawnHandlers { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match ready!(self.as_mut().project().msg_rx.poll_next(cx)) { + None => return Poll::Ready(()), + Some(message) => self.handlers.iter().for_each(|handler| { + handler.handler_message(&message); + }), + } + } + } +} + +pub struct WsConnect { + ws_tx: MsgSender, +} + +impl WsConnect { + pub fn new(ws_tx: MsgSender) -> Self { Self { ws_tx } } + pub fn send(&self, msg: Message) -> Result<(), WsError> { + let _ = self.ws_tx.unbounded_send(msg)?; + Ok(()) + } +} + +pub struct WsRaw { + msg_tx: Option, + ws_rx: Option, +} + +impl WsRaw { + pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver) -> Self { + WsRaw { + msg_tx: Some(msg_tx), + ws_rx: Some(ws_rx), + } + } + + pub async fn connect(&mut self, addr: &str) -> Result<(), WsError> { + let url = url::Url::parse(addr)?; + match connect_async(url).await { + Ok((stream, _)) => self.bind_stream(stream).await, + Err(e) => Err(WsError::internal().context(e)), + } + } + + async fn bind_stream(&mut self, stream: WebSocketStream>) -> Result<(), WsError> { + let (ws_write, ws_read) = stream.split(); + let (tx, rx) = self.take_mpsc(); + let to_ws = rx.map(Ok).forward(ws_write); + let from_ws = { + ws_read.for_each(|message| async { + match message { + Ok(message) => { + match tx.unbounded_send(message) { + Ok(_) => {}, + Err(e) => log::error!("tx send error: {:?}", e), + }; + }, + Err(e) => log::error!("ws read error: {:?}", e), + } + }) + }; + tokio::select! { + _ = to_ws => { + log::debug!("ws write completed") + } + _ = from_ws => { + log::debug!("ws read completed") + } + }; + Ok(()) + } + + fn take_mpsc(&mut self) -> (MsgSender, MsgReceiver) { (self.msg_tx.take().unwrap(), self.ws_rx.take().unwrap()) } +}