From 6e9f0bb1e3d3f4bd347fbdbf62560da10b09e565 Mon Sep 17 00:00:00 2001 From: Sebastian Jeltsch Date: Wed, 15 Apr 2026 10:23:31 +0200 Subject: [PATCH] Move `trailbase-sqlite` APIs around and further untangle rusqlite from `Row[s]`. --- Cargo.toml | 2 +- crates/core/src/admin/query.rs | 3 +- crates/core/src/connection.rs | 2 +- crates/sqlite/benches/synthetic/benchmark.rs | 3 +- crates/sqlite/src/connection.rs | 300 +--------- crates/sqlite/src/error.rs | 10 + crates/sqlite/src/experimental/mod.rs | 13 +- crates/sqlite/src/lib.rs | 16 +- crates/sqlite/src/rows.rs | 72 +-- crates/sqlite/src/sqlite/batch.rs | 5 +- crates/sqlite/src/sqlite/connection.rs | 545 ++++++++----------- crates/sqlite/src/sqlite/executor.rs | 379 +++++++++++++ crates/sqlite/src/sqlite/mod.rs | 12 +- crates/sqlite/src/sqlite/util.rs | 81 +++ crates/sqlite/src/tests.rs | 2 +- crates/wasm-runtime-host/src/host.rs | 2 +- crates/wasm-runtime-host/src/sqlite.rs | 2 +- 17 files changed, 728 insertions(+), 721 deletions(-) create mode 100644 crates/sqlite/src/sqlite/executor.rs diff --git a/Cargo.toml b/Cargo.toml index a30d178e..0094b539 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ tracing-subscriber = { version = "0.3.18", default-features = false, features = trailbase = { path = "crates/core", version = "0.2.0", default-features = false, features=["wasm"] } trailbase-assets = { path = "crates/assets", version = "0.2.0" } trailbase-build = { path = "crates/build", version = "0.1.1" } -trailbase-client = { path = "crates/client", version = "0.8.1" } +trailbase-client = { path = "crates/client", version = "0.8.0" } trailbase-extension = { path = "crates/extension", version = "0.3.0" } trailbase-qs = { path = "crates/qs", version = "0.1.0" } trailbase-reactive = { path = "crates/reactive", version = "0.1.0" } diff --git a/crates/core/src/admin/query.rs b/crates/core/src/admin/query.rs index 9487d337..2073e397 100644 --- a/crates/core/src/admin/query.rs +++ b/crates/core/src/admin/query.rs @@ -77,8 +77,7 @@ pub async fn query_handler( .as_ref(), )?; - let batched_rows_result = - trailbase_sqlite::sqlite::batch::execute_batch(&conn, request.query).await; + let batched_rows_result = trailbase_sqlite::sqlite::execute_batch(&conn, request.query).await; // In the fallback case we always need to invalidate the cache. if must_invalidate_schema_cache { diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index ee62f1ef..4c0ace66 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -352,7 +352,7 @@ fn init_main_db_impl( return Ok(conn); }, - trailbase_sqlite::connection::Options { + trailbase_sqlite::Options { n_read_threads: match (data_dir, std::thread::available_parallelism()) { (None, _) => Some(0), (Some(_), Ok(n)) => Some(n.get().clamp(2, 4)), diff --git a/crates/sqlite/benches/synthetic/benchmark.rs b/crates/sqlite/benches/synthetic/benchmark.rs index 528a8e08..949ff527 100644 --- a/crates/sqlite/benches/synthetic/benchmark.rs +++ b/crates/sqlite/benches/synthetic/benchmark.rs @@ -10,8 +10,7 @@ use rand::RngExt; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; -use trailbase_sqlite::Value; -use trailbase_sqlite::connection::{Connection, Options}; +use trailbase_sqlite::{Connection, Options, Value}; use crate::connection::{AsyncConnection, SharedRusqlite, ThreadLocalRusqlite}; use crate::error::BenchmarkError; diff --git a/crates/sqlite/src/connection.rs b/crates/sqlite/src/connection.rs index 524047c4..13e5f58d 100644 --- a/crates/sqlite/src/connection.rs +++ b/crates/sqlite/src/connection.rs @@ -1,299 +1 @@ -use std::fmt::Debug; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; - -use crate::database::Database; -use crate::error::Error; -use crate::from_sql::FromSql; -use crate::params::Params; -use crate::rows::{Row, Rows, columns}; -use crate::sqlite::connection::{ConnectionImpl, get_value, map_first}; - -// NOTE: We should probably decouple from the impl. -pub use crate::sqlite::connection::{ArcLockGuard, LockGuard, Options}; - -/// A handle to call functions in background thread. -#[derive(Clone)] -pub struct Connection { - c: ConnectionImpl, -} - -impl Connection { - pub fn new(builder: impl Fn() -> Result) -> Result { - return Self::with_opts(builder, Options::default()); - } - - pub fn with_opts( - builder: impl Fn() -> Result, - opt: Options, - ) -> std::result::Result { - return Ok(Self { - c: ConnectionImpl::new(builder, opt)?, - }); - } - - /// Open a new connection to an in-memory SQLite database. - /// - /// # Failure - /// - /// Will return `Err` if the underlying SQLite open call fails. - pub fn open_in_memory() -> Result { - let conn = Self::with_opts( - rusqlite::Connection::open_in_memory, - Options { - n_read_threads: Some(0), - ..Default::default() - }, - )?; - - assert_eq!(1, conn.threads()); - - return Ok(conn); - } - - pub fn id(&self) -> usize { - return self.c.id(); - } - - pub fn threads(&self) -> usize { - return self.c.threads(); - } - - pub fn write_lock(&self) -> LockGuard<'_> { - return self.c.write_lock(); - } - - pub fn try_write_arc_lock_for(&self, duration: tokio::time::Duration) -> Option { - return self.c.try_write_arc_lock_for(duration); - } - - /// Call a function in background thread and get the result - /// asynchronously. - /// - /// # Failure - /// - /// Will return `Err` if the database connection has been closed. - pub async fn call(&self, function: F) -> Result - where - F: FnOnce(&mut rusqlite::Connection) -> Result + Send + 'static, - R: Send + 'static, - { - return self.c.call(function).await; - } - - pub async fn call_reader(&self, function: F) -> Result - where - F: FnOnce(&rusqlite::Connection) -> Result + Send + 'static, - R: Send + 'static, - { - return self.c.call_reader(function).await; - } - - /// Query SQL statement. - pub async fn read_query_rows( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result { - return self - .c - .read_query_rows_f(sql, params, crate::rows::from_rows) - .await; - } - - pub async fn read_query_row( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result, Error> { - return self - .c - .read_query_rows_f(sql, params, |rows| { - return map_first(rows, |row| { - return crate::rows::from_row(row, Arc::new(columns(row.as_ref()))); - }); - }) - .await; - } - - pub async fn read_query_row_get( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - index: usize, - ) -> Result, Error> - where - T: FromSql + Send + 'static, - { - return self - .c - .read_query_rows_f(sql, params, move |rows| { - return map_first(rows, move |row| { - return get_value(row, index); - }); - }) - .await; - } - - pub async fn read_query_value( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result, Error> { - return self - .c - .read_query_rows_f(sql, params, |rows| { - return map_first(rows, move |row| { - serde_rusqlite::from_row(row).map_err(Error::DeserializeValue) - }); - }) - .await; - } - - pub async fn read_query_values( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result, Error> { - return self - .c - .read_query_rows_f(sql, params, |rows| { - return serde_rusqlite::from_rows(rows) - .collect::, _>>() - .map_err(Error::DeserializeValue); - }) - .await; - } - - pub async fn write_query_rows( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result { - return self - .c - .write_query_rows_f(sql, params, crate::rows::from_rows) - .await; - } - - pub async fn query_row_get( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - index: usize, - ) -> Result, Error> - where - T: FromSql + Send + 'static, - { - return self - .c - .write_query_rows_f(sql, params, move |rows| { - return map_first(rows, move |row| { - return get_value(row, index); - }); - }) - .await; - } - - pub async fn write_query_value( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result, Error> { - return self - .c - .write_query_rows_f(sql, params, |rows| { - return map_first(rows, |row| { - serde_rusqlite::from_row(row).map_err(Error::DeserializeValue) - }); - }) - .await; - } - - pub async fn write_query_values( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result, Error> { - return self - .c - .write_query_rows_f(sql, params, |rows| { - return serde_rusqlite::from_rows(rows) - .collect::, _>>() - .map_err(Error::DeserializeValue); - }) - .await; - } - - /// Execute SQL statement. - pub async fn execute( - &self, - sql: impl AsRef + Send + 'static, - params: impl Params + Send + 'static, - ) -> Result { - return self.c.execute(sql, params).await; - } - - /// Batch execute provided SQL statementsi in batch. - pub async fn execute_batch(&self, sql: impl AsRef + Send + 'static) -> Result<(), Error> { - return self.c.execute_batch(sql).await; - } - - pub fn attach(&self, path: &str, name: &str) -> Result<(), Error> { - let query = format!("ATTACH DATABASE '{path}' AS {name} "); - return self.c.map(move |conn| { - conn.execute(&query, ())?; - return Ok(()); - }); - } - - pub fn detach(&self, name: &str) -> Result<(), Error> { - let query = format!("DETACH DATABASE {name}"); - return self.c.map(move |conn| { - conn.execute(&query, ())?; - return Ok(()); - }); - } - - pub async fn list_databases(&self) -> Result, Error> { - return self - .c - .call_reader(crate::sqlite::util::list_databases) - .await; - } - - /// Close the database connection. - /// - /// This is functionally equivalent to the `Drop` implementation for `Connection`. It consumes - /// the `Connection`, but on error returns it to the caller for retry purposes. - /// - /// If successful, any following `close` operations performed on `Connection` copies will succeed - /// immediately. - /// - /// # Failure - /// - /// Will return `Err` if the underlying SQLite close call fails. - pub async fn close(self) -> Result<(), Error> { - return self.c.close().await; - } -} - -impl Debug for Connection { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Connection").finish() - } -} - -impl Hash for Connection { - fn hash(&self, state: &mut H) { - self.id().hash(state); - } -} - -impl PartialEq for Connection { - fn eq(&self, other: &Self) -> bool { - return self.id() == other.id(); - } -} - -impl Eq for Connection {} +pub use crate::sqlite::connection::{ArcLockGuard, Connection, LockGuard, Options}; diff --git a/crates/sqlite/src/error.rs b/crates/sqlite/src/error.rs index 5f0131bb..f61283cb 100644 --- a/crates/sqlite/src/error.rs +++ b/crates/sqlite/src/error.rs @@ -3,6 +3,16 @@ pub enum Error { #[error("ConnectionClosed")] ConnectionClosed, + /// Error when the value of a particular column is requested, but the type + /// of the result in that column cannot be converted to the requested + /// Rust type. + #[error("InvalidColumnType({idx}, {name}, {decl_type:?})")] + InvalidColumnType { + idx: usize, + name: String, + decl_type: Option, + }, + // QUESTION: This is leaky. How often do downstream users have to introspect on this // rusqlite::Error. Otherwise, should/could this be more opaue. #[error("Rusqlite: {0}")] diff --git a/crates/sqlite/src/experimental/mod.rs b/crates/sqlite/src/experimental/mod.rs index 069b5727..9320c31f 100644 --- a/crates/sqlite/src/experimental/mod.rs +++ b/crates/sqlite/src/experimental/mod.rs @@ -20,13 +20,13 @@ impl DummyConnection { } #[allow(unused)] -enum PolymorphicConnection { +pub enum DummyPolymorphicConnection { Sqlite(Connection), Dummy(DummyConnection), } #[allow(unused)] -impl PolymorphicConnection { +impl DummyPolymorphicConnection { pub async fn read_query_row_get( &self, sql: impl AsRef + Send + 'static, @@ -41,6 +41,13 @@ impl PolymorphicConnection { Self::Dummy(c) => c.read_query_row_get(sql, params, index).await, } } + + pub fn sqlite_connection(&self) -> Option<&Connection> { + match self { + Self::Sqlite(conn) => Some(conn), + Self::Dummy(_) => None, + } + } } #[cfg(test)] @@ -50,7 +57,7 @@ mod tests { #[tokio::test] async fn polymorphic_test() { let conn = Connection::open_in_memory().unwrap(); - let p = PolymorphicConnection::Sqlite(conn.clone()); + let p = DummyPolymorphicConnection::Sqlite(conn.clone()); conn .execute_batch( diff --git a/crates/sqlite/src/lib.rs b/crates/sqlite/src/lib.rs index 842fbb09..98e653f9 100644 --- a/crates/sqlite/src/lib.rs +++ b/crates/sqlite/src/lib.rs @@ -10,20 +10,20 @@ clippy::needless_continue )] -pub mod connection; -pub mod database; -pub mod error; +mod connection; +mod database; +mod error; pub mod from_sql; -pub mod params; -pub mod rows; +mod params; +mod rows; pub mod sqlite; pub mod to_sql; -pub mod value; +mod value; #[cfg(debug_assertions)] -pub mod experimental; +mod experimental; -pub use connection::Connection; +pub use connection::{ArcLockGuard, Connection, LockGuard, Options}; pub use database::Database; pub use error::Error; pub use params::{NamedParamRef, NamedParams, NamedParamsRef, Params}; diff --git a/crates/sqlite/src/rows.rs b/crates/sqlite/src/rows.rs index 332a52ec..6b05dce5 100644 --- a/crates/sqlite/src/rows.rs +++ b/crates/sqlite/src/rows.rs @@ -1,4 +1,4 @@ -use rusqlite::{Statement, types}; +use rusqlite::types; use std::fmt::Debug; use std::ops::Index; use std::str::FromStr; @@ -33,8 +33,8 @@ impl FromStr for ValueType { #[derive(Debug, Clone, PartialEq)] pub struct Column { - name: String, - decl_type: Option, + pub(crate) name: String, + pub(crate) decl_type: Option, } #[derive(Debug)] @@ -74,38 +74,18 @@ impl Rows { } pub fn column_type(&self, idx: usize) -> Result { - if let Some(c) = self.1.get(idx) { - return Ok(c.decl_type.ok_or_else(|| { - rusqlite::Error::InvalidColumnType( - idx, - self.column_name(idx).unwrap_or("?").to_string(), - types::Type::Null, - ) - })?); - } - - return Err( - rusqlite::Error::InvalidColumnType( + return self + .1 + .get(idx) + .and_then(|c| c.decl_type) + .ok_or_else(|| Error::InvalidColumnType { idx, - self.column_name(idx).unwrap_or("?").to_string(), - types::Type::Null, - ) - .into(), - ); + name: self.column_name(idx).unwrap_or("?").to_string(), + decl_type: None, + }); } } -pub fn from_rows(mut rows: rusqlite::Rows) -> Result { - let columns: Arc> = Arc::new(rows.as_ref().map_or_else(Vec::new, columns)); - - let mut result = vec![]; - while let Some(row) = rows.next()? { - result.push(self::from_row(row, columns.clone())?); - } - - return Ok(Rows(result, columns)); -} - impl Index for Rows { type Output = Row; @@ -123,17 +103,6 @@ impl IntoIterator for Rows { } } -pub(crate) fn columns(stmt: &Statement<'_>) -> Vec { - return stmt - .columns() - .into_iter() - .map(|c| Column { - name: c.name().to_string(), - decl_type: c.decl_type().and_then(|s| ValueType::from_str(s).ok()), - }) - .collect(); -} - #[derive(Debug)] pub struct Row(pub Vec, pub Arc>); @@ -181,25 +150,6 @@ impl Row { } } -pub(crate) fn from_row(row: &rusqlite::Row, cols: Arc>) -> Result { - #[cfg(debug_assertions)] - if let Some(rc) = Some(columns(row.as_ref())) - && rc.len() != cols.len() - { - // Apparently this can happen during schema manipulations, e.g. when deleting a column - // :shrug:. We normalize everything to the same rows schema rather than dealing with - // jagged tables. - log::warn!("Rows/row column mismatch: {cols:?} vs {rc:?}"); - } - - // We have to access by index here, since names can be duplicate. - let values = (0..cols.len()) - .map(|idx| row.get(idx).unwrap_or(Value::Null)) - .collect(); - - return Ok(Row(values, cols)); -} - impl Index for Row { type Output = Value; diff --git a/crates/sqlite/src/sqlite/batch.rs b/crates/sqlite/src/sqlite/batch.rs index 22500cbc..8f43620c 100644 --- a/crates/sqlite/src/sqlite/batch.rs +++ b/crates/sqlite/src/sqlite/batch.rs @@ -1,9 +1,10 @@ use rusqlite::fallible_iterator::FallibleIterator; use std::sync::Arc; +use super::util::{columns, from_row}; use crate::connection::Connection; use crate::error::Error; -use crate::rows::{Column, Rows, columns, from_row}; +use crate::rows::{Column, Rows}; /// Batch execute SQL statements and return rows of last statement. /// @@ -31,7 +32,7 @@ pub async fn execute_batch( let mut result = vec![from_row(row, cols.clone())?]; while let Some(row) = rows.next()? { - result.push(crate::rows::from_row(row, cols.clone())?); + result.push(from_row(row, cols.clone())?); } return Ok(Some(Rows(result, cols))); } diff --git a/crates/sqlite/src/sqlite/connection.rs b/crates/sqlite/src/sqlite/connection.rs index 7a6b327c..f6d0bd88 100644 --- a/crates/sqlite/src/sqlite/connection.rs +++ b/crates/sqlite/src/sqlite/connection.rs @@ -1,415 +1,294 @@ -use kanal::{Receiver, Sender}; -use log::*; -use parking_lot::RwLock; -use rusqlite::fallible_iterator::FallibleIterator; -use std::ops::{Deref, DerefMut}; +use std::fmt::Debug; +use std::hash::{Hash, Hasher}; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio::sync::oneshot; +use crate::database::Database; use crate::error::Error; -use crate::from_sql::{FromSql, FromSqlError}; +use crate::from_sql::FromSql; use crate::params::Params; +use crate::rows::{Row, Rows}; +use crate::sqlite::executor::Executor; +use crate::sqlite::util::{columns, from_row, from_rows, get_value, map_first}; -#[derive(Default)] -struct ConnectionVec(smallvec::SmallVec<[rusqlite::Connection; 32]>); - -// NOTE: We must never access the same connection concurrently even as immutable &Connection, due -// to intrinsic statement cache. We can ensure this by uniquely assigning one connection to each -// thread. -unsafe impl Sync for ConnectionVec {} - -enum Message { - RunMut(Box), - RunConst(Box), - Terminate, -} - -#[derive(Clone, Default)] -pub struct Options { - pub busy_timeout: Option, - pub n_read_threads: Option, -} +// NOTE: We should probably decouple from the impl. +pub use crate::sqlite::executor::{ArcLockGuard, LockGuard, Options}; /// A handle to call functions in background thread. #[derive(Clone)] -pub(crate) struct ConnectionImpl { - id: usize, - reader: Sender, - writer: Sender, - // NOTE: Is shared across reader and writer worker threads. - conns: Arc>, +pub struct Connection { + exec: Executor, } -impl ConnectionImpl { - pub fn new( +impl Connection { + pub fn new(builder: impl Fn() -> Result) -> Result { + return Self::with_opts(builder, Options::default()); + } + + pub fn with_opts( builder: impl Fn() -> Result, opt: Options, - ) -> Result { - let Options { - busy_timeout, - n_read_threads, - } = opt; - - let new_conn = || -> Result { - let conn = builder()?; - if let Some(busy_timeout) = busy_timeout { - conn - .busy_timeout(busy_timeout) - .expect("busy timeout failed"); - } - return Ok(conn); - }; - - let write_conn = new_conn()?; - let path = write_conn.path().map(|p| p.to_string()); - let in_memory = path.as_ref().is_none_or(|s| { - // Returns empty string for in-memory databases. - return s.is_empty(); - }); - - let n_read_threads: i64 = match (in_memory, n_read_threads.unwrap_or(0)) { - (true, _) => { - // We cannot share an in-memory database across threads, they're all independent. - 0 - } - (false, 1) => { - warn!("A single reader thread won't improve performance, falling back to 0."); - 0 - } - (false, n) => { - if let Ok(max) = std::thread::available_parallelism() - && n > max.get() - { - warn!( - "Num read threads '{n}' exceeds hardware parallelism: {}", - max.get() - ); - } - n as i64 - } - }; - - let conns = Arc::new(RwLock::new(ConnectionVec({ - let mut conns = vec![write_conn]; - for _ in 0..(n_read_threads - 1).max(0) { - conns.push(new_conn()?); - } - conns.into() - }))); - - assert_eq!(n_read_threads.max(1) as usize, conns.read().0.len()); - - // Spawn writer. - let (shared_write_sender, shared_write_receiver) = kanal::unbounded::(); - { - let conns = conns.clone(); - std::thread::Builder::new() - .name("tb-sqlite-writer".to_string()) - .spawn(move || event_loop(0, conns, shared_write_receiver)) - .expect("startup"); - } - - // Spawn readers. - let shared_read_sender = if n_read_threads > 0 { - let (shared_read_sender, shared_read_receiver) = kanal::unbounded::(); - for i in 0..n_read_threads { - // NOTE: read and writer threads are sharing the first conn, given they're mutually - // exclusive. - let index = i as usize; - let shared_read_receiver = shared_read_receiver.clone(); - let conns = conns.clone(); - - std::thread::Builder::new() - .name(format!("tb-sqlite-reader-{index}")) - .spawn(move || event_loop(index, conns, shared_read_receiver)) - .expect("startup"); - } - shared_read_sender - } else { - shared_write_sender.clone() - }; - - debug!( - "Opened SQLite DB '{}' with {n_read_threads} reader threads", - path.as_deref().unwrap_or("") - ); - + ) -> std::result::Result { return Ok(Self { - id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst), - reader: shared_read_sender, - writer: shared_write_sender, - conns, + exec: Executor::new(builder, opt)?, }); } + /// Open a new connection to an in-memory SQLite database. + /// + /// # Failure + /// + /// Will return `Err` if the underlying SQLite open call fails. + pub fn open_in_memory() -> Result { + let conn = Self::with_opts( + rusqlite::Connection::open_in_memory, + Options { + n_read_threads: Some(0), + ..Default::default() + }, + )?; + + assert_eq!(1, conn.threads()); + + return Ok(conn); + } + pub fn id(&self) -> usize { - return self.id; + return self.exec.id(); } pub fn threads(&self) -> usize { - return self.conns.read().0.len(); + return self.exec.threads(); } - #[inline] pub fn write_lock(&self) -> LockGuard<'_> { - return LockGuard { - guard: self.conns.write(), - }; + return self.exec.write_lock(); } - #[inline] pub fn try_write_arc_lock_for(&self, duration: tokio::time::Duration) -> Option { - return self - .conns - .try_write_arc_for(duration) - .map(|guard| ArcLockGuard { guard }); + return self.exec.try_write_arc_lock_for(duration); } - #[inline] - pub(crate) fn map( - &self, - f: impl Fn(&rusqlite::Connection) -> Result<(), Error> + Send + 'static, - ) -> Result<(), Error> { - let lock = self.conns.write(); - for conn in &lock.0 { - f(conn)?; - } - return Ok(()); - } - - #[inline] + /// Call a function in background thread and get the result + /// asynchronously. + /// + /// # Failure + /// + /// Will return `Err` if the database connection has been closed. pub async fn call(&self, function: F) -> Result where F: FnOnce(&mut rusqlite::Connection) -> Result + Send + 'static, R: Send + 'static, { - // return call_impl(&self.writer, function).await; - let (sender, receiver) = oneshot::channel::>(); - - self - .writer - .send(Message::RunMut(Box::new(move |conn| { - if !sender.is_closed() { - let _ = sender.send(function(conn)); - } - }))) - .map_err(|_| Error::ConnectionClosed)?; - - receiver.await.map_err(|_| Error::ConnectionClosed)? + return self.exec.call(function).await; } - #[inline] pub async fn call_reader(&self, function: F) -> Result where F: FnOnce(&rusqlite::Connection) -> Result + Send + 'static, R: Send + 'static, { - let (sender, receiver) = oneshot::channel::>(); - - self - .reader - .send(Message::RunConst(Box::new(move |conn| { - if !sender.is_closed() { - let _ = sender.send(function(conn)); - } - }))) - .map_err(|_| Error::ConnectionClosed)?; - - receiver.await.map_err(|_| Error::ConnectionClosed)? + return self.exec.call_reader(function).await; } - #[inline] - pub async fn write_query_rows_f( + /// Query SQL statement. + pub async fn read_query_rows( &self, sql: impl AsRef + Send + 'static, params: impl Params + Send + 'static, - f: impl (FnOnce(rusqlite::Rows<'_>) -> Result) + Send + 'static, - ) -> Result - where - T: Send + 'static, - { + ) -> Result { + return self.exec.read_query_rows_f(sql, params, from_rows).await; + } + + pub async fn read_query_row( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result, Error> { return self - .call(move |conn: &mut rusqlite::Connection| { - let mut stmt = conn.prepare_cached(sql.as_ref())?; - - params.bind(&mut stmt)?; - - return f(stmt.raw_query()); + .exec + .read_query_rows_f(sql, params, |rows| { + return map_first(rows, |row| { + return from_row(row, Arc::new(columns(row.as_ref()))); + }); }) .await; } - #[inline] - pub async fn read_query_rows_f( + pub async fn read_query_row_get( &self, sql: impl AsRef + Send + 'static, params: impl Params + Send + 'static, - f: impl (FnOnce(rusqlite::Rows<'_>) -> Result) + Send + 'static, - ) -> Result + index: usize, + ) -> Result, Error> where - T: Send + 'static, + T: FromSql + Send + 'static, { return self - .call_reader(move |conn: &rusqlite::Connection| { - let mut stmt = conn.prepare_cached(sql.as_ref())?; - assert!(stmt.readonly()); - - params.bind(&mut stmt)?; - - return f(stmt.raw_query()); + .exec + .read_query_rows_f(sql, params, move |rows| { + return map_first(rows, move |row| { + return get_value(row, index); + }); }) .await; } + pub async fn read_query_value( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result, Error> { + return self + .exec + .read_query_rows_f(sql, params, |rows| { + return map_first(rows, move |row| { + serde_rusqlite::from_row(row).map_err(Error::DeserializeValue) + }); + }) + .await; + } + + pub async fn read_query_values( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result, Error> { + return self + .exec + .read_query_rows_f(sql, params, |rows| { + return serde_rusqlite::from_rows(rows) + .collect::, _>>() + .map_err(Error::DeserializeValue); + }) + .await; + } + + pub async fn write_query_rows( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result { + return self.exec.write_query_rows_f(sql, params, from_rows).await; + } + + pub async fn query_row_get( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + index: usize, + ) -> Result, Error> + where + T: FromSql + Send + 'static, + { + return self + .exec + .write_query_rows_f(sql, params, move |rows| { + return map_first(rows, move |row| { + return get_value(row, index); + }); + }) + .await; + } + + pub async fn write_query_value( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result, Error> { + return self + .exec + .write_query_rows_f(sql, params, |rows| { + return map_first(rows, |row| { + serde_rusqlite::from_row(row).map_err(Error::DeserializeValue) + }); + }) + .await; + } + + pub async fn write_query_values( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result, Error> { + return self + .exec + .write_query_rows_f(sql, params, |rows| { + return serde_rusqlite::from_rows(rows) + .collect::, _>>() + .map_err(Error::DeserializeValue); + }) + .await; + } + + /// Execute SQL statement. pub async fn execute( &self, sql: impl AsRef + Send + 'static, params: impl Params + Send + 'static, ) -> Result { + return self.exec.execute(sql, params).await; + } + + /// Batch execute provided SQL statementsi in batch. + pub async fn execute_batch(&self, sql: impl AsRef + Send + 'static) -> Result<(), Error> { + return self.exec.execute_batch(sql).await; + } + + pub fn attach(&self, path: &str, name: &str) -> Result<(), Error> { + let query = format!("ATTACH DATABASE '{path}' AS {name} "); + return self.exec.map(move |conn| { + conn.execute(&query, ())?; + return Ok(()); + }); + } + + pub fn detach(&self, name: &str) -> Result<(), Error> { + let query = format!("DETACH DATABASE {name}"); + return self.exec.map(move |conn| { + conn.execute(&query, ())?; + return Ok(()); + }); + } + + pub async fn list_databases(&self) -> Result, Error> { return self - .call(move |conn: &mut rusqlite::Connection| { - let mut stmt = conn.prepare_cached(sql.as_ref())?; - - params.bind(&mut stmt)?; - - return Ok(stmt.raw_execute()?); - }) + .exec + .call_reader(crate::sqlite::util::list_databases) .await; } - pub async fn execute_batch(&self, sql: impl AsRef + Send + 'static) -> Result<(), Error> { - self - .call(move |conn: &mut rusqlite::Connection| { - let mut batch = rusqlite::Batch::new(conn, sql.as_ref()); - while let Some(mut stmt) = batch.next()? { - // NOTE: We must use `raw_query` instead of `raw_execute`, otherwise queries - // returning rows (e.g. SELECT) will return an error. Rusqlite's batch_execute - // behaves consistently. - let _row = stmt.raw_query().next()?; - } - return Ok(()); - }) - .await?; - - return Ok(()); - } - + /// Close the database connection. + /// + /// This is functionally equivalent to the `Drop` implementation for `Connection`. It consumes + /// the `Connection`, but on error returns it to the caller for retry purposes. + /// + /// If successful, any following `close` operations performed on `Connection` copies will succeed + /// immediately. + /// + /// # Failure + /// + /// Will return `Err` if the underlying SQLite close call fails. pub async fn close(self) -> Result<(), Error> { - let _ = self.writer.send(Message::Terminate); - while self.reader.send(Message::Terminate).is_ok() { - // Continue to close readers while the channel is alive. - } - - let mut errors = vec![]; - let conns: ConnectionVec = std::mem::take(&mut self.conns.write()); - for conn in conns.0 { - // NOTE: rusqlite's `Connection::close()` returns itself, to allow users to retry - // failed closes. We on the other, may be left in a partially closed state with multiple - // connections. Ignorance is bliss. - if let Err((_self, err)) = conn.close() { - errors.push(err); - }; - } - - if !errors.is_empty() { - warn!("Closing connection: {errors:?}"); - return Err(errors.swap_remove(0).into()); - } - - return Ok(()); + return self.exec.close().await; } } -fn event_loop(id: usize, conns: Arc>, receiver: Receiver) { - while let Ok(message) = receiver.recv() { - match message { - Message::RunConst(f) => { - let lock = conns.read(); - f(&lock.0[id]) - } - Message::RunMut(f) => { - let mut lock = conns.write(); - f(&mut lock.0[0]) - } - Message::Terminate => { - return; - } - }; +impl Debug for Connection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Connection").finish() } } -pub struct LockGuard<'a> { - guard: parking_lot::RwLockWriteGuard<'a, ConnectionVec>, -} - -impl Deref for LockGuard<'_> { - type Target = rusqlite::Connection; - #[inline] - fn deref(&self) -> &rusqlite::Connection { - return &self.guard.deref().0[0]; +impl Hash for Connection { + fn hash(&self, state: &mut H) { + self.id().hash(state); } } -impl DerefMut for LockGuard<'_> { - #[inline] - fn deref_mut(&mut self) -> &mut rusqlite::Connection { - return &mut self.guard.deref_mut().0[0]; +impl PartialEq for Connection { + fn eq(&self, other: &Self) -> bool { + return self.id() == other.id(); } } -pub struct ArcLockGuard { - guard: parking_lot::ArcRwLockWriteGuard, -} - -impl Deref for ArcLockGuard { - type Target = rusqlite::Connection; - #[inline] - fn deref(&self) -> &rusqlite::Connection { - return &self.guard.deref().0[0]; - } -} - -impl DerefMut for ArcLockGuard { - #[inline] - fn deref_mut(&mut self) -> &mut rusqlite::Connection { - return &mut self.guard.deref_mut().0[0]; - } -} - -#[inline] -pub(crate) fn map_first( - mut rows: rusqlite::Rows<'_>, - f: impl (FnOnce(&rusqlite::Row<'_>) -> Result) + Send + 'static, -) -> Result, Error> -where - T: Send + 'static, -{ - if let Some(row) = rows.next()? { - return Ok(Some(f(row)?)); - } - return Ok(None); -} - -#[inline] -pub fn get_value(row: &rusqlite::Row<'_>, idx: usize) -> Result { - let value = row.get_ref(idx)?; - - return FromSql::column_result(value.into()).map_err(|err| { - use rusqlite::Error as RError; - - return Error::Rusqlite(match err { - FromSqlError::InvalidType => { - RError::InvalidColumnType(idx, "".into(), value.data_type()) - } - FromSqlError::OutOfRange(i) => RError::IntegralValueOutOfRange(idx, i), - FromSqlError::Utf8Error(err) => RError::Utf8Error(idx, err), - FromSqlError::Other(err) => RError::FromSqlConversionFailure(idx, value.data_type(), err), - FromSqlError::InvalidBlobSize { .. } => { - RError::FromSqlConversionFailure(idx, value.data_type(), Box::new(err)) - } - }); - }); -} - -static UNIQUE_CONN_ID: AtomicUsize = AtomicUsize::new(0); +impl Eq for Connection {} diff --git a/crates/sqlite/src/sqlite/executor.rs b/crates/sqlite/src/sqlite/executor.rs new file mode 100644 index 00000000..0c50cabf --- /dev/null +++ b/crates/sqlite/src/sqlite/executor.rs @@ -0,0 +1,379 @@ +use kanal::{Receiver, Sender}; +use log::*; +use parking_lot::RwLock; +use rusqlite::fallible_iterator::FallibleIterator; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::sync::oneshot; + +use crate::error::Error; +use crate::params::Params; + +#[derive(Default)] +struct ConnectionVec(smallvec::SmallVec<[rusqlite::Connection; 32]>); + +// NOTE: We must never access the same connection concurrently even as immutable &Connection, due +// to intrinsic statement cache. We can ensure this by uniquely assigning one connection to each +// thread. +unsafe impl Sync for ConnectionVec {} + +enum Message { + RunMut(Box), + RunConst(Box), + Terminate, +} + +#[derive(Clone, Default)] +pub struct Options { + pub busy_timeout: Option, + pub n_read_threads: Option, +} + +/// A handle to call functions in background thread. +#[derive(Clone)] +pub(crate) struct Executor { + id: usize, + reader: Sender, + writer: Sender, + // NOTE: Is shared across reader and writer worker threads. + conns: Arc>, +} + +impl Executor { + pub fn new( + builder: impl Fn() -> Result, + opt: Options, + ) -> Result { + let Options { + busy_timeout, + n_read_threads, + } = opt; + + let new_conn = || -> Result { + let conn = builder()?; + if let Some(busy_timeout) = busy_timeout { + conn + .busy_timeout(busy_timeout) + .expect("busy timeout failed"); + } + return Ok(conn); + }; + + let write_conn = new_conn()?; + let path = write_conn.path().map(|p| p.to_string()); + let in_memory = path.as_ref().is_none_or(|s| { + // Returns empty string for in-memory databases. + return s.is_empty(); + }); + + let n_read_threads: i64 = match (in_memory, n_read_threads.unwrap_or(0)) { + (true, _) => { + // We cannot share an in-memory database across threads, they're all independent. + 0 + } + (false, 1) => { + warn!("A single reader thread won't improve performance, falling back to 0."); + 0 + } + (false, n) => { + if let Ok(max) = std::thread::available_parallelism() + && n > max.get() + { + warn!( + "Num read threads '{n}' exceeds hardware parallelism: {}", + max.get() + ); + } + n as i64 + } + }; + + let conns = Arc::new(RwLock::new(ConnectionVec({ + let mut conns = vec![write_conn]; + for _ in 0..(n_read_threads - 1).max(0) { + conns.push(new_conn()?); + } + conns.into() + }))); + + assert_eq!(n_read_threads.max(1) as usize, conns.read().0.len()); + + // Spawn writer. + let (shared_write_sender, shared_write_receiver) = kanal::unbounded::(); + { + let conns = conns.clone(); + std::thread::Builder::new() + .name("tb-sqlite-writer".to_string()) + .spawn(move || event_loop(0, conns, shared_write_receiver)) + .expect("startup"); + } + + // Spawn readers. + let shared_read_sender = if n_read_threads > 0 { + let (shared_read_sender, shared_read_receiver) = kanal::unbounded::(); + for i in 0..n_read_threads { + // NOTE: read and writer threads are sharing the first conn, given they're mutually + // exclusive. + let index = i as usize; + let shared_read_receiver = shared_read_receiver.clone(); + let conns = conns.clone(); + + std::thread::Builder::new() + .name(format!("tb-sqlite-reader-{index}")) + .spawn(move || event_loop(index, conns, shared_read_receiver)) + .expect("startup"); + } + shared_read_sender + } else { + shared_write_sender.clone() + }; + + debug!( + "Opened SQLite DB '{}' with {n_read_threads} reader threads", + path.as_deref().unwrap_or("") + ); + + return Ok(Self { + id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst), + reader: shared_read_sender, + writer: shared_write_sender, + conns, + }); + } + + pub fn id(&self) -> usize { + return self.id; + } + + pub fn threads(&self) -> usize { + return self.conns.read().0.len(); + } + + #[inline] + pub fn write_lock(&self) -> LockGuard<'_> { + return LockGuard { + guard: self.conns.write(), + }; + } + + #[inline] + pub fn try_write_arc_lock_for(&self, duration: tokio::time::Duration) -> Option { + return self + .conns + .try_write_arc_for(duration) + .map(|guard| ArcLockGuard { guard }); + } + + #[inline] + pub(crate) fn map( + &self, + f: impl Fn(&rusqlite::Connection) -> Result<(), Error> + Send + 'static, + ) -> Result<(), Error> { + let lock = self.conns.write(); + for conn in &lock.0 { + f(conn)?; + } + return Ok(()); + } + + #[inline] + pub async fn call(&self, function: F) -> Result + where + F: FnOnce(&mut rusqlite::Connection) -> Result + Send + 'static, + R: Send + 'static, + { + // return call_impl(&self.writer, function).await; + let (sender, receiver) = oneshot::channel::>(); + + self + .writer + .send(Message::RunMut(Box::new(move |conn| { + if !sender.is_closed() { + let _ = sender.send(function(conn)); + } + }))) + .map_err(|_| Error::ConnectionClosed)?; + + receiver.await.map_err(|_| Error::ConnectionClosed)? + } + + #[inline] + pub async fn call_reader(&self, function: F) -> Result + where + F: FnOnce(&rusqlite::Connection) -> Result + Send + 'static, + R: Send + 'static, + { + let (sender, receiver) = oneshot::channel::>(); + + self + .reader + .send(Message::RunConst(Box::new(move |conn| { + if !sender.is_closed() { + let _ = sender.send(function(conn)); + } + }))) + .map_err(|_| Error::ConnectionClosed)?; + + receiver.await.map_err(|_| Error::ConnectionClosed)? + } + + #[inline] + pub async fn write_query_rows_f( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + f: impl (FnOnce(rusqlite::Rows<'_>) -> Result) + Send + 'static, + ) -> Result + where + T: Send + 'static, + { + return self + .call(move |conn: &mut rusqlite::Connection| { + let mut stmt = conn.prepare_cached(sql.as_ref())?; + + params.bind(&mut stmt)?; + + return f(stmt.raw_query()); + }) + .await; + } + + #[inline] + pub async fn read_query_rows_f( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + f: impl (FnOnce(rusqlite::Rows<'_>) -> Result) + Send + 'static, + ) -> Result + where + T: Send + 'static, + { + return self + .call_reader(move |conn: &rusqlite::Connection| { + let mut stmt = conn.prepare_cached(sql.as_ref())?; + assert!(stmt.readonly()); + + params.bind(&mut stmt)?; + + return f(stmt.raw_query()); + }) + .await; + } + + pub async fn execute( + &self, + sql: impl AsRef + Send + 'static, + params: impl Params + Send + 'static, + ) -> Result { + return self + .call(move |conn: &mut rusqlite::Connection| { + let mut stmt = conn.prepare_cached(sql.as_ref())?; + + params.bind(&mut stmt)?; + + return Ok(stmt.raw_execute()?); + }) + .await; + } + + pub async fn execute_batch(&self, sql: impl AsRef + Send + 'static) -> Result<(), Error> { + self + .call(move |conn: &mut rusqlite::Connection| { + let mut batch = rusqlite::Batch::new(conn, sql.as_ref()); + while let Some(mut stmt) = batch.next()? { + // NOTE: We must use `raw_query` instead of `raw_execute`, otherwise queries + // returning rows (e.g. SELECT) will return an error. Rusqlite's batch_execute + // behaves consistently. + let _row = stmt.raw_query().next()?; + } + return Ok(()); + }) + .await?; + + return Ok(()); + } + + pub async fn close(self) -> Result<(), Error> { + let _ = self.writer.send(Message::Terminate); + while self.reader.send(Message::Terminate).is_ok() { + // Continue to close readers while the channel is alive. + } + + let mut errors = vec![]; + let conns: ConnectionVec = std::mem::take(&mut self.conns.write()); + for conn in conns.0 { + // NOTE: rusqlite's `Connection::close()` returns itself, to allow users to retry + // failed closes. We on the other, may be left in a partially closed state with multiple + // connections. Ignorance is bliss. + if let Err((_self, err)) = conn.close() { + errors.push(err); + }; + } + + if !errors.is_empty() { + warn!("Closing connection: {errors:?}"); + return Err(errors.swap_remove(0).into()); + } + + return Ok(()); + } +} + +fn event_loop(id: usize, conns: Arc>, receiver: Receiver) { + while let Ok(message) = receiver.recv() { + match message { + Message::RunConst(f) => { + let lock = conns.read(); + f(&lock.0[id]) + } + Message::RunMut(f) => { + let mut lock = conns.write(); + f(&mut lock.0[0]) + } + Message::Terminate => { + return; + } + }; + } +} + +pub struct LockGuard<'a> { + guard: parking_lot::RwLockWriteGuard<'a, ConnectionVec>, +} + +impl Deref for LockGuard<'_> { + type Target = rusqlite::Connection; + #[inline] + fn deref(&self) -> &rusqlite::Connection { + return &self.guard.deref().0[0]; + } +} + +impl DerefMut for LockGuard<'_> { + #[inline] + fn deref_mut(&mut self) -> &mut rusqlite::Connection { + return &mut self.guard.deref_mut().0[0]; + } +} + +pub struct ArcLockGuard { + guard: parking_lot::ArcRwLockWriteGuard, +} + +impl Deref for ArcLockGuard { + type Target = rusqlite::Connection; + #[inline] + fn deref(&self) -> &rusqlite::Connection { + return &self.guard.deref().0[0]; + } +} + +impl DerefMut for ArcLockGuard { + #[inline] + fn deref_mut(&mut self) -> &mut rusqlite::Connection { + return &mut self.guard.deref_mut().0[0]; + } +} + +static UNIQUE_CONN_ID: AtomicUsize = AtomicUsize::new(0); diff --git a/crates/sqlite/src/sqlite/mod.rs b/crates/sqlite/src/sqlite/mod.rs index 4e769a81..e1f4d82e 100644 --- a/crates/sqlite/src/sqlite/mod.rs +++ b/crates/sqlite/src/sqlite/mod.rs @@ -1,7 +1,7 @@ -pub mod batch; -pub mod connection; -pub mod util; +mod batch; +pub(super) mod connection; +mod executor; +mod util; -pub use util::extract_record_values; -pub use util::extract_row_id; -pub use util::list_databases; +pub use batch::execute_batch; +pub use util::{extract_record_values, extract_row_id, from_rows, list_databases}; diff --git a/crates/sqlite/src/sqlite/util.rs b/crates/sqlite/src/sqlite/util.rs index bb6db783..a2587771 100644 --- a/crates/sqlite/src/sqlite/util.rs +++ b/crates/sqlite/src/sqlite/util.rs @@ -1,9 +1,90 @@ use rusqlite::hooks::PreUpdateCase; +use std::str::FromStr; +use std::sync::Arc; use crate::database::Database; use crate::error::Error; +use crate::from_sql::{FromSql, FromSqlError}; +use crate::rows::{Column, Row, Rows, ValueType}; use crate::value::Value; +#[inline] +pub(crate) fn map_first( + mut rows: rusqlite::Rows<'_>, + f: impl (FnOnce(&rusqlite::Row<'_>) -> Result) + Send + 'static, +) -> Result, Error> +where + T: Send + 'static, +{ + if let Some(row) = rows.next()? { + return Ok(Some(f(row)?)); + } + return Ok(None); +} + +#[inline] +pub fn get_value(row: &rusqlite::Row<'_>, idx: usize) -> Result { + let value = row.get_ref(idx)?; + + return FromSql::column_result(value.into()).map_err(|err| { + use rusqlite::Error as RError; + + return Error::Rusqlite(match err { + FromSqlError::InvalidType => { + RError::InvalidColumnType(idx, "".into(), value.data_type()) + } + FromSqlError::OutOfRange(i) => RError::IntegralValueOutOfRange(idx, i), + FromSqlError::Utf8Error(err) => RError::Utf8Error(idx, err), + FromSqlError::Other(err) => RError::FromSqlConversionFailure(idx, value.data_type(), err), + FromSqlError::InvalidBlobSize { .. } => { + RError::FromSqlConversionFailure(idx, value.data_type(), Box::new(err)) + } + }); + }); +} + +pub fn from_rows(mut rows: rusqlite::Rows) -> Result { + let columns: Arc> = Arc::new(rows.as_ref().map_or_else(Vec::new, columns)); + + let mut result = vec![]; + while let Some(row) = rows.next()? { + result.push(self::from_row(row, columns.clone())?); + } + + return Ok(Rows(result, columns)); +} + +pub(super) fn from_row(row: &rusqlite::Row, cols: Arc>) -> Result { + #[cfg(debug_assertions)] + if let Some(rc) = Some(columns(row.as_ref())) + && rc.len() != cols.len() + { + // Apparently this can happen during schema manipulations, e.g. when deleting a column + // :shrug:. We normalize everything to the same rows schema rather than dealing with + // jagged tables. + log::warn!("Rows/row column mismatch: {cols:?} vs {rc:?}"); + } + + // We have to access by index here, since names can be duplicate. + let values = (0..cols.len()) + .map(|idx| row.get(idx).unwrap_or(Value::Null)) + .collect(); + + return Ok(Row(values, cols)); +} + +#[inline] +pub(super) fn columns(stmt: &rusqlite::Statement<'_>) -> Vec { + return stmt + .columns() + .into_iter() + .map(|c| Column { + name: c.name().to_string(), + decl_type: c.decl_type().and_then(|s| ValueType::from_str(s).ok()), + }) + .collect(); +} + #[inline] pub fn extract_row_id(case: &PreUpdateCase) -> Option { return match case { diff --git a/crates/sqlite/src/tests.rs b/crates/sqlite/src/tests.rs index b1704aa1..d12965f3 100644 --- a/crates/sqlite/src/tests.rs +++ b/crates/sqlite/src/tests.rs @@ -278,7 +278,7 @@ async fn test_execute_and_query() { assert_eq!(person.id, 1); assert_eq!(person.name, "baz"); - let rows = crate::sqlite::batch::execute_batch( + let rows = crate::sqlite::execute_batch( &conn, r#" CREATE TABLE foo (id INTEGER) STRICT; diff --git a/crates/wasm-runtime-host/src/host.rs b/crates/wasm-runtime-host/src/host.rs index f571a86b..5bc13eed 100644 --- a/crates/wasm-runtime-host/src/host.rs +++ b/crates/wasm-runtime-host/src/host.rs @@ -263,7 +263,7 @@ impl self::trailbase::database::sqlite::Host for State { .bind(&mut stmt) .map_err(|err| TxError::Other(err.to_string()))?; - let rows = trailbase_sqlite::rows::from_rows(stmt.raw_query()) + let rows = trailbase_sqlite::sqlite::from_rows(stmt.raw_query()) .map_err(|err| TxError::Other(err.to_string()))?; let values: Vec<_> = rows diff --git a/crates/wasm-runtime-host/src/sqlite.rs b/crates/wasm-runtime-host/src/sqlite.rs index fb24147a..3fe64d11 100644 --- a/crates/wasm-runtime-host/src/sqlite.rs +++ b/crates/wasm-runtime-host/src/sqlite.rs @@ -7,7 +7,7 @@ use sqlite3_parser::ast::{Expr, OneSelect, ResultColumn, Select, Stmt}; use tokio::time::Duration; use trailbase_schema::parse::parse_into_statement; use trailbase_schema::sqlite::unquote_expr; -use trailbase_sqlite::{Rows, connection::ArcLockGuard}; +use trailbase_sqlite::{ArcLockGuard, Rows}; use trailbase_sqlvalue::{DecodeError, SqlValue}; use trailbase_wasm_common::{SqliteRequest, SqliteResponse}; use wasmtime_wasi_http::p2::bindings::http::types::ErrorCode;