Reduce locking during config access in reactivity library - use ArcSwap instead - and use it downstream.

Previously accessing the config would require claiming an exclusive lock, sometimes warranting an expensive clone.
This commit is contained in:
Sebastian Jeltsch 2026-04-08 19:48:04 +02:00
parent 57375d29e8
commit 083a2880b2
15 changed files with 123 additions and 123 deletions

1
Cargo.lock generated
View file

@ -6810,6 +6810,7 @@ dependencies = [
name = "trailbase-reactive"
version = "0.1.0"
dependencies = [
"arc-swap",
"parking_lot",
"paste",
]

View file

@ -250,7 +250,7 @@ async fn async_main(
let email = Email::new(&state, &cmd.to, cmd.subject, cmd.body)?;
email.send().await?;
let c = state.get_config().email;
let c = state.get_config().email.clone();
match (c.smtp_host, c.smtp_port, c.smtp_username, c.smtp_password) {
(Some(host), Some(port), Some(username), Some(_)) => {
println!("Sent email using: {username}@{host}:{port}");

View file

@ -168,7 +168,7 @@ pub async fn alter_table_handler(
// Fix configuration: update all table references by existing APIs.
if source_table_schema.name != target_table_name {
let mut config = state.get_config();
let mut config = (*state.get_config()).clone();
let old_config_hash = hash_config(&config);
for api in &mut config.record_apis {

View file

@ -97,7 +97,7 @@ pub async fn drop_table_handler(
// Fix configuration: remove all APIs reference the no longer existing table.
{
let mut config = state.get_config();
let mut config = (*state.get_config()).clone();
let old_config_hash = hash_config(&config);
config.record_apis.retain(|c| {

View file

@ -323,20 +323,15 @@ impl AppState {
return r;
}
pub fn get_config(&self) -> Config {
return self.state.config.value();
pub fn get_config(&self) -> Arc<Config> {
return self.state.config.ptr();
}
pub fn access_config<F, T>(&self, f: F) -> T
where
F: FnOnce(&Config) -> T,
{
let mut result: Option<T> = None;
let r = &mut result;
self.state.config.with_value(move |c| {
let _ = r.insert(f(c));
});
return result.expect("inserted");
return f(&self.state.config.ptr());
}
pub async fn validate_and_update_config(

View file

@ -114,12 +114,13 @@ pub(crate) fn validate_redirect<T: AsRef<str>>(
) -> Result<Option<T>, AuthError> {
if let Some(ref redirect_uri) = redirect_uri {
let site: &Option<url::Url> = &state.site_url();
let custom_uri_schemes = state.access_config(|c| c.auth.custom_uri_schemes.clone());
let config = state.get_config();
validate_redirect_impl(
site.as_ref(),
&custom_uri_schemes,
&[],
&config.auth.custom_uri_schemes,
&config.auth.redirect_uri_allowlist,
redirect_uri.as_ref(),
state.dev_mode(),
)?;

View file

@ -118,8 +118,8 @@ impl Email {
) -> Result<Self, EmailError> {
let to: Mailbox = email_address.parse()?;
let (server_config, template) =
state.access_config(|c| (c.server.clone(), c.email.user_verification_template.clone()));
let config = state.get_config();
let template = &config.email.user_verification_template;
let subject_template = template
.as_ref()
@ -143,13 +143,13 @@ impl Email {
let subject = env
.template_from_named_str("subject", subject_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
EMAIL => email_address,
})?;
let body = env
.template_from_named_str("body", body_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
CODE => email_verification_token,
EMAIL => email_address,
REDIRECT_URI => redirect_uri,
@ -168,8 +168,8 @@ impl Email {
redirect_uri: Option<&str>,
) -> Result<Self, EmailError> {
let to: Mailbox = email_address.parse()?;
let (server_config, template) =
state.access_config(|c| (c.server.clone(), c.email.change_email_template.clone()));
let config = state.get_config();
let template = &config.email.change_email_template;
let subject_template = template
.as_ref()
@ -193,13 +193,13 @@ impl Email {
let subject = env
.template_from_named_str("subject", subject_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
EMAIL => email_address,
})?;
let body = env
.template_from_named_str("body", body_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
CODE => email_verification_token,
EMAIL => email_address,
REDIRECT_URI => redirect_uri,
@ -217,8 +217,8 @@ impl Email {
password_reset_token: &str,
) -> Result<Self, EmailError> {
let to: Mailbox = email_address.parse()?;
let (server_config, template) =
state.access_config(|c| (c.server.clone(), c.email.password_reset_template.clone()));
let config = state.get_config();
let template = &config.email.password_reset_template;
let subject_template = template
.as_ref()
@ -237,13 +237,13 @@ impl Email {
let subject = env
.template_from_named_str("subject", subject_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
EMAIL => email_address,
})?;
let body = env
.template_from_named_str("body", body_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
CODE => password_reset_token,
EMAIL => email_address,
SITE_URL => site_url.origin().ascii_serialization(),
@ -260,8 +260,8 @@ impl Email {
redirect_uri: Option<&str>,
) -> Result<Self, EmailError> {
let to: Mailbox = email_address.parse()?;
let (server_config, template) =
state.access_config(|c| (c.server.clone(), c.email.otp_template.clone()));
let config = state.get_config();
let template = &config.email.otp_template;
let subject_template = template
.as_ref()
@ -277,13 +277,13 @@ impl Email {
let subject = env
.template_from_named_str("subject", subject_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
EMAIL => email_address,
})?;
let body = env
.template_from_named_str("body", body_template)?
.render(context! {
APP_NAME => server_config.application_name,
APP_NAME => &config.server.application_name,
CODE => otp_code,
EMAIL => email_address,
REDIRECT_URI => redirect_uri,
@ -294,12 +294,14 @@ impl Email {
}
fn get_sender(state: &AppState) -> Result<Mailbox, EmailError> {
let (sender_address, sender_name) =
state.access_config(|c| (c.email.sender_address.clone(), c.email.sender_name.clone()));
let config = state.get_config();
let address = config
.email
.sender_address
.clone()
.unwrap_or_else(|| fallback_sender(&state.site_url()));
let address = sender_address.unwrap_or_else(|| fallback_sender(&state.site_url()));
if let Some(ref name) = sender_name {
if let Some(ref name) = config.email.sender_name {
return Ok(format!("{name} <{address}>").parse::<Mailbox>()?);
}
return Ok(address.parse::<Mailbox>()?);

View file

@ -82,7 +82,7 @@ async fn subscribe_to_record_test() {
// Make sure updating config doesn't drop subscriptions.
state
.validate_and_update_config(state.get_config(), None)
.validate_and_update_config((*state.get_config()).clone(), None)
.await
.unwrap();

View file

@ -17,7 +17,7 @@ mod tests {
acls: Acls,
access_rules: AccessRules,
) -> Result<(), crate::config::ConfigError> {
let mut config = state.get_config();
let mut config = (*state.get_config()).clone();
config.record_apis.push(RecordApiConfig {
name: Some(api_name.to_string()),
@ -46,7 +46,7 @@ mod tests {
state: &AppState,
api: RecordApiConfig,
) -> Result<(), crate::config::ConfigError> {
let mut config = state.get_config();
let mut config = (*state.get_config()).clone();
config.record_apis.push(api);
return state.validate_and_update_config(config, None).await;
}

View file

@ -17,7 +17,7 @@ async fn add_record_api_config(
state: &AppState,
api: RecordApiConfig,
) -> Result<(), anyhow::Error> {
let mut config = state.get_config();
let mut config = (*state.get_config()).clone();
config.record_apis.push(api);
return Ok(state.validate_and_update_config(config, None).await?);
}

View file

@ -8,5 +8,6 @@ repository = "https://github.com/trailbaseio/trailbase"
readme = "../../README.md"
[dependencies]
arc-swap = "1.9.1"
parking_lot.workspace = true
paste = "1"

View file

@ -2,8 +2,9 @@ use paste::paste;
use crate::{Merge, Reactive};
impl<T: Clone + Default + Send + 'static> Merge for &Reactive<T> {
impl<T: Clone + Default + Send + Sync + 'static> Merge for &Reactive<T> {
type Output = T;
fn merge(self) -> Reactive<Self::Output> {
self.clone()
}
@ -14,7 +15,7 @@ macro_rules! impl_merge_for_nested_tuple {
impl < $( [<T $i>], )* > Merge for ( $( [<T $i>], )* )
where
$( [<T $i>]: Merge, ) *
$( [<T $i>]::Output: Clone + Default + Send + 'static, ) *
$( [<T $i>]::Output: Clone + Default + Send + Sync + 'static, ) *
{
body!($($i),*);
}
@ -37,10 +38,18 @@ macro_rules! body {
// eg: (&Reactive<String>, &Reactive<usize>, ...) -> Reactive<(String, usize, ...)>
// so if the parent reactive changes, the 'combined' will definitely change.
// Therefore 'unchecked' is fine.
move |val| combined.update_inplace_unchecked(|c| c.$i = val.clone())
//
// move |val| combined.update_inplace_unchecked(|c| c.$i = val.clone())
move |val: &std::sync::Arc<_>| {
combined.update_unchecked(|c| {
let mut x = c.clone();
x.$i = (**val).clone();
return x;
});
}
}); )*
combined
return combined;
}
}};
}
@ -55,9 +64,9 @@ impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
// impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
// impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
// impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13);
// impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
// impl_merge_for_nested_tuple!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);

View file

@ -18,7 +18,6 @@ use crate::Reactive;
/// let r3: Reactive<f64> = Reactive::default();
///
/// let r: Reactive<(usize, String, f64)> = (&r1, &r2, &r3).merge();
///
/// ```
pub trait Merge {
type Output;

View file

@ -1,13 +1,14 @@
use arc_swap::ArcSwap;
use parking_lot::Mutex;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::sync::Arc;
type Observer<T> = Box<dyn FnMut(&T) + Send>;
type Observer<T> = Box<dyn FnMut(&Arc<T>) + Send + Sync>;
#[derive(Default)]
struct State<T> {
value: Mutex<T>,
value: ArcSwap<T>,
observers: Mutex<Vec<Observer<T>>>,
}
@ -28,12 +29,21 @@ impl<T> Reactive<T> {
pub fn new(value: T) -> Self {
Self {
state: Arc::new(State {
value: Mutex::new(value),
value: ArcSwap::from_pointee(value),
observers: Default::default(),
}),
}
}
// pub fn from(value: Arc<T>) -> Self {
// Self {
// state: Arc::new(State {
// value: ArcSwap::from(value),
// observers: Default::default(),
// }),
// }
// }
/// Returns a clone/copy of the value inside the reactive
///
/// # Examples
@ -47,7 +57,14 @@ impl<T> Reactive<T> {
where
T: Clone,
{
return self.state.value.lock().clone();
return (**self.state.value.load()).clone();
}
pub fn ptr(&self) -> Arc<T>
where
T: Clone,
{
return self.state.value.load_full();
}
/// Perform some action with the reference to the inner value.
@ -60,7 +77,7 @@ impl<T> Reactive<T> {
/// r.with_value(|s| println!("{}", s));
/// ```
pub fn with_value(&self, f: impl FnOnce(&T)) {
f(self.state.value.lock().deref());
f(&self.state.value.load());
}
/// derive a new child reactive that changes whenever the parent reactive changes.
@ -75,14 +92,14 @@ impl<T> Reactive<T> {
///
/// assert_eq!(15, d.value());
/// ```
pub fn derive<U: Clone + PartialEq + Send + 'static>(
pub fn derive<U: Clone + PartialEq + Send + Sync + 'static>(
&self,
f: impl Fn(&T) -> U + Send + 'static,
f: impl Fn(&T) -> U + Send + Sync + 'static,
) -> Reactive<U>
where
T: Clone,
{
let derived_val = f(self.state.value.lock().deref());
let derived_val = f(&self.state.value.load());
let derived: Reactive<U> = Reactive::new(derived_val);
self.add_observer({
@ -94,14 +111,14 @@ impl<T> Reactive<T> {
}
// Unlike Reactive::derive, doesn't require PartialEq.
pub fn derive_unchecked<U: Clone + Send + 'static>(
pub fn derive_unchecked<U: Clone + Send + Sync + 'static>(
&self,
f: impl Fn(&T) -> U + Send + 'static,
f: impl Fn(&T) -> U + Send + Sync + 'static,
) -> Reactive<U>
where
T: Clone,
{
let derived_val = f(self.state.value.lock().deref());
let derived_val = f(&self.state.value.load());
let derived: Reactive<U> = Reactive::new(derived_val);
self.add_observer({
@ -122,8 +139,8 @@ impl<T> Reactive<T> {
/// let r = Reactive::new(String::from("🦀"));
/// r.add_observer(|val| println!("{}", val));
/// ```
pub fn add_observer(&self, f: impl FnMut(&T) + Send + 'static) {
return self.state.observers.lock().push(Box::new(f));
pub fn add_observer(&self, mut f: impl FnMut(&Arc<T>) + Send + Sync + 'static) {
return self.state.observers.lock().push(Box::new(move |v| f(v)));
}
/// Clears all observers from the reactive.
@ -184,14 +201,14 @@ impl<T> Reactive<T> {
where
T: PartialEq,
{
let mut guard = self.state.value.lock();
let val = guard.deref_mut();
let val = &**self.state.value.load();
let new_val = f(val);
if &new_val != val {
*val = new_val;
let new_val = Arc::new(new_val);
self.state.value.store(new_val.clone());
for obs in self.state.observers.lock().deref_mut() {
obs(val);
obs(&new_val);
}
}
}
@ -224,27 +241,24 @@ impl<T> Reactive<T> {
///
/// It is also faster than `update` for that reason
pub fn update_unchecked(&self, f: impl FnOnce(&T) -> T) {
let mut guard = self.state.value.lock();
let val = guard.deref_mut();
*val = f(val);
let val = &**self.state.value.load();
let new_val = Arc::new(f(val));
self.state.value.store(new_val.clone());
for obs in self.state.observers.lock().deref_mut() {
obs(val);
obs(&new_val);
}
}
/// Updates the value inside inplace without creating a new clone/copy and notify
/// all the observers by calling the added observer functions in the sequence they were added
/// without checking if the value is changed after applying the provided function.
pub(crate) fn update_inplace_unchecked(&self, f: impl FnOnce(&mut T)) {
let mut guard = self.state.value.lock();
let val = guard.deref_mut();
f(val);
for obs in self.state.observers.lock().deref_mut() {
obs(val);
}
}
// pub fn update_unchecked_ptr(&self, f: impl FnOnce(&Arc<T>) -> T) {
// let val = self.state.value.load();
// let new_val = Arc::new(f(&val));
// self.state.value.store(new_val.clone());
//
// for obs in self.state.observers.lock().deref_mut() {
// obs(&new_val);
// }
// }
/// Notify all the observers of the current value by calling the
/// added observer functions in the sequence they were added
@ -259,10 +273,9 @@ impl<T> Reactive<T> {
/// r.notify();
/// ```
pub fn notify(&self) {
let guard = self.state.value.lock();
let val = guard.deref();
let val = self.state.value.load();
for obs in self.state.observers.lock().deref_mut() {
obs(val);
obs(&val);
}
}
}
@ -270,7 +283,7 @@ impl<T> Reactive<T> {
impl<T: Debug> Debug for Reactive<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Reactive")
.field(self.state.value.lock().deref())
.field(&self.state.value.load())
.finish()
}
}

View file

@ -1,3 +1,5 @@
use parking_lot::Mutex;
use std::sync::Arc;
use trailbase_reactive::{Merge, Reactive};
#[test]
@ -34,11 +36,11 @@ fn can_update() {
fn update_only_notifies_observers_when_value_changes() {
let r: Reactive<String> = Reactive::default();
let changes: std::sync::Arc<std::sync::Mutex<Vec<String>>> = Default::default();
let changes: Arc<Mutex<Vec<String>>> = Default::default();
r.add_observer({
let changes = changes.clone();
move |val| changes.lock().unwrap().push(val.clone())
move |val| changes.lock().push((**val).clone())
});
r.update(|_| String::from("a"));
@ -48,17 +50,17 @@ fn update_only_notifies_observers_when_value_changes() {
let expected = vec![String::from("a"), String::from("b")];
assert_eq!(expected, changes.lock().unwrap().clone());
assert_eq!(expected, changes.lock().clone());
}
#[test]
fn update_unchecked_notifies_observers_without_checking_if_value_changed() {
let r: Reactive<String> = Reactive::default();
let changes: std::sync::Arc<std::sync::Mutex<Vec<String>>> = Default::default();
let changes: Arc<Mutex<Vec<String>>> = Default::default();
r.add_observer({
let changes = changes.clone();
move |val| changes.lock().unwrap().push(val.clone())
move |val| changes.lock().push((**val).clone())
});
r.update_unchecked(|_| String::from("a"));
@ -73,40 +75,17 @@ fn update_unchecked_notifies_observers_without_checking_if_value_changed() {
String::from("b"),
];
assert_eq!(expected, changes.lock().unwrap().clone());
assert_eq!(expected, changes.lock().clone());
}
// #[test]
// fn update_inplace_unchecked_notifies_observers_without_checking_if_value_changed() {
// let r: Reactive<String> = Reactive::default();
//
// let changes: std::sync::Arc<std::sync::Mutex<Vec<String>>> = Default::default();
//
// r.add_observer({
// let changes = changes.clone();
// move |val| changes.lock().unwrap().push(val.clone())
// });
//
// r.update_inplace_unchecked(|s| s.push('a'));
// r.update_inplace_unchecked(|s| {
// s.push('x');
// s.pop();
// });
// r.update_inplace_unchecked(|s| s.push('b'));
//
// let expected = vec![String::from("a"), String::from("a"), String::from("ab")];
//
// assert_eq!(expected, changes.lock().unwrap().clone());
// }
#[test]
fn can_add_observers() {
let r: Reactive<String> = Reactive::default();
let changes: std::sync::Arc<std::sync::Mutex<Vec<String>>> = Default::default();
let changes: Arc<Mutex<Vec<String>>> = Default::default();
r.add_observer({
let changes = changes.clone();
move |val| changes.lock().unwrap().push(val.clone())
move |val| changes.lock().push((**val).clone())
});
r.update(|_| String::from("a"));
@ -114,7 +93,7 @@ fn can_add_observers() {
let expected = vec![String::from("a"), String::from("b")];
assert_eq!(expected, changes.lock().unwrap().clone());
assert_eq!(expected, changes.lock().clone());
}
#[test]
@ -184,11 +163,11 @@ fn can_merge() {
fn can_notify() {
let r: Reactive<String> = Reactive::new(String::from("🦀"));
let changes: std::sync::Arc<std::sync::Mutex<Vec<String>>> = Default::default();
let changes: Arc<Mutex<Vec<String>>> = Default::default();
r.add_observer({
let changes = changes.clone();
move |val| changes.lock().unwrap().push(val.clone())
move |val| changes.lock().push((**val).clone())
});
r.notify();
@ -197,5 +176,5 @@ fn can_notify() {
let expected = vec![String::from("🦀"), String::from("🦀"), String::from("🦀")];
assert_eq!(expected, changes.lock().unwrap().clone());
assert_eq!(expected, changes.lock().clone());
}