Add datastore methods for distributed queries (#458)

New datastore methods are introduced for creating/updating
distributed query campaigns, as well as determining the active
distributed queries for a given host.
This commit is contained in:
Zachary Wasserman 2016-11-09 15:33:16 -08:00 committed by GitHub
parent c8b9b15ef2
commit af6a5ee1b6
10 changed files with 323 additions and 19 deletions

View file

@ -142,3 +142,139 @@ func testSearchHostsLimit(t *testing.T, db kolide.Datastore) {
require.Nil(t, err)
assert.Len(t, hosts, 10)
}
func testDistributedQueriesForHost(t *testing.T, db kolide.Datastore) {
h1, err := db.NewHost(&kolide.Host{
DetailUpdateTime: time.Now(),
NodeKey: "1",
UUID: "1",
HostName: "foo.local",
PrimaryIP: "192.168.1.10",
})
require.Nil(t, err)
h2, err := db.NewHost(&kolide.Host{
DetailUpdateTime: time.Now(),
NodeKey: "2",
UUID: "2",
HostName: "bar.local",
PrimaryIP: "192.168.1.11",
})
require.Nil(t, err)
// All should have no queries
var queries map[uint]string
queries, err = db.DistributedQueriesForHost(h1)
require.Nil(t, err)
assert.Empty(t, queries)
queries, err = db.DistributedQueriesForHost(h2)
require.Nil(t, err)
assert.Empty(t, queries)
// Create a label
l1, err := db.NewLabel(&kolide.Label{
Name: "label foo",
Query: "query1",
})
require.Nil(t, err)
l1ID := fmt.Sprintf("%d", l1.ID)
// Add hosts to label
for _, h := range []*kolide.Host{h1, h2} {
err = db.RecordLabelQueryExecutions(h, map[string]bool{l1ID: true}, time.Now())
require.Nil(t, err)
}
// Create a query
q1 := &kolide.Query{
Name: "bar",
Query: "select * from bar",
}
q1, err = db.NewQuery(q1)
require.Nil(t, err)
// Create a query campaign
c1 := kolide.DistributedQueryCampaign{
QueryID: q1.ID,
Status: kolide.QueryRunning,
}
c1, err = db.NewDistributedQueryCampaign(c1)
require.Nil(t, err)
// Add a target to the campaign
target := kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetLabel,
DistributedQueryCampaignID: c1.ID,
TargetID: l1.ID,
}
target, err = db.NewDistributedQueryCampaignTarget(target)
require.Nil(t, err)
// All should have the query now
queries, err = db.DistributedQueriesForHost(h1)
require.Nil(t, err)
assert.Len(t, queries, 1)
assert.Equal(t, "select * from bar", queries[c1.ID])
queries, err = db.DistributedQueriesForHost(h2)
require.Nil(t, err)
assert.Len(t, queries, 1)
assert.Equal(t, "select * from bar", queries[c1.ID])
// Record an execution
exec := kolide.DistributedQueryExecution{
HostID: h1.ID,
DistributedQueryCampaignID: c1.ID,
Status: kolide.ExecutionSucceeded,
}
exec, err = db.NewDistributedQueryExecution(exec)
require.Nil(t, err)
// Add another query/campaign
q2 := &kolide.Query{
Name: "foo",
Query: "select * from foo",
}
q2, err = db.NewQuery(q2)
require.Nil(t, err)
c2 := kolide.DistributedQueryCampaign{
QueryID: q2.ID,
Status: kolide.QueryRunning,
}
c2, err = db.NewDistributedQueryCampaign(c2)
require.Nil(t, err)
// This one targets only h1
target = kolide.DistributedQueryCampaignTarget{
Type: kolide.TargetHost,
DistributedQueryCampaignID: c2.ID,
TargetID: h1.ID,
}
target, err = db.NewDistributedQueryCampaignTarget(target)
require.Nil(t, err)
// Check for correct queries
queries, err = db.DistributedQueriesForHost(h1)
require.Nil(t, err)
assert.Len(t, queries, 1)
assert.Equal(t, "select * from foo", queries[c2.ID])
queries, err = db.DistributedQueriesForHost(h2)
require.Nil(t, err)
assert.Len(t, queries, 1)
assert.Equal(t, "select * from bar", queries[c1.ID])
// End both of the campaigns
c1.Status = kolide.QueryComplete
require.Nil(t, db.SaveDistributedQueryCampaign(c1))
c2.Status = kolide.QueryComplete
require.Nil(t, db.SaveDistributedQueryCampaign(c2))
// Now no queries should be returned
queries, err = db.DistributedQueriesForHost(h1)
require.Nil(t, err)
assert.Empty(t, queries)
queries, err = db.DistributedQueriesForHost(h2)
require.Nil(t, err)
assert.Empty(t, queries)
}

