From 0e1d83fb0227df0da19cebefc75ec634815b358a Mon Sep 17 00:00:00 2001 From: extrawurst <776816+extrawurst@users.noreply.github.com> Date: Mon, 4 Sep 2023 20:55:17 +0200 Subject: [PATCH] Parallelize log search (#1874) --- CHANGELOG.md | 3 ++ Cargo.lock | 11 +++++ asyncgit/Cargo.toml | 1 + asyncgit/src/error.rs | 4 ++ asyncgit/src/filter_commits.rs | 77 +++++++++++++++++++++++++--------- 5 files changed, 76 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70d9fbe4..5610820c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Changed +* parallelise log search - performance gain ~100% ([#1869](https://github.com/extrawurst/gitui/issues/1869)) + ## [0.24.2] - 2023-09-03 ### Fixes diff --git a/Cargo.lock b/Cargo.lock index 56e60f3e..0eb3a365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,6 +109,7 @@ dependencies = [ "log", "openssl-sys", "pretty_assertions", + "rayon", "rayon-core", "scopetime", "serde", @@ -1310,6 +1311,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + [[package]] name = "rayon-core" version = "1.11.0" diff --git a/asyncgit/Cargo.toml b/asyncgit/Cargo.toml index df03ba17..8f773c83 100644 --- a/asyncgit/Cargo.toml +++ b/asyncgit/Cargo.toml @@ -22,6 +22,7 @@ log = "0.4" # git2 = { git="https://github.com/extrawurst/git2-rs.git", rev="fc13dcc", features = ["vendored-openssl"]} # pinning to vendored openssl, using the git2 feature this gets lost with new resolver openssl-sys = { version = '0.9', features = ["vendored"], optional = true } +rayon = "1.7" rayon-core = "1.11" scopetime = { path = "../scopetime", version = "0.1" } serde = { version = "1.0", features = ["derive"] } diff --git a/asyncgit/src/error.rs b/asyncgit/src/error.rs index e171d04a..b9d87ea3 100644 --- a/asyncgit/src/error.rs +++ b/asyncgit/src/error.rs @@ -84,6 +84,10 @@ pub enum Error { /// #[error("not on a branch")] NoBranch, + + /// + #[error("rayon error: {0}")] + ThreadPool(#[from] rayon_core::ThreadPoolBuildError), } /// diff --git a/asyncgit/src/filter_commits.rs b/asyncgit/src/filter_commits.rs index 3f5799b0..3746b250 100644 --- a/asyncgit/src/filter_commits.rs +++ b/asyncgit/src/filter_commits.rs @@ -1,3 +1,8 @@ +use rayon::{ + prelude::ParallelIterator, + slice::{ParallelSlice, ParallelSliceMut}, +}; + use crate::{ asyncjob::{AsyncJob, RunParams}, error::Result, @@ -5,7 +10,7 @@ use crate::{ AsyncGitNotification, ProgressPercent, }; use std::{ - sync::{Arc, Mutex}, + sync::{atomic::AtomicUsize, Arc, Mutex}, time::{Duration, Instant}, }; @@ -69,41 +74,73 @@ impl AsyncCommitFilterJob { commits: Vec, params: &RunParams, ) -> JobState { - let response = sync::repo(repo_path) - .map(|repo| self.filter_commits(&repo, commits, params)) + let result = self + .filter_commits(repo_path, commits, params) .map(|(start, result)| CommitFilterResult { result, duration: start.elapsed(), }); - JobState::Response(response) + JobState::Response(result) } fn filter_commits( &self, - repo: &git2::Repository, + repo_path: &RepoPath, commits: Vec, params: &RunParams, - ) -> (Instant, Vec) { + ) -> Result<(Instant, Vec)> { let total_amount = commits.len(); let start = Instant::now(); - let result = commits - .into_iter() - .enumerate() - .filter_map(|(idx, c)| { - Self::update_progress( - params, - ProgressPercent::new(idx, total_amount), - ); + //note: for some reason >4 threads degrades search performance + let pool = + rayon::ThreadPoolBuilder::new().num_threads(4).build()?; - (*self.filter)(repo, &c) - .ok() - .and_then(|res| res.then_some(c)) - }) - .collect::>(); + let idx = AtomicUsize::new(0); - (start, result) + let mut result = pool.install(|| { + commits + .into_iter() + .enumerate() + .collect::>() + .par_chunks(1000) + .filter_map(|c| { + //TODO: error log repo open errors + sync::repo(repo_path).ok().map(|repo| { + c.iter() + .filter_map(|(e, c)| { + let idx = idx.fetch_add( + 1, + std::sync::atomic::Ordering::Relaxed, + ); + + Self::update_progress( + params, + ProgressPercent::new( + idx, + total_amount, + ), + ); + + (*self.filter)(&repo, c) + .ok() + .and_then(|res| { + res.then_some((*e, *c)) + }) + }) + .collect::>() + }) + }) + .flatten() + .collect::>() + }); + + result.par_sort_by(|a, b| a.0.cmp(&b.0)); + + let result = result.into_iter().map(|c| c.1).collect(); + + Ok((start, result)) } fn update_progress(