Clean-up trailbase_sqlite::Connection options and fix benchmark builds.

This commit is contained in:
Sebastian Jeltsch 2026-04-13 23:05:21 +02:00
parent 3e0ae46971
commit 4f6b305497
9 changed files with 144 additions and 133 deletions

View file

@ -218,15 +218,12 @@ mod tests {
#[tokio::test]
async fn test_aggregate_rate_computation() {
let conn = trailbase_sqlite::Connection::new(
move || -> anyhow::Result<_> {
let mut conn_sync =
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap();
apply_logs_migrations(&mut conn_sync).unwrap();
return Ok(conn_sync);
},
None,
)
let conn = trailbase_sqlite::Connection::new(move || -> anyhow::Result<_> {
let mut conn_sync =
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap();
apply_logs_migrations(&mut conn_sync).unwrap();
return Ok(conn_sync);
})
.unwrap();
let interval_seconds = 600;

View file

@ -38,16 +38,13 @@ fn get_conn_and_migration_path(
let json_registry = state.json_schema_registry().clone();
Ok((
trailbase_sqlite::Connection::new(
move || {
// TODO: We should load WASM SQLite functions, since migrations may depend on them.
return trailbase_extension::connect_sqlite(
Some(db_path.clone()),
Some(json_registry.clone()),
);
},
None,
)
trailbase_sqlite::Connection::new(move || {
// TODO: We should load WASM SQLite functions, since migrations may depend on them.
return trailbase_extension::connect_sqlite(
Some(db_path.clone()),
Some(json_registry.clone()),
);
})
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?,
migration_path,
))

View file

@ -104,10 +104,9 @@ mod tests {
#[tokio::test]
async fn test_some_sqlite_errors_yield_client_errors() {
let conn = trailbase_sqlite::Connection::new(
|| crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None),
None,
)
let conn = trailbase_sqlite::Connection::new(|| {
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None)
})
.unwrap();
conn

View file

@ -329,7 +329,7 @@ fn init_main_db_impl(
let json_registry = json_registry.clone();
let new_db = &mut new_db;
trailbase_sqlite::Connection::new(
trailbase_sqlite::Connection::with_opts(
move || -> Result<_, ConnectionError> {
let mut conn =
trailbase_extension::connect_sqlite(main_path.clone(), json_registry.clone())?;
@ -352,14 +352,14 @@ fn init_main_db_impl(
return Ok(conn);
},
Some(trailbase_sqlite::connection::Options {
trailbase_sqlite::connection::Options {
n_read_threads: match (data_dir, std::thread::available_parallelism()) {
(None, _) => 0,
(Some(_), Ok(n)) => n.get().clamp(2, 4),
(Some(_), Err(_)) => 4,
(None, _) => Some(0),
(Some(_), Ok(n)) => Some(n.get().clamp(2, 4)),
(Some(_), Err(_)) => Some(4),
},
..Default::default()
}),
},
)?
};
@ -392,38 +392,32 @@ fn init_main_db_impl(
pub(super) fn init_logs_db(data_dir: Option<&DataDir>) -> Result<Connection, ConnectionError> {
let path = data_dir.map(|d| d.logs_db_path());
return trailbase_sqlite::Connection::new(
|| -> Result<_, ConnectionError> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
return trailbase_sqlite::Connection::new(|| -> Result<_, ConnectionError> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
// Turn off secure_deletions, i.e. don't wipe the memory with zeros.
conn.pragma_update(None, "secure_delete", "FALSE")?;
// Turn off secure_deletions, i.e. don't wipe the memory with zeros.
conn.pragma_update(None, "secure_delete", "FALSE")?;
apply_logs_migrations(&mut conn)?;
return Ok(conn);
},
None,
);
apply_logs_migrations(&mut conn)?;
return Ok(conn);
});
}
pub fn init_session_db(data_dir: Option<&DataDir>) -> Result<Connection, ConnectionError> {
let path = data_dir.map(|d| d.session_db_path());
return trailbase_sqlite::Connection::new(
|| -> Result<_, ConnectionError> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
return trailbase_sqlite::Connection::new(|| -> Result<_, ConnectionError> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
apply_session_migrations(&mut conn)?;
return Ok(conn);
},
None,
);
apply_session_migrations(&mut conn)?;
return Ok(conn);
});
}
pub(crate) fn connect_rusqlite_without_default_extensions_and_schemas(

View file

@ -118,45 +118,48 @@ fn insert_benchmark_group(c: &mut Criterion) {
group.bench_function("trailbase-sqlite (1 thread)", |b| {
async_insert_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
None,
Options {
n_read_threads: Some(0),
..Default::default()
},
)?);
})
});
group.bench_function("trailbase-sqlite (2 threads)", |b| {
async_insert_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 2,
Options {
n_read_threads: Some(2),
..Default::default()
}),
},
)?);
})
});
group.bench_function("trailbase-sqlite (4 threads)", |b| {
async_insert_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 4,
Options {
n_read_threads: Some(4),
..Default::default()
}),
},
)?);
})
});
group.bench_function("trailbase-sqlite (8 threads)", |b| {
async_insert_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 8,
Options {
n_read_threads: Some(8),
..Default::default()
}),
},
)?);
})
});
@ -253,45 +256,48 @@ fn read_benchmark_group(c: &mut Criterion) {
group.bench_function("trailbase-sqlite (1 thread)", |b| {
async_read_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
None,
Options {
n_read_threads: Some(2),
..Default::default()
},
)?);
})
});
group.bench_function("trailbase-sqlite (2 threads)", |b| {
async_read_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 2,
Options {
n_read_threads: Some(2),
..Default::default()
}),
},
)?);
})
});
group.bench_function("trailbase-sqlite (4 threads)", |b| {
async_read_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 4,
Options {
n_read_threads: Some(4),
..Default::default()
}),
},
)?);
})
});
group.bench_function("trailbase-sqlite (8 threads)", |b| {
async_read_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 8,
Options {
n_read_threads: Some(8),
..Default::default()
}),
},
)?);
})
});
@ -426,45 +432,48 @@ fn mixed_benchmark_group(c: &mut Criterion) {
group.bench_function("trailbase-sqlite (1 thread)", |b| {
async_mixed_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
None,
Options {
n_read_threads: Some(0),
..Default::default()
},
)?);
});
});
group.bench_function("trailbase-sqlite (2 threads)", |b| {
async_mixed_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 2,
Options {
n_read_threads: Some(2),
..Default::default()
}),
},
)?);
});
});
group.bench_function("trailbase-sqlite (4 threads)", |b| {
async_mixed_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 4,
Options {
n_read_threads: Some(4),
..Default::default()
}),
},
)?);
});
});
group.bench_function("trailbase-sqlite (8 threads)", |b| {
async_mixed_benchmark(b, async |fname| {
return Ok(Connection::new(
return Ok(Connection::with_opts(
|| rusqlite::Connection::open(&fname),
Some(Options {
n_read_threads: 8,
Options {
n_read_threads: Some(8),
..Default::default()
}),
},
)?);
});
});

View file

@ -33,9 +33,10 @@ impl AsyncConnection for Connection {
) -> Result<T, BenchmarkError> {
return Ok(
self
.query_row_get(sql.into(), params.into(), 0)
.query_row_get::<Adapter<T>>(sql.into(), params.into(), 0)
.await?
.unwrap(),
.unwrap()
.0,
);
}
@ -46,9 +47,10 @@ impl AsyncConnection for Connection {
) -> Result<T, BenchmarkError> {
return Ok(
self
.read_query_row_get(sql.into(), params.into(), 0)
.read_query_row_get::<Adapter<T>>(sql.into(), params.into(), 0)
.await?
.unwrap(),
.unwrap()
.0,
);
}
@ -163,3 +165,14 @@ impl AsyncConnection for ThreadLocalRusqlite {
return Ok(());
}
}
struct Adapter<T>(T);
impl<T: rusqlite::types::FromSql> trailbase_sqlite::from_sql::FromSql for Adapter<T> {
#[inline]
fn column_result(
value: trailbase_sqlite::ValueRef<'_>,
) -> trailbase_sqlite::from_sql::FromSqlResult<Self> {
return Ok(Adapter(T::column_result(value.into()).unwrap()));
}
}

View file

@ -19,9 +19,13 @@ pub struct Connection {
}
impl Connection {
pub fn new<E>(
pub fn new<E>(builder: impl Fn() -> Result<rusqlite::Connection, E>) -> Result<Self, E> {
return Self::with_opts(builder, Options::default());
}
pub fn with_opts<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Option<Options>,
opt: Options,
) -> std::result::Result<Self, E> {
return Ok(Self {
c: ConnectionImpl::new(builder, opt)?,
@ -34,15 +38,15 @@ impl Connection {
///
/// Will return `Err` if the underlying SQLite open call fails.
pub fn open_in_memory() -> Result<Self, Error> {
let conn = Self::new(
let conn = Self::with_opts(
rusqlite::Connection::open_in_memory,
Some(Options {
n_read_threads: 0,
Options {
n_read_threads: Some(0),
..Default::default()
}),
},
)?;
assert_eq!(1, conn.c.len());
assert_eq!(1, conn.threads());
return Ok(conn);
}
@ -51,6 +55,10 @@ impl Connection {
return self.c.id();
}
pub fn threads(&self) -> usize {
return self.c.threads();
}
pub fn write_lock(&self) -> LockGuard<'_> {
return self.c.write_lock();
}

View file

@ -25,19 +25,10 @@ enum Message {
Terminate,
}
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Options {
pub busy_timeout: std::time::Duration,
pub n_read_threads: usize,
}
impl Default for Options {
fn default() -> Self {
return Self {
busy_timeout: std::time::Duration::from_secs(5),
n_read_threads: 0,
};
}
pub busy_timeout: Option<std::time::Duration>,
pub n_read_threads: Option<usize>,
}
/// A handle to call functions in background thread.
@ -53,16 +44,16 @@ pub(crate) struct ConnectionImpl {
impl ConnectionImpl {
pub fn new<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opt: Option<Options>,
opt: Options,
) -> Result<Self, E> {
let Options {
busy_timeout,
n_read_threads,
} = opt.unwrap_or_default();
} = opt;
let new_conn = || -> Result<rusqlite::Connection, E> {
let conn = builder()?;
if !busy_timeout.is_zero() {
if let Some(busy_timeout) = busy_timeout {
conn
.busy_timeout(busy_timeout)
.expect("busy timeout failed");
@ -72,12 +63,12 @@ impl ConnectionImpl {
let write_conn = new_conn()?;
let path = write_conn.path().map(|p| p.to_string());
// Returns empty string for in-memory databases.
let in_memory = path
.as_ref()
.is_none_or(|s| s.is_empty() || s == ":memory:");
let in_memory = path.as_ref().is_none_or(|s| {
// Returns empty string for in-memory databases.
return s.is_empty();
});
let n_read_threads: i64 = match (in_memory, n_read_threads) {
let n_read_threads: i64 = match (in_memory, n_read_threads.unwrap_or(0)) {
(true, _) => {
// We cannot share an in-memory database across threads, they're all independent.
0
@ -156,7 +147,7 @@ impl ConnectionImpl {
return self.id;
}
pub(crate) fn len(&self) -> usize {
pub fn threads(&self) -> usize {
return self.conns.read().0.len();
}

View file

@ -11,6 +11,7 @@ use crate::{Database, Error, Value, ValueType};
#[tokio::test]
async fn open_in_memory_test() {
let conn = Connection::open_in_memory().unwrap();
assert_eq!(1, conn.threads());
assert!(conn.close().await.is_ok());
}
@ -61,15 +62,17 @@ async fn close_success_test() {
let tmp_dir = tempfile::TempDir::new().unwrap();
let db_path = tmp_dir.path().join("main.sqlite");
let conn = Connection::new(
let conn = Connection::with_opts(
move || rusqlite::Connection::open(&db_path),
Some(Options {
n_read_threads: 2,
Options {
n_read_threads: Some(2),
..Default::default()
}),
},
)
.unwrap();
assert_eq!(2, conn.threads());
conn
.execute("CREATE TABLE 'test' (id INTEGER PRIMARY KEY)", ())
.await
@ -584,7 +587,7 @@ fn test_busy() {
let tmp_dir = tempfile::TempDir::new().unwrap();
let db_path = tmp_dir.path().join("main.sqlite");
let conn = Connection::new(
let conn = Connection::with_opts(
move || -> Result<_, rusqlite::Error> {
let conn = rusqlite::Connection::open(&db_path)?;
@ -603,10 +606,10 @@ fn test_busy() {
return Ok(conn);
},
Some(Options {
n_read_threads: 2,
Options {
n_read_threads: Some(2),
..Default::default()
}),
},
)
.unwrap();