Move trailbase-sqlite APIs around and further untangle rusqlite from Row[s].

This commit is contained in:
Sebastian Jeltsch 2026-04-15 10:23:31 +02:00
parent 32648c44aa
commit 6e9f0bb1e3
17 changed files with 728 additions and 721 deletions

View file

@ -75,7 +75,7 @@ tracing-subscriber = { version = "0.3.18", default-features = false, features =
trailbase = { path = "crates/core", version = "0.2.0", default-features = false, features=["wasm"] }
trailbase-assets = { path = "crates/assets", version = "0.2.0" }
trailbase-build = { path = "crates/build", version = "0.1.1" }
trailbase-client = { path = "crates/client", version = "0.8.1" }
trailbase-client = { path = "crates/client", version = "0.8.0" }
trailbase-extension = { path = "crates/extension", version = "0.3.0" }
trailbase-qs = { path = "crates/qs", version = "0.1.0" }
trailbase-reactive = { path = "crates/reactive", version = "0.1.0" }

View file

@ -77,8 +77,7 @@ pub async fn query_handler(
.as_ref(),
)?;
let batched_rows_result =
trailbase_sqlite::sqlite::batch::execute_batch(&conn, request.query).await;
let batched_rows_result = trailbase_sqlite::sqlite::execute_batch(&conn, request.query).await;
// In the fallback case we always need to invalidate the cache.
if must_invalidate_schema_cache {

View file

@ -352,7 +352,7 @@ fn init_main_db_impl(
return Ok(conn);
},
trailbase_sqlite::connection::Options {
trailbase_sqlite::Options {
n_read_threads: match (data_dir, std::thread::available_parallelism()) {
(None, _) => Some(0),
(Some(_), Ok(n)) => Some(n.get().clamp(2, 4)),

View file

@ -10,8 +10,7 @@ use rand::RngExt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use trailbase_sqlite::Value;
use trailbase_sqlite::connection::{Connection, Options};
use trailbase_sqlite::{Connection, Options, Value};
use crate::connection::{AsyncConnection, SharedRusqlite, ThreadLocalRusqlite};
use crate::error::BenchmarkError;

View file

@ -1,299 +1 @@
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::database::Database;
use crate::error::Error;
use crate::from_sql::FromSql;
use crate::params::Params;
use crate::rows::{Row, Rows, columns};
use crate::sqlite::connection::{ConnectionImpl, get_value, map_first};
// NOTE: We should probably decouple from the impl.
pub use crate::sqlite::connection::{ArcLockGuard, LockGuard, Options};
/// A handle to call functions in background thread.
#[derive(Clone)]
pub struct Connection {
c: ConnectionImpl,
}
impl Connection {
pub fn new<E>(builder: impl Fn() -> Result<rusqlite::Connection, E>) -> Result<Self, E> {
return Self::with_opts(builder, Options::default());
}
pub fn with_opts<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Options,
) -> std::result::Result<Self, E> {
return Ok(Self {
c: ConnectionImpl::new(builder, opt)?,
});
}
/// Open a new connection to an in-memory SQLite database.
///
/// # Failure
///
/// Will return `Err` if the underlying SQLite open call fails.
pub fn open_in_memory() -> Result<Self, Error> {
let conn = Self::with_opts(
rusqlite::Connection::open_in_memory,
Options {
n_read_threads: Some(0),
..Default::default()
},
)?;
assert_eq!(1, conn.threads());
return Ok(conn);
}
pub fn id(&self) -> usize {
return self.c.id();
}
pub fn threads(&self) -> usize {
return self.c.threads();
}
pub fn write_lock(&self) -> LockGuard<'_> {
return self.c.write_lock();
}
pub fn try_write_arc_lock_for(&self, duration: tokio::time::Duration) -> Option<ArcLockGuard> {
return self.c.try_write_arc_lock_for(duration);
}
/// Call a function in background thread and get the result
/// asynchronously.
///
/// # Failure
///
/// Will return `Err` if the database connection has been closed.
pub async fn call<F, R>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&mut rusqlite::Connection) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
return self.c.call(function).await;
}
pub async fn call_reader<F, R>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
return self.c.call_reader(function).await;
}
/// Query SQL statement.
pub async fn read_query_rows(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Rows, Error> {
return self
.c
.read_query_rows_f(sql, params, crate::rows::from_rows)
.await;
}
pub async fn read_query_row(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Option<Row>, Error> {
return self
.c
.read_query_rows_f(sql, params, |rows| {
return map_first(rows, |row| {
return crate::rows::from_row(row, Arc::new(columns(row.as_ref())));
});
})
.await;
}
pub async fn read_query_row_get<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
index: usize,
) -> Result<Option<T>, Error>
where
T: FromSql + Send + 'static,
{
return self
.c
.read_query_rows_f(sql, params, move |rows| {
return map_first(rows, move |row| {
return get_value(row, index);
});
})
.await;
}
pub async fn read_query_value<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Option<T>, Error> {
return self
.c
.read_query_rows_f(sql, params, |rows| {
return map_first(rows, move |row| {
serde_rusqlite::from_row(row).map_err(Error::DeserializeValue)
});
})
.await;
}
pub async fn read_query_values<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Vec<T>, Error> {
return self
.c
.read_query_rows_f(sql, params, |rows| {
return serde_rusqlite::from_rows(rows)
.collect::<Result<Vec<_>, _>>()
.map_err(Error::DeserializeValue);
})
.await;
}
pub async fn write_query_rows(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Rows, Error> {
return self
.c
.write_query_rows_f(sql, params, crate::rows::from_rows)
.await;
}
pub async fn query_row_get<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
index: usize,
) -> Result<Option<T>, Error>
where
T: FromSql + Send + 'static,
{
return self
.c
.write_query_rows_f(sql, params, move |rows| {
return map_first(rows, move |row| {
return get_value(row, index);
});
})
.await;
}
pub async fn write_query_value<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Option<T>, Error> {
return self
.c
.write_query_rows_f(sql, params, |rows| {
return map_first(rows, |row| {
serde_rusqlite::from_row(row).map_err(Error::DeserializeValue)
});
})
.await;
}
pub async fn write_query_values<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Vec<T>, Error> {
return self
.c
.write_query_rows_f(sql, params, |rows| {
return serde_rusqlite::from_rows(rows)
.collect::<Result<Vec<_>, _>>()
.map_err(Error::DeserializeValue);
})
.await;
}
/// Execute SQL statement.
pub async fn execute(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<usize, Error> {
return self.c.execute(sql, params).await;
}
/// Batch execute provided SQL statementsi in batch.
pub async fn execute_batch(&self, sql: impl AsRef<str> + Send + 'static) -> Result<(), Error> {
return self.c.execute_batch(sql).await;
}
pub fn attach(&self, path: &str, name: &str) -> Result<(), Error> {
let query = format!("ATTACH DATABASE '{path}' AS {name} ");
return self.c.map(move |conn| {
conn.execute(&query, ())?;
return Ok(());
});
}
pub fn detach(&self, name: &str) -> Result<(), Error> {
let query = format!("DETACH DATABASE {name}");
return self.c.map(move |conn| {
conn.execute(&query, ())?;
return Ok(());
});
}
pub async fn list_databases(&self) -> Result<Vec<Database>, Error> {
return self
.c
.call_reader(crate::sqlite::util::list_databases)
.await;
}
/// Close the database connection.
///
/// This is functionally equivalent to the `Drop` implementation for `Connection`. It consumes
/// the `Connection`, but on error returns it to the caller for retry purposes.
///
/// If successful, any following `close` operations performed on `Connection` copies will succeed
/// immediately.
///
/// # Failure
///
/// Will return `Err` if the underlying SQLite close call fails.
pub async fn close(self) -> Result<(), Error> {
return self.c.close().await;
}
}
impl Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection").finish()
}
}
impl Hash for Connection {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
impl PartialEq for Connection {
fn eq(&self, other: &Self) -> bool {
return self.id() == other.id();
}
}
impl Eq for Connection {}
pub use crate::sqlite::connection::{ArcLockGuard, Connection, LockGuard, Options};

View file

@ -3,6 +3,16 @@ pub enum Error {
#[error("ConnectionClosed")]
ConnectionClosed,
/// Error when the value of a particular column is requested, but the type
/// of the result in that column cannot be converted to the requested
/// Rust type.
#[error("InvalidColumnType({idx}, {name}, {decl_type:?})")]
InvalidColumnType {
idx: usize,
name: String,
decl_type: Option<crate::rows::ValueType>,
},
// QUESTION: This is leaky. How often do downstream users have to introspect on this
// rusqlite::Error. Otherwise, should/could this be more opaue.
#[error("Rusqlite: {0}")]

View file

@ -20,13 +20,13 @@ impl DummyConnection {
}
#[allow(unused)]
enum PolymorphicConnection {
pub enum DummyPolymorphicConnection {
Sqlite(Connection),
Dummy(DummyConnection),
}
#[allow(unused)]
impl PolymorphicConnection {
impl DummyPolymorphicConnection {
pub async fn read_query_row_get<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
@ -41,6 +41,13 @@ impl PolymorphicConnection {
Self::Dummy(c) => c.read_query_row_get(sql, params, index).await,
}
}
pub fn sqlite_connection(&self) -> Option<&Connection> {
match self {
Self::Sqlite(conn) => Some(conn),
Self::Dummy(_) => None,
}
}
}
#[cfg(test)]
@ -50,7 +57,7 @@ mod tests {
#[tokio::test]
async fn polymorphic_test() {
let conn = Connection::open_in_memory().unwrap();
let p = PolymorphicConnection::Sqlite(conn.clone());
let p = DummyPolymorphicConnection::Sqlite(conn.clone());
conn
.execute_batch(

View file

@ -10,20 +10,20 @@
clippy::needless_continue
)]
pub mod connection;
pub mod database;
pub mod error;
mod connection;
mod database;
mod error;
pub mod from_sql;
pub mod params;
pub mod rows;
mod params;
mod rows;
pub mod sqlite;
pub mod to_sql;
pub mod value;
mod value;
#[cfg(debug_assertions)]
pub mod experimental;
mod experimental;
pub use connection::Connection;
pub use connection::{ArcLockGuard, Connection, LockGuard, Options};
pub use database::Database;
pub use error::Error;
pub use params::{NamedParamRef, NamedParams, NamedParamsRef, Params};

View file

@ -1,4 +1,4 @@
use rusqlite::{Statement, types};
use rusqlite::types;
use std::fmt::Debug;
use std::ops::Index;
use std::str::FromStr;
@ -33,8 +33,8 @@ impl FromStr for ValueType {
#[derive(Debug, Clone, PartialEq)]
pub struct Column {
name: String,
decl_type: Option<ValueType>,
pub(crate) name: String,
pub(crate) decl_type: Option<ValueType>,
}
#[derive(Debug)]
@ -74,38 +74,18 @@ impl Rows {
}
pub fn column_type(&self, idx: usize) -> Result<ValueType, Error> {
if let Some(c) = self.1.get(idx) {
return Ok(c.decl_type.ok_or_else(|| {
rusqlite::Error::InvalidColumnType(
idx,
self.column_name(idx).unwrap_or("?").to_string(),
types::Type::Null,
)
})?);
}
return Err(
rusqlite::Error::InvalidColumnType(
return self
.1
.get(idx)
.and_then(|c| c.decl_type)
.ok_or_else(|| Error::InvalidColumnType {
idx,
self.column_name(idx).unwrap_or("?").to_string(),
types::Type::Null,
)
.into(),
);
name: self.column_name(idx).unwrap_or("?").to_string(),
decl_type: None,
});
}
}
pub fn from_rows(mut rows: rusqlite::Rows) -> Result<Rows, Error> {
let columns: Arc<Vec<Column>> = Arc::new(rows.as_ref().map_or_else(Vec::new, columns));
let mut result = vec![];
while let Some(row) = rows.next()? {
result.push(self::from_row(row, columns.clone())?);
}
return Ok(Rows(result, columns));
}
impl Index<usize> for Rows {
type Output = Row;
@ -123,17 +103,6 @@ impl IntoIterator for Rows {
}
}
pub(crate) fn columns(stmt: &Statement<'_>) -> Vec<Column> {
return stmt
.columns()
.into_iter()
.map(|c| Column {
name: c.name().to_string(),
decl_type: c.decl_type().and_then(|s| ValueType::from_str(s).ok()),
})
.collect();
}
#[derive(Debug)]
pub struct Row(pub Vec<Value>, pub Arc<Vec<Column>>);
@ -181,25 +150,6 @@ impl Row {
}
}
pub(crate) fn from_row(row: &rusqlite::Row, cols: Arc<Vec<Column>>) -> Result<Row, Error> {
#[cfg(debug_assertions)]
if let Some(rc) = Some(columns(row.as_ref()))
&& rc.len() != cols.len()
{
// Apparently this can happen during schema manipulations, e.g. when deleting a column
// :shrug:. We normalize everything to the same rows schema rather than dealing with
// jagged tables.
log::warn!("Rows/row column mismatch: {cols:?} vs {rc:?}");
}
// We have to access by index here, since names can be duplicate.
let values = (0..cols.len())
.map(|idx| row.get(idx).unwrap_or(Value::Null))
.collect();
return Ok(Row(values, cols));
}
impl Index<usize> for Row {
type Output = Value;

View file

@ -1,9 +1,10 @@
use rusqlite::fallible_iterator::FallibleIterator;
use std::sync::Arc;
use super::util::{columns, from_row};
use crate::connection::Connection;
use crate::error::Error;
use crate::rows::{Column, Rows, columns, from_row};
use crate::rows::{Column, Rows};
/// Batch execute SQL statements and return rows of last statement.
///
@ -31,7 +32,7 @@ pub async fn execute_batch(
let mut result = vec![from_row(row, cols.clone())?];
while let Some(row) = rows.next()? {
result.push(crate::rows::from_row(row, cols.clone())?);
result.push(from_row(row, cols.clone())?);
}
return Ok(Some(Rows(result, cols)));
}

View file

@ -1,415 +1,294 @@
use kanal::{Receiver, Sender};
use log::*;
use parking_lot::RwLock;
use rusqlite::fallible_iterator::FallibleIterator;
use std::ops::{Deref, DerefMut};
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::oneshot;
use crate::database::Database;
use crate::error::Error;
use crate::from_sql::{FromSql, FromSqlError};
use crate::from_sql::FromSql;
use crate::params::Params;
use crate::rows::{Row, Rows};
use crate::sqlite::executor::Executor;
use crate::sqlite::util::{columns, from_row, from_rows, get_value, map_first};
#[derive(Default)]
struct ConnectionVec(smallvec::SmallVec<[rusqlite::Connection; 32]>);
// NOTE: We must never access the same connection concurrently even as immutable &Connection, due
// to intrinsic statement cache. We can ensure this by uniquely assigning one connection to each
// thread.
unsafe impl Sync for ConnectionVec {}
enum Message {
RunMut(Box<dyn FnOnce(&mut rusqlite::Connection) + Send>),
RunConst(Box<dyn FnOnce(&rusqlite::Connection) + Send>),
Terminate,
}
#[derive(Clone, Default)]
pub struct Options {
pub busy_timeout: Option<std::time::Duration>,
pub n_read_threads: Option<usize>,
}
// NOTE: We should probably decouple from the impl.
pub use crate::sqlite::executor::{ArcLockGuard, LockGuard, Options};
/// A handle to call functions in background thread.
#[derive(Clone)]
pub(crate) struct ConnectionImpl {
id: usize,
reader: Sender<Message>,
writer: Sender<Message>,
// NOTE: Is shared across reader and writer worker threads.
conns: Arc<RwLock<ConnectionVec>>,
pub struct Connection {
exec: Executor,
}
impl ConnectionImpl {
pub fn new<E>(
impl Connection {
pub fn new<E>(builder: impl Fn() -> Result<rusqlite::Connection, E>) -> Result<Self, E> {
return Self::with_opts(builder, Options::default());
}
pub fn with_opts<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Options,
) -> Result<Self, E> {
let Options {
busy_timeout,
n_read_threads,
} = opt;
let new_conn = || -> Result<rusqlite::Connection, E> {
let conn = builder()?;
if let Some(busy_timeout) = busy_timeout {
conn
.busy_timeout(busy_timeout)
.expect("busy timeout failed");
}
return Ok(conn);
};
let write_conn = new_conn()?;
let path = write_conn.path().map(|p| p.to_string());
let in_memory = path.as_ref().is_none_or(|s| {
// Returns empty string for in-memory databases.
return s.is_empty();
});
let n_read_threads: i64 = match (in_memory, n_read_threads.unwrap_or(0)) {
(true, _) => {
// We cannot share an in-memory database across threads, they're all independent.
0
}
(false, 1) => {
warn!("A single reader thread won't improve performance, falling back to 0.");
0
}
(false, n) => {
if let Ok(max) = std::thread::available_parallelism()
&& n > max.get()
{
warn!(
"Num read threads '{n}' exceeds hardware parallelism: {}",
max.get()
);
}
n as i64
}
};
let conns = Arc::new(RwLock::new(ConnectionVec({
let mut conns = vec![write_conn];
for _ in 0..(n_read_threads - 1).max(0) {
conns.push(new_conn()?);
}
conns.into()
})));
assert_eq!(n_read_threads.max(1) as usize, conns.read().0.len());
// Spawn writer.
let (shared_write_sender, shared_write_receiver) = kanal::unbounded::<Message>();
{
let conns = conns.clone();
std::thread::Builder::new()
.name("tb-sqlite-writer".to_string())
.spawn(move || event_loop(0, conns, shared_write_receiver))
.expect("startup");
}
// Spawn readers.
let shared_read_sender = if n_read_threads > 0 {
let (shared_read_sender, shared_read_receiver) = kanal::unbounded::<Message>();
for i in 0..n_read_threads {
// NOTE: read and writer threads are sharing the first conn, given they're mutually
// exclusive.
let index = i as usize;
let shared_read_receiver = shared_read_receiver.clone();
let conns = conns.clone();
std::thread::Builder::new()
.name(format!("tb-sqlite-reader-{index}"))
.spawn(move || event_loop(index, conns, shared_read_receiver))
.expect("startup");
}
shared_read_sender
} else {
shared_write_sender.clone()
};
debug!(
"Opened SQLite DB '{}' with {n_read_threads} reader threads",
path.as_deref().unwrap_or("<in-memory>")
);
) -> std::result::Result<Self, E> {
return Ok(Self {
id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst),
reader: shared_read_sender,
writer: shared_write_sender,
conns,
exec: Executor::new(builder, opt)?,
});
}
/// Open a new connection to an in-memory SQLite database.
///
/// # Failure
///
/// Will return `Err` if the underlying SQLite open call fails.
pub fn open_in_memory() -> Result<Self, Error> {
let conn = Self::with_opts(
rusqlite::Connection::open_in_memory,
Options {
n_read_threads: Some(0),
..Default::default()
},
)?;
assert_eq!(1, conn.threads());
return Ok(conn);
}
pub fn id(&self) -> usize {
return self.id;
return self.exec.id();
}
pub fn threads(&self) -> usize {
return self.conns.read().0.len();
return self.exec.threads();
}
#[inline]
pub fn write_lock(&self) -> LockGuard<'_> {
return LockGuard {
guard: self.conns.write(),
};
return self.exec.write_lock();
}
#[inline]
pub fn try_write_arc_lock_for(&self, duration: tokio::time::Duration) -> Option<ArcLockGuard> {
return self
.conns
.try_write_arc_for(duration)
.map(|guard| ArcLockGuard { guard });
return self.exec.try_write_arc_lock_for(duration);
}
#[inline]
pub(crate) fn map(
&self,
f: impl Fn(&rusqlite::Connection) -> Result<(), Error> + Send + 'static,
) -> Result<(), Error> {
let lock = self.conns.write();
for conn in &lock.0 {
f(conn)?;
}
return Ok(());
}
#[inline]
/// Call a function in background thread and get the result
/// asynchronously.
///
/// # Failure
///
/// Will return `Err` if the database connection has been closed.
pub async fn call<F, R>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&mut rusqlite::Connection) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
// return call_impl(&self.writer, function).await;
let (sender, receiver) = oneshot::channel::<Result<R, Error>>();
self
.writer
.send(Message::RunMut(Box::new(move |conn| {
if !sender.is_closed() {
let _ = sender.send(function(conn));
}
})))
.map_err(|_| Error::ConnectionClosed)?;
receiver.await.map_err(|_| Error::ConnectionClosed)?
return self.exec.call(function).await;
}
#[inline]
pub async fn call_reader<F, R>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = oneshot::channel::<Result<R, Error>>();
self
.reader
.send(Message::RunConst(Box::new(move |conn| {
if !sender.is_closed() {
let _ = sender.send(function(conn));
}
})))
.map_err(|_| Error::ConnectionClosed)?;
receiver.await.map_err(|_| Error::ConnectionClosed)?
return self.exec.call_reader(function).await;
}
#[inline]
pub async fn write_query_rows_f<T>(
/// Query SQL statement.
pub async fn read_query_rows(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
f: impl (FnOnce(rusqlite::Rows<'_>) -> Result<T, Error>) + Send + 'static,
) -> Result<T, Error>
where
T: Send + 'static,
{
) -> Result<Rows, Error> {
return self.exec.read_query_rows_f(sql, params, from_rows).await;
}
pub async fn read_query_row(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Option<Row>, Error> {
return self
.call(move |conn: &mut rusqlite::Connection| {
let mut stmt = conn.prepare_cached(sql.as_ref())?;
params.bind(&mut stmt)?;
return f(stmt.raw_query());
.exec
.read_query_rows_f(sql, params, |rows| {
return map_first(rows, |row| {
return from_row(row, Arc::new(columns(row.as_ref())));
});
})
.await;
}
#[inline]
pub async fn read_query_rows_f<T>(
pub async fn read_query_row_get<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
f: impl (FnOnce(rusqlite::Rows<'_>) -> Result<T, Error>) + Send + 'static,
) -> Result<T, Error>
index: usize,
) -> Result<Option<T>, Error>
where
T: Send + 'static,
T: FromSql + Send + 'static,
{
return self
.call_reader(move |conn: &rusqlite::Connection| {
let mut stmt = conn.prepare_cached(sql.as_ref())?;
assert!(stmt.readonly());
params.bind(&mut stmt)?;
return f(stmt.raw_query());
.exec
.read_query_rows_f(sql, params, move |rows| {
return map_first(rows, move |row| {
return get_value(row, index);
});
})
.await;
}
pub async fn read_query_value<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Option<T>, Error> {
return self
.exec
.read_query_rows_f(sql, params, |rows| {
return map_first(rows, move |row| {
serde_rusqlite::from_row(row).map_err(Error::DeserializeValue)
});
})
.await;
}
pub async fn read_query_values<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Vec<T>, Error> {
return self
.exec
.read_query_rows_f(sql, params, |rows| {
return serde_rusqlite::from_rows(rows)
.collect::<Result<Vec<_>, _>>()
.map_err(Error::DeserializeValue);
})
.await;
}
pub async fn write_query_rows(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Rows, Error> {
return self.exec.write_query_rows_f(sql, params, from_rows).await;
}
pub async fn query_row_get<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
index: usize,
) -> Result<Option<T>, Error>
where
T: FromSql + Send + 'static,
{
return self
.exec
.write_query_rows_f(sql, params, move |rows| {
return map_first(rows, move |row| {
return get_value(row, index);
});
})
.await;
}
pub async fn write_query_value<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Option<T>, Error> {
return self
.exec
.write_query_rows_f(sql, params, |rows| {
return map_first(rows, |row| {
serde_rusqlite::from_row(row).map_err(Error::DeserializeValue)
});
})
.await;
}
pub async fn write_query_values<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<Vec<T>, Error> {
return self
.exec
.write_query_rows_f(sql, params, |rows| {
return serde_rusqlite::from_rows(rows)
.collect::<Result<Vec<_>, _>>()
.map_err(Error::DeserializeValue);
})
.await;
}
/// Execute SQL statement.
pub async fn execute(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<usize, Error> {
return self.exec.execute(sql, params).await;
}
/// Batch execute provided SQL statementsi in batch.
pub async fn execute_batch(&self, sql: impl AsRef<str> + Send + 'static) -> Result<(), Error> {
return self.exec.execute_batch(sql).await;
}
pub fn attach(&self, path: &str, name: &str) -> Result<(), Error> {
let query = format!("ATTACH DATABASE '{path}' AS {name} ");
return self.exec.map(move |conn| {
conn.execute(&query, ())?;
return Ok(());
});
}
pub fn detach(&self, name: &str) -> Result<(), Error> {
let query = format!("DETACH DATABASE {name}");
return self.exec.map(move |conn| {
conn.execute(&query, ())?;
return Ok(());
});
}
pub async fn list_databases(&self) -> Result<Vec<Database>, Error> {
return self
.call(move |conn: &mut rusqlite::Connection| {
let mut stmt = conn.prepare_cached(sql.as_ref())?;
params.bind(&mut stmt)?;
return Ok(stmt.raw_execute()?);
})
.exec
.call_reader(crate::sqlite::util::list_databases)
.await;
}
pub async fn execute_batch(&self, sql: impl AsRef<str> + Send + 'static) -> Result<(), Error> {
self
.call(move |conn: &mut rusqlite::Connection| {
let mut batch = rusqlite::Batch::new(conn, sql.as_ref());
while let Some(mut stmt) = batch.next()? {
// NOTE: We must use `raw_query` instead of `raw_execute`, otherwise queries
// returning rows (e.g. SELECT) will return an error. Rusqlite's batch_execute
// behaves consistently.
let _row = stmt.raw_query().next()?;
}
return Ok(());
})
.await?;
return Ok(());
}
/// Close the database connection.
///
/// This is functionally equivalent to the `Drop` implementation for `Connection`. It consumes
/// the `Connection`, but on error returns it to the caller for retry purposes.
///
/// If successful, any following `close` operations performed on `Connection` copies will succeed
/// immediately.
///
/// # Failure
///
/// Will return `Err` if the underlying SQLite close call fails.
pub async fn close(self) -> Result<(), Error> {
let _ = self.writer.send(Message::Terminate);
while self.reader.send(Message::Terminate).is_ok() {
// Continue to close readers while the channel is alive.
}
let mut errors = vec![];
let conns: ConnectionVec = std::mem::take(&mut self.conns.write());
for conn in conns.0 {
// NOTE: rusqlite's `Connection::close()` returns itself, to allow users to retry
// failed closes. We on the other, may be left in a partially closed state with multiple
// connections. Ignorance is bliss.
if let Err((_self, err)) = conn.close() {
errors.push(err);
};
}
if !errors.is_empty() {
warn!("Closing connection: {errors:?}");
return Err(errors.swap_remove(0).into());
}
return Ok(());
return self.exec.close().await;
}
}
fn event_loop(id: usize, conns: Arc<RwLock<ConnectionVec>>, receiver: Receiver<Message>) {
while let Ok(message) = receiver.recv() {
match message {
Message::RunConst(f) => {
let lock = conns.read();
f(&lock.0[id])
}
Message::RunMut(f) => {
let mut lock = conns.write();
f(&mut lock.0[0])
}
Message::Terminate => {
return;
}
};
impl Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection").finish()
}
}
pub struct LockGuard<'a> {
guard: parking_lot::RwLockWriteGuard<'a, ConnectionVec>,
}
impl Deref for LockGuard<'_> {
type Target = rusqlite::Connection;
#[inline]
fn deref(&self) -> &rusqlite::Connection {
return &self.guard.deref().0[0];
impl Hash for Connection {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
impl DerefMut for LockGuard<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut rusqlite::Connection {
return &mut self.guard.deref_mut().0[0];
impl PartialEq for Connection {
fn eq(&self, other: &Self) -> bool {
return self.id() == other.id();
}
}
pub struct ArcLockGuard {
guard: parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ConnectionVec>,
}
impl Deref for ArcLockGuard {
type Target = rusqlite::Connection;
#[inline]
fn deref(&self) -> &rusqlite::Connection {
return &self.guard.deref().0[0];
}
}
impl DerefMut for ArcLockGuard {
#[inline]
fn deref_mut(&mut self) -> &mut rusqlite::Connection {
return &mut self.guard.deref_mut().0[0];
}
}
#[inline]
pub(crate) fn map_first<T>(
mut rows: rusqlite::Rows<'_>,
f: impl (FnOnce(&rusqlite::Row<'_>) -> Result<T, Error>) + Send + 'static,
) -> Result<Option<T>, Error>
where
T: Send + 'static,
{
if let Some(row) = rows.next()? {
return Ok(Some(f(row)?));
}
return Ok(None);
}
#[inline]
pub fn get_value<T: FromSql>(row: &rusqlite::Row<'_>, idx: usize) -> Result<T, Error> {
let value = row.get_ref(idx)?;
return FromSql::column_result(value.into()).map_err(|err| {
use rusqlite::Error as RError;
return Error::Rusqlite(match err {
FromSqlError::InvalidType => {
RError::InvalidColumnType(idx, "<unknown>".into(), value.data_type())
}
FromSqlError::OutOfRange(i) => RError::IntegralValueOutOfRange(idx, i),
FromSqlError::Utf8Error(err) => RError::Utf8Error(idx, err),
FromSqlError::Other(err) => RError::FromSqlConversionFailure(idx, value.data_type(), err),
FromSqlError::InvalidBlobSize { .. } => {
RError::FromSqlConversionFailure(idx, value.data_type(), Box::new(err))
}
});
});
}
static UNIQUE_CONN_ID: AtomicUsize = AtomicUsize::new(0);
impl Eq for Connection {}

View file

@ -0,0 +1,379 @@
use kanal::{Receiver, Sender};
use log::*;
use parking_lot::RwLock;
use rusqlite::fallible_iterator::FallibleIterator;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::oneshot;
use crate::error::Error;
use crate::params::Params;
#[derive(Default)]
struct ConnectionVec(smallvec::SmallVec<[rusqlite::Connection; 32]>);
// NOTE: We must never access the same connection concurrently even as immutable &Connection, due
// to intrinsic statement cache. We can ensure this by uniquely assigning one connection to each
// thread.
unsafe impl Sync for ConnectionVec {}
enum Message {
RunMut(Box<dyn FnOnce(&mut rusqlite::Connection) + Send>),
RunConst(Box<dyn FnOnce(&rusqlite::Connection) + Send>),
Terminate,
}
#[derive(Clone, Default)]
pub struct Options {
pub busy_timeout: Option<std::time::Duration>,
pub n_read_threads: Option<usize>,
}
/// A handle to call functions in background thread.
#[derive(Clone)]
pub(crate) struct Executor {
id: usize,
reader: Sender<Message>,
writer: Sender<Message>,
// NOTE: Is shared across reader and writer worker threads.
conns: Arc<RwLock<ConnectionVec>>,
}
impl Executor {
pub fn new<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Options,
) -> Result<Self, E> {
let Options {
busy_timeout,
n_read_threads,
} = opt;
let new_conn = || -> Result<rusqlite::Connection, E> {
let conn = builder()?;
if let Some(busy_timeout) = busy_timeout {
conn
.busy_timeout(busy_timeout)
.expect("busy timeout failed");
}
return Ok(conn);
};
let write_conn = new_conn()?;
let path = write_conn.path().map(|p| p.to_string());
let in_memory = path.as_ref().is_none_or(|s| {
// Returns empty string for in-memory databases.
return s.is_empty();
});
let n_read_threads: i64 = match (in_memory, n_read_threads.unwrap_or(0)) {
(true, _) => {
// We cannot share an in-memory database across threads, they're all independent.
0
}
(false, 1) => {
warn!("A single reader thread won't improve performance, falling back to 0.");
0
}
(false, n) => {
if let Ok(max) = std::thread::available_parallelism()
&& n > max.get()
{
warn!(
"Num read threads '{n}' exceeds hardware parallelism: {}",
max.get()
);
}
n as i64
}
};
let conns = Arc::new(RwLock::new(ConnectionVec({
let mut conns = vec![write_conn];
for _ in 0..(n_read_threads - 1).max(0) {
conns.push(new_conn()?);
}
conns.into()
})));
assert_eq!(n_read_threads.max(1) as usize, conns.read().0.len());
// Spawn writer.
let (shared_write_sender, shared_write_receiver) = kanal::unbounded::<Message>();
{
let conns = conns.clone();
std::thread::Builder::new()
.name("tb-sqlite-writer".to_string())
.spawn(move || event_loop(0, conns, shared_write_receiver))
.expect("startup");
}
// Spawn readers.
let shared_read_sender = if n_read_threads > 0 {
let (shared_read_sender, shared_read_receiver) = kanal::unbounded::<Message>();
for i in 0..n_read_threads {
// NOTE: read and writer threads are sharing the first conn, given they're mutually
// exclusive.
let index = i as usize;
let shared_read_receiver = shared_read_receiver.clone();
let conns = conns.clone();
std::thread::Builder::new()
.name(format!("tb-sqlite-reader-{index}"))
.spawn(move || event_loop(index, conns, shared_read_receiver))
.expect("startup");
}
shared_read_sender
} else {
shared_write_sender.clone()
};
debug!(
"Opened SQLite DB '{}' with {n_read_threads} reader threads",
path.as_deref().unwrap_or("<in-memory>")
);
return Ok(Self {
id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst),
reader: shared_read_sender,
writer: shared_write_sender,
conns,
});
}
pub fn id(&self) -> usize {
return self.id;
}
pub fn threads(&self) -> usize {
return self.conns.read().0.len();
}
#[inline]
pub fn write_lock(&self) -> LockGuard<'_> {
return LockGuard {
guard: self.conns.write(),
};
}
#[inline]
pub fn try_write_arc_lock_for(&self, duration: tokio::time::Duration) -> Option<ArcLockGuard> {
return self
.conns
.try_write_arc_for(duration)
.map(|guard| ArcLockGuard { guard });
}
#[inline]
pub(crate) fn map(
&self,
f: impl Fn(&rusqlite::Connection) -> Result<(), Error> + Send + 'static,
) -> Result<(), Error> {
let lock = self.conns.write();
for conn in &lock.0 {
f(conn)?;
}
return Ok(());
}
#[inline]
pub async fn call<F, R>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&mut rusqlite::Connection) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
// return call_impl(&self.writer, function).await;
let (sender, receiver) = oneshot::channel::<Result<R, Error>>();
self
.writer
.send(Message::RunMut(Box::new(move |conn| {
if !sender.is_closed() {
let _ = sender.send(function(conn));
}
})))
.map_err(|_| Error::ConnectionClosed)?;
receiver.await.map_err(|_| Error::ConnectionClosed)?
}
#[inline]
pub async fn call_reader<F, R>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = oneshot::channel::<Result<R, Error>>();
self
.reader
.send(Message::RunConst(Box::new(move |conn| {
if !sender.is_closed() {
let _ = sender.send(function(conn));
}
})))
.map_err(|_| Error::ConnectionClosed)?;
receiver.await.map_err(|_| Error::ConnectionClosed)?
}
#[inline]
pub async fn write_query_rows_f<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
f: impl (FnOnce(rusqlite::Rows<'_>) -> Result<T, Error>) + Send + 'static,
) -> Result<T, Error>
where
T: Send + 'static,
{
return self
.call(move |conn: &mut rusqlite::Connection| {
let mut stmt = conn.prepare_cached(sql.as_ref())?;
params.bind(&mut stmt)?;
return f(stmt.raw_query());
})
.await;
}
#[inline]
pub async fn read_query_rows_f<T>(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
f: impl (FnOnce(rusqlite::Rows<'_>) -> Result<T, Error>) + Send + 'static,
) -> Result<T, Error>
where
T: Send + 'static,
{
return self
.call_reader(move |conn: &rusqlite::Connection| {
let mut stmt = conn.prepare_cached(sql.as_ref())?;
assert!(stmt.readonly());
params.bind(&mut stmt)?;
return f(stmt.raw_query());
})
.await;
}
pub async fn execute(
&self,
sql: impl AsRef<str> + Send + 'static,
params: impl Params + Send + 'static,
) -> Result<usize, Error> {
return self
.call(move |conn: &mut rusqlite::Connection| {
let mut stmt = conn.prepare_cached(sql.as_ref())?;
params.bind(&mut stmt)?;
return Ok(stmt.raw_execute()?);
})
.await;
}
pub async fn execute_batch(&self, sql: impl AsRef<str> + Send + 'static) -> Result<(), Error> {
self
.call(move |conn: &mut rusqlite::Connection| {
let mut batch = rusqlite::Batch::new(conn, sql.as_ref());
while let Some(mut stmt) = batch.next()? {
// NOTE: We must use `raw_query` instead of `raw_execute`, otherwise queries
// returning rows (e.g. SELECT) will return an error. Rusqlite's batch_execute
// behaves consistently.
let _row = stmt.raw_query().next()?;
}
return Ok(());
})
.await?;
return Ok(());
}
pub async fn close(self) -> Result<(), Error> {
let _ = self.writer.send(Message::Terminate);
while self.reader.send(Message::Terminate).is_ok() {
// Continue to close readers while the channel is alive.
}
let mut errors = vec![];
let conns: ConnectionVec = std::mem::take(&mut self.conns.write());
for conn in conns.0 {
// NOTE: rusqlite's `Connection::close()` returns itself, to allow users to retry
// failed closes. We on the other, may be left in a partially closed state with multiple
// connections. Ignorance is bliss.
if let Err((_self, err)) = conn.close() {
errors.push(err);
};
}
if !errors.is_empty() {
warn!("Closing connection: {errors:?}");
return Err(errors.swap_remove(0).into());
}
return Ok(());
}
}
fn event_loop(id: usize, conns: Arc<RwLock<ConnectionVec>>, receiver: Receiver<Message>) {
while let Ok(message) = receiver.recv() {
match message {
Message::RunConst(f) => {
let lock = conns.read();
f(&lock.0[id])
}
Message::RunMut(f) => {
let mut lock = conns.write();
f(&mut lock.0[0])
}
Message::Terminate => {
return;
}
};
}
}
pub struct LockGuard<'a> {
guard: parking_lot::RwLockWriteGuard<'a, ConnectionVec>,
}
impl Deref for LockGuard<'_> {
type Target = rusqlite::Connection;
#[inline]
fn deref(&self) -> &rusqlite::Connection {
return &self.guard.deref().0[0];
}
}
impl DerefMut for LockGuard<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut rusqlite::Connection {
return &mut self.guard.deref_mut().0[0];
}
}
pub struct ArcLockGuard {
guard: parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ConnectionVec>,
}
impl Deref for ArcLockGuard {
type Target = rusqlite::Connection;
#[inline]
fn deref(&self) -> &rusqlite::Connection {
return &self.guard.deref().0[0];
}
}
impl DerefMut for ArcLockGuard {
#[inline]
fn deref_mut(&mut self) -> &mut rusqlite::Connection {
return &mut self.guard.deref_mut().0[0];
}
}
static UNIQUE_CONN_ID: AtomicUsize = AtomicUsize::new(0);

View file

@ -1,7 +1,7 @@
pub mod batch;
pub mod connection;
pub mod util;
mod batch;
pub(super) mod connection;
mod executor;
mod util;
pub use util::extract_record_values;
pub use util::extract_row_id;
pub use util::list_databases;
pub use batch::execute_batch;
pub use util::{extract_record_values, extract_row_id, from_rows, list_databases};

View file

@ -1,9 +1,90 @@
use rusqlite::hooks::PreUpdateCase;
use std::str::FromStr;
use std::sync::Arc;
use crate::database::Database;
use crate::error::Error;
use crate::from_sql::{FromSql, FromSqlError};
use crate::rows::{Column, Row, Rows, ValueType};
use crate::value::Value;
#[inline]
pub(crate) fn map_first<T>(
mut rows: rusqlite::Rows<'_>,
f: impl (FnOnce(&rusqlite::Row<'_>) -> Result<T, Error>) + Send + 'static,
) -> Result<Option<T>, Error>
where
T: Send + 'static,
{
if let Some(row) = rows.next()? {
return Ok(Some(f(row)?));
}
return Ok(None);
}
#[inline]
pub fn get_value<T: FromSql>(row: &rusqlite::Row<'_>, idx: usize) -> Result<T, Error> {
let value = row.get_ref(idx)?;
return FromSql::column_result(value.into()).map_err(|err| {
use rusqlite::Error as RError;
return Error::Rusqlite(match err {
FromSqlError::InvalidType => {
RError::InvalidColumnType(idx, "<unknown>".into(), value.data_type())
}
FromSqlError::OutOfRange(i) => RError::IntegralValueOutOfRange(idx, i),
FromSqlError::Utf8Error(err) => RError::Utf8Error(idx, err),
FromSqlError::Other(err) => RError::FromSqlConversionFailure(idx, value.data_type(), err),
FromSqlError::InvalidBlobSize { .. } => {
RError::FromSqlConversionFailure(idx, value.data_type(), Box::new(err))
}
});
});
}
pub fn from_rows(mut rows: rusqlite::Rows) -> Result<Rows, Error> {
let columns: Arc<Vec<Column>> = Arc::new(rows.as_ref().map_or_else(Vec::new, columns));
let mut result = vec![];
while let Some(row) = rows.next()? {
result.push(self::from_row(row, columns.clone())?);
}
return Ok(Rows(result, columns));
}
pub(super) fn from_row(row: &rusqlite::Row, cols: Arc<Vec<Column>>) -> Result<Row, Error> {
#[cfg(debug_assertions)]
if let Some(rc) = Some(columns(row.as_ref()))
&& rc.len() != cols.len()
{
// Apparently this can happen during schema manipulations, e.g. when deleting a column
// :shrug:. We normalize everything to the same rows schema rather than dealing with
// jagged tables.
log::warn!("Rows/row column mismatch: {cols:?} vs {rc:?}");
}
// We have to access by index here, since names can be duplicate.
let values = (0..cols.len())
.map(|idx| row.get(idx).unwrap_or(Value::Null))
.collect();
return Ok(Row(values, cols));
}
#[inline]
pub(super) fn columns(stmt: &rusqlite::Statement<'_>) -> Vec<Column> {
return stmt
.columns()
.into_iter()
.map(|c| Column {
name: c.name().to_string(),
decl_type: c.decl_type().and_then(|s| ValueType::from_str(s).ok()),
})
.collect();
}
#[inline]
pub fn extract_row_id(case: &PreUpdateCase) -> Option<i64> {
return match case {

View file

@ -278,7 +278,7 @@ async fn test_execute_and_query() {
assert_eq!(person.id, 1);
assert_eq!(person.name, "baz");
let rows = crate::sqlite::batch::execute_batch(
let rows = crate::sqlite::execute_batch(
&conn,
r#"
CREATE TABLE foo (id INTEGER) STRICT;

View file

@ -263,7 +263,7 @@ impl self::trailbase::database::sqlite::Host for State {
.bind(&mut stmt)
.map_err(|err| TxError::Other(err.to_string()))?;
let rows = trailbase_sqlite::rows::from_rows(stmt.raw_query())
let rows = trailbase_sqlite::sqlite::from_rows(stmt.raw_query())
.map_err(|err| TxError::Other(err.to_string()))?;
let values: Vec<_> = rows

View file

@ -7,7 +7,7 @@ use sqlite3_parser::ast::{Expr, OneSelect, ResultColumn, Select, Stmt};
use tokio::time::Duration;
use trailbase_schema::parse::parse_into_statement;
use trailbase_schema::sqlite::unquote_expr;
use trailbase_sqlite::{Rows, connection::ArcLockGuard};
use trailbase_sqlite::{ArcLockGuard, Rows};
use trailbase_sqlvalue::{DecodeError, SqlValue};
use trailbase_wasm_common::{SqliteRequest, SqliteResponse};
use wasmtime_wasi_http::p2::bindings::http::types::ErrorCode;