use crate::{ error::Result, sync::{repo, CommitId, LogWalker, LogWalkerFilter, RepoPath}, AsyncGitNotification, }; use crossbeam_channel::Sender; use git2::Oid; use scopetime::scope_time; use std::{ sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, }, thread, time::Duration, }; /// #[derive(PartialEq)] pub enum FetchStatus { /// previous fetch still running Pending, /// no change expected NoChange, /// new walk was started Started, } /// pub struct AsyncLog { current: Arc>>, sender: Sender, pending: Arc, background: Arc, filter: Option, repo: RepoPath, } static LIMIT_COUNT: usize = 3000; static SLEEP_FOREGROUND: Duration = Duration::from_millis(2); static SLEEP_BACKGROUND: Duration = Duration::from_millis(1000); impl AsyncLog { /// pub fn new( repo: RepoPath, sender: &Sender, filter: Option, ) -> Self { Self { repo, current: Arc::new(Mutex::new(Vec::new())), sender: sender.clone(), pending: Arc::new(AtomicBool::new(false)), background: Arc::new(AtomicBool::new(false)), filter, } } /// pub fn count(&mut self) -> Result { Ok(self.current.lock()?.len()) } /// pub fn get_slice( &self, start_index: usize, amount: usize, ) -> Result> { let list = self.current.lock()?; let list_len = list.len(); let min = start_index.min(list_len); let max = min + amount; let max = max.min(list_len); Ok(list[min..max].to_vec()) } /// pub fn position(&self, id: CommitId) -> Result> { let list = self.current.lock()?; let position = list.iter().position(|&x| x == id); Ok(position) } /// pub fn is_pending(&self) -> bool { self.pending.load(Ordering::Relaxed) } /// pub fn set_background(&mut self) { self.background.store(true, Ordering::Relaxed); } /// fn current_head(&self) -> Result { Ok(self .current .lock()? .first() .map_or(Oid::zero().into(), |f| *f)) } /// fn head_changed(&self) -> Result { if let Ok(head) = repo(&self.repo)?.head() { if let Some(head) = head.target() { return Ok(head != self.current_head()?.into()); } } Ok(false) } /// pub fn fetch(&mut self) -> Result { self.background.store(false, Ordering::Relaxed); if self.is_pending() { return Ok(FetchStatus::Pending); } if !self.head_changed()? { return Ok(FetchStatus::NoChange); } self.clear()?; let arc_current = Arc::clone(&self.current); let sender = self.sender.clone(); let arc_pending = Arc::clone(&self.pending); let arc_background = Arc::clone(&self.background); let filter = self.filter.clone(); let repo = self.repo.clone(); self.pending.store(true, Ordering::Relaxed); rayon_core::spawn(move || { scope_time!("async::revlog"); Self::fetch_helper( &repo, &arc_current, &arc_background, &sender, filter, ) .expect("failed to fetch"); arc_pending.store(false, Ordering::Relaxed); Self::notify(&sender); }); Ok(FetchStatus::Started) } fn fetch_helper( repo_path: &RepoPath, arc_current: &Arc>>, arc_background: &Arc, sender: &Sender, filter: Option, ) -> Result<()> { let mut entries = Vec::with_capacity(LIMIT_COUNT); let r = repo(repo_path)?; let mut walker = LogWalker::new(&r, LIMIT_COUNT)?.filter(filter); loop { entries.clear(); let res_is_err = walker.read(&mut entries).is_err(); if !res_is_err { let mut current = arc_current.lock()?; current.extend(entries.iter()); } if res_is_err || entries.len() <= 1 { break; } Self::notify(sender); let sleep_duration = if arc_background.load(Ordering::Relaxed) { SLEEP_BACKGROUND } else { SLEEP_FOREGROUND }; thread::sleep(sleep_duration); } Ok(()) } fn clear(&mut self) -> Result<()> { self.current.lock()?.clear(); Ok(()) } fn notify(sender: &Sender) { sender .send(AsyncGitNotification::Log) .expect("error sending"); } }