From 0af9b67d892baa95b786d37f3d87ca28003d9052 Mon Sep 17 00:00:00 2001 From: Zach Wasserman Date: Tue, 12 Jan 2021 18:58:33 -0800 Subject: [PATCH] Add initial Badger database implementation Code for opening and managing the Badger database. --- pkg/database/database.go | 77 +++++++++++++++++++++++++++++++++ pkg/database/database_test.go | 80 +++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 pkg/database/database.go create mode 100644 pkg/database/database_test.go diff --git a/pkg/database/database.go b/pkg/database/database.go new file mode 100644 index 0000000000..0f736382d7 --- /dev/null +++ b/pkg/database/database.go @@ -0,0 +1,77 @@ +package database + +import ( + "log" + "time" + + "github.com/dgraph-io/badger/v2" + "github.com/pkg/errors" +) + +const ( + compactionInterval = 5 * time.Minute + // This is the discard ratio recommended in Badger docs + // (https://pkg.go.dev/github.com/dgraph-io/badger#DB.RunValueLogGC) + compactionDiscardRatio = 0.5 +) + +// BadgerDB is a wrapper around the standard badger.DB that provides a +// background compaction routine. +type BadgerDB struct { + *badger.DB + closeChan chan struct{} +} + +// Open opens (initializing if necessary) a new Badger database at the specified +// path. Users must close the DB with Close(). +func Open(path string) (*BadgerDB, error) { + // DefaultOptions sets synchronous writes to true (maximum data integrity). + // TODO implement logging? + db, err := badger.Open(badger.DefaultOptions(path).WithLogger(nil)) + if err != nil { + return nil, errors.Wrapf(err, "open badger %s", path) + } + + b := &BadgerDB{DB: db} + b.startBackgroundCompaction() + + return b, nil +} + +// startBackgroundCompaction starts a background loop that will call the +// compaction method on the database. Badger does not do this automatically, so +// we need to be sure to do so here (or elsewhere). +func (b *BadgerDB) startBackgroundCompaction() { + if b.closeChan != nil { + panic("background compaction already running") + } + b.closeChan = make(chan struct{}) + + go func() { + ticker := time.NewTicker(compactionInterval) + defer ticker.Stop() + for { + select { + case <-b.closeChan: + return + + case <-ticker.C: + if err := b.DB.RunValueLogGC(compactionDiscardRatio); err != nil { + log.Printf("Error compacting Badger: %v", err) + } + } + } + }() +} + +// stopBackgroundCompaction stops the background compaction routine. +func (b *BadgerDB) stopBackgroundCompaction() { + b.closeChan <- struct{}{} + b.closeChan = nil +} + +// Close closes the database connection and releases the associated resources. +func (b *BadgerDB) Close() error { + b.stopBackgroundCompaction() + return b.DB.Close() +} diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go new file mode 100644 index 0000000000..22414665bf --- /dev/null +++ b/pkg/database/database_test.go @@ -0,0 +1,80 @@ +package database + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDatabase(t *testing.T) { + t.Parallel() + + tmpDir, err := ioutil.TempDir("", "orbit-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + // Open and write + db, err := Open(tmpDir) + require.NoError(t, err) + + err = db.Update(func(tx *badger.Txn) error { + require.NoError(t, tx.Set([]byte("key"), []byte("value"))) + return nil + }) + require.NoError(t, err) + require.NoError(t, db.Close()) + + // Reopen and read + db, err = Open(tmpDir) + require.NoError(t, err) + + err = db.View(func(tx *badger.Txn) error { + item, err := tx.Get([]byte("key")) + require.NoError(t, err) + err = item.Value(func(val []byte) error { + assert.Equal(t, []byte("value"), val) + return nil + }) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) + require.NoError(t, db.Close()) +} + +func TestCompactionPanic(t *testing.T) { + t.Parallel() + + tmpDir, err := ioutil.TempDir("", "orbit-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + db, err := Open(tmpDir) + require.NoError(t, err) + + // Try to start the compaction routine again + assert.Panics(t, func() { db.startBackgroundCompaction() }) +} + +func TestCompactionRestart(t *testing.T) { + t.Parallel() + + tmpDir, err := ioutil.TempDir("", "orbit-test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + db, err := Open(tmpDir) + require.NoError(t, err) + go func() { + require.NoError(t, db.Close()) + }() + + db.stopBackgroundCompaction() + + assert.NotPanics(t, func() { db.startBackgroundCompaction() }) +}