View file

@ -37,4 +37,5 @@ var testFunctions = [...]func(*testing.T, kolide.Datastore){
testSearchLabelsLimit,
testListHostsInLabel,
testListUniqueHostsInLabels,
testDistributedQueriesForHost,
}

View file

@ -47,11 +47,15 @@ func (orm gormDB) Migrate() error {
}
}
// Have to manually add indexes. Yuck!
// Have to manually add indexes when specific ordering needed.
err := orm.DB.Model(&kolide.LabelQueryExecution{}).AddUniqueIndex("idx_lqe_label_host", "label_id", "host_id").Error
if err != nil {
return err
}
err = orm.DB.Model(&kolide.DistributedQueryExecution{}).AddUniqueIndex("idx_dqe_unique_host_dq_id", "host_id", "distributed_query_campaign_id").Error
if err != nil {
return err
}
indexes := []interface{}{}
err = orm.DB.Raw("SELECT * FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = 'kolide' AND INDEX_NAME = 'hosts_search';").Scan(&indexes).Error

View file

@ -156,3 +156,40 @@ AGAINST(? IN BOOLEAN MODE)
}
return results, nil
}
func (orm gormDB) DistributedQueriesForHost(host *kolide.Host) (map[uint]string, error) {
sql := `
SELECT DISTINCT dqc.id, q.query
FROM distributed_query_campaigns dqc
JOIN distributed_query_campaign_targets dqct
ON (dqc.id = dqct.distributed_query_campaign_id)
LEFT JOIN label_query_executions lqe
ON (dqct.type = ? AND dqct.target_id = lqe.label_id AND lqe.matches)
LEFT JOIN hosts h
ON ((dqct.type = ? AND lqe.host_id = h.id) OR (dqct.type = ? AND dqct.target_id = h.id))
LEFT JOIN distributed_query_executions dqe
ON (h.id = dqe.host_id AND dqc.id = dqe.distributed_query_id)
JOIN queries q
ON (dqc.query_id = q.id)
WHERE dqe.status IS NULL AND dqc.status = ? AND h.id = ?;
`
rows, err := orm.DB.Raw(sql, kolide.TargetLabel, kolide.TargetLabel, kolide.TargetHost, kolide.QueryRunning, host.ID).Rows()
if err != nil && err != gorm.ErrRecordNotFound {
return nil, errors.DatabaseError(err)
}
defer rows.Close()
results := map[uint]string{}
for rows.Next() {
var id uint
var query string
err = rows.Scan(&id, &query)
if err != nil {
return nil, errors.DatabaseError(err)
}
results[id] = query
}
return results, nil
}

View file

