diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 69045a6c4..15c8b5c99 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -321,6 +321,7 @@ func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (* for _, data := range dirtyData { data.Flushing.Store(true) } + entry.FileEntry.Flushing.Store(true) return entry.FileEntry, dirtyData } diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 802c4b5db..fc0dedcd5 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -61,6 +61,7 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in rtn := make(map[int]*DataCacheEntry) for _, d := range data { d.Dirty = &atomic.Bool{} + d.Flushing = &atomic.Bool{} if cap(d.Data) != int(partDataSize) { newData := make([]byte, len(d.Data), partDataSize) copy(newData, d.Data) diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 43030f6b1..b782ad491 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "log" + "sync" "sync/atomic" "testing" "time" @@ -259,6 +260,23 @@ func checkFileData(t *testing.T, ctx context.Context, blockId string, name strin } } +func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name string, val byte, expected int) { + _, rdata, err := GBS.ReadFile(ctx, blockId, name) + if err != nil { + t.Errorf("error reading data for file %q: %v", name, err) + return + } + var count int + for _, b := range rdata { + if b == val { + count++ + } + } + if count != expected { + t.Errorf("byte count mismatch for file %q: expected %d, got %d", name, expected, count) + } +} + func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) { _, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data))) if err != nil { @@ -536,4 +554,47 @@ func TestSimpleDBFlush(t *testing.T) { } checkFileDataAt(t, ctx, blockId, fileName, 6, "world!") checkFileSize(t, ctx, blockId, fileName, 12) + checkFileByteCount(t, ctx, blockId, fileName, 'l', 3) +} + +func TestConcurrentAppend(t *testing.T) { + initDb(t) + defer cleanupDb(t) + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + fileName := "t1" + err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + var wg sync.WaitGroup + for i := 0; i < 16; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + const hexChars = "0123456789abcdef" + ch := hexChars[n] + for j := 0; j < 100; j++ { + err := GBS.AppendData(ctx, blockId, fileName, []byte{ch}) + if err != nil { + t.Errorf("error appending data (%d): %v", n, err) + } + if j == 50 { + err = GBS.FlushCache(ctx) + if err != nil { + t.Errorf("error flushing cache: %v", err) + } + } + } + }(i) + } + wg.Wait() + checkFileSize(t, ctx, blockId, fileName, 1600) + checkFileByteCount(t, ctx, blockId, fileName, 'a', 100) + checkFileByteCount(t, ctx, blockId, fileName, 'e', 100) + GBS.FlushCache(ctx) + checkFileSize(t, ctx, blockId, fileName, 1600) + checkFileByteCount(t, ctx, blockId, fileName, 'a', 100) + checkFileByteCount(t, ctx, blockId, fileName, 'e', 100) }