Cleanup trailbase-sqlite error handling.

This commit is contained in:
Sebastian Jeltsch 2026-04-15 15:56:53 +02:00
parent eac139dcfe
commit 81b4853aed
13 changed files with 155 additions and 148 deletions

View file

@ -218,7 +218,7 @@ mod tests {
#[tokio::test]
async fn test_aggregate_rate_computation() {
let conn = trailbase_sqlite::Connection::new(move || -> anyhow::Result<_> {
let conn = trailbase_sqlite::Connection::new(move || -> Result<_, trailbase_sqlite::Error> {
let mut conn_sync =
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap();
apply_logs_migrations(&mut conn_sync).unwrap();

View file

@ -43,9 +43,9 @@ fn get_conn_and_migration_path(
return trailbase_extension::connect_sqlite(
Some(db_path.clone()),
Some(json_registry.clone()),
);
})
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?,
)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()));
})?,
migration_path,
))
}

View file

@ -329,29 +329,32 @@ fn init_main_db_impl(
let json_registry = json_registry.clone();
let new_db = &mut new_db;
let conn_builder = move || -> Result<_, trailbase_sqlite::Error> {
let mut conn = trailbase_extension::connect_sqlite(main_path.clone(), json_registry.clone())
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?;
#[cfg(any(feature = "geos", feature = "geos-static"))]
litegis::register(&conn)?;
if main_migrations {
new_db.fetch_or(
apply_main_migrations(&mut conn, migrations_path.as_ref())
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?,
Ordering::SeqCst,
);
}
#[cfg(feature = "wasm")]
for (store, functions) in &runtimes {
trailbase_wasm_runtime_host::functions::setup_connection(&conn, store.clone(), functions)
.expect("startup");
}
return Ok(conn);
};
trailbase_sqlite::Connection::with_opts(
move || -> Result<_, ConnectionError> {
let mut conn =
trailbase_extension::connect_sqlite(main_path.clone(), json_registry.clone())?;
#[cfg(any(feature = "geos", feature = "geos-static"))]
litegis::register(&conn)?;
if main_migrations {
new_db.fetch_or(
apply_main_migrations(&mut conn, migrations_path.as_ref())?,
Ordering::SeqCst,
);
}
#[cfg(feature = "wasm")]
for (store, functions) in &runtimes {
trailbase_wasm_runtime_host::functions::setup_connection(&conn, store.clone(), functions)
.expect("startup");
}
return Ok(conn);
},
conn_builder,
trailbase_sqlite::Options {
num_threads: match (data_dir, std::thread::available_parallelism()) {
(None, _) => Some(1),
@ -389,10 +392,12 @@ fn init_main_db_impl(
return Ok((conn, new_db.load(Ordering::SeqCst)));
}
pub(super) fn init_logs_db(data_dir: Option<&DataDir>) -> Result<Connection, ConnectionError> {
pub(super) fn init_logs_db(
data_dir: Option<&DataDir>,
) -> Result<Connection, trailbase_sqlite::Error> {
let path = data_dir.map(|d| d.logs_db_path());
return trailbase_sqlite::Connection::new(|| -> Result<_, ConnectionError> {
return trailbase_sqlite::Connection::new(|| -> Result<_, trailbase_sqlite::Error> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
@ -401,21 +406,22 @@ pub(super) fn init_logs_db(data_dir: Option<&DataDir>) -> Result<Connection, Con
// Turn off secure_deletions, i.e. don't wipe the memory with zeros.
conn.pragma_update(None, "secure_delete", "FALSE")?;
apply_logs_migrations(&mut conn)?;
apply_logs_migrations(&mut conn).map_err(|err| trailbase_sqlite::Error::Other(err.into()))?;
return Ok(conn);
});
}
pub fn init_session_db(data_dir: Option<&DataDir>) -> Result<Connection, ConnectionError> {
pub fn init_session_db(data_dir: Option<&DataDir>) -> Result<Connection, trailbase_sqlite::Error> {
let path = data_dir.map(|d| d.session_db_path());
return trailbase_sqlite::Connection::new(|| -> Result<_, ConnectionError> {
return trailbase_sqlite::Connection::new(|| -> Result<_, trailbase_sqlite::Error> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
apply_session_migrations(&mut conn)?;
apply_session_migrations(&mut conn)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?;
return Ok(conn);
});
}

View file

@ -249,10 +249,7 @@ impl SqliteLogLayer {
// NOTE: awaiting the `conn.call()` is the secret to batching, since we won't read from the
// channel until the database write is complete.
let result = conn
.call_writer(move |conn| {
Self::insert_logs(conn, buffer)?;
Ok(())
})
.call_writer(move |conn| Self::insert_logs(conn, buffer))
.await;
if let Err(err) = result {

View file

@ -484,11 +484,7 @@ impl RecordApi {
.state
.conn
.call_reader(move |conn| {
Ok(Self::check_record_level_access_impl(
conn,
&access_query,
params,
)?)
return Self::check_record_level_access_impl(conn, &access_query, params);
})
.await
}
@ -497,11 +493,7 @@ impl RecordApi {
.state
.conn
.call_writer(move |conn| {
Ok(Self::check_record_level_access_impl(
conn,
&access_query,
params,
)?)
return Self::check_record_level_access_impl(conn, &access_query, params);
})
.await
}

View file

@ -200,7 +200,7 @@ pub(crate) async fn run_queries(
};
let result: Vec<WriteQueryResult> = conn
.call_writer(move |conn| {
.call_writer(move |conn| -> Result<_, rusqlite::Error> {
let tx = conn.transaction()?;
let rows: Vec<WriteQueryResult> = queries
@ -251,7 +251,7 @@ pub(crate) async fn run_insert_query(
};
let (rowid, return_value): (i64, trailbase_sqlite::Value) = conn
.call_writer(move |conn| {
.call_writer(move |conn| -> Result<_, rusqlite::Error> {
let result = query.apply(conn)?;
return Ok((result.rowid, result.pk_value.expect("insert")));
})
@ -288,7 +288,7 @@ pub(crate) async fn run_update_query(
};
let rowid: i64 = conn
.call_writer(move |conn| {
.call_writer(move |conn| -> Result<_, rusqlite::Error> {
return Ok(query.apply(conn)?.rowid);
})
.await?;
@ -315,7 +315,7 @@ pub(crate) async fn run_delete_query(
let query = WriteQuery::new_delete(table_name, pk_column, pk_value)?;
let rowid: i64 = conn
.call_writer(move |conn| {
.call_writer(move |conn| -> Result<_, rusqlite::Error> {
return Ok(query.apply(conn)?.rowid);
})
.await?;

View file

@ -275,7 +275,7 @@ fn build_job(
return async move {
conn
.call_writer(|conn| {
return Ok(conn.backup("main", backup_file, /* progress= */ None)?);
return conn.backup("main", backup_file, /* progress= */ None);
})
.await
.map_err(|err| {

View file

@ -78,11 +78,9 @@ impl TransactionLog {
let report = conn
.call_writer(move |conn| {
let report = runner
return runner
.run(conn)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?;
return Ok(report);
.map_err(|err| trailbase_sqlite::Error::Other(err.into()));
})
.await
.map_err(|err| {
@ -129,22 +127,24 @@ impl TransactionLog {
conn: &trailbase_sqlite::Connection,
) -> Result<(), trailbase_sqlite::Error> {
conn
.call_writer(|conn: &mut rusqlite::Connection| {
let tx = conn.transaction()?;
for (query_type, stmt) in self.log {
match query_type {
QueryType::Query => {
tx.query_row(&stmt, (), |_row| Ok(()))?;
}
QueryType::Execute => {
tx.execute(&stmt, ())?;
.call_writer(
|conn: &mut rusqlite::Connection| -> Result<_, rusqlite::Error> {
let tx = conn.transaction()?;
for (query_type, stmt) in self.log {
match query_type {
QueryType::Query => {
tx.query_row(&stmt, (), |_row| Ok(()))?;
}
QueryType::Execute => {
tx.execute(&stmt, ())?;
}
}
}
}
tx.commit()?;
tx.commit()?;
return Ok(());
})
return Ok(());
},
)
.await?;
return Ok(());

View file

@ -126,7 +126,7 @@ fn build_json_schema_expanded_impl(
e.insert(v.clone());
}
Entry::Occupied(_e) => {
println!("SKIPPING: {k}");
debug!("Skipping {k}, already defined");
}
};
}

View file

@ -16,33 +16,35 @@ pub async fn execute_batch(
sql: impl AsRef<str> + Send + 'static,
) -> Result<Option<Rows>, Error> {
return conn
.call_writer(move |conn: &mut rusqlite::Connection| {
let batch = rusqlite::Batch::new(conn, sql.as_ref());
.call_writer(
move |conn: &mut rusqlite::Connection| -> Result<Option<Rows>, Error> {
let batch = rusqlite::Batch::new(conn, sql.as_ref());
let mut p = batch.peekable();
while let Some(mut stmt) = p.next()? {
let mut rows = stmt.raw_query();
let row = rows.next()?;
let mut p = batch.peekable();
while let Some(mut stmt) = p.next()? {
let mut rows = stmt.raw_query();
let row = rows.next()?;
match p.peek()? {
Some(_) => {}
None => {
if let Some(row) = row {
let cols: Arc<Vec<Column>> = Arc::new(columns(row.as_ref()));
match p.peek()? {
Some(_) => {}
None => {
if let Some(row) = row {
let cols: Arc<Vec<Column>> = Arc::new(columns(row.as_ref()));
let mut result = vec![from_row(row, cols.clone())?];
while let Some(row) = rows.next()? {
result.push(from_row(row, cols.clone())?);
let mut result = vec![from_row(row, cols.clone())?];
while let Some(row) = rows.next()? {
result.push(from_row(row, cols.clone())?);
}
return Ok(Some(Rows(result, cols)));
}
return Ok(Some(Rows(result, cols)));
}
return Ok(None);
return Ok(None);
}
}
}
}
return Ok(None);
})
return Ok(None);
},
)
.await;
}

View file

@ -22,14 +22,20 @@ pub struct Connection {
}
impl Connection {
pub fn new<E>(builder: impl Fn() -> Result<rusqlite::Connection, E>) -> Result<Self, E> {
pub fn new<E>(builder: impl Fn() -> Result<rusqlite::Connection, E>) -> Result<Self, Error>
where
Error: From<E>,
{
return Self::with_opts(builder, Options::default());
}
pub fn with_opts<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Options,
) -> std::result::Result<Self, E> {
) -> Result<Self, Error>
where
Error: From<E>,
{
return Ok(Self {
id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst),
exec: Executor::new(builder, opt)?,
@ -87,18 +93,22 @@ impl Connection {
/// during startup/SIGHUP).
/// * Batch log inserts to minimize thread slushing.
/// * Backups from scheduler (API could be easily hoisted)
pub async fn call_writer<F, R>(&self, function: F) -> Result<R, Error>
pub async fn call_writer<F, R, E>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&mut rusqlite::Connection) -> Result<R, Error> + Send + 'static,
F: FnOnce(&mut rusqlite::Connection) -> Result<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
Error: From<E>,
{
return self.exec.call_writer(function).await;
}
pub async fn call_reader<F, R>(&self, function: F) -> Result<R, Error>
pub async fn call_reader<F, R, E>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, Error> + Send + 'static,
F: FnOnce(&rusqlite::Connection) -> Result<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
Error: From<E>,
{
return self.exec.call_reader(function).await;
}

View file

@ -45,18 +45,19 @@ impl Executor {
pub fn new<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Options,
) -> Result<Self, E> {
) -> Result<Self, Error>
where
Error: From<E>,
{
let Options {
busy_timeout,
num_threads,
} = opt;
let new_conn = || -> Result<rusqlite::Connection, E> {
let new_conn = || -> Result<rusqlite::Connection, Error> {
let conn = builder()?;
if let Some(busy_timeout) = busy_timeout {
conn
.busy_timeout(busy_timeout)
.expect("busy timeout failed");
conn.busy_timeout(busy_timeout)?;
}
return Ok(conn);
};
@ -117,7 +118,7 @@ impl Executor {
move || writer_event_loop(conns, shared_read_receiver, shared_write_receiver)
})
.expect("startup");
.map_err(|err| Error::Other(format!("spawning rw thread failed: {err}").into()))?;
// Spawn readers threads.
for index in 0..num_read_threads {
@ -129,7 +130,7 @@ impl Executor {
move || reader_event_loop(index + 1, conns, shared_read_receiver)
})
.expect("startup");
.map_err(|err| Error::Other(format!("spawning ro thread {index} failed: {err}").into()))?;
}
debug!(
@ -180,13 +181,15 @@ impl Executor {
}
#[inline]
pub async fn call_writer<F, R>(&self, function: F) -> Result<R, Error>
pub async fn call_writer<F, R, E>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&mut rusqlite::Connection) -> Result<R, Error> + Send + 'static,
F: FnOnce(&mut rusqlite::Connection) -> Result<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
Error: From<E>,
{
// return call_impl(&self.writer, function).await;
let (sender, receiver) = oneshot::channel::<Result<R, Error>>();
let (sender, receiver) = oneshot::channel::<Result<R, E>>();
self
.writer
@ -197,14 +200,16 @@ impl Executor {
})))
.map_err(|_| Error::ConnectionClosed)?;
receiver.await.map_err(|_| Error::ConnectionClosed)?
return Ok(receiver.await.map_err(|_| Error::ConnectionClosed)??);
}
#[inline]
pub async fn call_reader<F, R>(&self, function: F) -> Result<R, Error>
pub async fn call_reader<F, R, E>(&self, function: F) -> Result<R, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, Error> + Send + 'static,
F: FnOnce(&rusqlite::Connection) -> Result<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
Error: From<E>,
{
let (sender, receiver) = oneshot::channel::<Result<R, Error>>();
@ -212,12 +217,12 @@ impl Executor {
.reader
.send(ReaderMessage::RunConst(Box::new(move |conn| {
if !sender.is_closed() {
let _ = sender.send(function(conn));
let _ = sender.send(function(conn).map_err(|err| err.into()));
}
})))
.map_err(|_| Error::ConnectionClosed)?;
receiver.await.map_err(|_| Error::ConnectionClosed)?
return receiver.await.map_err(|_| Error::ConnectionClosed)?;
}
#[inline]
@ -274,23 +279,25 @@ impl Executor {
params.bind(&mut stmt)?;
return Ok(stmt.raw_execute()?);
return stmt.raw_execute();
})
.await;
}
pub async fn execute_batch(&self, sql: impl AsRef<str> + Send + 'static) -> Result<(), Error> {
self
.call_writer(move |conn: &mut rusqlite::Connection| {
let mut batch = rusqlite::Batch::new(conn, sql.as_ref());
while let Some(mut stmt) = batch.next()? {
// NOTE: We must use `raw_query` instead of `raw_execute`, otherwise queries
// returning rows (e.g. SELECT) will return an error. Rusqlite's batch_execute
// behaves consistently.
let _row = stmt.raw_query().next()?;
}
return Ok(());
})
.call_writer(
move |conn: &mut rusqlite::Connection| -> Result<(), rusqlite::Error> {
let mut batch = rusqlite::Batch::new(conn, sql.as_ref());
while let Some(mut stmt) = batch.next()? {
// NOTE: We must use `raw_query` instead of `raw_execute`, otherwise queries
// returning rows (e.g. SELECT) will return an error. Rusqlite's batch_execute
// behaves consistently.
let _row = stmt.raw_query().next()?;
}
return Ok(());
},
)
.await?;
return Ok(());

View file

@ -21,12 +21,10 @@ async fn call_success_test() {
let result = conn
.call_writer(|conn| {
conn
.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL);",
[],
)
.map_err(|e| e.into())
return conn.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL);",
[],
);
})
.await;
@ -38,7 +36,7 @@ async fn call_failure_test() {
let conn = Connection::open_in_memory().unwrap();
let result = conn
.call_writer(|conn| conn.execute("Invalid sql", []).map_err(|e| e.into()))
.call_writer(|conn| conn.execute("Invalid sql", []))
.await;
assert!(match result.unwrap_err() {
@ -116,7 +114,7 @@ async fn close_call_test() {
assert!(conn.close().await.is_ok());
let result = conn2
.call_writer(|conn| conn.execute("SELECT 1;", []).map_err(|e| e.into()))
.call_writer(|conn| conn.execute("SELECT 1;", []))
.await;
assert!(matches!(
@ -131,12 +129,10 @@ async fn close_failure_test() {
conn
.call_writer(|conn| {
conn
.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL);",
[],
)
.map_err(|e| e.into())
return conn.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL);",
[],
);
})
.await
.unwrap();
@ -147,7 +143,8 @@ async fn close_failure_test() {
// See https://www.sqlite.org/c3ref/close.html for details regarding this behaviour
let stmt = Box::new(conn.prepare("INSERT INTO person VALUES (1, ?1);").unwrap());
Box::leak(stmt);
Ok(())
return Ok::<_, rusqlite::Error>(());
})
.await
.unwrap();
@ -214,12 +211,10 @@ async fn test_execute_and_query() {
let result = conn
.call_writer(|conn| {
conn
.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
[],
)
.map_err(|e| e.into())
return conn.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
[],
);
})
.await;
@ -403,12 +398,10 @@ async fn test_params() {
conn
.call_writer(|conn| {
conn
.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
[],
)
.map_err(|e| e.into())
return conn.execute(
"CREATE TABLE person(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
[],
);
})
.await
.unwrap();