@ -55,3 +55,22 @@ func (orm gormDB) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, error) {
err := orm.applyListOptions(opt).Find(&queries).Error
return queries, err
}
func (orm gormDB) NewDistributedQueryExecution(exec kolide.DistributedQueryExecution) (kolide.DistributedQueryExecution, error) {
err := orm.DB.Create(&exec).Error
return exec, err
}
func (orm gormDB) NewDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) (kolide.DistributedQueryCampaign, error) {
err := orm.DB.Create(&camp).Error
return camp, err
}
func (orm gormDB) SaveDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) error {
return orm.DB.Save(&camp).Error
}
func (orm gormDB) NewDistributedQueryCampaignTarget(target kolide.DistributedQueryCampaignTarget) (kolide.DistributedQueryCampaignTarget, error) {
err := orm.DB.Create(&target).Error
return target, err
}

View file

@ -14,17 +14,20 @@ type inmem struct {
mtx sync.RWMutex
nextIDs map[interface{}]uint
users map[uint]*kolide.User
sessions map[uint]*kolide.Session
passwordResets map[uint]*kolide.PasswordResetRequest
invites map[uint]*kolide.Invite
labels map[uint]*kolide.Label
labelQueryExecutions map[uint]*kolide.LabelQueryExecution
queries map[uint]*kolide.Query
packs map[uint]*kolide.Pack
hosts map[uint]*kolide.Host
packQueries map[uint]*kolide.PackQuery
packTargets map[uint]*kolide.PackTarget
users map[uint]*kolide.User
sessions map[uint]*kolide.Session
passwordResets map[uint]*kolide.PasswordResetRequest
invites map[uint]*kolide.Invite
labels map[uint]*kolide.Label
labelQueryExecutions map[uint]*kolide.LabelQueryExecution
queries map[uint]*kolide.Query
packs map[uint]*kolide.Pack
hosts map[uint]*kolide.Host
packQueries map[uint]*kolide.PackQuery
packTargets map[uint]*kolide.PackTarget
distributedQueryExecutions map[uint]kolide.DistributedQueryExecution
distributedQueryCampaigns map[uint]kolide.DistributedQueryCampaign
distributedQueryCampaignTargets map[uint]kolide.DistributedQueryCampaignTarget
orginfo *kolide.AppConfig
}
@ -48,6 +51,9 @@ func (orm *inmem) Migrate() error {
orm.hosts = make(map[uint]*kolide.Host)
orm.packQueries = make(map[uint]*kolide.PackQuery)
orm.packTargets = make(map[uint]*kolide.PackTarget)
orm.distributedQueryExecutions = make(map[uint]kolide.DistributedQueryExecution)
orm.distributedQueryCampaigns = make(map[uint]kolide.DistributedQueryCampaign)
orm.distributedQueryCampaignTargets = make(map[uint]kolide.DistributedQueryCampaignTarget)
return nil
}

View file

@ -203,3 +203,40 @@ func (orm *inmem) SearchHosts(query string, omit []uint) ([]kolide.Host, error)
return results, nil
}
func (orm *inmem) DistributedQueriesForHost(host *kolide.Host) (map[uint]string, error) {
// lookup of executions for this host
hostExecutions := map[uint]kolide.DistributedQueryExecutionStatus{}
for _, e := range orm.distributedQueryExecutions {
if e.HostID == host.ID {
hostExecutions[e.DistributedQueryCampaignID] = e.Status
}
}
// lookup of labels for this host (only including matching labels)
hostLabels := map[uint]bool{}
labels, err := orm.ListLabelsForHost(host.ID)
if err != nil {
return nil, err
}
for _, l := range labels {
hostLabels[l.ID] = true
}
queries := map[uint]string{} // map campaign ID -> query string
for _, campaign := range orm.distributedQueryCampaigns {
if campaign.Status != kolide.QueryRunning {
continue
}
for _, target := range orm.distributedQueryCampaignTargets {
if campaign.ID == target.DistributedQueryCampaignID &&
((target.Type == kolide.TargetHost && target.TargetID == host.ID) ||
(target.Type == kolide.TargetLabel && hostLabels[target.TargetID])) &&
(hostExecutions[campaign.ID] == kolide.ExecutionWaiting) {
queries[campaign.ID] = orm.queries[campaign.QueryID].Query
}
}
}
return queries, nil
}

