mirror of
https://github.com/trailbaseio/trailbase
synced 2026-04-21 13:37:44 +00:00
Model DB transactions from WASM as a WASI resource and use a watchdog task to force unlock the DB after 60s.
This is implemented as a minor WASI world update, i.e. it's backward compatible. WASM guests build against future runtime releases (JS & Rust) will transparently use the new APIs w/o anychanges to user code.
This commit is contained in:
parent
a692d8857a
commit
e08e648a0f
22 changed files with 285 additions and 145 deletions
|
|
@ -130,6 +130,9 @@ impl Guest for Endpoints {
|
|||
|
||||
tx.commit().map_err(internal)?;
|
||||
|
||||
// Keep one dangling to make sure RAII-cleanup works.
|
||||
let _tx_dangling = Transaction::begin();
|
||||
|
||||
return Ok(());
|
||||
}),
|
||||
routing::get("/attach_db", async |_req| {
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -69,7 +69,7 @@ test("WASM runtime DB Transaction", async ({ expect }) => {
|
|||
await Promise.all(
|
||||
Array.from({ length: 25 }, async (_v, _i) => {
|
||||
const response = await fetch(`http://${ADDRESS}/transaction`);
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.status, `Got: ${await response.text()}`).toBe(200);
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,9 +2,7 @@ use trailbase_sqlvalue::{Blob, DecodeError, SqlValue};
|
|||
use wstd::http::body::IntoBody;
|
||||
use wstd::http::{Client, Request};
|
||||
|
||||
use crate::wit::trailbase::database::sqlite::{
|
||||
tx_begin, tx_commit, tx_execute, tx_query, tx_rollback,
|
||||
};
|
||||
use crate::wit::trailbase::database::sqlite::Transaction as WasiTransaction;
|
||||
|
||||
pub use crate::wit::trailbase::database::sqlite::{TxError, Value};
|
||||
pub use trailbase_wasm_common::{SqliteRequest, SqliteResponse};
|
||||
|
|
@ -29,27 +27,30 @@ pub fn escape(s: impl AsRef<str>) -> String {
|
|||
}
|
||||
|
||||
pub struct Transaction {
|
||||
tx: WasiTransaction,
|
||||
committed: bool,
|
||||
}
|
||||
|
||||
impl Transaction {
|
||||
pub fn begin() -> Result<Self, TxError> {
|
||||
tx_begin()?;
|
||||
return Ok(Self { committed: false });
|
||||
return Ok(Self {
|
||||
tx: WasiTransaction::new(),
|
||||
committed: false,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn query(&mut self, query: &str, params: &[Value]) -> Result<Vec<Vec<Value>>, TxError> {
|
||||
return tx_query(query, params);
|
||||
return self.tx.query(query, params);
|
||||
}
|
||||
|
||||
pub fn execute(&mut self, query: &str, params: &[Value]) -> Result<u64, TxError> {
|
||||
return tx_execute(query, params);
|
||||
return self.tx.execute(query, params);
|
||||
}
|
||||
|
||||
pub fn commit(&mut self) -> Result<(), TxError> {
|
||||
if !self.committed {
|
||||
self.committed = true;
|
||||
tx_commit()?;
|
||||
self.tx.commit()?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
|
@ -58,7 +59,7 @@ impl Transaction {
|
|||
impl Drop for Transaction {
|
||||
fn drop(&mut self) {
|
||||
if !self.committed
|
||||
&& let Err(err) = tx_rollback()
|
||||
&& let Err(err) = self.tx.rollback()
|
||||
{
|
||||
log::warn!("TX rollback failed: {err}");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:component@0.1.0;
|
||||
package trailbase:component@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
interface init-endpoint {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:component@0.1.0;
|
||||
package trailbase:component@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
interface sqlite-function-endpoint {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:component@0.1.0;
|
||||
package trailbase:component@0.1.1;
|
||||
|
||||
// Note, everything is from the guest's perspective, i.e.:
|
||||
// * imports are provided by the host
|
||||
|
|
@ -25,7 +25,7 @@ world interfaces {
|
|||
|
||||
// TrailBase's interfaces:
|
||||
@since(version = 0.1.0)
|
||||
include trailbase:database/interfaces@0.1.0;
|
||||
include trailbase:database/interfaces@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
export init-endpoint;
|
||||
|
|
@ -38,7 +38,7 @@ world interfaces {
|
|||
world init {
|
||||
// TrailBase's interfaces:
|
||||
@since(version = 0.1.0)
|
||||
include trailbase:database/interfaces@0.1.0;
|
||||
include trailbase:database/interfaces@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
export init-endpoint;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:database@0.1.0;
|
||||
package trailbase:database@0.1.1;
|
||||
|
||||
interface sqlite {
|
||||
// WARNING: Evolving a variant currently breaks the ABI:
|
||||
|
|
@ -15,24 +15,38 @@ interface sqlite {
|
|||
real(f64),
|
||||
}
|
||||
|
||||
// NOTE: Ideally, we'd use these but they can currently block guests, w/o a
|
||||
// better non-blocking event loop.
|
||||
// NOTE: Post WASIp3 (and guest support in place, e.g. wstd) with native
|
||||
// async/future support, DB queries should be routed through below functions.
|
||||
//
|
||||
// @since(version = 0.1.0)
|
||||
// execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
// execute: func(query: string, params: list<value>) -> future<result<u64, tx-error>>;
|
||||
// @since(version = 0.1.0)
|
||||
// query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
// query: func(query: string, params: list<value>) -> future<result<list<list<value>>, tx-error>>;
|
||||
|
||||
@since(version = 0.1.1)
|
||||
resource transaction {
|
||||
constructor();
|
||||
commit: func() -> result<_, tx-error>;
|
||||
rollback: func() -> result<_, tx-error>;
|
||||
execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
}
|
||||
|
||||
// However, transactions have to be sync.
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-begin: func() -> result<_, tx-error>;
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-commit: func() -> result<_, tx-error>;
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-rollback: func() -> result<_, tx-error>;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use tokio::time::Duration;
|
|||
use trailbase_sqlite::Params;
|
||||
use trailbase_wasi_keyvalue::WasiKeyValueCtx;
|
||||
use wasmtime::Result;
|
||||
use wasmtime::component::{HasData, ResourceTable};
|
||||
use wasmtime::component::{HasData, Resource, ResourceTable};
|
||||
use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
|
||||
use wasmtime_wasi_http::WasiHttpCtx;
|
||||
use wasmtime_wasi_http::p2::{WasiHttpHooks, WasiHttpView};
|
||||
|
|
@ -42,18 +42,23 @@ wasmtime::component::bindgen!({
|
|||
// Interactions with `ResourceTable` can possibly trap so enable the ability
|
||||
// to return traps from generated functions.
|
||||
imports: {
|
||||
"trailbase:database/sqlite.tx-begin": async,
|
||||
"trailbase:database/sqlite.tx-commit": async,
|
||||
"trailbase:database/sqlite.tx-rollback": async,
|
||||
"trailbase:database/sqlite.tx-execute": async,
|
||||
"trailbase:database/sqlite.tx-query": async,
|
||||
"trailbase:database/sqlite.[constructor]transaction": async | trappable,
|
||||
"trailbase:database/sqlite.[drop]transaction": trappable,
|
||||
"trailbase:database/sqlite.[method]transaction.commit":async | trappable,
|
||||
"trailbase:database/sqlite.[method]transaction.rollback": async | trappable,
|
||||
"trailbase:database/sqlite.[method]transaction.query": async | trappable,
|
||||
"trailbase:database/sqlite.[method]transaction.execute": async | trappable,
|
||||
default: async,
|
||||
},
|
||||
with: {
|
||||
"trailbase:database/sqlite.transaction": self::TransactionImpl,
|
||||
},
|
||||
exports: {
|
||||
default: async | store,
|
||||
},
|
||||
});
|
||||
|
||||
pub use self::trailbase::database::sqlite::{TxError, Value};
|
||||
pub use self::trailbase::database::sqlite::{Transaction, TxError, Value};
|
||||
|
||||
/// NOTE: This is needed due to State needing to be Send.
|
||||
unsafe impl Send for crate::sqlite::OwnedTx {}
|
||||
|
|
@ -74,21 +79,13 @@ pub struct State {
|
|||
pub(crate) kv: WasiKeyValueCtx,
|
||||
|
||||
// A mutex of a DB lock.
|
||||
pub(crate) tx: Mutex<Option<crate::sqlite::OwnedTx>>,
|
||||
#[deprecated = "Used by deprecated `tx-*` free functions. Will be removed in favor of the `TransactionImpl` resource."]
|
||||
pub(crate) tx: Mutex<TransactionImpl>,
|
||||
|
||||
// State shared across all runtime instances.
|
||||
pub(crate) shared: Arc<SharedState>,
|
||||
}
|
||||
|
||||
impl Drop for State {
|
||||
fn drop(&mut self) {
|
||||
#[cfg(debug_assertions)]
|
||||
if self.tx.get_mut().is_some() {
|
||||
log::warn!("pending transaction locking the DB");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IoView for State {
|
||||
fn table(&mut self) -> &mut ResourceTable {
|
||||
return &mut self.resource_table;
|
||||
|
|
@ -159,73 +156,44 @@ impl HasData for State {
|
|||
}
|
||||
|
||||
impl self::trailbase::database::sqlite::Host for State {
|
||||
// async fn execute(&mut self, query: String, params: Vec<Value>) -> Result<u64, TxError> {
|
||||
// return Err(TxError::Other("not implemented".into()));
|
||||
// }
|
||||
// async fn query(&mut self, query: String, params: Vec<Value>) -> Result<Vec<Vec<Value>>,
|
||||
// TxError> { return Err(TxError::Other("not implemented".into()));
|
||||
// }
|
||||
|
||||
async fn tx_begin(&mut self) -> Result<(), TxError> {
|
||||
let Some(conn) = self.shared.conn.clone() else {
|
||||
return Err(TxError::Other("missing conn".into()));
|
||||
};
|
||||
|
||||
// Acquire shared lock first, before locking DB.
|
||||
#[allow(deprecated)]
|
||||
let mut lock = self.tx.lock().await;
|
||||
assert!(lock.is_none());
|
||||
|
||||
// TODO: Spawn a watcher task that unlocks the DB after a certain timeout.
|
||||
*lock = Some(
|
||||
acquire_transaction_lock_with_timeout(conn, Duration::from_millis(1000))
|
||||
.await
|
||||
.map_err(|err| TxError::Other(err.to_string()))?,
|
||||
);
|
||||
*lock = TransactionImpl::new(conn).await?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn tx_commit(&mut self) -> Result<(), TxError> {
|
||||
let Some(tx) = self.tx.lock().await.take() else {
|
||||
return Err(TxError::Other("no pending tx".to_string()));
|
||||
};
|
||||
|
||||
// NOTE: this is the same as `tx.commit()` just w/o consuming.
|
||||
let lock = tx.borrow_dependent();
|
||||
lock
|
||||
.execute_batch("COMMIT")
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
return Ok(());
|
||||
#[allow(deprecated)]
|
||||
let mut lock = self.tx.lock().await;
|
||||
let tx: &mut TransactionImpl = &mut lock;
|
||||
return tx.commit().await;
|
||||
}
|
||||
|
||||
async fn tx_rollback(&mut self) -> Result<(), TxError> {
|
||||
let Some(tx) = self.tx.lock().await.take() else {
|
||||
return Err(TxError::Other("no pending tx".to_string()));
|
||||
};
|
||||
|
||||
// NOTE: this is the same as `tx.rollback()` just w/o consuming.
|
||||
let lock = tx.borrow_dependent();
|
||||
lock
|
||||
.execute_batch("ROLLBACK")
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
return Ok(());
|
||||
#[allow(deprecated)]
|
||||
let mut lock = self.tx.lock().await;
|
||||
let tx: &mut TransactionImpl = &mut lock;
|
||||
return tx.rollback().await;
|
||||
}
|
||||
|
||||
async fn tx_execute(&mut self, query: String, params: Vec<Value>) -> Result<u64, TxError> {
|
||||
let params: Vec<_> = params.into_iter().map(to_sqlite_value).collect();
|
||||
|
||||
let Some(ref tx) = *self.tx.lock().await else {
|
||||
return Err(TxError::Other("No open transaction".to_string()));
|
||||
};
|
||||
|
||||
let lock = tx.borrow_dependent();
|
||||
let mut stmt = lock
|
||||
.prepare(&query)
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
params
|
||||
.bind(&mut stmt)
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
return Ok(
|
||||
stmt
|
||||
.raw_execute()
|
||||
.map_err(|err| TxError::Other(err.to_string()))? as u64,
|
||||
);
|
||||
#[allow(deprecated)]
|
||||
return self.tx.lock().await.execute(query, params).await;
|
||||
}
|
||||
|
||||
async fn tx_query(
|
||||
|
|
@ -233,17 +201,85 @@ impl self::trailbase::database::sqlite::Host for State {
|
|||
query: String,
|
||||
params: Vec<Value>,
|
||||
) -> Result<Vec<Vec<Value>>, TxError> {
|
||||
let params: Vec<_> = params.into_iter().map(to_sqlite_value).collect();
|
||||
#[allow(deprecated)]
|
||||
return self.tx.lock().await.query(query, params).await;
|
||||
}
|
||||
}
|
||||
|
||||
let Some(ref tx) = *self.tx.lock().await else {
|
||||
#[derive(Default)]
|
||||
pub struct TransactionImpl {
|
||||
// NOTE: This is only an `Arc<Mutex<OwnedTx>>` to have a watcher task force-unlock DB if
|
||||
// necessary. W/o the task, this could just be an `OwnedTx`. The Mutex is an async Mutex to
|
||||
// prevent blocking the watcher task, e.g. if a transaction is blocked on a long-running
|
||||
// `tx.query`.
|
||||
tx: Arc<Mutex<Option<crate::sqlite::OwnedTx>>>,
|
||||
}
|
||||
|
||||
impl TransactionImpl {
|
||||
async fn new(conn: trailbase_sqlite::Connection) -> Result<Self, TxError> {
|
||||
let db_lock = acquire_transaction_lock_with_timeout(conn, Duration::from_secs(2))
|
||||
.await
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
let tx = Arc::new(Mutex::new(Some(db_lock)));
|
||||
|
||||
{
|
||||
// Watcher task to unlock stuck transactions.
|
||||
let tx = Arc::downgrade(&tx);
|
||||
tokio::spawn(async move {
|
||||
const TIMEOUT: Duration = Duration::from_secs(60);
|
||||
tokio::time::sleep(TIMEOUT).await;
|
||||
if let Some(tx) = tx.upgrade() {
|
||||
// NOTE: Dropping the OwnedTx does all the cleanup of both issuing rollback and
|
||||
// releasing the DB lock.
|
||||
if tx.lock().await.take().is_some() {
|
||||
log::warn!("Pending WASM transaction lock found. Force-unlocked DB after {TIMEOUT:?}.");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(Self { tx });
|
||||
}
|
||||
|
||||
async fn commit(&mut self) -> Result<(), TxError> {
|
||||
let Some(tx) = self.tx.lock().await.take() else {
|
||||
return Err(TxError::Other("no pending tx".to_string()));
|
||||
};
|
||||
|
||||
// NOTE: this is the same as `tx.commit()` just w/o consuming.
|
||||
if let Err(err) = tx.borrow_dependent().execute_batch("COMMIT") {
|
||||
return Err(TxError::Other(err.to_string()));
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn rollback(&mut self) -> Result<(), TxError> {
|
||||
let Some(tx) = self.tx.lock().await.take() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// NOTE: this is the same as `tx.rollback()` just w/o consuming.
|
||||
tx.borrow_dependent()
|
||||
.execute_batch("ROLLBACK")
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn query(&self, query: String, params: Vec<Value>) -> Result<Vec<Vec<Value>>, TxError> {
|
||||
let lock = self.tx.lock().await;
|
||||
let Some(ref tx) = *lock else {
|
||||
return Err(TxError::Other("No open transaction".to_string()));
|
||||
};
|
||||
|
||||
let lock = tx.borrow_dependent();
|
||||
let mut stmt = lock
|
||||
let mut stmt = tx
|
||||
.borrow_dependent()
|
||||
.prepare(&query)
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
let params: Vec<_> = params.into_iter().map(to_sqlite_value).collect();
|
||||
params
|
||||
.bind(&mut stmt)
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
|
@ -251,14 +287,89 @@ impl self::trailbase::database::sqlite::Host for State {
|
|||
let rows = trailbase_sqlite::sqlite::from_rows(stmt.raw_query())
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
let values: Vec<_> = rows
|
||||
.into_iter()
|
||||
.map(|trailbase_sqlite::Row(row, _col)| {
|
||||
return row.into_iter().map(from_sqlite_value).collect::<Vec<_>>();
|
||||
})
|
||||
.collect();
|
||||
return Ok(
|
||||
rows
|
||||
.into_iter()
|
||||
.map(|trailbase_sqlite::Row(row, _col)| {
|
||||
return row.into_iter().map(from_sqlite_value).collect::<Vec<_>>();
|
||||
})
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
|
||||
return Ok(values);
|
||||
async fn execute(&self, query: String, params: Vec<Value>) -> Result<u64, TxError> {
|
||||
let lock = self.tx.lock().await;
|
||||
let Some(ref tx) = *lock else {
|
||||
return Err(TxError::Other("No open transaction".to_string()));
|
||||
};
|
||||
|
||||
let mut stmt = tx
|
||||
.borrow_dependent()
|
||||
.prepare(&query)
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
let params: Vec<_> = params.into_iter().map(to_sqlite_value).collect();
|
||||
params
|
||||
.bind(&mut stmt)
|
||||
.map_err(|err| TxError::Other(err.to_string()))?;
|
||||
|
||||
return stmt
|
||||
.raw_execute()
|
||||
.map_err(|err| TxError::Other(err.to_string()))
|
||||
.map(|n| n as u64);
|
||||
}
|
||||
}
|
||||
|
||||
impl self::trailbase::database::sqlite::HostTransaction for State {
|
||||
async fn new(&mut self) -> Result<Resource<Transaction>, wasmtime::Error> {
|
||||
let Some(conn) = self.shared.conn.clone() else {
|
||||
return Err(wasmtime::Error::msg("missing conn"));
|
||||
};
|
||||
|
||||
return Ok(self.table().push(TransactionImpl::new(conn).await?)?);
|
||||
}
|
||||
|
||||
async fn commit(
|
||||
&mut self,
|
||||
r: Resource<Transaction>,
|
||||
) -> Result<Result<(), TxError>, wasmtime::Error> {
|
||||
let resource: &mut TransactionImpl = self.resource_table.get_mut(&r)?;
|
||||
return Ok(resource.commit().await);
|
||||
}
|
||||
|
||||
async fn rollback(
|
||||
&mut self,
|
||||
r: Resource<Transaction>,
|
||||
) -> Result<Result<(), TxError>, wasmtime::Error> {
|
||||
let resource: &mut TransactionImpl = self.resource_table.get_mut(&r)?;
|
||||
return Ok(resource.rollback().await);
|
||||
}
|
||||
|
||||
async fn query(
|
||||
&mut self,
|
||||
r: Resource<Transaction>,
|
||||
query: String,
|
||||
params: Vec<Value>,
|
||||
) -> Result<Result<Vec<Vec<Value>>, TxError>, wasmtime::Error> {
|
||||
let resource: &TransactionImpl = self.resource_table.get(&r)?;
|
||||
return Ok(resource.query(query, params).await);
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&mut self,
|
||||
r: Resource<Transaction>,
|
||||
query: String,
|
||||
params: Vec<Value>,
|
||||
) -> Result<Result<u64, TxError>, wasmtime::Error> {
|
||||
let resource: &TransactionImpl = self.resource_table.get(&r)?;
|
||||
return Ok(resource.execute(query, params).await);
|
||||
}
|
||||
|
||||
fn drop(&mut self, r: Resource<Transaction>) -> Result<(), wasmtime::Error> {
|
||||
// NOTE: Dropping the OwnedTx does all the cleanup of both issuing rollback and
|
||||
// releasing the DB lock.
|
||||
self.resource_table.delete(r)?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ use std::path::PathBuf;
|
|||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::SystemTime;
|
||||
// use tokio::sync::Mutex;
|
||||
use tokio::task::JoinError;
|
||||
use trailbase_wasi_keyvalue::WasiKeyValueCtx;
|
||||
use wasmtime::component::{Component, Linker, ResourceTable};
|
||||
|
|
@ -23,6 +22,7 @@ use wasmtime_wasi_http::WasiHttpCtx;
|
|||
use wasmtime_wasi_http::p2::WasiHttpView;
|
||||
use wasmtime_wasi_http::p2::bindings::http::types::ErrorCode;
|
||||
|
||||
use crate::host::TransactionImpl;
|
||||
use crate::host::exports::trailbase::component::init_endpoint::Arguments;
|
||||
|
||||
pub use crate::host::exports::trailbase::component::init_endpoint::HttpMethodType;
|
||||
|
|
@ -222,8 +222,9 @@ impl StoreBuilder<State> for Arc<SharedState> {
|
|||
shared: self.clone(),
|
||||
},
|
||||
kv: WasiKeyValueCtx::new(self.kv_store.clone()),
|
||||
#[allow(deprecated)]
|
||||
tx: tokio::sync::Mutex::new(TransactionImpl::default()),
|
||||
shared: self.clone(),
|
||||
tx: tokio::sync::Mutex::new(None),
|
||||
},
|
||||
));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@
|
|||
/// <reference path="./interfaces/wasi-random-insecure-seed.d.ts" />
|
||||
/// <reference path="./interfaces/wasi-random-insecure.d.ts" />
|
||||
/// <reference path="./interfaces/wasi-random-random.d.ts" />
|
||||
declare module "trailbase:component/interfaces@0.1.0" {
|
||||
export type * as TrailbaseDatabaseSqlite010 from "trailbase:database/sqlite@0.1.0"; // import trailbase:database/sqlite@0.1.0
|
||||
declare module "trailbase:component/interfaces@0.1.1" {
|
||||
export type * as TrailbaseDatabaseSqlite011 from "trailbase:database/sqlite@0.1.1"; // import trailbase:database/sqlite@0.1.1
|
||||
export type * as WasiClocksMonotonicClock023 from "wasi:clocks/monotonic-clock@0.2.3"; // import wasi:clocks/monotonic-clock@0.2.3
|
||||
export type * as WasiClocksWallClock023 from "wasi:clocks/wall-clock@0.2.3"; // import wasi:clocks/wall-clock@0.2.3
|
||||
export type * as WasiFilesystemPreopens023 from "wasi:filesystem/preopens@0.2.3"; // import wasi:filesystem/preopens@0.2.3
|
||||
|
|
@ -35,6 +35,6 @@ declare module "trailbase:component/interfaces@0.1.0" {
|
|||
export type * as WasiRandomInsecure023 from "wasi:random/insecure@0.2.3"; // import wasi:random/insecure@0.2.3
|
||||
export type * as WasiRandomRandom023 from "wasi:random/random@0.2.3"; // import wasi:random/random@0.2.3
|
||||
export * as incomingHandler from "wasi:http/incoming-handler@0.2.3"; // export wasi:http/incoming-handler@0.2.3
|
||||
export * as initEndpoint from "trailbase:component/init-endpoint@0.1.0"; // export trailbase:component/init-endpoint@0.1.0
|
||||
export * as sqliteFunctionEndpoint from "trailbase:component/sqlite-function-endpoint@0.1.0"; // export trailbase:component/sqlite-function-endpoint@0.1.0
|
||||
export * as initEndpoint from "trailbase:component/init-endpoint@0.1.1"; // export trailbase:component/init-endpoint@0.1.1
|
||||
export * as sqliteFunctionEndpoint from "trailbase:component/sqlite-function-endpoint@0.1.1"; // export trailbase:component/sqlite-function-endpoint@0.1.1
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
declare module "trailbase:component/init-endpoint@0.1.0" {
|
||||
declare module "trailbase:component/init-endpoint@0.1.1" {
|
||||
export function initHttpHandlers(args: Arguments): HttpHandlers;
|
||||
export function initJobHandlers(args: Arguments): JobHandlers;
|
||||
export function initSqliteFunctions(args: Arguments): SqliteFunctions;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
declare module "trailbase:component/sqlite-function-endpoint@0.1.0" {
|
||||
declare module "trailbase:component/sqlite-function-endpoint@0.1.1" {
|
||||
export function dispatchScalarFunction(args: Arguments): Value;
|
||||
/**
|
||||
* WARNING: Evolving a variant currently breaks the ABI:
|
||||
|
|
|
|||
|
|
@ -1,13 +1,4 @@
|
|||
declare module "trailbase:database/sqlite@0.1.0" {
|
||||
/**
|
||||
* NOTE: Ideally, we'd use these but they can currently block guests, w/o a
|
||||
* better non-blocking event loop.
|
||||
* @since(version = 0.1.0)
|
||||
* execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
* @since(version = 0.1.0)
|
||||
* query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
* However, transactions have to be sync.
|
||||
*/
|
||||
declare module "trailbase:database/sqlite@0.1.1" {
|
||||
export function txBegin(): void;
|
||||
export function txCommit(): void;
|
||||
export function txRollback(): void;
|
||||
|
|
@ -50,4 +41,13 @@ declare module "trailbase:database/sqlite@0.1.0" {
|
|||
tag: "real";
|
||||
val: number;
|
||||
}
|
||||
|
||||
export class Transaction implements Disposable {
|
||||
constructor();
|
||||
commit(): void;
|
||||
rollback(): void;
|
||||
execute(query: string, params: Array<Value>): bigint;
|
||||
query(query: string, params: Array<Value>): Array<Array<Value>>;
|
||||
[Symbol.dispose](): void;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,6 @@
|
|||
import * as JSON from "../json";
|
||||
|
||||
import {
|
||||
txBegin,
|
||||
txCommit,
|
||||
txRollback,
|
||||
txExecute,
|
||||
txQuery,
|
||||
} from "trailbase:database/sqlite@0.1.0";
|
||||
import { Transaction as WasiTransaction } from "trailbase:database/sqlite@0.1.1";
|
||||
|
||||
import type { SqliteRequest } from "@common/SqliteRequest";
|
||||
import type { Value } from "./value";
|
||||
|
|
@ -19,29 +13,31 @@ import {
|
|||
toWitValue,
|
||||
} from "./value";
|
||||
|
||||
export type { Value } from "trailbase:database/sqlite@0.1.0";
|
||||
export type { Value } from "trailbase:database/sqlite@0.1.1";
|
||||
export { escape } from "./value";
|
||||
|
||||
export class Transaction {
|
||||
private readonly tx: WasiTransaction;
|
||||
|
||||
constructor() {
|
||||
txBegin();
|
||||
this.tx = new WasiTransaction();
|
||||
}
|
||||
|
||||
query(query: string, params: Value[]): Value[][] {
|
||||
return txQuery(query, params.map(toWitValue)).map((row) =>
|
||||
row.map(fromWitValue),
|
||||
);
|
||||
return this.tx
|
||||
.query(query, params.map(toWitValue))
|
||||
.map((row) => row.map(fromWitValue));
|
||||
}
|
||||
|
||||
execute(query: string, params: Value[]): number {
|
||||
return Number(txExecute(query, params.map(toWitValue)));
|
||||
return Number(this.tx.execute(query, params.map(toWitValue)));
|
||||
}
|
||||
|
||||
commit(): void {
|
||||
txCommit();
|
||||
this.tx.commit();
|
||||
}
|
||||
rollback(): void {
|
||||
txRollback();
|
||||
this.tx.rollback();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import type { Value as WitValue } from "trailbase:database/sqlite@0.1.0";
|
||||
import type { Value as WitValue } from "trailbase:database/sqlite@0.1.1";
|
||||
import { SqlValue } from "@common/SqlValue";
|
||||
import { Blob } from "@common/Blob";
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import {
|
|||
IncomingBody,
|
||||
Scheme as WasiScheme,
|
||||
} from "wasi:http/types@0.2.3";
|
||||
import type { HttpMethodType } from "trailbase:component/init-endpoint@0.1.0";
|
||||
import type { HttpMethodType } from "trailbase:component/init-endpoint@0.1.1";
|
||||
import type { HttpContextUser } from "@common/HttpContextUser";
|
||||
|
||||
export type Scheme = "HTTP" | "HTTPS" | "other";
|
||||
|
|
|
|||
|
|
@ -4,12 +4,12 @@ import type {
|
|||
HttpHandlers,
|
||||
JobHandlers,
|
||||
SqliteFunctions,
|
||||
} from "trailbase:component/init-endpoint@0.1.0";
|
||||
} from "trailbase:component/init-endpoint@0.1.1";
|
||||
import type {
|
||||
Arguments as SqliteArguments,
|
||||
Error as SqliteError,
|
||||
dispatchScalarFunction,
|
||||
} from "trailbase:component/sqlite-function-endpoint@0.1.0";
|
||||
} from "trailbase:component/sqlite-function-endpoint@0.1.1";
|
||||
import type { HttpHandlerInterface } from "./http";
|
||||
import type { JobHandlerInterface } from "./job";
|
||||
import { buildIncomingHttpHandler } from "./http/incoming";
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:database@0.1.0;
|
||||
package trailbase:database@0.1.1;
|
||||
|
||||
interface sqlite {
|
||||
// WARNING: Evolving a variant currently breaks the ABI:
|
||||
|
|
@ -15,24 +15,38 @@ interface sqlite {
|
|||
real(f64),
|
||||
}
|
||||
|
||||
// NOTE: Ideally, we'd use these but they can currently block guests, w/o a
|
||||
// better non-blocking event loop.
|
||||
// NOTE: Post WASIp3 (and guest support in place, e.g. wstd) with native
|
||||
// async/future support, DB queries should be routed through below functions.
|
||||
//
|
||||
// @since(version = 0.1.0)
|
||||
// execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
// execute: func(query: string, params: list<value>) -> future<result<u64, tx-error>>;
|
||||
// @since(version = 0.1.0)
|
||||
// query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
// query: func(query: string, params: list<value>) -> future<result<list<list<value>>, tx-error>>;
|
||||
|
||||
@since(version = 0.1.1)
|
||||
resource transaction {
|
||||
constructor();
|
||||
commit: func() -> result<_, tx-error>;
|
||||
rollback: func() -> result<_, tx-error>;
|
||||
execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
}
|
||||
|
||||
// However, transactions have to be sync.
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-begin: func() -> result<_, tx-error>;
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-commit: func() -> result<_, tx-error>;
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-rollback: func() -> result<_, tx-error>;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-execute: func(query: string, params: list<value>) -> result<u64, tx-error>;
|
||||
@since(version = 0.1.0)
|
||||
@deprecated(version = 0.1.1)
|
||||
tx-query: func(query: string, params: list<value>) -> result<list<list<value>>, tx-error>;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:component@0.1.0;
|
||||
package trailbase:component@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
interface init-endpoint {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:component@0.1.0;
|
||||
package trailbase:component@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
interface sqlite-function-endpoint {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package trailbase:component@0.1.0;
|
||||
package trailbase:component@0.1.1;
|
||||
|
||||
// Note, everything is from the guest's perspective, i.e.:
|
||||
// * imports are provided by the host
|
||||
|
|
@ -29,7 +29,7 @@ world interfaces {
|
|||
|
||||
// TrailBase's interfaces:
|
||||
@since(version = 0.1.0)
|
||||
include trailbase:database/interfaces@0.1.0;
|
||||
include trailbase:database/interfaces@0.1.1;
|
||||
|
||||
@since(version = 0.1.0)
|
||||
export init-endpoint;
|
||||
|
|
|
|||
Loading…
Reference in a new issue