Merge pull request #17 from taosdata/feat/ws-stop-query

feat(ws): add ws_stop_query, support write raw block
This commit is contained in:
Shuduo Sang 2022-08-01 18:04:17 +08:00 committed by GitHub
commit 24b199e895
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 514 additions and 318 deletions

View file

@ -167,10 +167,15 @@ impl Layout {
self
}
pub fn with_schema_changed(&mut self) -> &mut Self {
pub fn with_schema_changed(mut self) -> Self {
self.set(Self::SCHEMA_CHANGED, true);
self
}
pub fn set_schema_changed(&mut self, value: bool) -> &mut Self {
self.set(Self::SCHEMA_CHANGED, value);
self
}
pub fn schema_changed(&self) -> bool {
self.contains(Self::SCHEMA_CHANGED)

View file

@ -6,11 +6,10 @@ use crate::{
use bytes::Bytes;
use itertools::Itertools;
use nom::AsBytes;
use serde::Deserialize;
use std::{
cell::{RefCell, UnsafeCell},
cell::{Cell, RefCell, UnsafeCell},
ffi::c_void,
ops::{Deref, DerefMut},
ptr::NonNull,
@ -30,8 +29,8 @@ pub mod views;
pub use views::ColumnView;
use views::*;
pub use meta::*;
pub use data::*;
pub use meta::*;
mod de;
mod rows;
@ -55,7 +54,7 @@ pub struct RawBlock {
/// Raw bytes version, may be v2 or v3.
version: Version,
/// Data is required, which could be v2 websocket block or a v3 raw block.
data: Bytes,
data: Cell<Bytes>,
/// Number of rows in current data block.
rows: usize,
/// Number of columns (or fields) in current data block.
@ -181,7 +180,9 @@ impl RawBlock {
// const U_INT_NULL: u32 = u32::MAX;
// const U_BIG_INT_NULL: u64 = u64::MAX;
let layout = Arc::new(RefCell::new(Layout::INLINE_DEFAULT.into()));
let layout = Arc::new(RefCell::new(
Layout::INLINE_DEFAULT.with_schema_changed().into(),
));
let bytes = bytes.into();
let cols = fields.len();
@ -375,7 +376,7 @@ impl RawBlock {
Self {
layout,
version: Version::V2,
data: bytes,
data: Cell::new(bytes),
rows,
cols,
schemas,
@ -474,13 +475,13 @@ impl RawBlock {
let data = bytes.slice(o2..data_offset);
// dbg!()
ColumnView::NChar(dbg!(NCharView {
ColumnView::NChar(NCharView {
offsets,
data,
is_chars: UnsafeCell::new(true),
version: Version::V3,
layout: layout.clone(),
}))
})
}
Ty::UTinyInt => _primitive_value!(UTinyInt, u8),
Ty::USmallInt => _primitive_value!(USmallInt, u16),
@ -507,7 +508,7 @@ impl RawBlock {
RawBlock {
layout,
version: Version::V3,
data: bytes,
data: Cell::new(bytes),
rows,
cols,
precision,
@ -650,7 +651,35 @@ impl RawBlock {
}
pub fn as_raw_bytes(&self) -> &[u8] {
&self.data
if self.layout.borrow().schema_changed() {
let mut bytes = Vec::new();
// 4 bytes total length placeholder.
bytes.extend(0u32.to_le_bytes());
// 8 bytes group id.
bytes.extend(self.group_id.to_le_bytes());
// `ncols * std::mem::size_of::<ColSchema>()` bytes schemas.
bytes.extend(self.schemas.as_bytes());
// `ncols * std::mem::size_of::<u32>()` bytes lengths.
bytes.extend(self.lengths.as_bytes());
// data for each column
for col in self.columns() {
col.write_raw_into(&mut bytes).unwrap();
}
bytes.len();
unsafe {
*(bytes.as_mut_ptr() as *mut u32) = bytes.len() as u32;
}
debug_assert_eq!(
unsafe { *(bytes.as_ptr() as *const u32) },
bytes.len() as u32
);
let bytes = Bytes::from(bytes);
self.data.replace(bytes);
self.layout.borrow_mut().set_schema_changed(false);
unsafe { &*self.data.as_ptr() }
} else {
unsafe { &*self.data.as_ptr() }
}
}
pub fn is_null(&self, row: usize, col: usize) -> bool {
@ -949,7 +978,6 @@ fn test_raw_from_v2() {
pretty_env_logger::formatted_builder()
.filter_level(log::LevelFilter::Trace)
.init();
use serde::Deserialize;
let bytes = b"\x10\x86\x1aA \xcc)AB\xc2\x14AZ],A\xa2\x8d$A\x87\xb9%A\xf5~\x0fA\x96\xf7,AY\xee\x17A1|\x15As\x00\x00\x00q\x00\x00\x00s\x00\x00\x00t\x00\x00\x00u\x00\x00\x00t\x00\x00\x00n\x00\x00\x00n\x00\x00\x00n\x00\x00\x00r\x00\x00\x00";
let block = RawBlock::parse_from_raw_block_v2(
@ -1095,7 +1123,10 @@ fn test_from_v2() {
1,
Precision::Millisecond,
);
dbg!(raw);
let bytes = raw.as_raw_bytes();
let bytes = Bytes::copy_from_slice(bytes);
let raw2 = RawBlock::parse_from_raw_block(bytes, raw.nrows(), raw.ncols(), raw.precision());
dbg!(&raw, raw2);
// dbg!(raw.as_bytes());
// let v = unsafe { raw.get_ref_unchecked(0, 0) };
// dbg!(v);

View file

@ -110,6 +110,14 @@ impl UBigIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct UBigIntViewIter<'a> {

View file

@ -110,6 +110,14 @@ impl BigIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct BigIntViewIter<'a> {

View file

@ -1,4 +1,4 @@
use std::ffi::c_void;
use std::{ffi::c_void, io::Write};
use crate::common::{BorrowedValue, Ty};
@ -104,6 +104,14 @@ impl BoolView {
pub fn to_vec(&self) -> Vec<Option<bool>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct BoolViewIter<'a> {

View file

@ -109,6 +109,15 @@ impl DoubleView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct DoubleViewIter<'a> {

View file

@ -110,6 +110,14 @@ impl FloatView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct FloatViewIter<'a> {

View file

@ -110,6 +110,14 @@ impl UIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct UIntViewIter<'a> {

View file

@ -110,6 +110,14 @@ impl IntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct IntViewIter<'a> {

View file

@ -68,6 +68,14 @@ impl JsonView {
.map(|row| unsafe { self.get_unchecked(row) }.map(|s| s.to_string()))
.collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let offsets = self.offsets.as_bytes();
wtr.write_all(offsets)?;
wtr.write_all(&self.data)?;
Ok(offsets.len() + self.data.len())
}
}
pub struct VarCharIter<'a> {

View file

@ -27,6 +27,11 @@ impl Lengths {
)
}
}
/// As a [u8] slice.
pub fn as_bytes(&self) -> &[u8] {
&self.0.as_ref()
}
}
impl Deref for Lengths {

View file

@ -59,7 +59,7 @@ pub(crate) use lengths::*;
use crate::common::{BorrowedValue, Column, Ty};
use std::{ffi::c_void, fmt::Debug, iter::FusedIterator};
use std::{ffi::c_void, fmt::Debug, iter::FusedIterator, io::Write};
/// Compatible version for var char.
#[derive(Debug, Clone, Copy, PartialEq)]
@ -313,4 +313,24 @@ impl ColumnView {
ColumnView::Json(_) => todo!(),
}
}
pub(super) fn write_raw_into<W: Write>(&self, wtr: W) -> std::io::Result<usize> {
match self {
ColumnView::Bool(view) => view.write_raw_into(wtr),
ColumnView::TinyInt(view) => view.write_raw_into(wtr),
ColumnView::SmallInt(view) => view.write_raw_into(wtr),
ColumnView::Int(view) => view.write_raw_into(wtr),
ColumnView::BigInt(view) => view.write_raw_into(wtr),
ColumnView::Float(view) => view.write_raw_into(wtr),
ColumnView::Double(view) => view.write_raw_into(wtr),
ColumnView::VarChar(view) => view.write_raw_into(wtr),
ColumnView::Timestamp(view) => view.write_raw_into(wtr),
ColumnView::NChar(view) => view.write_raw_into(wtr),
ColumnView::UTinyInt(view) => view.write_raw_into(wtr),
ColumnView::USmallInt(view) => view.write_raw_into(wtr),
ColumnView::UInt(view) => view.write_raw_into(wtr),
ColumnView::UBigInt(view) => view.write_raw_into(wtr),
ColumnView::Json(view) => view.write_raw_into(wtr),
}
}
}

View file

@ -126,6 +126,14 @@ impl NCharView {
pub fn to_vec(&self) -> Vec<Option<&str>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let offsets = self.offsets.as_bytes();
wtr.write_all(offsets)?;
wtr.write_all(&self.data)?;
Ok(offsets.len() + self.data.len())
}
}
pub struct NCharViewIter<'a> {

View file

@ -25,13 +25,14 @@ impl Offsets {
}
/// As a i32 slice.
pub fn as_slice(&self) -> &[i32] {
unsafe {
std::slice::from_raw_parts(
self.0.as_ptr() as *const i32,
self.len(),
)
}
unsafe { std::slice::from_raw_parts(self.0.as_ptr() as *const i32, self.len()) }
}
/// As a [u8] slice.
pub fn as_bytes(&self) -> &[u8] {
self.0.as_ref()
}
pub fn len(&self) -> usize {
self.0.len() / std::mem::size_of::<i32>()
}

View file

@ -97,6 +97,10 @@ impl Schemas {
)
}
}
pub fn as_bytes(&self) -> &[u8] {
self.0.as_ref()
}
}
impl Deref for Schemas {

View file

@ -110,6 +110,14 @@ impl USmallIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct USmallIntViewIter<'a> {

View file

@ -110,6 +110,15 @@ impl SmallIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct SmallIntViewIter<'a> {

View file

@ -120,6 +120,14 @@ impl TimestampView {
pub fn to_vec(&self) -> Vec<Option<Timestamp>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct TimestampViewIter<'a> {

View file

@ -110,6 +110,14 @@ impl UTinyIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct UTinyIntViewIter<'a> {

View file

@ -110,6 +110,14 @@ impl TinyIntView {
pub fn to_vec(&self) -> Vec<Option<Target>> {
self.iter().collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let nulls = self.nulls.0.as_ref();
wtr.write_all(nulls)?;
wtr.write_all(&self.data)?;
Ok(nulls.len() + self.data.len())
}
}
pub struct TinyIntViewIter<'a> {

View file

@ -82,6 +82,14 @@ impl VarCharView {
.map(|row| unsafe { self.get_unchecked(row) }.map(|s| s.to_string()))
.collect()
}
/// Write column data as raw bytes.
pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
let offsets = self.offsets.as_bytes();
wtr.write_all(offsets)?;
wtr.write_all(&self.data)?;
Ok(offsets.len() + self.data.len())
}
}
pub struct VarCharIter<'a> {

View file

@ -77,13 +77,13 @@ impl<T> Deref for WsMaybeError<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { self.data.as_ref().unwrap() }
unsafe { self.data.as_ref().expect("data pointer should not be null") }
}
}
impl<T> DerefMut for WsMaybeError<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { self.data.as_mut().unwrap() }
unsafe { self.data.as_mut().expect("data pointer should not be null") }
}
}
@ -386,6 +386,10 @@ impl WsResultSet {
fn take_timing(&mut self) -> Duration {
self.rs.take_timing()
}
fn stop_query(&mut self) {
self.rs.stop_query()
}
}
unsafe fn connect_with_dsn(dsn: *const c_char) -> WsTaos {
@ -506,6 +510,16 @@ pub unsafe extern "C" fn ws_query(taos: *mut WS_TAOS, sql: *const c_char) -> *mu
Box::into_raw(Box::new(res)) as _
}
#[no_mangle]
pub unsafe extern "C" fn ws_stop_query(rs: *mut WS_RES) {
match (rs as *mut WsMaybeError<WsResultSet>).as_mut() {
Some(rs) => {
rs.stop_query();
}
_ => {}
}
}
#[no_mangle]
/// Query a sql with timeout.
///

View file

@ -3,8 +3,7 @@ use bytes::Bytes;
use futures::{FutureExt, SinkExt, StreamExt};
use scc::HashMap;
// use std::sync::Mutex;
use taos_query::common::{Field, Precision, RawBlock, RawMeta};
use taos_query::prelude::AsyncInlinable;
use taos_query::common::{Field, Precision, RawBlock, RawMeta, Ty};
use taos_query::util::InlinableWrite;
use taos_query::{AsyncFetchable, AsyncQueryable, DeError, DsnError, IntoDsn};
use thiserror::Error;
@ -27,11 +26,10 @@ use std::time::Duration;
type WsFetchResult = std::result::Result<WsFetchData, taos_error::Error>;
type FetchSender = std::sync::mpsc::SyncSender<WsFetchResult>;
type FetchReceiver = std::sync::mpsc::Receiver<WsFetchResult>;
// type WsSenderStream = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
type WsSender = tokio::sync::mpsc::Sender<Message>;
pub struct WsAsyncClient {
pub struct WsTaos {
timeout: Duration,
req_id: Arc<AtomicU64>,
ws: WsSender,
@ -72,17 +70,6 @@ impl Debug for ResultSet {
.finish()
}
}
pub struct ResultSetRef {
ws: WsSender,
timeout: Duration,
fetches: Arc<HashMap<ResId, FetchSender>>,
receiver: Option<FetchReceiver>,
args: WsResArgs,
fields: Option<Vec<Field>>,
fields_count: usize,
affected_rows: usize,
precision: Precision,
}
impl Drop for ResultSet {
fn drop(&mut self) {
@ -97,7 +84,7 @@ impl Drop for ResultSet {
}
}
impl Debug for WsAsyncClient {
impl Debug for WsTaos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WsClient")
.field("req_id", &self.req_id)
@ -151,14 +138,14 @@ impl Error {
type Result<T> = std::result::Result<T, Error>;
impl Drop for WsAsyncClient {
impl Drop for WsTaos {
fn drop(&mut self) {
// send close signal to reader/writer spawned tasks.
let _ = self.close_signal.send(true);
}
}
impl WsAsyncClient {
impl WsTaos {
/// Build TDengine websocket client from dsn.
///
/// ```text
@ -303,12 +290,19 @@ impl WsAsyncClient {
sender.send(ok.map(|_| WsQueryResp::default())).unwrap();
}
}
WsRecvData::WriteRaw => {
if let Some((_, sender)) = queries_sender.remove(&req_id)
{
sender.send(ok.map(|_| WsQueryResp::default())).unwrap();
}
}
// Block type is for binary.
_ => unreachable!(),
}
}
Message::Binary(block) => {
dbg!(block.len());
// dbg!(block.len());
let mut slice = block.as_slice();
use taos_query::util::InlinableRead;
let timing = slice.read_u64().unwrap();
@ -356,7 +350,8 @@ impl WsAsyncClient {
}
},
Err(err) => {
dbg!(err);
log::error!("{}", err);
break;
}
}
}
@ -422,6 +417,46 @@ impl WsAsyncClient {
};
Ok(())
}
async fn s_write_raw_block(&self, raw: &RawBlock) -> Result<()> {
let req_id = self.req_id();
let message_id = req_id;
let raw_block_message = 4; // action number from `taosAdapter/controller/rest/const.go:L56`.
let mut meta = Vec::new();
meta.write_u64_le(req_id)?;
meta.write_u64_le(message_id)?;
meta.write_u64_le(raw_block_message as u64)?;
meta.write_u32_le(raw.nrows() as u32)?;
meta.write_inlined_str::<2>(raw.table_name().unwrap())?;
meta.write(&raw.as_raw_bytes())?;
log::debug!(
"write meta with req_id: {}, message_id: {}, raw data: {:?}",
req_id,
message_id,
Bytes::copy_from_slice(&meta)
);
let (tx, rx) = oneshot::channel();
{
self.queries.insert(req_id, tx).unwrap();
self.ws
.send_timeout(Message::Binary(meta), self.timeout)
.await?;
}
let sleep = tokio::time::sleep(self.timeout);
tokio::pin!(sleep);
let _resp = tokio::select! {
_ = &mut sleep, if !sleep.is_elapsed() => {
log::debug!("get server version timed out");
Err(Error::QueryTimeout("write meta".to_string()))?
}
message = rx => {
message??
}
};
Ok(())
}
pub async fn s_query(&self, sql: &str) -> Result<ResultSet> {
let req_id = self.req_id();
@ -586,90 +621,6 @@ impl ResultSet {
}
}
}
impl ResultSetRef {
async fn fetch(&mut self) -> Result<Option<RawBlock>> {
let fetch = WsSend::Fetch(self.args);
{
log::info!("send fetch message: {fetch:?}");
self.ws.send(fetch.to_msg()).await?;
log::info!("send done");
// unlock mutex when out of scope.
}
println!("wait for fetch message");
let fetch_resp = match self.receiver.as_mut().unwrap().recv()?? {
WsFetchData::Fetch(fetch) => fetch,
data => panic!("unexpected result {data:?}"),
};
if fetch_resp.completed {
return Ok(None);
}
log::info!("fetch with: {fetch_resp:?}");
let fetch_block = WsSend::FetchBlock(self.args);
{
// prepare for receiving.
log::info!("send fetch message: {fetch_block:?}");
self.ws.send(fetch_block.to_msg()).await?;
log::info!("send done");
// unlock mutex when out of scope.
}
log::info!("receiving block...");
match self.receiver.as_mut().unwrap().recv()?? {
WsFetchData::Block(timing, raw) => {
let mut raw = RawBlock::parse_from_raw_block(
raw,
fetch_resp.rows,
self.fields_count,
self.precision,
);
for row in 0..raw.nrows() {
for col in 0..raw.ncols() {
log::debug!("at ({}, {})", row, col);
let v = unsafe { raw.get_ref_unchecked(row, col) };
println!("({}, {}): {:?}", row, col, v);
}
}
raw.with_field_names(self.fields.as_ref().unwrap().iter().map(Field::name));
Ok(Some(raw))
}
WsFetchData::BlockV2(timing, raw) => {
let mut raw = RawBlock::parse_from_raw_block_v2(
raw,
self.fields.as_ref().unwrap(),
dbg!(fetch_resp.lengths.as_ref().unwrap()),
fetch_resp.rows,
self.precision,
);
for row in 0..raw.nrows() {
for col in 0..raw.ncols() {
log::debug!("at ({}, {})", row, col);
let v = unsafe { raw.get_ref_unchecked(row, col) };
println!("({}, {}): {:?}", row, col, v);
}
}
raw.with_field_names(self.fields.as_ref().unwrap().iter().map(Field::name));
Ok(Some(raw))
}
_ => Ok(None),
}
}
}
impl futures::Stream for ResultSetRef {
type Item = Result<RawBlock>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.fetch().map(|v| v.transpose()).boxed().poll_unpin(cx)
}
}
impl AsyncFetchable for ResultSet {
fn affected_rows(&self) -> i32 {
@ -704,7 +655,7 @@ impl AsyncFetchable for ResultSet {
}
#[async_trait::async_trait]
impl AsyncQueryable for WsAsyncClient {
impl AsyncQueryable for WsTaos {
type Error = Error;
type AsyncResultSet = ResultSet;
@ -720,7 +671,7 @@ impl AsyncQueryable for WsAsyncClient {
}
async fn write_raw_block(&self, block: &RawBlock) -> StdResult<(), Self::Error> {
todo!()
self.s_write_raw_block(block).await
}
}
@ -733,7 +684,7 @@ async fn test_client() -> anyhow::Result<()> {
let dsn = std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
// pretty_env_logger::init();
let client = WsAsyncClient::from_dsn(dsn).await?;
let client = WsTaos::from_dsn(dsn).await?;
let _version = client.version();
assert_eq!(client.exec("drop database if exists abc_a").await?, 0);
@ -779,7 +730,7 @@ async fn test_client_cloud() -> anyhow::Result<()> {
return Ok(());
}
let dsn = dsn.unwrap();
let client = WsAsyncClient::from_dsn(dsn).await?;
let client = WsTaos::from_dsn(dsn).await?;
let mut rs = client.query("select * from test.meters limit 10").await?;
let values = rs.to_records();
@ -799,10 +750,62 @@ async fn test_client_cloud() -> anyhow::Result<()> {
async fn ws_show_databases() -> anyhow::Result<()> {
use taos_query::Queryable;
let dsn = std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
let client = WsAsyncClient::from_dsn(dsn).await?;
let client = WsTaos::from_dsn(dsn).await?;
let mut rs = client.query("show databases").await?;
let values = rs.to_records();
let values = rs.to_records()?;
dbg!(values);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ws_write_raw_block() -> anyhow::Result<()> {
let mut raw = RawBlock::parse_from_raw_block_v2(
&[0, 0, 0, 0, 0, 0, 0, 0, 2][..],
&[
Field::new("ts", Ty::Timestamp, 8),
Field::new("v", Ty::Bool, 1),
],
&[8, 1],
1,
Precision::Millisecond,
);
raw.with_table_name("tb1");
dbg!(&raw);
use futures::TryStreamExt;
std::env::set_var("RUST_LOG", "debug");
let dsn = std::env::var("TDENGINE_ClOUD_DSN")
.unwrap_or("http://localhost:6041".to_string());
// pretty_env_logger::init();
let client = WsTaos::from_dsn(dsn).await?;
let _version = client.version();
client.exec_many([
"create database write_raw_block_test keep 36500",
"use write_raw_block_test",
"create table if not exists tb1(ts timestamp, v bool)"
]).await?;
client.write_raw_block(&raw).await?;
// // let mut rs = client.s_query("select * from abc_a.tb1").unwrap().unwrap();
let mut rs = client.query("select * from tb1").await?;
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct A {
ts: String,
v: Option<bool>,
}
let values: Vec<A> = rs.deserialize_stream().try_collect().await?;
dbg!(values);
assert_eq!(client.exec("drop database write_raw_block_test").await?, 0);
Ok(())
}

View file

@ -80,7 +80,7 @@ pub enum TmqSend {
blocking_time: i64,
},
FetchJsonMeta(MessageArgs),
FetchRawMeta(MessageArgs),
FetchRaw(MessageArgs),
Fetch(MessageArgs),
FetchBlock(MessageArgs),
Commit(MessageArgs),
@ -104,7 +104,7 @@ impl TmqSend {
blocking_time: _,
} => *req_id,
TmqSend::FetchJsonMeta(args) => args.req_id,
TmqSend::FetchRawMeta(args) => args.req_id,
TmqSend::FetchRaw(args) => args.req_id,
TmqSend::Fetch(args) => args.req_id,
TmqSend::FetchBlock(args) => args.req_id,
TmqSend::Commit(args) => args.req_id,
@ -176,7 +176,7 @@ pub enum TmqRecvData {
},
#[serde(skip)]
Bytes(Bytes),
FetchRawMeta {
FetchRaw {
#[serde(skip)]
meta: Bytes,
},

View file

@ -119,7 +119,7 @@ struct WsMessageBase {
}
impl WsMessageBase {
async fn fetch_raw_data(&self) -> Result<Option<RawBlock>> {
async fn fetch_raw_block(&self) -> Result<Option<RawBlock>> {
let req_id = self.sender.req_id();
let msg = TmqSend::Fetch(MessageArgs {
@ -183,7 +183,7 @@ impl WsMessageBase {
}
async fn fetch_raw_meta(&self) -> Result<RawMeta> {
let req_id = self.sender.req_id();
let msg = TmqSend::FetchRawMeta(MessageArgs {
let msg = TmqSend::FetchRaw(MessageArgs {
req_id,
message_id: self.message_id,
});
@ -228,7 +228,7 @@ pub struct Data(WsMessageBase);
impl Data {
pub async fn fetch_block(&self) -> Result<Option<RawBlock>> {
self.0.fetch_raw_data().await
self.0.fetch_raw_block().await
}
}
@ -268,29 +268,14 @@ impl WsMessageSet {
}
impl Consumer {
// pub async fn subscribe<Item: Into<String>, Iter: IntoIterator<Item = Item>>(
// &mut self,
// topics: Iter,
// ) -> Result<()> {
// let req_id = self.sender.req_id();
// let action = TmqSend::Subscribe {
// req_id,
// req: self.tmq_conf.clone(),
// topics: topics.into_iter().map(Into::into).collect_vec(),
// conn: self.conn.clone(),
// };
// self.sender.send_recv(action).await?;
// Ok(())
// }
pub(crate) async fn poll_timeout(
&mut self,
&self,
timeout: Duration,
) -> Result<Option<(Offset, WsMessageSet)>> {
) -> Result<Option<(Offset, MessageSet<Meta, Data>)>> {
let req_id = self.sender.req_id();
let action = TmqSend::Poll {
req_id,
blocking_time: -1,
blocking_time: timeout.as_millis() as _,
};
let data = self.sender.send_recv_timeout(action, timeout).await;
@ -303,111 +288,6 @@ impl Consumer {
}
let data = data.unwrap();
match data {
TmqRecvData::Poll(TmqPoll {
message_id,
database,
have_message,
topic,
vgroup_id,
message_type,
}) => {
if have_message {
let offset = Offset {
message_id,
database,
topic,
vgroup_id,
};
let message = WsMessageBase {
sender: self.sender.clone(),
message_id,
};
match message_type {
MessageType::Meta => Ok(Some((offset, WsMessageSet::Meta(Meta(message))))),
MessageType::Data => Ok(Some((offset, WsMessageSet::Data(Data(message))))),
_ => unreachable!(),
}
} else {
Ok(None)
}
}
_ => unreachable!(),
}
}
// pub async fn commit(&self, offset: Offset) -> Result<()> {
// let req_id = self.sender.req_id();
// let action = TmqSend::Commit(MessageArgs {
// req_id,
// message_id: offset.message_id,
// });
// let _ = self.sender.send_recv(action).await?;
// Ok(())
// }
// pub async fn unsubscribe(&mut self) -> Result<()> {
// let action = TmqSend::Close;
// self.sender.send_recv(action).await?;
// Ok(())
// }
}
#[async_trait::async_trait]
impl AsAsyncConsumer for Consumer {
type Error = Error;
type Offset = Offset;
type Meta = Meta;
type Data = Data;
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<()> {
let req_id = self.sender.req_id();
let action = TmqSend::Subscribe {
req_id,
req: self.tmq_conf.clone(),
topics: topics.into_iter().map(Into::into).collect_vec(),
conn: self.conn.clone(),
};
self.sender.send_recv(action).await?;
Ok(())
}
async fn recv_timeout(
&self,
timeout: taos_query::tmq::Timeout,
) -> StdResult<
Option<(
Self::Offset,
taos_query::tmq::MessageSet<Self::Meta, Self::Data>,
)>,
Self::Error,
> {
let req_id = self.sender.req_id();
let action = TmqSend::Poll {
req_id,
blocking_time: -1,
};
let data = self
.sender
.send_recv_timeout(action, timeout.as_duration())
.await;
if data.is_err() {
let err = data.unwrap_err();
match err {
Error::QueryTimeout(_) => return Ok(None),
_ => return Err(err),
}
}
let data = data.unwrap();
match data {
TmqRecvData::Poll(TmqPoll {
message_id,
@ -440,6 +320,98 @@ impl AsAsyncConsumer for Consumer {
_ => unreachable!(),
}
}
}
#[async_trait::async_trait]
impl AsAsyncConsumer for Consumer {
type Error = Error;
type Offset = Offset;
type Meta = Meta;
type Data = Data;
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<()> {
let req_id = self.sender.req_id();
let action = TmqSend::Subscribe {
req_id,
req: self.tmq_conf.clone(),
topics: topics.into_iter().map(Into::into).collect_vec(),
conn: self.conn.clone(),
};
self.sender.send_recv(action).await?;
Ok(())
}
async fn recv_timeout(
&self,
timeout: taos_query::tmq::Timeout,
) -> StdResult<
Option<(
Self::Offset,
taos_query::tmq::MessageSet<Self::Meta, Self::Data>,
)>,
Self::Error,
> {
match timeout {
Timeout::Never => loop {
if let Some(msg) = self.poll_timeout(Duration::MAX).await? {
return Ok(Some(msg));
}
},
Timeout::None => self.poll_timeout(Duration::MAX).await,
Timeout::Duration(timeout) => self.poll_timeout(timeout).await,
}
// let data = self
// .sender
// .send_recv_timeout(action, timeout.as_duration())
// .await;
// if data.is_err() {
// let err = data.unwrap_err();
// match err {
// Error::QueryTimeout(_) => return Ok(None),
// _ => return Err(err),
// }
// }
// let data = data.unwrap();
// match data {
// TmqRecvData::Poll(TmqPoll {
// message_id,
// database,
// have_message,
// topic,
// vgroup_id,
// message_type,
// }) => {
// if have_message {
// let offset = Offset {
// message_id,
// database,
// topic,
// vgroup_id,
// };
// let message = WsMessageBase {
// sender: self.sender.clone(),
// message_id,
// };
// match message_type {
// MessageType::Meta => Ok(Some((offset, MessageSet::Meta(Meta(message))))),
// MessageType::Data => Ok(Some((offset, MessageSet::Data(Data(message))))),
// _ => unreachable!(),
// }
// } else {
// Ok(None)
// }
// }
// _ => unreachable!(),
// }
}
async fn commit(&self, offset: Self::Offset) -> StdResult<(), Self::Error> {
let req_id = self.sender.req_id();
@ -544,7 +516,9 @@ impl TmqBuilder {
log::info!("send done");
}
_ = rx.changed() => {
log::info!("close sender task");
let _= sender.send(Message::Close(None)).await;
sender.close().await.unwrap();
log::info!("close tmq sender");
break;
}
}
@ -590,7 +564,7 @@ impl TmqBuilder {
log::warn!("poll message received but no receiver alive");
}
}
TmqRecvData::FetchRawMeta { meta: _ }=> {
TmqRecvData::FetchRaw { meta: _ }=> {
if let Some((_, sender)) = queries_sender.remove(&req_id)
{
sender.send(ok.map(|_|recv)).unwrap();
@ -626,14 +600,16 @@ impl TmqBuilder {
// writeUint32(message.buffer, length)
// writeUint16(message.buffer, metaType)
let mut bytes = Bytes::from(data);
let part = bytes.slice(16..);
let part = bytes.slice(24..);
// dbg!(&bytes);
use bytes::Buf;
let timing = bytes.get_u64_le();
let req_id = bytes.get_u64_le();
let message_id = bytes.get_u64_le();
log::debug!("receive binary message with req_id {} message_id {}",
log::debug!("[{:.2}ms] receive binary message with req_id {} message_id {}",
Duration::from_nanos(timing).as_secs_f64() / 1000.,
req_id, message_id);
if let Some((_, sender)) = queries_sender.remove(&req_id)
@ -674,6 +650,7 @@ impl TmqBuilder {
}
}
}
log::info!("end consumer loop");
});
Ok(Consumer {
conn: self.info.to_conn_request(),
@ -699,6 +676,12 @@ pub struct Consumer {
timeout: Duration,
}
impl Drop for Consumer {
fn drop(&mut self) {
self.close_signal.send(true).unwrap();
}
}
pub struct Offset {
message_id: MessageId,
database: String,
@ -764,13 +747,6 @@ impl Error {
type Result<T> = std::result::Result<T, Error>;
impl Drop for Consumer {
fn drop(&mut self) {
// send close signal to reader/writer spawned tasks.
let _ = self.close_signal.send(true);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
@ -863,54 +839,57 @@ mod tests {
let mut consumer = builder.build_consumer().await?;
consumer.subscribe(["ws_abc1"]).await?;
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(5));
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(5));
while let Some((offset, message)) = stream.try_next().await? {
// Offset contains information for topic name, database name and vgroup id,
// similar to kafka topic/partition/offset.
let _ = offset.topic();
let _ = offset.database();
let _ = offset.vgroup_id();
while let Some((offset, message)) = stream.try_next().await? {
// Offset contains information for topic name, database name and vgroup id,
// similar to kafka topic/partition/offset.
let _ = offset.topic();
let _ = offset.database();
let _ = offset.vgroup_id();
// Different to kafka message, TDengine consumer would consume two kind of messages.
//
// 1. meta
// 2. data
match message {
MessageSet::Meta(meta) => {
let _raw = meta.as_raw_meta().await?;
// taos.write_meta(raw).await?;
// Different to kafka message, TDengine consumer would consume two kind of messages.
//
// 1. meta
// 2. data
match message {
MessageSet::Meta(meta) => {
let _raw = meta.as_raw_meta().await?;
// taos.write_meta(raw).await?;
// meta data can be write to an database seamlessly by raw or json (to sql).
let json = meta.as_json_meta().await?;
let sql = dbg!(json.to_string());
if let Err(err) = taos.exec(sql).await {
match err.errno() {
Code::TAG_ALREADY_EXIST => log::info!("tag already exists"),
Code::TAG_NOT_EXIST => log::debug!("tag not exist"),
Code::COLUMN_EXISTS => log::info!("column already exists"),
Code::COLUMN_NOT_EXIST => log::debug!("column not exists"),
Code::INVALID_COLUMN_NAME => log::info!("invalid column name"),
Code::MODIFIED_ALREADY => log::debug!("modified already done"),
Code::TABLE_NOT_EXIST => log::debug!("table does not exists"),
Code::STABLE_NOT_EXIST => log::debug!("stable does not exists"),
_ => {
log::error!("{:?}", err);
panic!("{}", err);
// meta data can be write to an database seamlessly by raw or json (to sql).
let json = meta.as_json_meta().await?;
let sql = dbg!(json.to_string());
if let Err(err) = taos.exec(sql).await {
match err.errno() {
Code::TAG_ALREADY_EXIST => log::info!("tag already exists"),
Code::TAG_NOT_EXIST => log::debug!("tag not exist"),
Code::COLUMN_EXISTS => log::info!("column already exists"),
Code::COLUMN_NOT_EXIST => log::debug!("column not exists"),
Code::INVALID_COLUMN_NAME => log::info!("invalid column name"),
Code::MODIFIED_ALREADY => log::debug!("modified already done"),
Code::TABLE_NOT_EXIST => log::debug!("table does not exists"),
Code::STABLE_NOT_EXIST => log::debug!("stable does not exists"),
_ => {
log::error!("{:?}", err);
panic!("{}", err);
}
}
}
}
}
MessageSet::Data(data) => {
// data message may have more than one data block for various tables.
while let Some(data) = data.fetch_block().await? {
dbg!(data.table_name());
dbg!(data);
MessageSet::Data(data) => {
// data message may have more than one data block for various tables.
while let Some(data) = data.fetch_block().await? {
dbg!(data.table_name());
dbg!(data);
}
}
}
consumer.commit(offset).await?;
}
consumer.commit(offset).await?;
}
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(2)).await;

View file

@ -139,6 +139,7 @@ pub enum WsRecvData {
block: Vec<u32>,
},
WriteMeta,
WriteRaw,
}
#[serde_as]

View file

@ -5,7 +5,7 @@ use infra::WsConnReq;
use once_cell::sync::OnceCell;
#[cfg(feature = "async")]
use asyn::WsAsyncClient;
use asyn::WsTaos;
use sync::WsClient;
use taos_query::{common::RawMeta, DsnError, IntoDsn, Queryable, TBuilder};
@ -170,7 +170,7 @@ impl TaosBuilder {
pub struct Taos {
dsn: TaosBuilder,
#[cfg(feature = "async")]
async_client: OnceCell<WsAsyncClient>,
async_client: OnceCell<WsTaos>,
sync_client: OnceCell<WsClient>,
}
@ -229,7 +229,7 @@ impl taos_query::AsyncQueryable for Taos {
if let Some(ws) = self.async_client.get() {
ws.s_query(sql.as_ref()).await
} else {
let async_client = WsAsyncClient::from_wsinfo(&self.dsn).await?;
let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
self.async_client
.get_or_init(|| async_client)
.s_query(sql.as_ref())
@ -241,7 +241,7 @@ impl taos_query::AsyncQueryable for Taos {
if let Some(ws) = self.async_client.get() {
ws.write_meta(raw).await
} else {
let async_client = WsAsyncClient::from_wsinfo(&self.dsn).await?;
let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
self.async_client
.get_or_init(|| async_client)
.write_meta(raw)

View file

@ -62,7 +62,7 @@ impl Drop for WsAsyncStmt {
self.fetches.remove(&self.args.stmt_id);
let args = self.args;
let ws = self.ws.clone();
let _ = ws.blocking_send(StmtSend::Close(args).to_msg());
let _ = taos_query::block_in_place_or_global(ws.send(StmtSend::Close(args).to_msg()));
}
}
@ -396,7 +396,7 @@ mod tests {
// !Websocket tests should always use `multi_thread`
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_client() -> anyhow::Result<()> {
use taos_query::AsyncQueryable;
let taos = TaosBuilder::from_dsn("taos://localhost:6041")?.build()?;
@ -405,7 +405,7 @@ mod tests {
taos.exec("create table stmt.ctb (ts timestamp, v int)")
.await?;
std::env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let client = WsStmtClient::from_dsn("taos+ws://localhost:6041/stmt").await?;

View file

@ -64,6 +64,7 @@ impl Drop for WsClient {
#[derive(Debug)]
pub struct ResultSet {
id: ResId,
rt: Arc<tokio::runtime::Runtime>,
timeout: Duration,
sender: MsgSender,
@ -464,6 +465,7 @@ impl WsClient {
self.fetches.insert(resp.id, tx).unwrap();
}
Ok(ResultSet {
id: resp.id,
rt: self.rt.clone(),
timeout: self.timeout.clone(),
sender: self.sender.clone(),
@ -483,6 +485,7 @@ impl WsClient {
})
} else {
Ok(ResultSet {
id: resp.id,
rt: self.rt.clone(),
timeout: self.timeout.clone(),
affected_rows: resp.affected_rows,
@ -658,6 +661,14 @@ impl ResultSet {
self.timing = UnsafeCell::new(Duration::ZERO);
timing
}
pub fn stop_query(&mut self) {
if let Some((_, sender)) = self.fetches.remove(&self.id) {
sender
.send(Err(taos_error::Error::from_string("").into()))
.unwrap();
}
}
}
impl Iterator for ResultSet {
type Item = Result<RawBlock>;