View file

@ -101,3 +101,51 @@ func (orm *inmem) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, error) {
return queries, nil
}
func (orm *inmem) NewDistributedQueryExecution(exec kolide.DistributedQueryExecution) (kolide.DistributedQueryExecution, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
for _, e := range orm.distributedQueryExecutions {
if exec.HostID == e.ID && exec.DistributedQueryCampaignID == e.DistributedQueryCampaignID {
return exec, ErrExists
}
}
exec.ID = orm.nextID(exec)
orm.distributedQueryExecutions[exec.ID] = exec
return exec, nil
}
func (orm *inmem) NewDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) (kolide.DistributedQueryCampaign, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
camp.ID = orm.nextID(camp)
orm.distributedQueryCampaigns[camp.ID] = camp
return camp, nil
}
func (orm *inmem) SaveDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) error {
orm.mtx.Lock()
defer orm.mtx.Unlock()
if _, ok := orm.distributedQueryCampaigns[camp.ID]; !ok {
return ErrNotFound
}
orm.distributedQueryCampaigns[camp.ID] = camp
return nil
}
func (orm *inmem) NewDistributedQueryCampaignTarget(target kolide.DistributedQueryCampaignTarget) (kolide.DistributedQueryCampaignTarget, error) {
orm.mtx.Lock()
defer orm.mtx.Unlock()
target.ID = orm.nextID(target)
orm.distributedQueryCampaignTargets[target.ID] = target
return target, nil
}

View file

@ -16,6 +16,10 @@ type HostStore interface {
AuthenticateHost(nodeKey string) (*Host, error)
MarkHostSeen(host *Host, t time.Time) error
SearchHosts(query string, omit []uint) ([]Host, error)
// DistributedQueriesForHost retrieves the distributed queries that the
// given host should run. The result map is a mapping from campaign ID
// to query text.
DistributedQueriesForHost(host *Host) (map[uint]string, error)
}
type HostService interface {

View file

@ -14,6 +14,18 @@ type QueryStore interface {
DeleteQuery(query *Query) error
Query(id uint) (*Query, error)
ListQueries(opt ListOptions) ([]*Query, error)
// NewDistributedQueryCampaign creates a new distributed query campaign
NewDistributedQueryCampaign(camp DistributedQueryCampaign) (DistributedQueryCampaign, error)
// SaveDistributedQueryCampaign updates an existing distributed query
// campaign
SaveDistributedQueryCampaign(camp DistributedQueryCampaign) error
// NewDistributedQueryCampaignTarget adds a new target to an existing
// distributed query campaign
NewDistributedQueryCampaignTarget(target DistributedQueryCampaignTarget) (DistributedQueryCampaignTarget, error)
// NewDistributedQueryCampaignExecution records a new execution for a
// distributed query campaign
NewDistributedQueryExecution(exec DistributedQueryExecution) (DistributedQueryExecution, error)
}
type QueryService interface {
@ -75,7 +87,7 @@ type DistributedQueryCampaign struct {
type DistributedQueryCampaignTarget struct {
ID uint `gorm:"primary_key"`
Type TargetType
DistributedQueryCampaignID uint
DistributedQueryCampaignID uint `gorm:"index:idx_dqct_dqc_id"`
TargetID uint
}
@ -95,12 +107,12 @@ type DistributedQueryResult struct {
}
type DistributedQueryExecution struct {
ID uint `gorm:"primary_key"`
HostID uint
DistributedQueryID uint
Status DistributedQueryExecutionStatus
Error string `gorm:"size:1024"`
ExecutionDuration time.Duration
ID uint `gorm:"primary_key"`
HostID uint // unique index added in migrate
DistributedQueryCampaignID uint // unique index added in migrate
Status DistributedQueryExecutionStatus
Error string `gorm:"size:1024"`
ExecutionDuration time.Duration
}
type Option struct {