Refactor async tags (follow up PR) (#1245)

* AsyncTags
* Verify hash of last hash
* Make default Tags notification 'FinishUnchanged'
* fixed bug preventing tags from displaying

Co-authored-by: Martijn van Eijk <mfveijk@gmail.com>
Co-authored-by: extrawurst <mail@rusticorn.com>
This commit is contained in:
Chigozie Joshua 2022-07-04 09:45:26 +01:00 committed by GitHub
parent 1985fd2dbc
commit edf798b91b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,4 +1,5 @@
use crate::{
asyncjob::{AsyncJob, AsyncSingleJob, RunParams},
error::Result,
hash,
sync::{self, RepoPath},
@ -6,26 +7,23 @@ use crate::{
};
use crossbeam_channel::Sender;
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use sync::Tags;
///
#[derive(Default, Clone)]
struct TagsResult {
pub struct TagsResult {
hash: u64,
tags: Tags,
}
///
pub struct AsyncTags {
last: Arc<Mutex<Option<(Instant, TagsResult)>>>,
last: Option<(Instant, TagsResult)>,
sender: Sender<AsyncGitNotification>,
pending: Arc<AtomicUsize>,
job: AsyncSingleJob<AsyncTagsJob>,
repo: RepoPath,
}
@ -37,30 +35,27 @@ impl AsyncTags {
) -> Self {
Self {
repo,
last: Arc::new(Mutex::new(None)),
last: None,
sender: sender.clone(),
pending: Arc::new(AtomicUsize::new(0)),
job: AsyncSingleJob::new(sender.clone()),
}
}
/// last fetched result
pub fn last(&mut self) -> Result<Option<Tags>> {
let last = self.last.lock()?;
Ok(last.clone().map(|last| last.1.tags))
pub fn last(&self) -> Result<Option<Tags>> {
Ok(self.last.as_ref().map(|result| result.1.tags.clone()))
}
///
pub fn is_pending(&self) -> bool {
self.pending.load(Ordering::Relaxed) > 0
self.job.is_pending()
}
fn is_outdated(&self, dur: Duration) -> Result<bool> {
let last = self.last.lock()?;
Ok(last
///
fn is_outdated(&self, dur: Duration) -> bool {
self.last
.as_ref()
.map_or(true, |(last_time, _)| last_time.elapsed() > dur))
.map_or(true, |(last_time, _)| last_time.elapsed() > dur)
}
///
@ -71,72 +66,106 @@ impl AsyncTags {
) -> Result<()> {
log::trace!("request");
if !force && self.is_pending() {
if !force && self.job.is_pending() {
return Ok(());
}
let outdated = self.is_outdated(dur)?;
let outdated = self.is_outdated(dur);
if !force && !outdated {
return Ok(());
}
let arc_last = Arc::clone(&self.last);
let sender = self.sender.clone();
let arc_pending = Arc::clone(&self.pending);
self.pending.fetch_add(1, Ordering::Relaxed);
let repo = self.repo.clone();
rayon_core::spawn(move || {
let notify = Self::getter(&repo, &arc_last, outdated)
.expect("error getting tags");
if outdated {
self.job.spawn(AsyncTagsJob::new(
self.last
.as_ref()
.map_or(0, |(_, result)| result.hash),
repo,
));
arc_pending.fetch_sub(1, Ordering::Relaxed);
sender
.send(if notify {
AsyncGitNotification::Tags
} else {
AsyncGitNotification::FinishUnchanged
})
.expect("error sending notify");
});
if let Some(job) = self.job.take_last() {
if let Some(Ok(result)) = job.result() {
self.last = Some(result);
}
}
} else {
self.sender
.send(AsyncGitNotification::FinishUnchanged)?;
}
Ok(())
}
}
fn getter(
repo: &RepoPath,
arc_last: &Arc<Mutex<Option<(Instant, TagsResult)>>>,
outdated: bool,
) -> Result<bool> {
let tags = sync::get_tags(repo)?;
enum JobState {
Request(u64, RepoPath),
Response(Result<(Instant, TagsResult)>),
}
let hash = hash(&tags);
///
#[derive(Clone, Default)]
pub struct AsyncTagsJob {
state: Arc<Mutex<Option<JobState>>>,
}
if !outdated
&& Self::last_hash(arc_last)
.map(|last| last == hash)
.unwrap_or_default()
{
return Ok(false);
///
impl AsyncTagsJob {
///
pub fn new(last_hash: u64, repo: RepoPath) -> Self {
Self {
state: Arc::new(Mutex::new(Some(JobState::Request(
last_hash, repo,
)))),
}
{
let mut last = arc_last.lock()?;
let now = Instant::now();
*last = Some((now, TagsResult { hash, tags }));
}
Ok(true)
}
fn last_hash(
last: &Arc<Mutex<Option<(Instant, TagsResult)>>>,
) -> Option<u64> {
last.lock()
.ok()
.and_then(|last| last.as_ref().map(|(_, last)| last.hash))
///
pub fn result(&self) -> Option<Result<(Instant, TagsResult)>> {
if let Ok(mut state) = self.state.lock() {
if let Some(state) = state.take() {
return match state {
JobState::Request(_, _) => None,
JobState::Response(result) => Some(result),
};
}
}
None
}
}
impl AsyncJob for AsyncTagsJob {
type Notification = AsyncGitNotification;
type Progress = ();
fn run(
&mut self,
_params: RunParams<Self::Notification, Self::Progress>,
) -> Result<Self::Notification> {
let mut notification = AsyncGitNotification::FinishUnchanged;
if let Ok(mut state) = self.state.lock() {
*state = state.take().map(|state| match state {
JobState::Request(last_hash, repo) => {
let tags = sync::get_tags(&repo);
JobState::Response(tags.map(|tags| {
let hash = hash(&tags);
if last_hash != hash {
notification = AsyncGitNotification::Tags;
}
(Instant::now(), TagsResult { hash, tags })
}))
}
JobState::Response(result) => {
JobState::Response(result)
}
});
}
Ok(notification)
}
}