From 4b58a871a717c53a6075be02aefa3c24cbab98aa Mon Sep 17 00:00:00 2001 From: sawka Date: Sun, 19 May 2024 12:42:05 -0700 Subject: [PATCH] prevent concurrent flushing --- pkg/blockstore/blockstore.go | 25 +++++++++++++++++++++++++ pkg/blockstore/blockstore_cache.go | 1 + pkg/blockstore/blockstore_test.go | 6 ++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 3f23ae9ff..6a28e3776 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -31,6 +31,7 @@ var GBS *BlockStore = &BlockStore{ Lock: &sync.Mutex{}, Cache: make(map[cacheKey]*CacheEntry), NextIntentionId: 1, + IsFlushing: false, } type FileOptsType struct { @@ -574,7 +575,31 @@ func (s *BlockStore) deleteCacheEntry(blockId string, name string) { delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) } +func (s *BlockStore) setIsFlushing(flushing bool) { + s.Lock.Lock() + defer s.Lock.Unlock() + s.IsFlushing = flushing +} + +// returns old value of IsFlushing +func (s *BlockStore) setUnlessFlushing() bool { + s.Lock.Lock() + defer s.Lock.Unlock() + if s.IsFlushing { + return true + } + s.IsFlushing = true + return false + +} + func (s *BlockStore) FlushCache(ctx context.Context) error { + wasFlushing := s.setUnlessFlushing() + if wasFlushing { + return fmt.Errorf("flush already in progress") + } + defer s.setIsFlushing(false) + // get a copy of dirty keys so we can iterate without the lock dirtyCacheKeys := s.getDirtyCacheKeys() for _, key := range dirtyCacheKeys { diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 15c8b5c99..09505f97b 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -176,6 +176,7 @@ type BlockStore struct { Lock *sync.Mutex Cache map[cacheKey]*CacheEntry NextIntentionId int + IsFlushing bool } func makeCacheEntry(blockId string, name string) *CacheEntry { diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index b782ad491..7658b9462 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -581,10 +581,8 @@ func TestConcurrentAppend(t *testing.T) { t.Errorf("error appending data (%d): %v", n, err) } if j == 50 { - err = GBS.FlushCache(ctx) - if err != nil { - t.Errorf("error flushing cache: %v", err) - } + // ignore error here (concurrent flushing) + GBS.FlushCache(ctx) } } }(i)