From edf798b91bc3c507641456d9eb1b608c873cb3a0 Mon Sep 17 00:00:00 2001 From: Chigozie Joshua <36326251+guzzit@users.noreply.github.com> Date: Mon, 4 Jul 2022 09:45:26 +0100 Subject: [PATCH] Refactor async tags (follow up PR) (#1245) * AsyncTags * Verify hash of last hash * Make default Tags notification 'FinishUnchanged' * fixed bug preventing tags from displaying Co-authored-by: Martijn van Eijk Co-authored-by: extrawurst --- asyncgit/src/tags.rs | 161 +++++++++++++++++++++++++------------------ 1 file changed, 95 insertions(+), 66 deletions(-) diff --git a/asyncgit/src/tags.rs b/asyncgit/src/tags.rs index 0a7ad013..d1ec08d1 100644 --- a/asyncgit/src/tags.rs +++ b/asyncgit/src/tags.rs @@ -1,4 +1,5 @@ use crate::{ + asyncjob::{AsyncJob, AsyncSingleJob, RunParams}, error::Result, hash, sync::{self, RepoPath}, @@ -6,26 +7,23 @@ use crate::{ }; use crossbeam_channel::Sender; use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; use sync::Tags; /// #[derive(Default, Clone)] -struct TagsResult { +pub struct TagsResult { hash: u64, tags: Tags, } /// pub struct AsyncTags { - last: Arc>>, + last: Option<(Instant, TagsResult)>, sender: Sender, - pending: Arc, + job: AsyncSingleJob, repo: RepoPath, } @@ -37,30 +35,27 @@ impl AsyncTags { ) -> Self { Self { repo, - last: Arc::new(Mutex::new(None)), + last: None, sender: sender.clone(), - pending: Arc::new(AtomicUsize::new(0)), + job: AsyncSingleJob::new(sender.clone()), } } /// last fetched result - pub fn last(&mut self) -> Result> { - let last = self.last.lock()?; - - Ok(last.clone().map(|last| last.1.tags)) + pub fn last(&self) -> Result> { + Ok(self.last.as_ref().map(|result| result.1.tags.clone())) } /// pub fn is_pending(&self) -> bool { - self.pending.load(Ordering::Relaxed) > 0 + self.job.is_pending() } - fn is_outdated(&self, dur: Duration) -> Result { - let last = self.last.lock()?; - - Ok(last + /// + fn is_outdated(&self, dur: Duration) -> bool { + self.last .as_ref() - .map_or(true, |(last_time, _)| last_time.elapsed() > dur)) + .map_or(true, |(last_time, _)| last_time.elapsed() > dur) } /// @@ -71,72 +66,106 @@ impl AsyncTags { ) -> Result<()> { log::trace!("request"); - if !force && self.is_pending() { + if !force && self.job.is_pending() { return Ok(()); } - let outdated = self.is_outdated(dur)?; + let outdated = self.is_outdated(dur); if !force && !outdated { return Ok(()); } - let arc_last = Arc::clone(&self.last); - let sender = self.sender.clone(); - let arc_pending = Arc::clone(&self.pending); - - self.pending.fetch_add(1, Ordering::Relaxed); let repo = self.repo.clone(); - rayon_core::spawn(move || { - let notify = Self::getter(&repo, &arc_last, outdated) - .expect("error getting tags"); + if outdated { + self.job.spawn(AsyncTagsJob::new( + self.last + .as_ref() + .map_or(0, |(_, result)| result.hash), + repo, + )); - arc_pending.fetch_sub(1, Ordering::Relaxed); - - sender - .send(if notify { - AsyncGitNotification::Tags - } else { - AsyncGitNotification::FinishUnchanged - }) - .expect("error sending notify"); - }); + if let Some(job) = self.job.take_last() { + if let Some(Ok(result)) = job.result() { + self.last = Some(result); + } + } + } else { + self.sender + .send(AsyncGitNotification::FinishUnchanged)?; + } Ok(()) } +} - fn getter( - repo: &RepoPath, - arc_last: &Arc>>, - outdated: bool, - ) -> Result { - let tags = sync::get_tags(repo)?; +enum JobState { + Request(u64, RepoPath), + Response(Result<(Instant, TagsResult)>), +} - let hash = hash(&tags); +/// +#[derive(Clone, Default)] +pub struct AsyncTagsJob { + state: Arc>>, +} - if !outdated - && Self::last_hash(arc_last) - .map(|last| last == hash) - .unwrap_or_default() - { - return Ok(false); +/// +impl AsyncTagsJob { + /// + pub fn new(last_hash: u64, repo: RepoPath) -> Self { + Self { + state: Arc::new(Mutex::new(Some(JobState::Request( + last_hash, repo, + )))), } - - { - let mut last = arc_last.lock()?; - let now = Instant::now(); - *last = Some((now, TagsResult { hash, tags })); - } - - Ok(true) } - fn last_hash( - last: &Arc>>, - ) -> Option { - last.lock() - .ok() - .and_then(|last| last.as_ref().map(|(_, last)| last.hash)) + /// + pub fn result(&self) -> Option> { + if let Ok(mut state) = self.state.lock() { + if let Some(state) = state.take() { + return match state { + JobState::Request(_, _) => None, + JobState::Response(result) => Some(result), + }; + } + } + + None + } +} + +impl AsyncJob for AsyncTagsJob { + type Notification = AsyncGitNotification; + type Progress = (); + + fn run( + &mut self, + _params: RunParams, + ) -> Result { + let mut notification = AsyncGitNotification::FinishUnchanged; + if let Ok(mut state) = self.state.lock() { + *state = state.take().map(|state| match state { + JobState::Request(last_hash, repo) => { + let tags = sync::get_tags(&repo); + + JobState::Response(tags.map(|tags| { + let hash = hash(&tags); + if last_hash != hash { + notification = AsyncGitNotification::Tags; + } + + (Instant::now(), TagsResult { hash, tags }) + })) + } + JobState::Response(result) => { + JobState::Response(result) + } + }); + } + + Ok(notification) } }