diff --git a/server/datastore/datastore_hosts_test.go b/server/datastore/datastore_hosts_test.go index 9a3fa4c4e4..5ca99280d1 100644 --- a/server/datastore/datastore_hosts_test.go +++ b/server/datastore/datastore_hosts_test.go @@ -142,3 +142,139 @@ func testSearchHostsLimit(t *testing.T, db kolide.Datastore) { require.Nil(t, err) assert.Len(t, hosts, 10) } + +func testDistributedQueriesForHost(t *testing.T, db kolide.Datastore) { + h1, err := db.NewHost(&kolide.Host{ + DetailUpdateTime: time.Now(), + NodeKey: "1", + UUID: "1", + HostName: "foo.local", + PrimaryIP: "192.168.1.10", + }) + require.Nil(t, err) + + h2, err := db.NewHost(&kolide.Host{ + DetailUpdateTime: time.Now(), + NodeKey: "2", + UUID: "2", + HostName: "bar.local", + PrimaryIP: "192.168.1.11", + }) + require.Nil(t, err) + + // All should have no queries + var queries map[uint]string + queries, err = db.DistributedQueriesForHost(h1) + require.Nil(t, err) + assert.Empty(t, queries) + queries, err = db.DistributedQueriesForHost(h2) + require.Nil(t, err) + assert.Empty(t, queries) + + // Create a label + l1, err := db.NewLabel(&kolide.Label{ + Name: "label foo", + Query: "query1", + }) + require.Nil(t, err) + l1ID := fmt.Sprintf("%d", l1.ID) + + // Add hosts to label + for _, h := range []*kolide.Host{h1, h2} { + err = db.RecordLabelQueryExecutions(h, map[string]bool{l1ID: true}, time.Now()) + require.Nil(t, err) + } + + // Create a query + q1 := &kolide.Query{ + Name: "bar", + Query: "select * from bar", + } + q1, err = db.NewQuery(q1) + require.Nil(t, err) + + // Create a query campaign + c1 := kolide.DistributedQueryCampaign{ + QueryID: q1.ID, + Status: kolide.QueryRunning, + } + c1, err = db.NewDistributedQueryCampaign(c1) + require.Nil(t, err) + + // Add a target to the campaign + target := kolide.DistributedQueryCampaignTarget{ + Type: kolide.TargetLabel, + DistributedQueryCampaignID: c1.ID, + TargetID: l1.ID, + } + target, err = db.NewDistributedQueryCampaignTarget(target) + require.Nil(t, err) + + // All should have the query now + queries, err = db.DistributedQueriesForHost(h1) + require.Nil(t, err) + assert.Len(t, queries, 1) + assert.Equal(t, "select * from bar", queries[c1.ID]) + queries, err = db.DistributedQueriesForHost(h2) + require.Nil(t, err) + assert.Len(t, queries, 1) + assert.Equal(t, "select * from bar", queries[c1.ID]) + + // Record an execution + exec := kolide.DistributedQueryExecution{ + HostID: h1.ID, + DistributedQueryCampaignID: c1.ID, + Status: kolide.ExecutionSucceeded, + } + exec, err = db.NewDistributedQueryExecution(exec) + require.Nil(t, err) + + // Add another query/campaign + q2 := &kolide.Query{ + Name: "foo", + Query: "select * from foo", + } + q2, err = db.NewQuery(q2) + require.Nil(t, err) + + c2 := kolide.DistributedQueryCampaign{ + QueryID: q2.ID, + Status: kolide.QueryRunning, + } + c2, err = db.NewDistributedQueryCampaign(c2) + require.Nil(t, err) + + // This one targets only h1 + target = kolide.DistributedQueryCampaignTarget{ + Type: kolide.TargetHost, + DistributedQueryCampaignID: c2.ID, + TargetID: h1.ID, + } + target, err = db.NewDistributedQueryCampaignTarget(target) + require.Nil(t, err) + + // Check for correct queries + queries, err = db.DistributedQueriesForHost(h1) + require.Nil(t, err) + assert.Len(t, queries, 1) + assert.Equal(t, "select * from foo", queries[c2.ID]) + queries, err = db.DistributedQueriesForHost(h2) + require.Nil(t, err) + assert.Len(t, queries, 1) + assert.Equal(t, "select * from bar", queries[c1.ID]) + + // End both of the campaigns + c1.Status = kolide.QueryComplete + require.Nil(t, db.SaveDistributedQueryCampaign(c1)) + c2.Status = kolide.QueryComplete + require.Nil(t, db.SaveDistributedQueryCampaign(c2)) + + // Now no queries should be returned + queries, err = db.DistributedQueriesForHost(h1) + require.Nil(t, err) + assert.Empty(t, queries) + queries, err = db.DistributedQueriesForHost(h2) + require.Nil(t, err) + assert.Empty(t, queries) + +} diff --git a/server/datastore/datastore_test.go b/server/datastore/datastore_test.go index 91c9cbf150..ea44f73ae6 100644 --- a/server/datastore/datastore_test.go +++ b/server/datastore/datastore_test.go @@ -37,4 +37,5 @@ var testFunctions = [...]func(*testing.T, kolide.Datastore){ testSearchLabelsLimit, testListHostsInLabel, testListUniqueHostsInLabels, + testDistributedQueriesForHost, } diff --git a/server/datastore/gorm.go b/server/datastore/gorm.go index e7961b4249..b9c663fe0b 100644 --- a/server/datastore/gorm.go +++ b/server/datastore/gorm.go @@ -47,11 +47,15 @@ func (orm gormDB) Migrate() error { } } - // Have to manually add indexes. Yuck! + // Have to manually add indexes when specific ordering needed. err := orm.DB.Model(&kolide.LabelQueryExecution{}).AddUniqueIndex("idx_lqe_label_host", "label_id", "host_id").Error if err != nil { return err } + err = orm.DB.Model(&kolide.DistributedQueryExecution{}).AddUniqueIndex("idx_dqe_unique_host_dq_id", "host_id", "distributed_query_campaign_id").Error + if err != nil { + return err + } indexes := []interface{}{} err = orm.DB.Raw("SELECT * FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = 'kolide' AND INDEX_NAME = 'hosts_search';").Scan(&indexes).Error diff --git a/server/datastore/gorm_hosts.go b/server/datastore/gorm_hosts.go index ef5a6d7531..7c269808e5 100644 --- a/server/datastore/gorm_hosts.go +++ b/server/datastore/gorm_hosts.go @@ -156,3 +156,40 @@ AGAINST(? IN BOOLEAN MODE) } return results, nil } + +func (orm gormDB) DistributedQueriesForHost(host *kolide.Host) (map[uint]string, error) { + sql := ` +SELECT DISTINCT dqc.id, q.query +FROM distributed_query_campaigns dqc +JOIN distributed_query_campaign_targets dqct + ON (dqc.id = dqct.distributed_query_campaign_id) +LEFT JOIN label_query_executions lqe + ON (dqct.type = ? AND dqct.target_id = lqe.label_id AND lqe.matches) +LEFT JOIN hosts h + ON ((dqct.type = ? AND lqe.host_id = h.id) OR (dqct.type = ? AND dqct.target_id = h.id)) +LEFT JOIN distributed_query_executions dqe + ON (h.id = dqe.host_id AND dqc.id = dqe.distributed_query_id) +JOIN queries q + ON (dqc.query_id = q.id) +WHERE dqe.status IS NULL AND dqc.status = ? AND h.id = ?; +` + rows, err := orm.DB.Raw(sql, kolide.TargetLabel, kolide.TargetLabel, kolide.TargetHost, kolide.QueryRunning, host.ID).Rows() + if err != nil && err != gorm.ErrRecordNotFound { + return nil, errors.DatabaseError(err) + } + defer rows.Close() + + results := map[uint]string{} + for rows.Next() { + var id uint + var query string + err = rows.Scan(&id, &query) + if err != nil { + return nil, errors.DatabaseError(err) + } + results[id] = query + } + + return results, nil + +} diff --git a/server/datastore/gorm_queries.go b/server/datastore/gorm_queries.go index 7ccdfe5014..475312b3f8 100644 --- a/server/datastore/gorm_queries.go +++ b/server/datastore/gorm_queries.go @@ -55,3 +55,22 @@ func (orm gormDB) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, error) { err := orm.applyListOptions(opt).Find(&queries).Error return queries, err } + +func (orm gormDB) NewDistributedQueryExecution(exec kolide.DistributedQueryExecution) (kolide.DistributedQueryExecution, error) { + err := orm.DB.Create(&exec).Error + return exec, err +} + +func (orm gormDB) NewDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) (kolide.DistributedQueryCampaign, error) { + err := orm.DB.Create(&camp).Error + return camp, err +} + +func (orm gormDB) SaveDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) error { + return orm.DB.Save(&camp).Error +} + +func (orm gormDB) NewDistributedQueryCampaignTarget(target kolide.DistributedQueryCampaignTarget) (kolide.DistributedQueryCampaignTarget, error) { + err := orm.DB.Create(&target).Error + return target, err +} diff --git a/server/datastore/inmem.go b/server/datastore/inmem.go index 1a67e3e644..896c3f0363 100644 --- a/server/datastore/inmem.go +++ b/server/datastore/inmem.go @@ -14,17 +14,20 @@ type inmem struct { mtx sync.RWMutex nextIDs map[interface{}]uint - users map[uint]*kolide.User - sessions map[uint]*kolide.Session - passwordResets map[uint]*kolide.PasswordResetRequest - invites map[uint]*kolide.Invite - labels map[uint]*kolide.Label - labelQueryExecutions map[uint]*kolide.LabelQueryExecution - queries map[uint]*kolide.Query - packs map[uint]*kolide.Pack - hosts map[uint]*kolide.Host - packQueries map[uint]*kolide.PackQuery - packTargets map[uint]*kolide.PackTarget + users map[uint]*kolide.User + sessions map[uint]*kolide.Session + passwordResets map[uint]*kolide.PasswordResetRequest + invites map[uint]*kolide.Invite + labels map[uint]*kolide.Label + labelQueryExecutions map[uint]*kolide.LabelQueryExecution + queries map[uint]*kolide.Query + packs map[uint]*kolide.Pack + hosts map[uint]*kolide.Host + packQueries map[uint]*kolide.PackQuery + packTargets map[uint]*kolide.PackTarget + distributedQueryExecutions map[uint]kolide.DistributedQueryExecution + distributedQueryCampaigns map[uint]kolide.DistributedQueryCampaign + distributedQueryCampaignTargets map[uint]kolide.DistributedQueryCampaignTarget orginfo *kolide.AppConfig } @@ -48,6 +51,9 @@ func (orm *inmem) Migrate() error { orm.hosts = make(map[uint]*kolide.Host) orm.packQueries = make(map[uint]*kolide.PackQuery) orm.packTargets = make(map[uint]*kolide.PackTarget) + orm.distributedQueryExecutions = make(map[uint]kolide.DistributedQueryExecution) + orm.distributedQueryCampaigns = make(map[uint]kolide.DistributedQueryCampaign) + orm.distributedQueryCampaignTargets = make(map[uint]kolide.DistributedQueryCampaignTarget) return nil } diff --git a/server/datastore/inmem_hosts.go b/server/datastore/inmem_hosts.go index 06ea019114..38c59e416e 100644 --- a/server/datastore/inmem_hosts.go +++ b/server/datastore/inmem_hosts.go @@ -203,3 +203,40 @@ func (orm *inmem) SearchHosts(query string, omit []uint) ([]kolide.Host, error) return results, nil } + +func (orm *inmem) DistributedQueriesForHost(host *kolide.Host) (map[uint]string, error) { + // lookup of executions for this host + hostExecutions := map[uint]kolide.DistributedQueryExecutionStatus{} + for _, e := range orm.distributedQueryExecutions { + if e.HostID == host.ID { + hostExecutions[e.DistributedQueryCampaignID] = e.Status + } + } + + // lookup of labels for this host (only including matching labels) + hostLabels := map[uint]bool{} + labels, err := orm.ListLabelsForHost(host.ID) + if err != nil { + return nil, err + } + for _, l := range labels { + hostLabels[l.ID] = true + } + + queries := map[uint]string{} // map campaign ID -> query string + for _, campaign := range orm.distributedQueryCampaigns { + if campaign.Status != kolide.QueryRunning { + continue + } + for _, target := range orm.distributedQueryCampaignTargets { + if campaign.ID == target.DistributedQueryCampaignID && + ((target.Type == kolide.TargetHost && target.TargetID == host.ID) || + (target.Type == kolide.TargetLabel && hostLabels[target.TargetID])) && + (hostExecutions[campaign.ID] == kolide.ExecutionWaiting) { + queries[campaign.ID] = orm.queries[campaign.QueryID].Query + } + } + } + + return queries, nil +} diff --git a/server/datastore/inmem_queries.go b/server/datastore/inmem_queries.go index 75b56ebd68..56dc3ff340 100644 --- a/server/datastore/inmem_queries.go +++ b/server/datastore/inmem_queries.go @@ -101,3 +101,51 @@ func (orm *inmem) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, error) { return queries, nil } + +func (orm *inmem) NewDistributedQueryExecution(exec kolide.DistributedQueryExecution) (kolide.DistributedQueryExecution, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + for _, e := range orm.distributedQueryExecutions { + if exec.HostID == e.ID && exec.DistributedQueryCampaignID == e.DistributedQueryCampaignID { + return exec, ErrExists + } + } + + exec.ID = orm.nextID(exec) + orm.distributedQueryExecutions[exec.ID] = exec + + return exec, nil +} + +func (orm *inmem) NewDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) (kolide.DistributedQueryCampaign, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + camp.ID = orm.nextID(camp) + orm.distributedQueryCampaigns[camp.ID] = camp + + return camp, nil +} + +func (orm *inmem) SaveDistributedQueryCampaign(camp kolide.DistributedQueryCampaign) error { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + if _, ok := orm.distributedQueryCampaigns[camp.ID]; !ok { + return ErrNotFound + } + + orm.distributedQueryCampaigns[camp.ID] = camp + return nil +} + +func (orm *inmem) NewDistributedQueryCampaignTarget(target kolide.DistributedQueryCampaignTarget) (kolide.DistributedQueryCampaignTarget, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + target.ID = orm.nextID(target) + orm.distributedQueryCampaignTargets[target.ID] = target + + return target, nil +} diff --git a/server/kolide/hosts.go b/server/kolide/hosts.go index 3fd28f9a00..fe75971463 100644 --- a/server/kolide/hosts.go +++ b/server/kolide/hosts.go @@ -16,6 +16,10 @@ type HostStore interface { AuthenticateHost(nodeKey string) (*Host, error) MarkHostSeen(host *Host, t time.Time) error SearchHosts(query string, omit []uint) ([]Host, error) + // DistributedQueriesForHost retrieves the distributed queries that the + // given host should run. The result map is a mapping from campaign ID + // to query text. + DistributedQueriesForHost(host *Host) (map[uint]string, error) } type HostService interface { diff --git a/server/kolide/queries.go b/server/kolide/queries.go index dfc2d77df1..5f5331d5fd 100644 --- a/server/kolide/queries.go +++ b/server/kolide/queries.go @@ -14,6 +14,18 @@ type QueryStore interface { DeleteQuery(query *Query) error Query(id uint) (*Query, error) ListQueries(opt ListOptions) ([]*Query, error) + + // NewDistributedQueryCampaign creates a new distributed query campaign + NewDistributedQueryCampaign(camp DistributedQueryCampaign) (DistributedQueryCampaign, error) + // SaveDistributedQueryCampaign updates an existing distributed query + // campaign + SaveDistributedQueryCampaign(camp DistributedQueryCampaign) error + // NewDistributedQueryCampaignTarget adds a new target to an existing + // distributed query campaign + NewDistributedQueryCampaignTarget(target DistributedQueryCampaignTarget) (DistributedQueryCampaignTarget, error) + // NewDistributedQueryCampaignExecution records a new execution for a + // distributed query campaign + NewDistributedQueryExecution(exec DistributedQueryExecution) (DistributedQueryExecution, error) } type QueryService interface { @@ -75,7 +87,7 @@ type DistributedQueryCampaign struct { type DistributedQueryCampaignTarget struct { ID uint `gorm:"primary_key"` Type TargetType - DistributedQueryCampaignID uint + DistributedQueryCampaignID uint `gorm:"index:idx_dqct_dqc_id"` TargetID uint } @@ -95,12 +107,12 @@ type DistributedQueryResult struct { } type DistributedQueryExecution struct { - ID uint `gorm:"primary_key"` - HostID uint - DistributedQueryID uint - Status DistributedQueryExecutionStatus - Error string `gorm:"size:1024"` - ExecutionDuration time.Duration + ID uint `gorm:"primary_key"` + HostID uint // unique index added in migrate + DistributedQueryCampaignID uint // unique index added in migrate + Status DistributedQueryExecutionStatus + Error string `gorm:"size:1024"` + ExecutionDuration time.Duration } type Option struct {