diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 4a71f20fc..19dc00f8d 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -23,8 +23,9 @@ var partDataSize int64 = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} var GBS *BlockStore = &BlockStore{ - Lock: &sync.Mutex{}, - Cache: make(map[cacheKey]*CacheEntry), + Lock: &sync.Mutex{}, + Cache: make(map[cacheKey]*CacheEntry), + NextIntentionId: 1, } type FileOptsType struct { @@ -36,15 +37,19 @@ type FileOptsType struct { type FileMeta = map[string]any type BlockFile struct { + // these fields are static (not updated) BlockId string `json:"blockid"` Name string `json:"name"` - Size int64 `json:"size"` - CreatedTs int64 `json:"createdts"` - ModTs int64 `json:"modts"` Opts FileOptsType `json:"opts"` - Meta FileMeta `json:"meta"` + CreatedTs int64 `json:"createdts"` + + // these fields are mutable + Size int64 `json:"size"` + ModTs int64 `json:"modts"` + Meta FileMeta `json:"meta"` // only top-level keys can be updated (lower levels are immutable) } +// this works because lower levels are immutable func copyMeta(meta FileMeta) FileMeta { newMeta := make(FileMeta) for k, v := range meta { @@ -193,40 +198,28 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil } func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error { - file, ok := s.getFileFromCache(blockId, name) - if !ok { - dbFile, err := dbGetBlockFile(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error getting file: %v", err) - } - file = dbFile + s.pinCacheEntry(blockId, name) + defer s.unpinCacheEntry(blockId, name) + _, err := s.loadFileInfo(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error loading file info: %v", err) } - if file == nil { - return fmt.Errorf("file not found") - } - var rtnErr error - s.withLock(blockId, name, true, func(entry *CacheEntry) { - if entry.Deleted { - rtnErr = fmt.Errorf("file is deleted") - return - } - newFileEntry := entry.copyOrCreateFileEntry(file) - if merge { - for k, v := range meta { - if v == nil { - delete(newFileEntry.File.Meta, k) - continue + return s.withLockExists(blockId, name, func(entry *CacheEntry) error { + entry.modifyFileData(func(file *BlockFile) { + if merge { + for k, v := range meta { + if v == nil { + delete(file.Meta, k) + continue + } + file.Meta[k] = v } - newFileEntry.File.Meta[k] = v + } else { + file.Meta = meta } - } else { - newFileEntry.File.Meta = meta - } - entry.FileEntry = newFileEntry - entry.FileEntry.File.ModTs = time.Now().UnixMilli() - entry.Version++ + }) + return nil }) - return rtnErr } func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) { @@ -246,6 +239,7 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri } var rtnErr error rtnFile := dbFile + // cannot use withLockExists because we're setting entry.FileEntry! s.withLock(blockId, name, true, func(entry *CacheEntry) { if entry.Deleted { rtnFile = nil @@ -257,7 +251,11 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri rtnFile = entry.FileEntry.File.DeepCopy() return } - entry.FileEntry = entry.copyOrCreateFileEntry(dbFile) + entry.FileEntry = &FileCacheEntry{ + Dirty: &atomic.Bool{}, + Flushing: &atomic.Bool{}, + File: *dbFile.DeepCopy(), // make a copy since File must be immutable + } // returns dbFile, nil }) return rtnFile, rtnErr @@ -327,19 +325,20 @@ func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name str }) } -func (s *BlockStore) writeAt_nolock(entry *CacheEntry, offset int64, data []byte) { +func (entry *CacheEntry) writeAtToCache(offset int64, data []byte, replace bool) { endWrite := offset + int64(len(data)) - entry.writeAt(offset, data) - if endWrite > entry.FileEntry.File.Size { - entry.FileEntry.File.Size = endWrite - } - entry.FileEntry.File.ModTs = time.Now().UnixMilli() - entry.Version++ + entry.writeAt(offset, data, replace) + entry.modifyFileData(func(file *BlockFile) { + if endWrite > file.Size || replace { + file.Size = endWrite + } + file.ModTs = time.Now().UnixMilli() + }) } func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error { return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - s.writeAt_nolock(entry, entry.FileEntry.File.Size, data) + entry.writeAtToCache(entry.FileEntry.File.Size, data, false) return nil }) } @@ -347,6 +346,8 @@ func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { s.pinCacheEntry(blockId, name) defer s.unpinCacheEntry(blockId, name) + intentionId := s.setWriteIntention(blockId, name, WriteIntention{Append: true}) + defer s.clearWriteIntention(blockId, name, intentionId) _, err := s.loadFileInfo(ctx, blockId, name) if err != nil { return fmt.Errorf("error loading file info: %v", err) @@ -366,6 +367,16 @@ func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { return dbGetAllBlockIds(ctx) } +func incompletePartsFromMap(partMap map[int]int) []int { + var incompleteParts []int + for partIdx, size := range partMap { + if size != int(partDataSize) { + incompleteParts = append(incompleteParts, partIdx) + } + } + return incompleteParts +} + // returns a map of partIdx to amount of data to write to that part func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int { partMap := make(map[int]int) @@ -388,6 +399,21 @@ func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int return partMap } +func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error { + s.pinCacheEntry(blockId, name) + defer s.unpinCacheEntry(blockId, name) + intentionId := s.setWriteIntention(blockId, name, WriteIntention{Replace: true}) + defer s.clearWriteIntention(blockId, name, intentionId) + _, err := s.loadFileInfo(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error loading file info: %v", err) + } + return s.withLockExists(blockId, name, func(entry *CacheEntry) error { + entry.writeAtToCache(0, data, true) + return nil + }) +} + func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { s.pinCacheEntry(blockId, name) defer s.unpinCacheEntry(blockId, name) @@ -395,16 +421,34 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o if err != nil { return fmt.Errorf("error loading file info: %v", err) } - startWriteIdx := offset - endWriteIdx := offset + int64(len(data)) - startPartIdx := file.partIdxAtOffset(startWriteIdx) - endPartIdx := file.partIdxAtOffset(endWriteIdx) - err = s.loadDataParts(ctx, blockId, name, []int{startPartIdx, endPartIdx}) + if offset < 0 { + return fmt.Errorf("offset must be non-negative") + } + if offset > file.Size { + return fmt.Errorf("offset is past the end of the file") + } + if file.Opts.Circular { + startCirFileOffset := file.Size - file.Opts.MaxSize + if offset+int64(len(data)) < startCirFileOffset { + // write is before the start of the circular file + return nil + } + if offset < startCirFileOffset { + amtBeforeStart := startCirFileOffset - offset + offset += amtBeforeStart + data = data[amtBeforeStart:] + } + } + partMap := file.computePartMap(offset, int64(len(data))) + intentionId := s.setWriteIntention(blockId, name, WriteIntention{Parts: partMap}) + defer s.clearWriteIntention(blockId, name, intentionId) + incompleteParts := incompletePartsFromMap(partMap) + err = s.loadDataParts(ctx, blockId, name, incompleteParts) if err != nil { return fmt.Errorf("error loading data parts: %v", err) } return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - s.writeAt_nolock(entry, offset, data) + entry.writeAtToCache(offset, data, false) return nil }) } @@ -487,6 +531,25 @@ func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) return s.ReadAt(ctx, blockId, name, 0, file.Size) } +func (s *BlockStore) FlushCache(ctx context.Context) error { + var dirtyCacheKeys []cacheKey + s.Lock.Lock() + for key, entry := range s.Cache { + if entry.FileEntry != nil && entry.FileEntry.Dirty.Load() { + dirtyCacheKeys = append(dirtyCacheKeys, key) + continue + } + for _, dataEntry := range entry.DataEntries { + if dataEntry != nil && dataEntry.Dirty.Load() { + dirtyCacheKeys = append(dirtyCacheKeys, key) + break + } + } + } + s.Lock.Unlock() + return nil +} + func minInt64(a, b int64) int64 { if a < b { return a diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 9a5665942..242025878 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "fmt" + "log" "sync" "sync/atomic" ) @@ -16,6 +17,14 @@ type cacheKey struct { Name string } +// note about "Dirty" and "Flushing" fields: +// - Dirty is set to true when the entry is modified +// - Flushing is set to true when the entry is being flushed to disk +// note these fields can *only* be set to true while holding the store lock +// but the flusher may set them to false without the lock (when the flusher no longer will read the entry fields) +// the flusher *must* unset Dirty first, then Flushing +// other code should test Flushing before Dirty +// that means you *cannot* write a field in a cache entry if Flushing.Load() is true (you must make a copy) type DataCacheEntry struct { Dirty *atomic.Bool Flushing *atomic.Bool @@ -24,13 +33,15 @@ type DataCacheEntry struct { } type FileCacheEntry struct { - Dirty *atomic.Bool - File BlockFile + Dirty *atomic.Bool + Flushing *atomic.Bool + File BlockFile } type WriteIntention struct { - Parts map[int]bool - Append bool + Parts map[int]int + Append bool + Replace bool } // invariants: @@ -43,10 +54,9 @@ type WriteIntention struct { type CacheEntry struct { BlockId string Name string - Version int PinCount int Deleted bool - WriteIntentions map[string]*WriteIntention // map from intentionid -> WriteIntention + WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention FileEntry *FileCacheEntry DataEntries []*DataCacheEntry } @@ -54,7 +64,7 @@ type CacheEntry struct { //lint:ignore U1000 used for testing func (e *CacheEntry) dump() string { var buf bytes.Buffer - fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, Version: %d, PinCount: %d, Deleted: %v, IW: %v\n", e.BlockId, e.Name, e.Version, e.PinCount, e.Deleted, e.WriteIntentions) + fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, PinCount: %d, Deleted: %v, IW: %v\n", e.BlockId, e.Name, e.PinCount, e.Deleted, e.WriteIntentions) if e.FileEntry != nil { fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File) } @@ -140,7 +150,10 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC return toWrite, dce } -func (entry *CacheEntry) writeAt(offset int64, data []byte) { +func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { + if replace { + entry.DataEntries = nil + } for len(data) > 0 { partIdx := int(offset / partDataSize) if entry.FileEntry.File.Opts.Circular { @@ -157,8 +170,9 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte) { } type BlockStore struct { - Lock *sync.Mutex - Cache map[cacheKey]*CacheEntry + Lock *sync.Mutex + Cache map[cacheKey]*CacheEntry + NextIntentionId int } func makeCacheEntry(blockId string, name string) *CacheEntry { @@ -166,7 +180,7 @@ func makeCacheEntry(blockId string, name string) *CacheEntry { BlockId: blockId, Name: name, PinCount: 0, - WriteIntentions: make(map[string]*WriteIntention), + WriteIntentions: make(map[int]WriteIntention), FileEntry: nil, DataEntries: nil, } @@ -206,12 +220,36 @@ func (s *BlockStore) pinCacheEntry(blockId string, name string) { entry.PinCount++ } +func (s *BlockStore) setWriteIntention(blockId string, name string, intention WriteIntention) int { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + return 0 + } + intentionId := s.NextIntentionId + s.NextIntentionId++ + entry.WriteIntentions[intentionId] = intention + return intentionId +} + +func (s *BlockStore) clearWriteIntention(blockId string, name string, intentionId int) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + log.Printf("warning: cannot find write intention to clear %q %q", blockId, name) + return + } + delete(entry.WriteIntentions, intentionId) +} + func (s *BlockStore) unpinCacheEntry(blockId string, name string) { s.Lock.Lock() defer s.Lock.Unlock() entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] if entry == nil { - // this is not good + log.Printf("warning: unpinning non-existent cache entry %q %q", blockId, name) return } entry.PinCount-- @@ -236,6 +274,7 @@ func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) bool { } // getFileFromCache returns the file from the cache if it exists +// makes a copy, so it can be used by the caller // return (file, cached) func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) { s.Lock.Lock() @@ -253,17 +292,20 @@ func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, return entry.FileEntry.File.DeepCopy(), true } -func (e *CacheEntry) copyOrCreateFileEntry(dbFile *BlockFile) *FileCacheEntry { - if e.FileEntry == nil { - return &FileCacheEntry{ - Dirty: &atomic.Bool{}, - File: *dbFile, +func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) { + var fileEntry = e.FileEntry + if e.FileEntry.Flushing.Load() { + // must make a copy + fileEntry = &FileCacheEntry{ + Dirty: &atomic.Bool{}, + Flushing: &atomic.Bool{}, + File: *e.FileEntry.File.DeepCopy(), } + e.FileEntry = fileEntry } - return &FileCacheEntry{ - Dirty: &atomic.Bool{}, - File: *e.FileEntry.File.DeepCopy(), - } + // always set to dirty (we're modifying it) + fileEntry.Dirty.Store(true) + fn(&fileEntry.File) } // also sets Flushing to true diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index a25e18194..5e94560cd 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -6,6 +6,7 @@ package blockstore import ( "bytes" "context" + "fmt" "log" "testing" "time" @@ -77,10 +78,27 @@ func TestCreate(t *testing.T) { if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 { t.Fatalf("opts not empty") } + blockIds, err := GBS.GetAllBlockIds(ctx) + if err != nil { + t.Fatalf("error getting block ids: %v", err) + } + if len(blockIds) != 1 { + t.Fatalf("block id count mismatch") + } + if blockIds[0] != blockId { + t.Fatalf("block id mismatch") + } err = GBS.DeleteFile(ctx, blockId, "testfile") if err != nil { t.Fatalf("error deleting file: %v", err) } + blockIds, err = GBS.GetAllBlockIds(ctx) + if err != nil { + t.Fatalf("error getting block ids: %v", err) + } + if len(blockIds) != 0 { + t.Fatalf("block id count mismatch") + } } func containsFile(arr []*BlockFile, name string) bool { @@ -272,6 +290,130 @@ func TestAppend(t *testing.T) { checkFileData(t, ctx, blockId, fileName, "hello world") } +func TestWriteFile(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + fileName := "t3" + err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileData(t, ctx, blockId, fileName, "hello world!") + err = GBS.WriteFile(ctx, blockId, fileName, []byte("goodbye world!")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileData(t, ctx, blockId, fileName, "goodbye world!") + err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileData(t, ctx, blockId, fileName, "hello") + + // circular file + err = GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") + err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana") +} + +func TestCircularWrites(t *testing.T) { + initDb(t) + defer cleanupDb(t) + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + err := GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 ")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileData(t, ctx, blockId, "c1", "123456789 123456789 123456789 123456789 123456789 ") + + err = GBS.AppendData(ctx, blockId, "c1", []byte("apple")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") + err = GBS.WriteAt(ctx, blockId, "c1", 0, []byte("foo")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + // content should be unchanged because write is before the beginning of circular offset + checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") + err = GBS.WriteAt(ctx, blockId, "c1", 5, []byte("a")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileSize(t, ctx, blockId, "c1", 55) + checkFileData(t, ctx, blockId, "c1", "a789 123456789 123456789 123456789 123456789 apple") + err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + checkFileSize(t, ctx, blockId, "c1", 62) + checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana") + err = GBS.WriteAt(ctx, blockId, "c1", 20, []byte("foo")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileSize(t, ctx, blockId, "c1", 62) + checkFileData(t, ctx, blockId, "c1", "3456789 foo456789 123456789 123456789 apple banana") + offset, _, _ := GBS.ReadFile(ctx, blockId, "c1") + if offset != 12 { + t.Errorf("offset mismatch: expected 12, got %d", offset) + } + err = GBS.AppendData(ctx, blockId, "c1", []byte(" world")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + checkFileSize(t, ctx, blockId, "c1", 68) + offset, _, _ = GBS.ReadFile(ctx, blockId, "c1") + if offset != 18 { + t.Errorf("offset mismatch: expected 18, got %d", offset) + } + checkFileData(t, ctx, blockId, "c1", "9 foo456789 123456789 123456789 apple banana world") + err = GBS.AppendData(ctx, blockId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789")) + if err != nil { + t.Fatalf("error appending data: %v", err) + } + checkFileSize(t, ctx, blockId, "c1", 128) + checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789") + GBS.withLock(blockId, "c1", false, func(entry *CacheEntry) { + if entry == nil { + err = fmt.Errorf("entry not found") + return + } + if len(entry.DataEntries) != 1 { + err = fmt.Errorf("data entries mismatch: expected 1, got %d", len(entry.DataEntries)) + } + }) + if err != nil { + t.Fatalf("error checking data entries: %v", err) + } +} + func makeText(n int) string { var buf bytes.Buffer for i := 0; i < n; i++ {