Merge pull request #21 from taosdata/fix/unsigned-tinyint-null

chore(ws): add debug log for stmt
This commit is contained in:
Linhe Huo 2022-08-06 15:36:31 +08:00 committed by GitHub
commit 29424d58c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 168 additions and 52 deletions

View file

@ -412,7 +412,16 @@ unsafe fn connect_with_dsn(dsn: *const c_char) -> WsTaos {
/// ```
#[no_mangle]
pub unsafe extern "C" fn ws_enable_log() {
pretty_env_logger::init();
static ONCE_INIT: std::sync::Once = std::sync::Once::new();
ONCE_INIT.call_once(|| {
let mut builder = pretty_env_logger::formatted_timed_builder();
builder.format_timestamp_nanos();
if let Ok(s) = ::std::env::var("RUST_LOG") {
builder.parse_filters(&s);
}
builder.init();
});
log::debug!("enable logger to stdout");
}
/// Connect via dsn string, returns NULL if failed.
@ -744,11 +753,8 @@ pub unsafe fn ws_print_row(rs: *mut WS_RES, row: i32) {
#[cfg(test)]
pub fn init_env() {
static ONCE_INIT: std::sync::Once = std::sync::Once::new();
ONCE_INIT.call_once(|| {
pretty_env_logger::init();
std::env::set_var("RUST_DEBUG", "debug");
});
std::env::set_var("RUST_LOG", "debug");
unsafe { ws_enable_log() };
}
#[cfg(test)]

View file

@ -156,27 +156,46 @@ impl TaosMultiBind {
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
use serde_json::Value;
assert!(self.num > 0, "invalid bind value");
let len = self.num as usize;
macro_rules! _nulls {
() => {
json!(std::iter::repeat(Value::Null).take(len).collect::<Vec<_>>())
};
}
if self.buffer.is_null() {
return _nulls!();
}
macro_rules! _impl_primitive {
($t:ty) => {{
let len = self.num as usize;
let slice = std::slice::from_raw_parts(self.buffer as *const $t, len);
if self.is_null.is_null() {
return json!(slice);
match self.is_null.is_null() {
true => json!(slice),
false => {
let nulls = std::slice::from_raw_parts(self.is_null as *const bool, len);
let column: Vec<_> = slice
.iter()
.zip(nulls)
.map(
|(value, is_null)| {
if *is_null {
None
} else {
Some(*value)
}
},
)
.collect();
json!(column)
}
}
let nulls = std::slice::from_raw_parts(self.is_null as *const bool, len);
let column: Vec<_> = slice
.iter()
.zip(nulls)
.map(|(value, is_null)| if *is_null { None } else { Some(*value) })
.collect();
json!(column)
}};
}
unsafe {
match Ty::from(self.buffer_type as u8) {
Ty::Null => {
json!(Vec::from_iter(std::iter::repeat(Value::Null)))
}
Ty::Null => _nulls!(),
Ty::Bool => _impl_primitive!(bool),
Ty::TinyInt => _impl_primitive!(i8),
Ty::SmallInt => _impl_primitive!(i16),
@ -278,9 +297,7 @@ impl TaosMultiBind {
.offset(self.buffer_length as isize * i as isize);
let len = *self.length.offset(i as isize) as usize;
let bytes = std::slice::from_raw_parts(ptr, len);
Some(
serde_json::from_slice::<serde_json::Value>(bytes).unwrap()
)
Some(serde_json::from_slice::<serde_json::Value>(bytes).unwrap())
}
})
.collect::<Vec<_>>();
@ -652,6 +669,112 @@ mod tests {
// query!(b"drop database ws_stmt_i\0");
}
}
#[test]
fn stmt_tiny_int_null() {
use crate::*;
init_env();
unsafe {
let taos = ws_connect_with_dsn(b"ws://localhost:6041\0" as *const u8 as _);
if taos.is_null() {
let code = ws_errno(taos);
assert!(code != 0);
let str = ws_errstr(taos);
dbg!(CStr::from_ptr(str));
}
assert!(!taos.is_null());
macro_rules! query {
($sql:expr) => {
let sql = $sql as *const u8 as _;
let rs = ws_query(taos, sql);
let code = ws_errno(rs);
assert!(code == 0, "{:?}", CStr::from_ptr(ws_errstr(rs)));
ws_free_result(rs);
};
}
query!(b"drop database if exists ws_stmt_i\0");
query!(b"create database ws_stmt_i keep 36500\0");
query!(b"use ws_stmt_i\0");
query!(b"create table st(ts timestamp, c1 TINYINT UNSIGNED) tags(utntag TINYINT UNSIGNED)\0");
query!(b"create table t1 using st tags(0)\0");
query!(b"create table t2 using st tags(255)\0");
query!(b"create table t3 using st tags(NULL)\0");
let stmt = ws_stmt_init(taos);
let sql = "insert into ? values(?,?)";
let code = ws_stmt_prepare(stmt, sql.as_ptr() as _, sql.len() as _);
if code != 0 {
dbg!(CStr::from_ptr(ws_errstr(stmt)).to_str().unwrap());
panic!()
}
for tbname in ["t1", "t2", "t3"] {
let name = format!("ws_stmt_i.`{}`\0", tbname);
let code = ws_stmt_set_tbname(stmt, name.as_ptr() as _);
if code != 0 {
dbg!(CStr::from_ptr(ws_errstr(stmt)).to_str().unwrap());
panic!()
}
let params = vec![
TaosMultiBind::from_raw_timestamps(vec![false], &[0]),
TaosMultiBind::from_primitives(vec![true], &[0u8]),
];
let code = ws_stmt_bind_param_batch(stmt, params.as_ptr(), params.len() as _);
if code != 0 {
dbg!(CStr::from_ptr(ws_errstr(stmt)).to_str().unwrap());
panic!()
}
ws_stmt_add_batch(stmt);
let mut rows = 0;
ws_stmt_execute(stmt, &mut rows);
assert_eq!(rows, 1);
let sql = format!("select * from st where tbname = '{tbname}'\0");
let rs = ws_query(taos, sql.as_bytes().as_ptr() as _);
let code = ws_errno(rs);
loop {
let mut ptr = std::ptr::null();
let mut rows = 0;
ws_fetch_block(rs, &mut ptr, &mut rows);
if rows == 0 {
break;
}
for row in 0..rows {
print!("{tbname} row {row}: ");
for col in 0..3 {
let mut ty = Ty::Null;
let mut len = 0;
let v = ws_get_value_in_block(
rs,
row,
col,
&mut ty as *mut _ as _,
&mut len,
);
if v.is_null() {
print!(",NULL");
} else {
match ty {
Ty::Timestamp => print!("ts: {}", *(v as *const i64)),
_ => print!(",{}", *(v as *const u8)),
}
}
}
println!("");
}
}
}
ws_stmt_close(stmt);
// query!(b"drop database ws_stmt_i\0");
}
}
#[test]
fn stmt_with_tags() {
use crate::*;
@ -717,3 +840,11 @@ mod tests {
}
}
}
#[test]
fn json_test() {
use serde_json::json;
let s = json!(vec![Option::<u8>::None]);
assert_eq!(dbg!(serde_json::to_string(&s).unwrap()), "[null]");
}

View file

@ -787,7 +787,6 @@ async fn test_client_cloud() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn ws_show_databases() -> anyhow::Result<()> {
use taos_query::Queryable;
let dsn = std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
let client = WsTaos::from_dsn(dsn).await?;
let mut rs = client.query("show databases").await?;

View file

@ -1,7 +1,3 @@
use bytes::Bytes;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
@ -10,7 +6,6 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
use serde_with::serde_as;
use serde_with::NoneAsEmptyString;
use taos_query::common::Field;
use taos_query::common::Precision;
use taos_query::common::Ty;

View file

@ -1,7 +1,3 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::serde_as;
@ -228,8 +224,6 @@ impl StmtRecv {
mod tests {
use anyhow::Ok;
#[test]
fn stmt() -> anyhow::Result<()> {
Ok(())

View file

@ -169,18 +169,11 @@ impl WsStmtClient {
loop {
tokio::select! {
_ = interval.tick() => {
//
log::trace!("Check websocket message sender alive");
}
Some(msg) = msg_recv.recv() => {
// dbg!(&msg);
// log::info!("send message: {}", msg.to_string());
sender.send(msg).await.unwrap();
log::info!("send done");
}
_ = rx.changed() => {
log::info!("close sender task");
log::debug!("close sender task");
break;
}
}
@ -195,14 +188,14 @@ impl WsStmtClient {
match message {
Ok(message) => match message {
Message::Text(text) => {
log::info!("json response: {}", text);
log::debug!("json response: {}", text);
let v: StmtRecv = serde_json::from_str(&text).unwrap();
match v.ok() {
StmtOk::Conn(_) => {
log::warn!("[{req_id}] received connected response in message loop");
},
StmtOk::Init(req_id, stmt_id) => {
log::info!("stmt init done: {{ req_id: {}, stmt_id: {:?}}}", req_id, stmt_id);
log::debug!("stmt init done: {{ req_id: {}, stmt_id: {:?}}}", req_id, stmt_id);
if let Some((_, sender)) = queries_sender.remove(&req_id)
{
sender.send(stmt_id).unwrap();
@ -212,7 +205,7 @@ impl WsStmtClient {
}
StmtOk::Stmt(stmt_id, res) => {
if let Some(sender) = fetches_sender.read(&stmt_id, |_, sender| sender.clone()) {
log::info!("send data to fetches with id {}", stmt_id);
log::debug!("send data to fetches with id {}", stmt_id);
// let res = res.clone();
sender.send(res).unwrap();
// }) {
@ -251,7 +244,7 @@ impl WsStmtClient {
}
}
_ = close_listener.changed() => {
log::info!("close reader task");
log::debug!("close reader task");
break
}
}
@ -328,7 +321,7 @@ impl WsAsyncStmt {
Ok(())
}
pub async fn add_batch(&self) -> Result<()> {
log::info!("add batch");
log::debug!("add batch");
let message = StmtSend::AddBatch(self.args);
self.ws.send(message.to_msg()).await?;
let _ = self.receiver.recv_timeout(self.timeout)??;
@ -340,10 +333,10 @@ impl WsAsyncStmt {
columns: columns,
};
{
log::info!("bind");
log::debug!("bind with: {message:?}");
self.ws.send(message.to_msg()).await?;
}
log::info!("begin receive");
log::debug!("begin receive");
let _ = self.receiver.recv_timeout(self.timeout)??;
Ok(())
}
@ -375,7 +368,7 @@ impl WsAsyncStmt {
}
pub async fn exec(&self) -> Result<usize> {
log::info!("exec");
log::debug!("exec");
let message = StmtSend::Exec(self.args);
self.ws.send_timeout(message.to_msg(), self.timeout).await?;
if let Some(affected) = self.receiver.recv_timeout(self.timeout)?? {
@ -396,7 +389,6 @@ mod tests {
// !Websocket tests should always use `multi_thread`
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_client() -> anyhow::Result<()> {
use taos_query::AsyncQueryable;
let taos = TaosBuilder::from_dsn("taos://localhost:6041")?.build()?;
@ -405,7 +397,6 @@ mod tests {
taos.exec("create table stmt.ctb (ts timestamp, v int)")
.await?;
std::env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let client = WsStmtClient::from_dsn("taos+ws://localhost:6041/stmt").await?;