mirror of
https://github.com/fleetdm/fleet
synced 2026-05-23 17:08:53 +00:00
Add initial Badger database implementation
Code for opening and managing the Badger database.
This commit is contained in:
parent
9d4af7f326
commit
0af9b67d89
2 changed files with 157 additions and 0 deletions
77
pkg/database/database.go
Normal file
77
pkg/database/database.go
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/badger/v2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
compactionInterval = 5 * time.Minute
|
||||
// This is the discard ratio recommended in Badger docs
|
||||
// (https://pkg.go.dev/github.com/dgraph-io/badger#DB.RunValueLogGC)
|
||||
compactionDiscardRatio = 0.5
|
||||
)
|
||||
|
||||
// BadgerDB is a wrapper around the standard badger.DB that provides a
|
||||
// background compaction routine.
|
||||
type BadgerDB struct {
|
||||
*badger.DB
|
||||
closeChan chan struct{}
|
||||
}
|
||||
|
||||
// Open opens (initializing if necessary) a new Badger database at the specified
|
||||
// path. Users must close the DB with Close().
|
||||
func Open(path string) (*BadgerDB, error) {
|
||||
// DefaultOptions sets synchronous writes to true (maximum data integrity).
|
||||
// TODO implement logging?
|
||||
db, err := badger.Open(badger.DefaultOptions(path).WithLogger(nil))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open badger %s", path)
|
||||
}
|
||||
|
||||
b := &BadgerDB{DB: db}
|
||||
b.startBackgroundCompaction()
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// startBackgroundCompaction starts a background loop that will call the
|
||||
// compaction method on the database. Badger does not do this automatically, so
|
||||
// we need to be sure to do so here (or elsewhere).
|
||||
func (b *BadgerDB) startBackgroundCompaction() {
|
||||
if b.closeChan != nil {
|
||||
panic("background compaction already running")
|
||||
}
|
||||
b.closeChan = make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(compactionInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-b.closeChan:
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
if err := b.DB.RunValueLogGC(compactionDiscardRatio); err != nil {
|
||||
log.Printf("Error compacting Badger: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// stopBackgroundCompaction stops the background compaction routine.
|
||||
func (b *BadgerDB) stopBackgroundCompaction() {
|
||||
b.closeChan <- struct{}{}
|
||||
b.closeChan = nil
|
||||
}
|
||||
|
||||
// Close closes the database connection and releases the associated resources.
|
||||
func (b *BadgerDB) Close() error {
|
||||
b.stopBackgroundCompaction()
|
||||
return b.DB.Close()
|
||||
}
|
||||
80
pkg/database/database_test.go
Normal file
80
pkg/database/database_test.go
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/dgraph-io/badger/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDatabase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpDir, err := ioutil.TempDir("", "orbit-test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Open and write
|
||||
db, err := Open(tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Update(func(tx *badger.Txn) error {
|
||||
require.NoError(t, tx.Set([]byte("key"), []byte("value")))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
// Reopen and read
|
||||
db, err = Open(tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.View(func(tx *badger.Txn) error {
|
||||
item, err := tx.Get([]byte("key"))
|
||||
require.NoError(t, err)
|
||||
err = item.Value(func(val []byte) error {
|
||||
assert.Equal(t, []byte("value"), val)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
||||
func TestCompactionPanic(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpDir, err := ioutil.TempDir("", "orbit-test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
db, err := Open(tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to start the compaction routine again
|
||||
assert.Panics(t, func() { db.startBackgroundCompaction() })
|
||||
}
|
||||
|
||||
func TestCompactionRestart(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpDir, err := ioutil.TempDir("", "orbit-test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
db, err := Open(tmpDir)
|
||||
require.NoError(t, err)
|
||||
go func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
|
||||
db.stopBackgroundCompaction()
|
||||
|
||||
assert.NotPanics(t, func() { db.startBackgroundCompaction() })
|
||||
}
|
||||
Loading…
Reference in a new issue