mirror of
https://github.com/taosdata/TDengine
synced 2026-05-24 10:09:01 +00:00
Merge pull request #21 from taosdata/fix/unsigned-tinyint-null
chore(ws): add debug log for stmt
This commit is contained in:
commit
29424d58c8
6 changed files with 168 additions and 52 deletions
|
|
@ -412,7 +412,16 @@ unsafe fn connect_with_dsn(dsn: *const c_char) -> WsTaos {
|
|||
/// ```
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn ws_enable_log() {
|
||||
pretty_env_logger::init();
|
||||
static ONCE_INIT: std::sync::Once = std::sync::Once::new();
|
||||
ONCE_INIT.call_once(|| {
|
||||
let mut builder = pretty_env_logger::formatted_timed_builder();
|
||||
builder.format_timestamp_nanos();
|
||||
if let Ok(s) = ::std::env::var("RUST_LOG") {
|
||||
builder.parse_filters(&s);
|
||||
}
|
||||
builder.init();
|
||||
});
|
||||
log::debug!("enable logger to stdout");
|
||||
}
|
||||
|
||||
/// Connect via dsn string, returns NULL if failed.
|
||||
|
|
@ -744,11 +753,8 @@ pub unsafe fn ws_print_row(rs: *mut WS_RES, row: i32) {
|
|||
|
||||
#[cfg(test)]
|
||||
pub fn init_env() {
|
||||
static ONCE_INIT: std::sync::Once = std::sync::Once::new();
|
||||
ONCE_INIT.call_once(|| {
|
||||
pretty_env_logger::init();
|
||||
std::env::set_var("RUST_DEBUG", "debug");
|
||||
});
|
||||
std::env::set_var("RUST_LOG", "debug");
|
||||
unsafe { ws_enable_log() };
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -156,27 +156,46 @@ impl TaosMultiBind {
|
|||
pub fn to_json(&self) -> serde_json::Value {
|
||||
use serde_json::json;
|
||||
use serde_json::Value;
|
||||
assert!(self.num > 0, "invalid bind value");
|
||||
let len = self.num as usize;
|
||||
|
||||
macro_rules! _nulls {
|
||||
() => {
|
||||
json!(std::iter::repeat(Value::Null).take(len).collect::<Vec<_>>())
|
||||
};
|
||||
}
|
||||
if self.buffer.is_null() {
|
||||
return _nulls!();
|
||||
}
|
||||
|
||||
macro_rules! _impl_primitive {
|
||||
($t:ty) => {{
|
||||
let len = self.num as usize;
|
||||
let slice = std::slice::from_raw_parts(self.buffer as *const $t, len);
|
||||
if self.is_null.is_null() {
|
||||
return json!(slice);
|
||||
match self.is_null.is_null() {
|
||||
true => json!(slice),
|
||||
false => {
|
||||
let nulls = std::slice::from_raw_parts(self.is_null as *const bool, len);
|
||||
let column: Vec<_> = slice
|
||||
.iter()
|
||||
.zip(nulls)
|
||||
.map(
|
||||
|(value, is_null)| {
|
||||
if *is_null {
|
||||
None
|
||||
} else {
|
||||
Some(*value)
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
json!(column)
|
||||
}
|
||||
}
|
||||
let nulls = std::slice::from_raw_parts(self.is_null as *const bool, len);
|
||||
let column: Vec<_> = slice
|
||||
.iter()
|
||||
.zip(nulls)
|
||||
.map(|(value, is_null)| if *is_null { None } else { Some(*value) })
|
||||
.collect();
|
||||
json!(column)
|
||||
}};
|
||||
}
|
||||
unsafe {
|
||||
match Ty::from(self.buffer_type as u8) {
|
||||
Ty::Null => {
|
||||
json!(Vec::from_iter(std::iter::repeat(Value::Null)))
|
||||
}
|
||||
Ty::Null => _nulls!(),
|
||||
Ty::Bool => _impl_primitive!(bool),
|
||||
Ty::TinyInt => _impl_primitive!(i8),
|
||||
Ty::SmallInt => _impl_primitive!(i16),
|
||||
|
|
@ -278,9 +297,7 @@ impl TaosMultiBind {
|
|||
.offset(self.buffer_length as isize * i as isize);
|
||||
let len = *self.length.offset(i as isize) as usize;
|
||||
let bytes = std::slice::from_raw_parts(ptr, len);
|
||||
Some(
|
||||
serde_json::from_slice::<serde_json::Value>(bytes).unwrap()
|
||||
)
|
||||
Some(serde_json::from_slice::<serde_json::Value>(bytes).unwrap())
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
|
@ -652,6 +669,112 @@ mod tests {
|
|||
// query!(b"drop database ws_stmt_i\0");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stmt_tiny_int_null() {
|
||||
use crate::*;
|
||||
init_env();
|
||||
unsafe {
|
||||
let taos = ws_connect_with_dsn(b"ws://localhost:6041\0" as *const u8 as _);
|
||||
if taos.is_null() {
|
||||
let code = ws_errno(taos);
|
||||
assert!(code != 0);
|
||||
let str = ws_errstr(taos);
|
||||
dbg!(CStr::from_ptr(str));
|
||||
}
|
||||
assert!(!taos.is_null());
|
||||
|
||||
macro_rules! query {
|
||||
($sql:expr) => {
|
||||
let sql = $sql as *const u8 as _;
|
||||
let rs = ws_query(taos, sql);
|
||||
let code = ws_errno(rs);
|
||||
assert!(code == 0, "{:?}", CStr::from_ptr(ws_errstr(rs)));
|
||||
ws_free_result(rs);
|
||||
};
|
||||
}
|
||||
|
||||
query!(b"drop database if exists ws_stmt_i\0");
|
||||
query!(b"create database ws_stmt_i keep 36500\0");
|
||||
query!(b"use ws_stmt_i\0");
|
||||
query!(b"create table st(ts timestamp, c1 TINYINT UNSIGNED) tags(utntag TINYINT UNSIGNED)\0");
|
||||
query!(b"create table t1 using st tags(0)\0");
|
||||
query!(b"create table t2 using st tags(255)\0");
|
||||
query!(b"create table t3 using st tags(NULL)\0");
|
||||
|
||||
let stmt = ws_stmt_init(taos);
|
||||
|
||||
let sql = "insert into ? values(?,?)";
|
||||
let code = ws_stmt_prepare(stmt, sql.as_ptr() as _, sql.len() as _);
|
||||
if code != 0 {
|
||||
dbg!(CStr::from_ptr(ws_errstr(stmt)).to_str().unwrap());
|
||||
panic!()
|
||||
}
|
||||
|
||||
for tbname in ["t1", "t2", "t3"] {
|
||||
let name = format!("ws_stmt_i.`{}`\0", tbname);
|
||||
let code = ws_stmt_set_tbname(stmt, name.as_ptr() as _);
|
||||
|
||||
if code != 0 {
|
||||
dbg!(CStr::from_ptr(ws_errstr(stmt)).to_str().unwrap());
|
||||
panic!()
|
||||
}
|
||||
let params = vec![
|
||||
TaosMultiBind::from_raw_timestamps(vec![false], &[0]),
|
||||
TaosMultiBind::from_primitives(vec![true], &[0u8]),
|
||||
];
|
||||
let code = ws_stmt_bind_param_batch(stmt, params.as_ptr(), params.len() as _);
|
||||
if code != 0 {
|
||||
dbg!(CStr::from_ptr(ws_errstr(stmt)).to_str().unwrap());
|
||||
panic!()
|
||||
}
|
||||
|
||||
ws_stmt_add_batch(stmt);
|
||||
let mut rows = 0;
|
||||
ws_stmt_execute(stmt, &mut rows);
|
||||
assert_eq!(rows, 1);
|
||||
|
||||
let sql = format!("select * from st where tbname = '{tbname}'\0");
|
||||
let rs = ws_query(taos, sql.as_bytes().as_ptr() as _);
|
||||
let code = ws_errno(rs);
|
||||
loop {
|
||||
let mut ptr = std::ptr::null();
|
||||
let mut rows = 0;
|
||||
ws_fetch_block(rs, &mut ptr, &mut rows);
|
||||
if rows == 0 {
|
||||
break;
|
||||
}
|
||||
for row in 0..rows {
|
||||
print!("{tbname} row {row}: ");
|
||||
for col in 0..3 {
|
||||
let mut ty = Ty::Null;
|
||||
let mut len = 0;
|
||||
let v = ws_get_value_in_block(
|
||||
rs,
|
||||
row,
|
||||
col,
|
||||
&mut ty as *mut _ as _,
|
||||
&mut len,
|
||||
);
|
||||
if v.is_null() {
|
||||
print!(",NULL");
|
||||
} else {
|
||||
match ty {
|
||||
Ty::Timestamp => print!("ts: {}", *(v as *const i64)),
|
||||
_ => print!(",{}", *(v as *const u8)),
|
||||
}
|
||||
}
|
||||
}
|
||||
println!("");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ws_stmt_close(stmt);
|
||||
// query!(b"drop database ws_stmt_i\0");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stmt_with_tags() {
|
||||
use crate::*;
|
||||
|
|
@ -717,3 +840,11 @@ mod tests {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_test() {
|
||||
use serde_json::json;
|
||||
|
||||
let s = json!(vec![Option::<u8>::None]);
|
||||
assert_eq!(dbg!(serde_json::to_string(&s).unwrap()), "[null]");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -787,7 +787,6 @@ async fn test_client_cloud() -> anyhow::Result<()> {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn ws_show_databases() -> anyhow::Result<()> {
|
||||
use taos_query::Queryable;
|
||||
let dsn = std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
|
||||
let client = WsTaos::from_dsn(dsn).await?;
|
||||
let mut rs = client.query("show databases").await?;
|
||||
|
|
|
|||
|
|
@ -1,7 +1,3 @@
|
|||
|
||||
|
||||
|
||||
|
||||
use bytes::Bytes;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
@ -10,7 +6,6 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
|
|||
use serde_with::serde_as;
|
||||
use serde_with::NoneAsEmptyString;
|
||||
|
||||
|
||||
use taos_query::common::Field;
|
||||
use taos_query::common::Precision;
|
||||
use taos_query::common::Ty;
|
||||
|
|
|
|||
|
|
@ -1,7 +1,3 @@
|
|||
|
||||
|
||||
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use serde_with::serde_as;
|
||||
|
|
@ -228,8 +224,6 @@ impl StmtRecv {
|
|||
mod tests {
|
||||
use anyhow::Ok;
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn stmt() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -169,18 +169,11 @@ impl WsStmtClient {
|
|||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
//
|
||||
log::trace!("Check websocket message sender alive");
|
||||
}
|
||||
Some(msg) = msg_recv.recv() => {
|
||||
// dbg!(&msg);
|
||||
// log::info!("send message: {}", msg.to_string());
|
||||
sender.send(msg).await.unwrap();
|
||||
log::info!("send done");
|
||||
}
|
||||
_ = rx.changed() => {
|
||||
log::info!("close sender task");
|
||||
log::debug!("close sender task");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -195,14 +188,14 @@ impl WsStmtClient {
|
|||
match message {
|
||||
Ok(message) => match message {
|
||||
Message::Text(text) => {
|
||||
log::info!("json response: {}", text);
|
||||
log::debug!("json response: {}", text);
|
||||
let v: StmtRecv = serde_json::from_str(&text).unwrap();
|
||||
match v.ok() {
|
||||
StmtOk::Conn(_) => {
|
||||
log::warn!("[{req_id}] received connected response in message loop");
|
||||
},
|
||||
StmtOk::Init(req_id, stmt_id) => {
|
||||
log::info!("stmt init done: {{ req_id: {}, stmt_id: {:?}}}", req_id, stmt_id);
|
||||
log::debug!("stmt init done: {{ req_id: {}, stmt_id: {:?}}}", req_id, stmt_id);
|
||||
if let Some((_, sender)) = queries_sender.remove(&req_id)
|
||||
{
|
||||
sender.send(stmt_id).unwrap();
|
||||
|
|
@ -212,7 +205,7 @@ impl WsStmtClient {
|
|||
}
|
||||
StmtOk::Stmt(stmt_id, res) => {
|
||||
if let Some(sender) = fetches_sender.read(&stmt_id, |_, sender| sender.clone()) {
|
||||
log::info!("send data to fetches with id {}", stmt_id);
|
||||
log::debug!("send data to fetches with id {}", stmt_id);
|
||||
// let res = res.clone();
|
||||
sender.send(res).unwrap();
|
||||
// }) {
|
||||
|
|
@ -251,7 +244,7 @@ impl WsStmtClient {
|
|||
}
|
||||
}
|
||||
_ = close_listener.changed() => {
|
||||
log::info!("close reader task");
|
||||
log::debug!("close reader task");
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -328,7 +321,7 @@ impl WsAsyncStmt {
|
|||
Ok(())
|
||||
}
|
||||
pub async fn add_batch(&self) -> Result<()> {
|
||||
log::info!("add batch");
|
||||
log::debug!("add batch");
|
||||
let message = StmtSend::AddBatch(self.args);
|
||||
self.ws.send(message.to_msg()).await?;
|
||||
let _ = self.receiver.recv_timeout(self.timeout)??;
|
||||
|
|
@ -340,10 +333,10 @@ impl WsAsyncStmt {
|
|||
columns: columns,
|
||||
};
|
||||
{
|
||||
log::info!("bind");
|
||||
log::debug!("bind with: {message:?}");
|
||||
self.ws.send(message.to_msg()).await?;
|
||||
}
|
||||
log::info!("begin receive");
|
||||
log::debug!("begin receive");
|
||||
let _ = self.receiver.recv_timeout(self.timeout)??;
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -375,7 +368,7 @@ impl WsAsyncStmt {
|
|||
}
|
||||
|
||||
pub async fn exec(&self) -> Result<usize> {
|
||||
log::info!("exec");
|
||||
log::debug!("exec");
|
||||
let message = StmtSend::Exec(self.args);
|
||||
self.ws.send_timeout(message.to_msg(), self.timeout).await?;
|
||||
if let Some(affected) = self.receiver.recv_timeout(self.timeout)?? {
|
||||
|
|
@ -396,7 +389,6 @@ mod tests {
|
|||
// !Websocket tests should always use `multi_thread`
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn test_client() -> anyhow::Result<()> {
|
||||
|
||||
use taos_query::AsyncQueryable;
|
||||
|
||||
let taos = TaosBuilder::from_dsn("taos://localhost:6041")?.build()?;
|
||||
|
|
@ -405,7 +397,6 @@ mod tests {
|
|||
taos.exec("create table stmt.ctb (ts timestamp, v int)")
|
||||
.await?;
|
||||
|
||||
|
||||
std::env::set_var("RUST_LOG", "debug");
|
||||
pretty_env_logger::init();
|
||||
let client = WsStmtClient::from_dsn("taos+ws://localhost:6041/stmt").await?;
|
||||
|
|
|
|||
Loading…
Reference in a new issue