mirror of
https://github.com/taosdata/TDengine
synced 2026-05-24 10:09:01 +00:00
Merge pull request #17 from taosdata/feat/ws-stop-query
feat(ws): add ws_stop_query, support write raw block
This commit is contained in:
commit
24b199e895
29 changed files with 514 additions and 318 deletions
|
|
@ -167,10 +167,15 @@ impl Layout {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn with_schema_changed(&mut self) -> &mut Self {
|
||||
pub fn with_schema_changed(mut self) -> Self {
|
||||
self.set(Self::SCHEMA_CHANGED, true);
|
||||
self
|
||||
}
|
||||
pub fn set_schema_changed(&mut self, value: bool) -> &mut Self {
|
||||
self.set(Self::SCHEMA_CHANGED, value);
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
pub fn schema_changed(&self) -> bool {
|
||||
self.contains(Self::SCHEMA_CHANGED)
|
||||
|
|
|
|||
|
|
@ -6,11 +6,10 @@ use crate::{
|
|||
use bytes::Bytes;
|
||||
use itertools::Itertools;
|
||||
|
||||
use nom::AsBytes;
|
||||
use serde::Deserialize;
|
||||
|
||||
use std::{
|
||||
cell::{RefCell, UnsafeCell},
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
ffi::c_void,
|
||||
ops::{Deref, DerefMut},
|
||||
ptr::NonNull,
|
||||
|
|
@ -30,8 +29,8 @@ pub mod views;
|
|||
pub use views::ColumnView;
|
||||
use views::*;
|
||||
|
||||
pub use meta::*;
|
||||
pub use data::*;
|
||||
pub use meta::*;
|
||||
|
||||
mod de;
|
||||
mod rows;
|
||||
|
|
@ -55,7 +54,7 @@ pub struct RawBlock {
|
|||
/// Raw bytes version, may be v2 or v3.
|
||||
version: Version,
|
||||
/// Data is required, which could be v2 websocket block or a v3 raw block.
|
||||
data: Bytes,
|
||||
data: Cell<Bytes>,
|
||||
/// Number of rows in current data block.
|
||||
rows: usize,
|
||||
/// Number of columns (or fields) in current data block.
|
||||
|
|
@ -181,7 +180,9 @@ impl RawBlock {
|
|||
// const U_INT_NULL: u32 = u32::MAX;
|
||||
// const U_BIG_INT_NULL: u64 = u64::MAX;
|
||||
|
||||
let layout = Arc::new(RefCell::new(Layout::INLINE_DEFAULT.into()));
|
||||
let layout = Arc::new(RefCell::new(
|
||||
Layout::INLINE_DEFAULT.with_schema_changed().into(),
|
||||
));
|
||||
|
||||
let bytes = bytes.into();
|
||||
let cols = fields.len();
|
||||
|
|
@ -375,7 +376,7 @@ impl RawBlock {
|
|||
Self {
|
||||
layout,
|
||||
version: Version::V2,
|
||||
data: bytes,
|
||||
data: Cell::new(bytes),
|
||||
rows,
|
||||
cols,
|
||||
schemas,
|
||||
|
|
@ -474,13 +475,13 @@ impl RawBlock {
|
|||
let data = bytes.slice(o2..data_offset);
|
||||
// dbg!()
|
||||
|
||||
ColumnView::NChar(dbg!(NCharView {
|
||||
ColumnView::NChar(NCharView {
|
||||
offsets,
|
||||
data,
|
||||
is_chars: UnsafeCell::new(true),
|
||||
version: Version::V3,
|
||||
layout: layout.clone(),
|
||||
}))
|
||||
})
|
||||
}
|
||||
Ty::UTinyInt => _primitive_value!(UTinyInt, u8),
|
||||
Ty::USmallInt => _primitive_value!(USmallInt, u16),
|
||||
|
|
@ -507,7 +508,7 @@ impl RawBlock {
|
|||
RawBlock {
|
||||
layout,
|
||||
version: Version::V3,
|
||||
data: bytes,
|
||||
data: Cell::new(bytes),
|
||||
rows,
|
||||
cols,
|
||||
precision,
|
||||
|
|
@ -650,7 +651,35 @@ impl RawBlock {
|
|||
}
|
||||
|
||||
pub fn as_raw_bytes(&self) -> &[u8] {
|
||||
&self.data
|
||||
if self.layout.borrow().schema_changed() {
|
||||
let mut bytes = Vec::new();
|
||||
// 4 bytes total length placeholder.
|
||||
bytes.extend(0u32.to_le_bytes());
|
||||
// 8 bytes group id.
|
||||
bytes.extend(self.group_id.to_le_bytes());
|
||||
// `ncols * std::mem::size_of::<ColSchema>()` bytes schemas.
|
||||
bytes.extend(self.schemas.as_bytes());
|
||||
// `ncols * std::mem::size_of::<u32>()` bytes lengths.
|
||||
bytes.extend(self.lengths.as_bytes());
|
||||
// data for each column
|
||||
for col in self.columns() {
|
||||
col.write_raw_into(&mut bytes).unwrap();
|
||||
}
|
||||
bytes.len();
|
||||
unsafe {
|
||||
*(bytes.as_mut_ptr() as *mut u32) = bytes.len() as u32;
|
||||
}
|
||||
debug_assert_eq!(
|
||||
unsafe { *(bytes.as_ptr() as *const u32) },
|
||||
bytes.len() as u32
|
||||
);
|
||||
let bytes = Bytes::from(bytes);
|
||||
self.data.replace(bytes);
|
||||
self.layout.borrow_mut().set_schema_changed(false);
|
||||
unsafe { &*self.data.as_ptr() }
|
||||
} else {
|
||||
unsafe { &*self.data.as_ptr() }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_null(&self, row: usize, col: usize) -> bool {
|
||||
|
|
@ -949,7 +978,6 @@ fn test_raw_from_v2() {
|
|||
pretty_env_logger::formatted_builder()
|
||||
.filter_level(log::LevelFilter::Trace)
|
||||
.init();
|
||||
use serde::Deserialize;
|
||||
let bytes = b"\x10\x86\x1aA \xcc)AB\xc2\x14AZ],A\xa2\x8d$A\x87\xb9%A\xf5~\x0fA\x96\xf7,AY\xee\x17A1|\x15As\x00\x00\x00q\x00\x00\x00s\x00\x00\x00t\x00\x00\x00u\x00\x00\x00t\x00\x00\x00n\x00\x00\x00n\x00\x00\x00n\x00\x00\x00r\x00\x00\x00";
|
||||
|
||||
let block = RawBlock::parse_from_raw_block_v2(
|
||||
|
|
@ -1095,7 +1123,10 @@ fn test_from_v2() {
|
|||
1,
|
||||
Precision::Millisecond,
|
||||
);
|
||||
dbg!(raw);
|
||||
let bytes = raw.as_raw_bytes();
|
||||
let bytes = Bytes::copy_from_slice(bytes);
|
||||
let raw2 = RawBlock::parse_from_raw_block(bytes, raw.nrows(), raw.ncols(), raw.precision());
|
||||
dbg!(&raw, raw2);
|
||||
// dbg!(raw.as_bytes());
|
||||
// let v = unsafe { raw.get_ref_unchecked(0, 0) };
|
||||
// dbg!(v);
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl UBigIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UBigIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl BigIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BigIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::ffi::c_void;
|
||||
use std::{ffi::c_void, io::Write};
|
||||
|
||||
use crate::common::{BorrowedValue, Ty};
|
||||
|
||||
|
|
@ -104,6 +104,14 @@ impl BoolView {
|
|||
pub fn to_vec(&self) -> Vec<Option<bool>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BoolViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -109,6 +109,15 @@ impl DoubleView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct DoubleViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl FloatView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FloatViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl UIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl IntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -68,6 +68,14 @@ impl JsonView {
|
|||
.map(|row| unsafe { self.get_unchecked(row) }.map(|s| s.to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let offsets = self.offsets.as_bytes();
|
||||
wtr.write_all(offsets)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(offsets.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VarCharIter<'a> {
|
||||
|
|
|
|||
|
|
@ -27,6 +27,11 @@ impl Lengths {
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// As a [u8] slice.
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
&self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Lengths {
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ pub(crate) use lengths::*;
|
|||
|
||||
use crate::common::{BorrowedValue, Column, Ty};
|
||||
|
||||
use std::{ffi::c_void, fmt::Debug, iter::FusedIterator};
|
||||
use std::{ffi::c_void, fmt::Debug, iter::FusedIterator, io::Write};
|
||||
|
||||
/// Compatible version for var char.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
|
|
@ -313,4 +313,24 @@ impl ColumnView {
|
|||
ColumnView::Json(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn write_raw_into<W: Write>(&self, wtr: W) -> std::io::Result<usize> {
|
||||
match self {
|
||||
ColumnView::Bool(view) => view.write_raw_into(wtr),
|
||||
ColumnView::TinyInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::SmallInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::Int(view) => view.write_raw_into(wtr),
|
||||
ColumnView::BigInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::Float(view) => view.write_raw_into(wtr),
|
||||
ColumnView::Double(view) => view.write_raw_into(wtr),
|
||||
ColumnView::VarChar(view) => view.write_raw_into(wtr),
|
||||
ColumnView::Timestamp(view) => view.write_raw_into(wtr),
|
||||
ColumnView::NChar(view) => view.write_raw_into(wtr),
|
||||
ColumnView::UTinyInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::USmallInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::UInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::UBigInt(view) => view.write_raw_into(wtr),
|
||||
ColumnView::Json(view) => view.write_raw_into(wtr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -126,6 +126,14 @@ impl NCharView {
|
|||
pub fn to_vec(&self) -> Vec<Option<&str>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let offsets = self.offsets.as_bytes();
|
||||
wtr.write_all(offsets)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(offsets.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NCharViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -25,13 +25,14 @@ impl Offsets {
|
|||
}
|
||||
/// As a i32 slice.
|
||||
pub fn as_slice(&self) -> &[i32] {
|
||||
unsafe {
|
||||
std::slice::from_raw_parts(
|
||||
self.0.as_ptr() as *const i32,
|
||||
self.len(),
|
||||
)
|
||||
}
|
||||
unsafe { std::slice::from_raw_parts(self.0.as_ptr() as *const i32, self.len()) }
|
||||
}
|
||||
|
||||
/// As a [u8] slice.
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
self.0.as_ref()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.len() / std::mem::size_of::<i32>()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,6 +97,10 @@ impl Schemas {
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Schemas {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl USmallIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct USmallIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,15 @@ impl SmallIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct SmallIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -120,6 +120,14 @@ impl TimestampView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Timestamp>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TimestampViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl UTinyIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UTinyIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,14 @@ impl TinyIntView {
|
|||
pub fn to_vec(&self) -> Vec<Option<Target>> {
|
||||
self.iter().collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let nulls = self.nulls.0.as_ref();
|
||||
wtr.write_all(nulls)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(nulls.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TinyIntViewIter<'a> {
|
||||
|
|
|
|||
|
|
@ -82,6 +82,14 @@ impl VarCharView {
|
|||
.map(|row| unsafe { self.get_unchecked(row) }.map(|s| s.to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Write column data as raw bytes.
|
||||
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
|
||||
let offsets = self.offsets.as_bytes();
|
||||
wtr.write_all(offsets)?;
|
||||
wtr.write_all(&self.data)?;
|
||||
Ok(offsets.len() + self.data.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VarCharIter<'a> {
|
||||
|
|
|
|||
|
|
@ -77,13 +77,13 @@ impl<T> Deref for WsMaybeError<T> {
|
|||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
unsafe { self.data.as_ref().unwrap() }
|
||||
unsafe { self.data.as_ref().expect("data pointer should not be null") }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for WsMaybeError<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
unsafe { self.data.as_mut().unwrap() }
|
||||
unsafe { self.data.as_mut().expect("data pointer should not be null") }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -386,6 +386,10 @@ impl WsResultSet {
|
|||
fn take_timing(&mut self) -> Duration {
|
||||
self.rs.take_timing()
|
||||
}
|
||||
|
||||
fn stop_query(&mut self) {
|
||||
self.rs.stop_query()
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn connect_with_dsn(dsn: *const c_char) -> WsTaos {
|
||||
|
|
@ -506,6 +510,16 @@ pub unsafe extern "C" fn ws_query(taos: *mut WS_TAOS, sql: *const c_char) -> *mu
|
|||
Box::into_raw(Box::new(res)) as _
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn ws_stop_query(rs: *mut WS_RES) {
|
||||
match (rs as *mut WsMaybeError<WsResultSet>).as_mut() {
|
||||
Some(rs) => {
|
||||
rs.stop_query();
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
/// Query a sql with timeout.
|
||||
///
|
||||
|
|
|
|||
|
|
@ -3,8 +3,7 @@ use bytes::Bytes;
|
|||
use futures::{FutureExt, SinkExt, StreamExt};
|
||||
use scc::HashMap;
|
||||
// use std::sync::Mutex;
|
||||
use taos_query::common::{Field, Precision, RawBlock, RawMeta};
|
||||
use taos_query::prelude::AsyncInlinable;
|
||||
use taos_query::common::{Field, Precision, RawBlock, RawMeta, Ty};
|
||||
use taos_query::util::InlinableWrite;
|
||||
use taos_query::{AsyncFetchable, AsyncQueryable, DeError, DsnError, IntoDsn};
|
||||
use thiserror::Error;
|
||||
|
|
@ -27,11 +26,10 @@ use std::time::Duration;
|
|||
type WsFetchResult = std::result::Result<WsFetchData, taos_error::Error>;
|
||||
type FetchSender = std::sync::mpsc::SyncSender<WsFetchResult>;
|
||||
type FetchReceiver = std::sync::mpsc::Receiver<WsFetchResult>;
|
||||
// type WsSenderStream = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
|
||||
|
||||
type WsSender = tokio::sync::mpsc::Sender<Message>;
|
||||
|
||||
pub struct WsAsyncClient {
|
||||
pub struct WsTaos {
|
||||
timeout: Duration,
|
||||
req_id: Arc<AtomicU64>,
|
||||
ws: WsSender,
|
||||
|
|
@ -72,17 +70,6 @@ impl Debug for ResultSet {
|
|||
.finish()
|
||||
}
|
||||
}
|
||||
pub struct ResultSetRef {
|
||||
ws: WsSender,
|
||||
timeout: Duration,
|
||||
fetches: Arc<HashMap<ResId, FetchSender>>,
|
||||
receiver: Option<FetchReceiver>,
|
||||
args: WsResArgs,
|
||||
fields: Option<Vec<Field>>,
|
||||
fields_count: usize,
|
||||
affected_rows: usize,
|
||||
precision: Precision,
|
||||
}
|
||||
|
||||
impl Drop for ResultSet {
|
||||
fn drop(&mut self) {
|
||||
|
|
@ -97,7 +84,7 @@ impl Drop for ResultSet {
|
|||
}
|
||||
}
|
||||
|
||||
impl Debug for WsAsyncClient {
|
||||
impl Debug for WsTaos {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("WsClient")
|
||||
.field("req_id", &self.req_id)
|
||||
|
|
@ -151,14 +138,14 @@ impl Error {
|
|||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl Drop for WsAsyncClient {
|
||||
impl Drop for WsTaos {
|
||||
fn drop(&mut self) {
|
||||
// send close signal to reader/writer spawned tasks.
|
||||
let _ = self.close_signal.send(true);
|
||||
}
|
||||
}
|
||||
|
||||
impl WsAsyncClient {
|
||||
impl WsTaos {
|
||||
/// Build TDengine websocket client from dsn.
|
||||
///
|
||||
/// ```text
|
||||
|
|
@ -303,12 +290,19 @@ impl WsAsyncClient {
|
|||
sender.send(ok.map(|_| WsQueryResp::default())).unwrap();
|
||||
}
|
||||
}
|
||||
WsRecvData::WriteRaw => {
|
||||
if let Some((_, sender)) = queries_sender.remove(&req_id)
|
||||
{
|
||||
sender.send(ok.map(|_| WsQueryResp::default())).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Block type is for binary.
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Message::Binary(block) => {
|
||||
dbg!(block.len());
|
||||
// dbg!(block.len());
|
||||
let mut slice = block.as_slice();
|
||||
use taos_query::util::InlinableRead;
|
||||
let timing = slice.read_u64().unwrap();
|
||||
|
|
@ -356,7 +350,8 @@ impl WsAsyncClient {
|
|||
}
|
||||
},
|
||||
Err(err) => {
|
||||
dbg!(err);
|
||||
log::error!("{}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -422,6 +417,46 @@ impl WsAsyncClient {
|
|||
};
|
||||
Ok(())
|
||||
}
|
||||
async fn s_write_raw_block(&self, raw: &RawBlock) -> Result<()> {
|
||||
let req_id = self.req_id();
|
||||
let message_id = req_id;
|
||||
let raw_block_message = 4; // action number from `taosAdapter/controller/rest/const.go:L56`.
|
||||
|
||||
let mut meta = Vec::new();
|
||||
meta.write_u64_le(req_id)?;
|
||||
meta.write_u64_le(message_id)?;
|
||||
meta.write_u64_le(raw_block_message as u64)?;
|
||||
meta.write_u32_le(raw.nrows() as u32)?;
|
||||
meta.write_inlined_str::<2>(raw.table_name().unwrap())?;
|
||||
meta.write(&raw.as_raw_bytes())?;
|
||||
|
||||
log::debug!(
|
||||
"write meta with req_id: {}, message_id: {}, raw data: {:?}",
|
||||
req_id,
|
||||
message_id,
|
||||
Bytes::copy_from_slice(&meta)
|
||||
);
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
{
|
||||
self.queries.insert(req_id, tx).unwrap();
|
||||
self.ws
|
||||
.send_timeout(Message::Binary(meta), self.timeout)
|
||||
.await?;
|
||||
}
|
||||
let sleep = tokio::time::sleep(self.timeout);
|
||||
tokio::pin!(sleep);
|
||||
let _resp = tokio::select! {
|
||||
_ = &mut sleep, if !sleep.is_elapsed() => {
|
||||
log::debug!("get server version timed out");
|
||||
Err(Error::QueryTimeout("write meta".to_string()))?
|
||||
}
|
||||
message = rx => {
|
||||
message??
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn s_query(&self, sql: &str) -> Result<ResultSet> {
|
||||
let req_id = self.req_id();
|
||||
|
|
@ -586,90 +621,6 @@ impl ResultSet {
|
|||
}
|
||||
}
|
||||
}
|
||||
impl ResultSetRef {
|
||||
async fn fetch(&mut self) -> Result<Option<RawBlock>> {
|
||||
let fetch = WsSend::Fetch(self.args);
|
||||
{
|
||||
log::info!("send fetch message: {fetch:?}");
|
||||
self.ws.send(fetch.to_msg()).await?;
|
||||
log::info!("send done");
|
||||
// unlock mutex when out of scope.
|
||||
}
|
||||
println!("wait for fetch message");
|
||||
let fetch_resp = match self.receiver.as_mut().unwrap().recv()?? {
|
||||
WsFetchData::Fetch(fetch) => fetch,
|
||||
data => panic!("unexpected result {data:?}"),
|
||||
};
|
||||
|
||||
if fetch_resp.completed {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
log::info!("fetch with: {fetch_resp:?}");
|
||||
|
||||
let fetch_block = WsSend::FetchBlock(self.args);
|
||||
{
|
||||
// prepare for receiving.
|
||||
log::info!("send fetch message: {fetch_block:?}");
|
||||
self.ws.send(fetch_block.to_msg()).await?;
|
||||
log::info!("send done");
|
||||
// unlock mutex when out of scope.
|
||||
}
|
||||
|
||||
log::info!("receiving block...");
|
||||
match self.receiver.as_mut().unwrap().recv()?? {
|
||||
WsFetchData::Block(timing, raw) => {
|
||||
let mut raw = RawBlock::parse_from_raw_block(
|
||||
raw,
|
||||
fetch_resp.rows,
|
||||
self.fields_count,
|
||||
self.precision,
|
||||
);
|
||||
|
||||
for row in 0..raw.nrows() {
|
||||
for col in 0..raw.ncols() {
|
||||
log::debug!("at ({}, {})", row, col);
|
||||
let v = unsafe { raw.get_ref_unchecked(row, col) };
|
||||
println!("({}, {}): {:?}", row, col, v);
|
||||
}
|
||||
}
|
||||
raw.with_field_names(self.fields.as_ref().unwrap().iter().map(Field::name));
|
||||
Ok(Some(raw))
|
||||
}
|
||||
WsFetchData::BlockV2(timing, raw) => {
|
||||
let mut raw = RawBlock::parse_from_raw_block_v2(
|
||||
raw,
|
||||
self.fields.as_ref().unwrap(),
|
||||
dbg!(fetch_resp.lengths.as_ref().unwrap()),
|
||||
fetch_resp.rows,
|
||||
self.precision,
|
||||
);
|
||||
|
||||
for row in 0..raw.nrows() {
|
||||
for col in 0..raw.ncols() {
|
||||
log::debug!("at ({}, {})", row, col);
|
||||
let v = unsafe { raw.get_ref_unchecked(row, col) };
|
||||
println!("({}, {}): {:?}", row, col, v);
|
||||
}
|
||||
}
|
||||
raw.with_field_names(self.fields.as_ref().unwrap().iter().map(Field::name));
|
||||
Ok(Some(raw))
|
||||
}
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::Stream for ResultSetRef {
|
||||
type Item = Result<RawBlock>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.fetch().map(|v| v.transpose()).boxed().poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncFetchable for ResultSet {
|
||||
fn affected_rows(&self) -> i32 {
|
||||
|
|
@ -704,7 +655,7 @@ impl AsyncFetchable for ResultSet {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl AsyncQueryable for WsAsyncClient {
|
||||
impl AsyncQueryable for WsTaos {
|
||||
type Error = Error;
|
||||
|
||||
type AsyncResultSet = ResultSet;
|
||||
|
|
@ -720,7 +671,7 @@ impl AsyncQueryable for WsAsyncClient {
|
|||
}
|
||||
|
||||
async fn write_raw_block(&self, block: &RawBlock) -> StdResult<(), Self::Error> {
|
||||
todo!()
|
||||
self.s_write_raw_block(block).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -733,7 +684,7 @@ async fn test_client() -> anyhow::Result<()> {
|
|||
let dsn = std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
|
||||
// pretty_env_logger::init();
|
||||
|
||||
let client = WsAsyncClient::from_dsn(dsn).await?;
|
||||
let client = WsTaos::from_dsn(dsn).await?;
|
||||
|
||||
let _version = client.version();
|
||||
assert_eq!(client.exec("drop database if exists abc_a").await?, 0);
|
||||
|
|
@ -779,7 +730,7 @@ async fn test_client_cloud() -> anyhow::Result<()> {
|
|||
return Ok(());
|
||||
}
|
||||
let dsn = dsn.unwrap();
|
||||
let client = WsAsyncClient::from_dsn(dsn).await?;
|
||||
let client = WsTaos::from_dsn(dsn).await?;
|
||||
let mut rs = client.query("select * from test.meters limit 10").await?;
|
||||
|
||||
let values = rs.to_records();
|
||||
|
|
@ -799,10 +750,62 @@ async fn test_client_cloud() -> anyhow::Result<()> {
|
|||
async fn ws_show_databases() -> anyhow::Result<()> {
|
||||
use taos_query::Queryable;
|
||||
let dsn = std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
|
||||
let client = WsAsyncClient::from_dsn(dsn).await?;
|
||||
let client = WsTaos::from_dsn(dsn).await?;
|
||||
let mut rs = client.query("show databases").await?;
|
||||
let values = rs.to_records();
|
||||
let values = rs.to_records()?;
|
||||
|
||||
dbg!(values);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn ws_write_raw_block() -> anyhow::Result<()> {
|
||||
let mut raw = RawBlock::parse_from_raw_block_v2(
|
||||
&[0, 0, 0, 0, 0, 0, 0, 0, 2][..],
|
||||
&[
|
||||
Field::new("ts", Ty::Timestamp, 8),
|
||||
Field::new("v", Ty::Bool, 1),
|
||||
],
|
||||
&[8, 1],
|
||||
1,
|
||||
Precision::Millisecond,
|
||||
);
|
||||
raw.with_table_name("tb1");
|
||||
dbg!(&raw);
|
||||
|
||||
use futures::TryStreamExt;
|
||||
std::env::set_var("RUST_LOG", "debug");
|
||||
let dsn = std::env::var("TDENGINE_ClOUD_DSN")
|
||||
.unwrap_or("http://localhost:6041".to_string());
|
||||
// pretty_env_logger::init();
|
||||
|
||||
let client = WsTaos::from_dsn(dsn).await?;
|
||||
|
||||
let _version = client.version();
|
||||
|
||||
client.exec_many([
|
||||
"create database write_raw_block_test keep 36500",
|
||||
"use write_raw_block_test",
|
||||
"create table if not exists tb1(ts timestamp, v bool)"
|
||||
|
||||
]).await?;
|
||||
|
||||
client.write_raw_block(&raw).await?;
|
||||
|
||||
// // let mut rs = client.s_query("select * from abc_a.tb1").unwrap().unwrap();
|
||||
let mut rs = client.query("select * from tb1").await?;
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct A {
|
||||
ts: String,
|
||||
v: Option<bool>,
|
||||
}
|
||||
|
||||
let values: Vec<A> = rs.deserialize_stream().try_collect().await?;
|
||||
|
||||
dbg!(values);
|
||||
|
||||
assert_eq!(client.exec("drop database write_raw_block_test").await?, 0);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ pub enum TmqSend {
|
|||
blocking_time: i64,
|
||||
},
|
||||
FetchJsonMeta(MessageArgs),
|
||||
FetchRawMeta(MessageArgs),
|
||||
FetchRaw(MessageArgs),
|
||||
Fetch(MessageArgs),
|
||||
FetchBlock(MessageArgs),
|
||||
Commit(MessageArgs),
|
||||
|
|
@ -104,7 +104,7 @@ impl TmqSend {
|
|||
blocking_time: _,
|
||||
} => *req_id,
|
||||
TmqSend::FetchJsonMeta(args) => args.req_id,
|
||||
TmqSend::FetchRawMeta(args) => args.req_id,
|
||||
TmqSend::FetchRaw(args) => args.req_id,
|
||||
TmqSend::Fetch(args) => args.req_id,
|
||||
TmqSend::FetchBlock(args) => args.req_id,
|
||||
TmqSend::Commit(args) => args.req_id,
|
||||
|
|
@ -176,7 +176,7 @@ pub enum TmqRecvData {
|
|||
},
|
||||
#[serde(skip)]
|
||||
Bytes(Bytes),
|
||||
FetchRawMeta {
|
||||
FetchRaw {
|
||||
#[serde(skip)]
|
||||
meta: Bytes,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ struct WsMessageBase {
|
|||
}
|
||||
|
||||
impl WsMessageBase {
|
||||
async fn fetch_raw_data(&self) -> Result<Option<RawBlock>> {
|
||||
async fn fetch_raw_block(&self) -> Result<Option<RawBlock>> {
|
||||
let req_id = self.sender.req_id();
|
||||
|
||||
let msg = TmqSend::Fetch(MessageArgs {
|
||||
|
|
@ -183,7 +183,7 @@ impl WsMessageBase {
|
|||
}
|
||||
async fn fetch_raw_meta(&self) -> Result<RawMeta> {
|
||||
let req_id = self.sender.req_id();
|
||||
let msg = TmqSend::FetchRawMeta(MessageArgs {
|
||||
let msg = TmqSend::FetchRaw(MessageArgs {
|
||||
req_id,
|
||||
message_id: self.message_id,
|
||||
});
|
||||
|
|
@ -228,7 +228,7 @@ pub struct Data(WsMessageBase);
|
|||
|
||||
impl Data {
|
||||
pub async fn fetch_block(&self) -> Result<Option<RawBlock>> {
|
||||
self.0.fetch_raw_data().await
|
||||
self.0.fetch_raw_block().await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -268,29 +268,14 @@ impl WsMessageSet {
|
|||
}
|
||||
|
||||
impl Consumer {
|
||||
// pub async fn subscribe<Item: Into<String>, Iter: IntoIterator<Item = Item>>(
|
||||
// &mut self,
|
||||
// topics: Iter,
|
||||
// ) -> Result<()> {
|
||||
// let req_id = self.sender.req_id();
|
||||
// let action = TmqSend::Subscribe {
|
||||
// req_id,
|
||||
// req: self.tmq_conf.clone(),
|
||||
// topics: topics.into_iter().map(Into::into).collect_vec(),
|
||||
// conn: self.conn.clone(),
|
||||
// };
|
||||
// self.sender.send_recv(action).await?;
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
pub(crate) async fn poll_timeout(
|
||||
&mut self,
|
||||
&self,
|
||||
timeout: Duration,
|
||||
) -> Result<Option<(Offset, WsMessageSet)>> {
|
||||
) -> Result<Option<(Offset, MessageSet<Meta, Data>)>> {
|
||||
let req_id = self.sender.req_id();
|
||||
let action = TmqSend::Poll {
|
||||
req_id,
|
||||
blocking_time: -1,
|
||||
blocking_time: timeout.as_millis() as _,
|
||||
};
|
||||
|
||||
let data = self.sender.send_recv_timeout(action, timeout).await;
|
||||
|
|
@ -303,111 +288,6 @@ impl Consumer {
|
|||
}
|
||||
let data = data.unwrap();
|
||||
|
||||
match data {
|
||||
TmqRecvData::Poll(TmqPoll {
|
||||
message_id,
|
||||
database,
|
||||
have_message,
|
||||
topic,
|
||||
vgroup_id,
|
||||
message_type,
|
||||
}) => {
|
||||
if have_message {
|
||||
let offset = Offset {
|
||||
message_id,
|
||||
database,
|
||||
topic,
|
||||
vgroup_id,
|
||||
};
|
||||
let message = WsMessageBase {
|
||||
sender: self.sender.clone(),
|
||||
message_id,
|
||||
};
|
||||
match message_type {
|
||||
MessageType::Meta => Ok(Some((offset, WsMessageSet::Meta(Meta(message))))),
|
||||
MessageType::Data => Ok(Some((offset, WsMessageSet::Data(Data(message))))),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
// pub async fn commit(&self, offset: Offset) -> Result<()> {
|
||||
// let req_id = self.sender.req_id();
|
||||
// let action = TmqSend::Commit(MessageArgs {
|
||||
// req_id,
|
||||
// message_id: offset.message_id,
|
||||
// });
|
||||
|
||||
// let _ = self.sender.send_recv(action).await?;
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// pub async fn unsubscribe(&mut self) -> Result<()> {
|
||||
// let action = TmqSend::Close;
|
||||
// self.sender.send_recv(action).await?;
|
||||
// Ok(())
|
||||
// }
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl AsAsyncConsumer for Consumer {
|
||||
type Error = Error;
|
||||
|
||||
type Offset = Offset;
|
||||
|
||||
type Meta = Meta;
|
||||
|
||||
type Data = Data;
|
||||
|
||||
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
|
||||
&mut self,
|
||||
topics: I,
|
||||
) -> Result<()> {
|
||||
let req_id = self.sender.req_id();
|
||||
let action = TmqSend::Subscribe {
|
||||
req_id,
|
||||
req: self.tmq_conf.clone(),
|
||||
topics: topics.into_iter().map(Into::into).collect_vec(),
|
||||
conn: self.conn.clone(),
|
||||
};
|
||||
self.sender.send_recv(action).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_timeout(
|
||||
&self,
|
||||
timeout: taos_query::tmq::Timeout,
|
||||
) -> StdResult<
|
||||
Option<(
|
||||
Self::Offset,
|
||||
taos_query::tmq::MessageSet<Self::Meta, Self::Data>,
|
||||
)>,
|
||||
Self::Error,
|
||||
> {
|
||||
let req_id = self.sender.req_id();
|
||||
let action = TmqSend::Poll {
|
||||
req_id,
|
||||
blocking_time: -1,
|
||||
};
|
||||
|
||||
let data = self
|
||||
.sender
|
||||
.send_recv_timeout(action, timeout.as_duration())
|
||||
.await;
|
||||
if data.is_err() {
|
||||
let err = data.unwrap_err();
|
||||
match err {
|
||||
Error::QueryTimeout(_) => return Ok(None),
|
||||
_ => return Err(err),
|
||||
}
|
||||
}
|
||||
let data = data.unwrap();
|
||||
|
||||
match data {
|
||||
TmqRecvData::Poll(TmqPoll {
|
||||
message_id,
|
||||
|
|
@ -440,6 +320,98 @@ impl AsAsyncConsumer for Consumer {
|
|||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl AsAsyncConsumer for Consumer {
|
||||
type Error = Error;
|
||||
|
||||
type Offset = Offset;
|
||||
|
||||
type Meta = Meta;
|
||||
|
||||
type Data = Data;
|
||||
|
||||
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
|
||||
&mut self,
|
||||
topics: I,
|
||||
) -> Result<()> {
|
||||
let req_id = self.sender.req_id();
|
||||
let action = TmqSend::Subscribe {
|
||||
req_id,
|
||||
req: self.tmq_conf.clone(),
|
||||
topics: topics.into_iter().map(Into::into).collect_vec(),
|
||||
conn: self.conn.clone(),
|
||||
};
|
||||
self.sender.send_recv(action).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_timeout(
|
||||
&self,
|
||||
timeout: taos_query::tmq::Timeout,
|
||||
) -> StdResult<
|
||||
Option<(
|
||||
Self::Offset,
|
||||
taos_query::tmq::MessageSet<Self::Meta, Self::Data>,
|
||||
)>,
|
||||
Self::Error,
|
||||
> {
|
||||
match timeout {
|
||||
Timeout::Never => loop {
|
||||
if let Some(msg) = self.poll_timeout(Duration::MAX).await? {
|
||||
return Ok(Some(msg));
|
||||
}
|
||||
},
|
||||
Timeout::None => self.poll_timeout(Duration::MAX).await,
|
||||
Timeout::Duration(timeout) => self.poll_timeout(timeout).await,
|
||||
}
|
||||
|
||||
// let data = self
|
||||
// .sender
|
||||
// .send_recv_timeout(action, timeout.as_duration())
|
||||
// .await;
|
||||
// if data.is_err() {
|
||||
// let err = data.unwrap_err();
|
||||
// match err {
|
||||
// Error::QueryTimeout(_) => return Ok(None),
|
||||
// _ => return Err(err),
|
||||
// }
|
||||
// }
|
||||
// let data = data.unwrap();
|
||||
|
||||
// match data {
|
||||
// TmqRecvData::Poll(TmqPoll {
|
||||
// message_id,
|
||||
// database,
|
||||
// have_message,
|
||||
// topic,
|
||||
// vgroup_id,
|
||||
// message_type,
|
||||
// }) => {
|
||||
// if have_message {
|
||||
// let offset = Offset {
|
||||
// message_id,
|
||||
// database,
|
||||
// topic,
|
||||
// vgroup_id,
|
||||
// };
|
||||
// let message = WsMessageBase {
|
||||
// sender: self.sender.clone(),
|
||||
// message_id,
|
||||
// };
|
||||
// match message_type {
|
||||
// MessageType::Meta => Ok(Some((offset, MessageSet::Meta(Meta(message))))),
|
||||
// MessageType::Data => Ok(Some((offset, MessageSet::Data(Data(message))))),
|
||||
// _ => unreachable!(),
|
||||
// }
|
||||
// } else {
|
||||
// Ok(None)
|
||||
// }
|
||||
// }
|
||||
// _ => unreachable!(),
|
||||
// }
|
||||
}
|
||||
|
||||
async fn commit(&self, offset: Self::Offset) -> StdResult<(), Self::Error> {
|
||||
let req_id = self.sender.req_id();
|
||||
|
|
@ -544,7 +516,9 @@ impl TmqBuilder {
|
|||
log::info!("send done");
|
||||
}
|
||||
_ = rx.changed() => {
|
||||
log::info!("close sender task");
|
||||
let _= sender.send(Message::Close(None)).await;
|
||||
sender.close().await.unwrap();
|
||||
log::info!("close tmq sender");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -590,7 +564,7 @@ impl TmqBuilder {
|
|||
log::warn!("poll message received but no receiver alive");
|
||||
}
|
||||
}
|
||||
TmqRecvData::FetchRawMeta { meta: _ }=> {
|
||||
TmqRecvData::FetchRaw { meta: _ }=> {
|
||||
if let Some((_, sender)) = queries_sender.remove(&req_id)
|
||||
{
|
||||
sender.send(ok.map(|_|recv)).unwrap();
|
||||
|
|
@ -626,14 +600,16 @@ impl TmqBuilder {
|
|||
// writeUint32(message.buffer, length)
|
||||
// writeUint16(message.buffer, metaType)
|
||||
let mut bytes = Bytes::from(data);
|
||||
let part = bytes.slice(16..);
|
||||
let part = bytes.slice(24..);
|
||||
// dbg!(&bytes);
|
||||
use bytes::Buf;
|
||||
let timing = bytes.get_u64_le();
|
||||
let req_id = bytes.get_u64_le();
|
||||
let message_id = bytes.get_u64_le();
|
||||
|
||||
|
||||
log::debug!("receive binary message with req_id {} message_id {}",
|
||||
log::debug!("[{:.2}ms] receive binary message with req_id {} message_id {}",
|
||||
Duration::from_nanos(timing).as_secs_f64() / 1000.,
|
||||
req_id, message_id);
|
||||
|
||||
if let Some((_, sender)) = queries_sender.remove(&req_id)
|
||||
|
|
@ -674,6 +650,7 @@ impl TmqBuilder {
|
|||
}
|
||||
}
|
||||
}
|
||||
log::info!("end consumer loop");
|
||||
});
|
||||
Ok(Consumer {
|
||||
conn: self.info.to_conn_request(),
|
||||
|
|
@ -699,6 +676,12 @@ pub struct Consumer {
|
|||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl Drop for Consumer {
|
||||
fn drop(&mut self) {
|
||||
self.close_signal.send(true).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Offset {
|
||||
message_id: MessageId,
|
||||
database: String,
|
||||
|
|
@ -764,13 +747,6 @@ impl Error {
|
|||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl Drop for Consumer {
|
||||
fn drop(&mut self) {
|
||||
// send close signal to reader/writer spawned tasks.
|
||||
let _ = self.close_signal.send(true);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
|
@ -863,54 +839,57 @@ mod tests {
|
|||
let mut consumer = builder.build_consumer().await?;
|
||||
consumer.subscribe(["ws_abc1"]).await?;
|
||||
|
||||
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(5));
|
||||
{
|
||||
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(5));
|
||||
|
||||
while let Some((offset, message)) = stream.try_next().await? {
|
||||
// Offset contains information for topic name, database name and vgroup id,
|
||||
// similar to kafka topic/partition/offset.
|
||||
let _ = offset.topic();
|
||||
let _ = offset.database();
|
||||
let _ = offset.vgroup_id();
|
||||
while let Some((offset, message)) = stream.try_next().await? {
|
||||
// Offset contains information for topic name, database name and vgroup id,
|
||||
// similar to kafka topic/partition/offset.
|
||||
let _ = offset.topic();
|
||||
let _ = offset.database();
|
||||
let _ = offset.vgroup_id();
|
||||
|
||||
// Different to kafka message, TDengine consumer would consume two kind of messages.
|
||||
//
|
||||
// 1. meta
|
||||
// 2. data
|
||||
match message {
|
||||
MessageSet::Meta(meta) => {
|
||||
let _raw = meta.as_raw_meta().await?;
|
||||
// taos.write_meta(raw).await?;
|
||||
// Different to kafka message, TDengine consumer would consume two kind of messages.
|
||||
//
|
||||
// 1. meta
|
||||
// 2. data
|
||||
match message {
|
||||
MessageSet::Meta(meta) => {
|
||||
let _raw = meta.as_raw_meta().await?;
|
||||
// taos.write_meta(raw).await?;
|
||||
|
||||
// meta data can be write to an database seamlessly by raw or json (to sql).
|
||||
let json = meta.as_json_meta().await?;
|
||||
let sql = dbg!(json.to_string());
|
||||
if let Err(err) = taos.exec(sql).await {
|
||||
match err.errno() {
|
||||
Code::TAG_ALREADY_EXIST => log::info!("tag already exists"),
|
||||
Code::TAG_NOT_EXIST => log::debug!("tag not exist"),
|
||||
Code::COLUMN_EXISTS => log::info!("column already exists"),
|
||||
Code::COLUMN_NOT_EXIST => log::debug!("column not exists"),
|
||||
Code::INVALID_COLUMN_NAME => log::info!("invalid column name"),
|
||||
Code::MODIFIED_ALREADY => log::debug!("modified already done"),
|
||||
Code::TABLE_NOT_EXIST => log::debug!("table does not exists"),
|
||||
Code::STABLE_NOT_EXIST => log::debug!("stable does not exists"),
|
||||
_ => {
|
||||
log::error!("{:?}", err);
|
||||
panic!("{}", err);
|
||||
// meta data can be write to an database seamlessly by raw or json (to sql).
|
||||
let json = meta.as_json_meta().await?;
|
||||
let sql = dbg!(json.to_string());
|
||||
if let Err(err) = taos.exec(sql).await {
|
||||
match err.errno() {
|
||||
Code::TAG_ALREADY_EXIST => log::info!("tag already exists"),
|
||||
Code::TAG_NOT_EXIST => log::debug!("tag not exist"),
|
||||
Code::COLUMN_EXISTS => log::info!("column already exists"),
|
||||
Code::COLUMN_NOT_EXIST => log::debug!("column not exists"),
|
||||
Code::INVALID_COLUMN_NAME => log::info!("invalid column name"),
|
||||
Code::MODIFIED_ALREADY => log::debug!("modified already done"),
|
||||
Code::TABLE_NOT_EXIST => log::debug!("table does not exists"),
|
||||
Code::STABLE_NOT_EXIST => log::debug!("stable does not exists"),
|
||||
_ => {
|
||||
log::error!("{:?}", err);
|
||||
panic!("{}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageSet::Data(data) => {
|
||||
// data message may have more than one data block for various tables.
|
||||
while let Some(data) = data.fetch_block().await? {
|
||||
dbg!(data.table_name());
|
||||
dbg!(data);
|
||||
MessageSet::Data(data) => {
|
||||
// data message may have more than one data block for various tables.
|
||||
while let Some(data) = data.fetch_block().await? {
|
||||
dbg!(data.table_name());
|
||||
dbg!(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
consumer.commit(offset).await?;
|
||||
}
|
||||
consumer.commit(offset).await?;
|
||||
}
|
||||
consumer.unsubscribe().await;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
|
|
|
|||
|
|
@ -139,6 +139,7 @@ pub enum WsRecvData {
|
|||
block: Vec<u32>,
|
||||
},
|
||||
WriteMeta,
|
||||
WriteRaw,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use infra::WsConnReq;
|
|||
use once_cell::sync::OnceCell;
|
||||
|
||||
#[cfg(feature = "async")]
|
||||
use asyn::WsAsyncClient;
|
||||
use asyn::WsTaos;
|
||||
use sync::WsClient;
|
||||
|
||||
use taos_query::{common::RawMeta, DsnError, IntoDsn, Queryable, TBuilder};
|
||||
|
|
@ -170,7 +170,7 @@ impl TaosBuilder {
|
|||
pub struct Taos {
|
||||
dsn: TaosBuilder,
|
||||
#[cfg(feature = "async")]
|
||||
async_client: OnceCell<WsAsyncClient>,
|
||||
async_client: OnceCell<WsTaos>,
|
||||
sync_client: OnceCell<WsClient>,
|
||||
}
|
||||
|
||||
|
|
@ -229,7 +229,7 @@ impl taos_query::AsyncQueryable for Taos {
|
|||
if let Some(ws) = self.async_client.get() {
|
||||
ws.s_query(sql.as_ref()).await
|
||||
} else {
|
||||
let async_client = WsAsyncClient::from_wsinfo(&self.dsn).await?;
|
||||
let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
|
||||
self.async_client
|
||||
.get_or_init(|| async_client)
|
||||
.s_query(sql.as_ref())
|
||||
|
|
@ -241,7 +241,7 @@ impl taos_query::AsyncQueryable for Taos {
|
|||
if let Some(ws) = self.async_client.get() {
|
||||
ws.write_meta(raw).await
|
||||
} else {
|
||||
let async_client = WsAsyncClient::from_wsinfo(&self.dsn).await?;
|
||||
let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
|
||||
self.async_client
|
||||
.get_or_init(|| async_client)
|
||||
.write_meta(raw)
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ impl Drop for WsAsyncStmt {
|
|||
self.fetches.remove(&self.args.stmt_id);
|
||||
let args = self.args;
|
||||
let ws = self.ws.clone();
|
||||
let _ = ws.blocking_send(StmtSend::Close(args).to_msg());
|
||||
let _ = taos_query::block_in_place_or_global(ws.send(StmtSend::Close(args).to_msg()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -396,7 +396,7 @@ mod tests {
|
|||
// !Websocket tests should always use `multi_thread`
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn test_client() -> anyhow::Result<()> {
|
||||
|
||||
|
||||
use taos_query::AsyncQueryable;
|
||||
|
||||
let taos = TaosBuilder::from_dsn("taos://localhost:6041")?.build()?;
|
||||
|
|
@ -405,7 +405,7 @@ mod tests {
|
|||
taos.exec("create table stmt.ctb (ts timestamp, v int)")
|
||||
.await?;
|
||||
|
||||
|
||||
|
||||
std::env::set_var("RUST_LOG", "debug");
|
||||
pretty_env_logger::init();
|
||||
let client = WsStmtClient::from_dsn("taos+ws://localhost:6041/stmt").await?;
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ impl Drop for WsClient {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct ResultSet {
|
||||
id: ResId,
|
||||
rt: Arc<tokio::runtime::Runtime>,
|
||||
timeout: Duration,
|
||||
sender: MsgSender,
|
||||
|
|
@ -464,6 +465,7 @@ impl WsClient {
|
|||
self.fetches.insert(resp.id, tx).unwrap();
|
||||
}
|
||||
Ok(ResultSet {
|
||||
id: resp.id,
|
||||
rt: self.rt.clone(),
|
||||
timeout: self.timeout.clone(),
|
||||
sender: self.sender.clone(),
|
||||
|
|
@ -483,6 +485,7 @@ impl WsClient {
|
|||
})
|
||||
} else {
|
||||
Ok(ResultSet {
|
||||
id: resp.id,
|
||||
rt: self.rt.clone(),
|
||||
timeout: self.timeout.clone(),
|
||||
affected_rows: resp.affected_rows,
|
||||
|
|
@ -658,6 +661,14 @@ impl ResultSet {
|
|||
self.timing = UnsafeCell::new(Duration::ZERO);
|
||||
timing
|
||||
}
|
||||
|
||||
pub fn stop_query(&mut self) {
|
||||
if let Some((_, sender)) = self.fetches.remove(&self.id) {
|
||||
sender
|
||||
.send(Err(taos_error::Error::from_string("").into()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Iterator for ResultSet {
|
||||
type Item = Result<RawBlock>;
|
||||
|
|
|
|||
Loading…
Reference in a new issue