diff --git a/server/datastore/datastore_campaigns_test.go b/server/datastore/datastore_campaigns_test.go new file mode 100644 index 0000000000..6508f5165a --- /dev/null +++ b/server/datastore/datastore_campaigns_test.go @@ -0,0 +1,130 @@ +package datastore + +import ( + "testing" + "time" + + "github.com/WatchBeam/clock" + "github.com/kolide/kolide-ose/server/kolide" + "github.com/patrickmn/sortutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newQuery(t *testing.T, ds kolide.Datastore, name, q string) *kolide.Query { + query, err := ds.NewQuery(&kolide.Query{ + Name: name, + Query: q, + }) + require.Nil(t, err) + + return query +} + +func newCampaign(t *testing.T, ds kolide.Datastore, queryID uint, status kolide.DistributedQueryStatus) *kolide.DistributedQueryCampaign { + campaign, err := ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{ + QueryID: queryID, + Status: status, + }) + require.Nil(t, err) + + return campaign +} + +func newHost(t *testing.T, ds kolide.Datastore, name, ip, key, uuid string, tim time.Time) *kolide.Host { + h, err := ds.NewHost(&kolide.Host{ + HostName: name, + PrimaryIP: ip, + NodeKey: key, + UUID: uuid, + DetailUpdateTime: tim, + }) + + require.Nil(t, err) + require.NotZero(t, h.ID) + require.Nil(t, ds.MarkHostSeen(h, tim)) + + return h +} + +func newLabel(t *testing.T, ds kolide.Datastore, name, query string) *kolide.Label { + l, err := ds.NewLabel(&kolide.Label{Name: name, Query: query}) + + require.Nil(t, err) + require.NotZero(t, l.ID) + + return l +} + +func addHost(t *testing.T, ds kolide.Datastore, campaignID, hostID uint) { + _, err := ds.NewDistributedQueryCampaignTarget( + &kolide.DistributedQueryCampaignTarget{ + Type: kolide.TargetHost, + TargetID: hostID, + DistributedQueryCampaignID: campaignID, + }) + require.Nil(t, err) + +} + +func addLabel(t *testing.T, ds kolide.Datastore, campaignID, labelID uint) { + _, err := ds.NewDistributedQueryCampaignTarget( + &kolide.DistributedQueryCampaignTarget{ + Type: kolide.TargetLabel, + TargetID: labelID, + DistributedQueryCampaignID: campaignID, + }) + require.Nil(t, err) +} + +func checkTargets(t *testing.T, ds kolide.Datastore, campaignID uint, expectedHostIDs []uint, expectedLabelIDs []uint) { + hostIDs, labelIDs, err := ds.DistributedQueryCampaignTargetIDs(campaignID) + require.Nil(t, err) + + sortutil.Asc(expectedHostIDs) + sortutil.Asc(hostIDs) + assert.Equal(t, expectedHostIDs, hostIDs) + + sortutil.Asc(expectedLabelIDs) + sortutil.Asc(labelIDs) + assert.Equal(t, expectedLabelIDs, labelIDs) +} + +func testDistributedQueryCampaign(t *testing.T, ds kolide.Datastore) { + mockClock := clock.NewMockClock() + + query := newQuery(t, ds, "test", "select * from time") + + campaign := newCampaign(t, ds, query.ID, kolide.QueryRunning) + + { + retrieved, err := ds.DistributedQueryCampaign(campaign.ID) + require.Nil(t, err) + assert.Equal(t, campaign.QueryID, retrieved.QueryID) + assert.Equal(t, campaign.Status, retrieved.Status) + } + + h1 := newHost(t, ds, "foo.local", "192.168.1.10", "1", "1", mockClock.Now()) + h2 := newHost(t, ds, "bar.local", "192.168.1.11", "2", "2", mockClock.Now().Add(-1*time.Hour)) + h3 := newHost(t, ds, "baz.local", "192.168.1.12", "3", "3", mockClock.Now().Add(-13*time.Minute)) + + l1 := newLabel(t, ds, "label foo", "query foo") + l2 := newLabel(t, ds, "label bar", "query foo") + + checkTargets(t, ds, campaign.ID, []uint{}, []uint{}) + + addHost(t, ds, campaign.ID, h1.ID) + checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{}) + + addLabel(t, ds, campaign.ID, l1.ID) + checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID}) + + addLabel(t, ds, campaign.ID, l2.ID) + checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID, l2.ID}) + + addHost(t, ds, campaign.ID, h2.ID) + addHost(t, ds, campaign.ID, h3.ID) + + checkTargets(t, ds, campaign.ID, []uint{h1.ID, h2.ID, h3.ID}, []uint{l1.ID, l2.ID}) + +} diff --git a/server/datastore/datastore_queries_test.go b/server/datastore/datastore_queries_test.go index fc8babe9c7..bc8d2c63e6 100644 --- a/server/datastore/datastore_queries_test.go +++ b/server/datastore/datastore_queries_test.go @@ -3,13 +3,9 @@ package datastore import ( "fmt" "testing" - "time" - "github.com/WatchBeam/clock" "github.com/kolide/kolide-ose/server/kolide" - "github.com/patrickmn/sortutil" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func testDeleteQuery(t *testing.T, ds kolide.Datastore) { @@ -63,121 +59,3 @@ func testListQuery(t *testing.T, ds kolide.Datastore) { assert.Nil(t, err) assert.Equal(t, 10, len(results)) } - -func newQuery(t *testing.T, ds kolide.Datastore, name, q string) *kolide.Query { - query, err := ds.NewQuery(&kolide.Query{ - Name: name, - Query: q, - }) - require.Nil(t, err) - - return query -} - -func newCampaign(t *testing.T, ds kolide.Datastore, queryID uint, status kolide.DistributedQueryStatus) *kolide.DistributedQueryCampaign { - campaign, err := ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{ - QueryID: queryID, - Status: status, - }) - require.Nil(t, err) - - return campaign -} - -func newHost(t *testing.T, ds kolide.Datastore, name, ip, key, uuid string, tim time.Time) *kolide.Host { - h, err := ds.NewHost(&kolide.Host{ - HostName: name, - PrimaryIP: ip, - NodeKey: key, - UUID: uuid, - DetailUpdateTime: tim, - }) - - require.Nil(t, err) - require.NotZero(t, h.ID) - require.Nil(t, ds.MarkHostSeen(h, tim)) - - return h -} - -func newLabel(t *testing.T, ds kolide.Datastore, name, query string) *kolide.Label { - l, err := ds.NewLabel(&kolide.Label{Name: name, Query: query}) - - require.Nil(t, err) - require.NotZero(t, l.ID) - - return l -} - -func addHost(t *testing.T, ds kolide.Datastore, campaignID, hostID uint) { - _, err := ds.NewDistributedQueryCampaignTarget( - &kolide.DistributedQueryCampaignTarget{ - Type: kolide.TargetHost, - TargetID: hostID, - DistributedQueryCampaignID: campaignID, - }) - require.Nil(t, err) - -} - -func addLabel(t *testing.T, ds kolide.Datastore, campaignID, labelID uint) { - _, err := ds.NewDistributedQueryCampaignTarget( - &kolide.DistributedQueryCampaignTarget{ - Type: kolide.TargetLabel, - TargetID: labelID, - DistributedQueryCampaignID: campaignID, - }) - require.Nil(t, err) -} - -func checkTargets(t *testing.T, ds kolide.Datastore, campaignID uint, expectedHostIDs []uint, expectedLabelIDs []uint) { - hostIDs, labelIDs, err := ds.DistributedQueryCampaignTargetIDs(campaignID) - require.Nil(t, err) - - sortutil.Asc(expectedHostIDs) - sortutil.Asc(hostIDs) - assert.Equal(t, expectedHostIDs, hostIDs) - - sortutil.Asc(expectedLabelIDs) - sortutil.Asc(labelIDs) - assert.Equal(t, expectedLabelIDs, labelIDs) -} - -func testDistributedQueryCampaign(t *testing.T, ds kolide.Datastore) { - mockClock := clock.NewMockClock() - - query := newQuery(t, ds, "test", "select * from time") - - campaign := newCampaign(t, ds, query.ID, kolide.QueryRunning) - - { - retrieved, err := ds.DistributedQueryCampaign(campaign.ID) - require.Nil(t, err) - assert.Equal(t, campaign.QueryID, retrieved.QueryID) - assert.Equal(t, campaign.Status, retrieved.Status) - } - - h1 := newHost(t, ds, "foo.local", "192.168.1.10", "1", "1", mockClock.Now()) - h2 := newHost(t, ds, "bar.local", "192.168.1.11", "2", "2", mockClock.Now().Add(-1*time.Hour)) - h3 := newHost(t, ds, "baz.local", "192.168.1.12", "3", "3", mockClock.Now().Add(-13*time.Minute)) - - l1 := newLabel(t, ds, "label foo", "query foo") - l2 := newLabel(t, ds, "label bar", "query foo") - - checkTargets(t, ds, campaign.ID, []uint{}, []uint{}) - - addHost(t, ds, campaign.ID, h1.ID) - checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{}) - - addLabel(t, ds, campaign.ID, l1.ID) - checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID}) - - addLabel(t, ds, campaign.ID, l2.ID) - checkTargets(t, ds, campaign.ID, []uint{h1.ID}, []uint{l1.ID, l2.ID}) - - addHost(t, ds, campaign.ID, h2.ID) - addHost(t, ds, campaign.ID, h3.ID) - - checkTargets(t, ds, campaign.ID, []uint{h1.ID, h2.ID, h3.ID}, []uint{l1.ID, l2.ID}) - -} diff --git a/server/datastore/inmem/campaigns.go b/server/datastore/inmem/campaigns.go new file mode 100644 index 0000000000..b45e114dea --- /dev/null +++ b/server/datastore/inmem/campaigns.go @@ -0,0 +1,89 @@ +package inmem + +import ( + "fmt" + + "github.com/kolide/kolide-ose/server/errors" + "github.com/kolide/kolide-ose/server/kolide" +) + +func (orm *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + camp.ID = orm.nextID(camp) + orm.distributedQueryCampaigns[camp.ID] = *camp + + return camp, nil +} + +func (orm *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + campaign, ok := orm.distributedQueryCampaigns[id] + if !ok { + return nil, errors.ErrNotFound + } + + return &campaign, nil +} + +func (orm *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + if _, ok := orm.distributedQueryCampaigns[camp.ID]; !ok { + return errors.ErrNotFound + } + + orm.distributedQueryCampaigns[camp.ID] = *camp + return nil +} + +func (orm *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + hostIDs = []uint{} + labelIDs = []uint{} + for _, target := range orm.distributedQueryCampaignTargets { + if target.DistributedQueryCampaignID == id { + if target.Type == kolide.TargetHost { + hostIDs = append(hostIDs, target.TargetID) + } else if target.Type == kolide.TargetLabel { + labelIDs = append(labelIDs, target.TargetID) + } else { + return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type) + } + } + } + + return hostIDs, labelIDs, nil +} + +func (orm *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + target.ID = orm.nextID(target) + orm.distributedQueryCampaignTargets[target.ID] = *target + + return target, nil +} + +func (orm *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) { + orm.mtx.Lock() + defer orm.mtx.Unlock() + + for _, e := range orm.distributedQueryExecutions { + if exec.HostID == e.ID && exec.DistributedQueryCampaignID == e.DistributedQueryCampaignID { + return exec, errors.ErrExists + } + } + + exec.ID = orm.nextID(exec) + orm.distributedQueryExecutions[exec.ID] = *exec + + return exec, nil +} diff --git a/server/datastore/inmem/queries.go b/server/datastore/inmem/queries.go index 8764d977d8..727eb86437 100644 --- a/server/datastore/inmem/queries.go +++ b/server/datastore/inmem/queries.go @@ -1,7 +1,6 @@ package inmem import ( - "fmt" "sort" "github.com/kolide/kolide-ose/server/errors" @@ -103,84 +102,3 @@ func (orm *Datastore) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, erro return queries, nil } - -func (orm *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) { - orm.mtx.Lock() - defer orm.mtx.Unlock() - - camp.ID = orm.nextID(camp) - orm.distributedQueryCampaigns[camp.ID] = *camp - - return camp, nil -} - -func (orm *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) { - orm.mtx.Lock() - defer orm.mtx.Unlock() - - campaign, ok := orm.distributedQueryCampaigns[id] - if !ok { - return nil, errors.ErrNotFound - } - - return &campaign, nil -} - -func (orm *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error { - orm.mtx.Lock() - defer orm.mtx.Unlock() - - if _, ok := orm.distributedQueryCampaigns[camp.ID]; !ok { - return errors.ErrNotFound - } - - orm.distributedQueryCampaigns[camp.ID] = *camp - return nil -} - -func (orm *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) { - orm.mtx.Lock() - defer orm.mtx.Unlock() - - hostIDs = []uint{} - labelIDs = []uint{} - for _, target := range orm.distributedQueryCampaignTargets { - if target.DistributedQueryCampaignID == id { - if target.Type == kolide.TargetHost { - hostIDs = append(hostIDs, target.TargetID) - } else if target.Type == kolide.TargetLabel { - labelIDs = append(labelIDs, target.TargetID) - } else { - return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type) - } - } - } - - return hostIDs, labelIDs, nil -} - -func (orm *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) { - orm.mtx.Lock() - defer orm.mtx.Unlock() - - target.ID = orm.nextID(target) - orm.distributedQueryCampaignTargets[target.ID] = *target - - return target, nil -} - -func (orm *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) { - orm.mtx.Lock() - defer orm.mtx.Unlock() - - for _, e := range orm.distributedQueryExecutions { - if exec.HostID == e.ID && exec.DistributedQueryCampaignID == e.DistributedQueryCampaignID { - return exec, errors.ErrExists - } - } - - exec.ID = orm.nextID(exec) - orm.distributedQueryExecutions[exec.ID] = *exec - - return exec, nil -} diff --git a/server/datastore/mysql/campaigns.go b/server/datastore/mysql/campaigns.go new file mode 100644 index 0000000000..6e4c92feea --- /dev/null +++ b/server/datastore/mysql/campaigns.go @@ -0,0 +1,123 @@ +package mysql + +import ( + "fmt" + + "github.com/kolide/kolide-ose/server/errors" + "github.com/kolide/kolide-ose/server/kolide" +) + +func (d *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) { + + sqlStatement := ` + INSERT INTO distributed_query_campaigns ( + query_id, + status, + user_id + ) + VALUES(?,?,?) + ` + result, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID) + if err != nil { + return nil, errors.DatabaseError(err) + } + + id, _ := result.LastInsertId() + camp.ID = uint(id) + return camp, nil +} + +func (d *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) { + sql := ` + SELECT * FROM distributed_query_campaigns WHERE id = ? AND NOT deleted + ` + campaign := &kolide.DistributedQueryCampaign{} + if err := d.db.Get(campaign, sql, id); err != nil { + return nil, errors.DatabaseError(err) + } + + return campaign, nil +} + +func (d *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error { + sqlStatement := ` + UPDATE distributed_query_campaigns SET + query_id = ?, + status = ?, + user_id = ? + WHERE id = ? + AND NOT deleted + ` + _, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID, camp.ID) + if err != nil { + return errors.DatabaseError(err) + } + + return nil +} + +func (d *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) { + sqlStatement := ` + SELECT * FROM distributed_query_campaign_targets WHERE distributed_query_campaign_id = ? + ` + targets := []kolide.DistributedQueryCampaignTarget{} + + if err = d.db.Select(&targets, sqlStatement, id); err != nil { + return nil, nil, errors.DatabaseError(err) + } + + hostIDs = []uint{} + labelIDs = []uint{} + for _, target := range targets { + if target.Type == kolide.TargetHost { + hostIDs = append(hostIDs, target.TargetID) + } else if target.Type == kolide.TargetLabel { + labelIDs = append(labelIDs, target.TargetID) + } else { + return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type) + } + } + + return hostIDs, labelIDs, nil +} + +func (d *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) { + sqlStatement := ` + INSERT into distributed_query_campaign_targets ( + type, + distributed_query_campaign_id, + target_id + ) + VALUES (?,?,?) + ` + result, err := d.db.Exec(sqlStatement, target.Type, target.DistributedQueryCampaignID, target.TargetID) + if err != nil { + return nil, errors.DatabaseError(err) + } + + id, _ := result.LastInsertId() + target.ID = uint(id) + return target, nil +} + +func (d *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) { + sqlStatement := ` + INSERT INTO distributed_query_executions ( + host_id, + distributed_query_campaign_id, + status, + error, + execution_duration + ) VALUES (?,?,?,?,?) + ` + result, err := d.db.Exec(sqlStatement, exec.HostID, exec.DistributedQueryCampaignID, + exec.Status, exec.Error, exec.ExecutionDuration) + if err != nil { + return nil, errors.DatabaseError(err) + } + + id, _ := result.LastInsertId() + exec.ID = uint(id) + + return exec, nil +} diff --git a/server/datastore/mysql/queries.go b/server/datastore/mysql/queries.go index 51374075df..1f731cf9d9 100644 --- a/server/datastore/mysql/queries.go +++ b/server/datastore/mysql/queries.go @@ -1,8 +1,6 @@ package mysql import ( - "fmt" - "github.com/kolide/kolide-ose/server/errors" "github.com/kolide/kolide-ose/server/kolide" ) @@ -90,118 +88,3 @@ func (d *Datastore) ListQueries(opt kolide.ListOptions) ([]*kolide.Query, error) return results, nil } - -func (d *Datastore) NewDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) (*kolide.DistributedQueryCampaign, error) { - - sqlStatement := ` - INSERT INTO distributed_query_campaigns ( - query_id, - status, - user_id - ) - VALUES(?,?,?) - ` - result, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID) - if err != nil { - return nil, errors.DatabaseError(err) - } - - id, _ := result.LastInsertId() - camp.ID = uint(id) - return camp, nil -} - -func (d *Datastore) DistributedQueryCampaign(id uint) (*kolide.DistributedQueryCampaign, error) { - sql := ` - SELECT * FROM distributed_query_campaigns WHERE id = ? AND NOT deleted - ` - campaign := &kolide.DistributedQueryCampaign{} - if err := d.db.Get(campaign, sql, id); err != nil { - return nil, errors.DatabaseError(err) - } - - return campaign, nil -} - -func (d *Datastore) SaveDistributedQueryCampaign(camp *kolide.DistributedQueryCampaign) error { - sqlStatement := ` - UPDATE distributed_query_campaigns SET - query_id = ?, - status = ?, - user_id = ? - WHERE id = ? - AND NOT deleted - ` - _, err := d.db.Exec(sqlStatement, camp.QueryID, camp.Status, camp.UserID, camp.ID) - if err != nil { - return errors.DatabaseError(err) - } - - return nil -} - -func (d *Datastore) DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) { - sqlStatement := ` - SELECT * FROM distributed_query_campaign_targets WHERE distributed_query_campaign_id = ? - ` - targets := []kolide.DistributedQueryCampaignTarget{} - - if err = d.db.Select(&targets, sqlStatement, id); err != nil { - return nil, nil, errors.DatabaseError(err) - } - - hostIDs = []uint{} - labelIDs = []uint{} - for _, target := range targets { - if target.Type == kolide.TargetHost { - hostIDs = append(hostIDs, target.TargetID) - } else if target.Type == kolide.TargetLabel { - labelIDs = append(labelIDs, target.TargetID) - } else { - return []uint{}, []uint{}, fmt.Errorf("invalid target type: %d", target.Type) - } - } - - return hostIDs, labelIDs, nil -} - -func (d *Datastore) NewDistributedQueryCampaignTarget(target *kolide.DistributedQueryCampaignTarget) (*kolide.DistributedQueryCampaignTarget, error) { - sqlStatement := ` - INSERT into distributed_query_campaign_targets ( - type, - distributed_query_campaign_id, - target_id - ) - VALUES (?,?,?) - ` - result, err := d.db.Exec(sqlStatement, target.Type, target.DistributedQueryCampaignID, target.TargetID) - if err != nil { - return nil, errors.DatabaseError(err) - } - - id, _ := result.LastInsertId() - target.ID = uint(id) - return target, nil -} - -func (d *Datastore) NewDistributedQueryExecution(exec *kolide.DistributedQueryExecution) (*kolide.DistributedQueryExecution, error) { - sqlStatement := ` - INSERT INTO distributed_query_executions ( - host_id, - distributed_query_campaign_id, - status, - error, - execution_duration - ) VALUES (?,?,?,?,?) - ` - result, err := d.db.Exec(sqlStatement, exec.HostID, exec.DistributedQueryCampaignID, - exec.Status, exec.Error, exec.ExecutionDuration) - if err != nil { - return nil, errors.DatabaseError(err) - } - - id, _ := result.LastInsertId() - exec.ID = uint(id) - - return exec, nil -} diff --git a/server/kolide/campaigns.go b/server/kolide/campaigns.go new file mode 100644 index 0000000000..493f55732f --- /dev/null +++ b/server/kolide/campaigns.go @@ -0,0 +1,106 @@ +package kolide + +import ( + "time" + + "github.com/kolide/kolide-ose/server/websocket" + "golang.org/x/net/context" +) + +// CampaignStore defines the distributed query campaign related datastore +// methods +type CampaignStore interface { + // NewDistributedQueryCampaign creates a new distributed query campaign + NewDistributedQueryCampaign(camp *DistributedQueryCampaign) (*DistributedQueryCampaign, error) + // DistributedQueryCampaign loads a distributed query campaign by ID + DistributedQueryCampaign(id uint) (*DistributedQueryCampaign, error) + // SaveDistributedQueryCampaign updates an existing distributed query + // campaign + SaveDistributedQueryCampaign(camp *DistributedQueryCampaign) error + // DistributedQueryCampaignTargetIDs gets the IDs of the targets for + // the query campaign of the provided ID + DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) + + // NewDistributedQueryCampaignTarget adds a new target to an existing + // distributed query campaign + NewDistributedQueryCampaignTarget(target *DistributedQueryCampaignTarget) (*DistributedQueryCampaignTarget, error) + + // NewDistributedQueryCampaignExecution records a new execution for a + // distributed query campaign + NewDistributedQueryExecution(exec *DistributedQueryExecution) (*DistributedQueryExecution, error) +} + +// CampaignService defines the distributed query campaign related service +// methods +type CampaignService interface { + // NewDistributedQueryCampaign creates a new distributed query campaign + // with the provided query and host/label targets + NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*DistributedQueryCampaign, error) + + // StreamCampaignResults streams updates with query results and + // expected host totals over the provided websocket. Note that the type + // signature is somewhat inconsistent due to this being a streaming API + // and not the typical go-kit RPC style. + StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) +} + +// DistributedQueryStatus is the lifecycle status of a distributed query +// campaign. +type DistributedQueryStatus int + +const ( + QueryRunning DistributedQueryStatus = iota + QueryComplete DistributedQueryStatus = iota + QueryError DistributedQueryStatus = iota +) + +// DistributedQueryCampaign is the basic metadata associated with a distributed +// query. +type DistributedQueryCampaign struct { + UpdateCreateTimestamps + DeleteFields + ID uint `json:"id"` + QueryID uint `json:"query_id" db:"query_id"` + Status DistributedQueryStatus `json:"status"` + UserID uint `json:"user_id" db:"user_id"` +} + +// DistributedQueryCampaignTarget stores a target (host or label) for a +// distributed query campaign. There is a one -> many mapping of campaigns to +// targets. +type DistributedQueryCampaignTarget struct { + ID uint + Type TargetType + DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"` + TargetID uint `db:"target_id"` +} + +// DistributedQueryExecutionStatus is the status of a distributed query +// execution on a single host. +type DistributedQueryExecutionStatus int + +const ( + ExecutionWaiting DistributedQueryExecutionStatus = iota + ExecutionRequested + ExecutionSucceeded + ExecutionFailed +) + +// DistributedQueryResult is the result returned from the execution of a +// distributed query on a single host. +type DistributedQueryResult struct { + DistributedQueryCampaignID uint `json:"distributed_query_execution_id"` + Host Host `json:"host"` + Rows []map[string]string `json:"rows"` +} + +// DistributedQueryExecution is the metadata associated with a distributed +// query execution on a single host. +type DistributedQueryExecution struct { + ID uint + HostID uint `db:"host_id"` + DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"` + Status DistributedQueryExecutionStatus + Error string + ExecutionDuration time.Duration `db:"execution_duration"` +} diff --git a/server/kolide/datastore.go b/server/kolide/datastore.go index 8f4dfbfc17..bcbded8b30 100644 --- a/server/kolide/datastore.go +++ b/server/kolide/datastore.go @@ -4,6 +4,7 @@ package kolide type Datastore interface { UserStore QueryStore + CampaignStore PackStore LabelStore HostStore diff --git a/server/kolide/queries.go b/server/kolide/queries.go index f17ed08130..d6235a3099 100644 --- a/server/kolide/queries.go +++ b/server/kolide/queries.go @@ -3,7 +3,6 @@ package kolide import ( "time" - "github.com/kolide/kolide-ose/server/websocket" "golang.org/x/net/context" ) @@ -14,25 +13,6 @@ type QueryStore interface { DeleteQuery(query *Query) error Query(id uint) (*Query, error) ListQueries(opt ListOptions) ([]*Query, error) - - // NewDistributedQueryCampaign creates a new distributed query campaign - NewDistributedQueryCampaign(camp *DistributedQueryCampaign) (*DistributedQueryCampaign, error) - // DistributedQueryCampaign loads a distributed query campaign by ID - DistributedQueryCampaign(id uint) (*DistributedQueryCampaign, error) - // SaveDistributedQueryCampaign updates an existing distributed query - // campaign - SaveDistributedQueryCampaign(camp *DistributedQueryCampaign) error - // DistributedQueryCampaignTargetIDs gets the IDs of the targets for - // the query campaign of the provided ID - DistributedQueryCampaignTargetIDs(id uint) (hostIDs []uint, labelIDs []uint, err error) - - // NewDistributedQueryCampaignTarget adds a new target to an existing - // distributed query campaign - NewDistributedQueryCampaignTarget(target *DistributedQueryCampaignTarget) (*DistributedQueryCampaignTarget, error) - - // NewDistributedQueryCampaignExecution records a new execution for a - // distributed query campaign - NewDistributedQueryExecution(exec *DistributedQueryExecution) (*DistributedQueryExecution, error) } type QueryService interface { @@ -41,13 +21,6 @@ type QueryService interface { NewQuery(ctx context.Context, p QueryPayload) (*Query, error) ModifyQuery(ctx context.Context, id uint, p QueryPayload) (*Query, error) DeleteQuery(ctx context.Context, id uint) error - NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*DistributedQueryCampaign, error) - - // StreamCampaignResults streams updates with query results and - // expected host totals over the provided websocket. Note that the type - // signature is somewhat inconsistent due to this being a streaming API - // and not the typical go-kit RPC style. - StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) } type QueryPayload struct { @@ -75,54 +48,6 @@ type Query struct { Version string `json:"version"` } -type DistributedQueryStatus int - -const ( - QueryRunning DistributedQueryStatus = iota - QueryComplete DistributedQueryStatus = iota - QueryError DistributedQueryStatus = iota -) - -type DistributedQueryCampaign struct { - UpdateCreateTimestamps - DeleteFields - ID uint `json:"id"` - QueryID uint `json:"query_id" db:"query_id"` - Status DistributedQueryStatus `json:"status"` - UserID uint `json:"user_id" db:"user_id"` -} - -type DistributedQueryCampaignTarget struct { - ID uint - Type TargetType - DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"` - TargetID uint `db:"target_id"` -} - -type DistributedQueryExecutionStatus int - -const ( - ExecutionWaiting DistributedQueryExecutionStatus = iota - ExecutionRequested - ExecutionSucceeded - ExecutionFailed -) - -type DistributedQueryResult struct { - DistributedQueryCampaignID uint `json:"distributed_query_execution_id"` - Host Host `json:"host"` - Rows []map[string]string `json:"rows"` -} - -type DistributedQueryExecution struct { - ID uint - HostID uint `db:"host_id"` - DistributedQueryCampaignID uint `db:"distributed_query_campaign_id"` - Status DistributedQueryExecutionStatus - Error string - ExecutionDuration time.Duration `db:"execution_duration"` -} - type Option struct { ID uint CreatedAt time.Time diff --git a/server/kolide/service.go b/server/kolide/service.go index 5cc8db1bf0..05d3784761 100644 --- a/server/kolide/service.go +++ b/server/kolide/service.go @@ -7,6 +7,7 @@ type Service interface { PackService LabelService QueryService + CampaignService OsqueryService HostService AppConfigService diff --git a/server/service/endpoint_campaigns.go b/server/service/endpoint_campaigns.go new file mode 100644 index 0000000000..993d350e13 --- /dev/null +++ b/server/service/endpoint_campaigns.go @@ -0,0 +1,81 @@ +package service + +import ( + "net/http" + + "github.com/go-kit/kit/endpoint" + "github.com/kolide/kolide-ose/server/contexts/viewer" + "github.com/kolide/kolide-ose/server/kolide" + "github.com/kolide/kolide-ose/server/websocket" + "golang.org/x/net/context" +) + +//////////////////////////////////////////////////////////////////////////////// +// Create Distributed Query Campaign +//////////////////////////////////////////////////////////////////////////////// + +type createDistributedQueryCampaignRequest struct { + UserID uint + Query string `json:"query"` + Selected struct { + Labels []uint `json:"labels"` + Hosts []uint `json:"hosts"` + } `json:"selected"` +} + +type createDistributedQueryCampaignResponse struct { + Campaign *kolide.DistributedQueryCampaign `json:"campaign,omitempty"` + Err error `json:"error,omitempty"` +} + +func (r createDistributedQueryCampaignResponse) error() error { return r.Err } + +func makeCreateDistributedQueryCampaignEndpoint(svc kolide.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(createDistributedQueryCampaignRequest) + campaign, err := svc.NewDistributedQueryCampaign(ctx, req.Query, req.Selected.Hosts, req.Selected.Labels) + if err != nil { + return createQueryResponse{Err: err}, nil + } + return createDistributedQueryCampaignResponse{campaign, nil}, nil + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Stream Distributed Query Campaign Results and Metadata +//////////////////////////////////////////////////////////////////////////////// + +func makeStreamDistributedQueryCampaignResultsHandler(svc kolide.Service, jwtKey string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Upgrade to websocket connection + conn, err := websocket.Upgrade(w, r) + if err != nil { + return + } + defer conn.Close() + + // Receive the auth bearer token + token, err := conn.ReadAuthToken() + if err != nil { + return + } + + // Authenticate with the token + vc, err := authViewer(context.Background(), jwtKey, string(token), svc) + if err != nil || !vc.CanPerformActions() { + conn.WriteJSONError("unauthorized") + return + } + + ctx := viewer.NewContext(context.Background(), *vc) + + campaignID, err := idFromRequest(r, "id") + if err != nil { + conn.WriteJSONError("invalid campaign ID") + return + } + + svc.StreamCampaignResults(ctx, conn, campaignID) + + } +} diff --git a/server/service/endpoint_queries.go b/server/service/endpoint_queries.go index 24e5b76031..f92278e4b5 100644 --- a/server/service/endpoint_queries.go +++ b/server/service/endpoint_queries.go @@ -1,12 +1,8 @@ package service import ( - "net/http" - "github.com/go-kit/kit/endpoint" - "github.com/kolide/kolide-ose/server/contexts/viewer" "github.com/kolide/kolide-ose/server/kolide" - "github.com/kolide/kolide-ose/server/websocket" "golang.org/x/net/context" ) @@ -143,73 +139,3 @@ func makeDeleteQueryEndpoint(svc kolide.Service) endpoint.Endpoint { return deleteQueryResponse{}, nil } } - -//////////////////////////////////////////////////////////////////////////////// -// Create Distributed Query Campaign -//////////////////////////////////////////////////////////////////////////////// - -type createDistributedQueryCampaignRequest struct { - UserID uint - Query string `json:"query"` - Selected struct { - Labels []uint `json:"labels"` - Hosts []uint `json:"hosts"` - } `json:"selected"` -} - -type createDistributedQueryCampaignResponse struct { - Campaign *kolide.DistributedQueryCampaign `json:"campaign,omitempty"` - Err error `json:"error,omitempty"` -} - -func (r createDistributedQueryCampaignResponse) error() error { return r.Err } - -func makeCreateDistributedQueryCampaignEndpoint(svc kolide.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(createDistributedQueryCampaignRequest) - campaign, err := svc.NewDistributedQueryCampaign(ctx, req.Query, req.Selected.Hosts, req.Selected.Labels) - if err != nil { - return createQueryResponse{Err: err}, nil - } - return createDistributedQueryCampaignResponse{campaign, nil}, nil - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Stream Distributed Query Campaign Results and Metadata -//////////////////////////////////////////////////////////////////////////////// - -func makeStreamDistributedQueryCampaignResultsHandler(svc kolide.Service, jwtKey string) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - // Upgrade to websocket connection - conn, err := websocket.Upgrade(w, r) - if err != nil { - return - } - defer conn.Close() - - // Receive the auth bearer token - token, err := conn.ReadAuthToken() - if err != nil { - return - } - - // Authenticate with the token - vc, err := authViewer(context.Background(), jwtKey, string(token), svc) - if err != nil || !vc.CanPerformActions() { - conn.WriteJSONError("unauthorized") - return - } - - ctx := viewer.NewContext(context.Background(), *vc) - - campaignID, err := idFromRequest(r, "id") - if err != nil { - conn.WriteJSONError("invalid campaign ID") - return - } - - svc.StreamCampaignResults(ctx, conn, campaignID) - - } -} diff --git a/server/service/service_campaigns.go b/server/service/service_campaigns.go new file mode 100644 index 0000000000..57ae744f34 --- /dev/null +++ b/server/service/service_campaigns.go @@ -0,0 +1,127 @@ +package service + +import ( + "fmt" + "time" + + "github.com/kolide/kolide-ose/server/contexts/viewer" + "github.com/kolide/kolide-ose/server/kolide" + "github.com/kolide/kolide-ose/server/websocket" + "golang.org/x/net/context" +) + +func (svc service) NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*kolide.DistributedQueryCampaign, error) { + vc, ok := viewer.FromContext(ctx) + if !ok { + return nil, errNoContext + } + + query, err := svc.NewQuery(ctx, kolide.QueryPayload{ + Name: &queryString, + Query: &queryString, + }) + if err != nil { + return nil, err + } + + campaign, err := svc.ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{ + QueryID: query.ID, + Status: kolide.QueryRunning, + UserID: vc.UserID(), + }) + if err != nil { + return nil, err + } + + // Add host targets + for _, hid := range hosts { + _, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{ + Type: kolide.TargetHost, + DistributedQueryCampaignID: campaign.ID, + TargetID: hid, + }) + if err != nil { + return nil, err + } + } + + // Add label targets + for _, lid := range labels { + _, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{ + Type: kolide.TargetLabel, + DistributedQueryCampaignID: campaign.ID, + TargetID: lid, + }) + if err != nil { + return nil, err + } + } + + return campaign, nil +} + +type targetTotals struct { + Total uint `json:"count"` + Online uint `json:"online"` +} + +func (svc service) StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) { + // Find the campaign and ensure it is active + campaign, err := svc.ds.DistributedQueryCampaign(campaignID) + if err != nil { + conn.WriteJSONError(fmt.Sprintf("cannot find campaign for ID %d", campaignID)) + return + } + + if campaign.Status != kolide.QueryRunning { + conn.WriteJSONError(fmt.Sprintf("campaign %d not running", campaignID)) + return + } + + // Open the channel from which we will receive incoming query results + // (probably from the redis pubsub implementation) + readChan, err := svc.resultStore.ReadChannel(context.Background(), *campaign) + if err != nil { + conn.WriteJSONError(fmt.Sprintf("cannot open read channel for campaign %d ", campaignID)) + return + } + + // Loop, pushing updates to results and expected totals + for { + select { + case res := <-readChan: + // Receive a result and push it over the websocket + switch res := res.(type) { + case kolide.DistributedQueryResult: + err = conn.WriteJSONMessage("result", res) + if err != nil { + fmt.Println("error writing to channel") + } + } + + case <-time.After(1 * time.Second): + // Update the expected hosts total + hostIDs, labelIDs, err := svc.ds.DistributedQueryCampaignTargetIDs(campaign.ID) + if err != nil { + if err = conn.WriteJSONError("error retrieving campaign targets"); err != nil { + return + } + } + + var totals targetTotals + totals.Total, totals.Online, err = svc.CountHostsInTargets( + context.Background(), hostIDs, labelIDs, + ) + if err != nil { + if err = conn.WriteJSONError("error retrieving target counts"); err != nil { + return + } + } + + if err = conn.WriteJSONMessage("totals", totals); err != nil { + return + } + } + } + +} diff --git a/server/service/service_queries.go b/server/service/service_queries.go index 7c848fdce0..46be376078 100644 --- a/server/service/service_queries.go +++ b/server/service/service_queries.go @@ -1,12 +1,7 @@ package service import ( - "fmt" - "time" - - "github.com/kolide/kolide-ose/server/contexts/viewer" "github.com/kolide/kolide-ose/server/kolide" - "github.com/kolide/kolide-ose/server/websocket" "golang.org/x/net/context" ) @@ -119,119 +114,3 @@ func (svc service) DeleteQuery(ctx context.Context, id uint) error { return nil } - -func (svc service) NewDistributedQueryCampaign(ctx context.Context, queryString string, hosts []uint, labels []uint) (*kolide.DistributedQueryCampaign, error) { - vc, ok := viewer.FromContext(ctx) - if !ok { - return nil, errNoContext - } - - query, err := svc.NewQuery(ctx, kolide.QueryPayload{ - Name: &queryString, - Query: &queryString, - }) - if err != nil { - return nil, err - } - - campaign, err := svc.ds.NewDistributedQueryCampaign(&kolide.DistributedQueryCampaign{ - QueryID: query.ID, - Status: kolide.QueryRunning, - UserID: vc.UserID(), - }) - if err != nil { - return nil, err - } - - // Add host targets - for _, hid := range hosts { - _, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{ - Type: kolide.TargetHost, - DistributedQueryCampaignID: campaign.ID, - TargetID: hid, - }) - if err != nil { - return nil, err - } - } - - // Add label targets - for _, lid := range labels { - _, err = svc.ds.NewDistributedQueryCampaignTarget(&kolide.DistributedQueryCampaignTarget{ - Type: kolide.TargetLabel, - DistributedQueryCampaignID: campaign.ID, - TargetID: lid, - }) - if err != nil { - return nil, err - } - } - - return campaign, nil -} - -type targetTotals struct { - Total uint `json:"count"` - Online uint `json:"online"` -} - -func (svc service) StreamCampaignResults(ctx context.Context, conn *websocket.Conn, campaignID uint) { - // Find the campaign and ensure it is active - campaign, err := svc.ds.DistributedQueryCampaign(campaignID) - if err != nil { - conn.WriteJSONError(fmt.Sprintf("cannot find campaign for ID %d", campaignID)) - return - } - - if campaign.Status != kolide.QueryRunning { - conn.WriteJSONError(fmt.Sprintf("campaign %d not running", campaignID)) - return - } - - // Open the channel from which we will receive incoming query results - // (probably from the redis pubsub implementation) - readChan, err := svc.resultStore.ReadChannel(context.Background(), *campaign) - if err != nil { - conn.WriteJSONError(fmt.Sprintf("cannot open read channel for campaign %d ", campaignID)) - return - } - - // Loop, pushing updates to results and expected totals - for { - select { - case res := <-readChan: - // Receive a result and push it over the websocket - switch res := res.(type) { - case kolide.DistributedQueryResult: - err = conn.WriteJSONMessage("result", res) - if err != nil { - fmt.Println("error writing to channel") - } - } - - case <-time.After(1 * time.Second): - // Update the expected hosts total - hostIDs, labelIDs, err := svc.ds.DistributedQueryCampaignTargetIDs(campaign.ID) - if err != nil { - if err = conn.WriteJSONError("error retrieving campaign targets"); err != nil { - return - } - } - - var totals targetTotals - totals.Total, totals.Online, err = svc.CountHostsInTargets( - context.Background(), hostIDs, labelIDs, - ) - if err != nil { - if err = conn.WriteJSONError("error retrieving target counts"); err != nil { - return - } - } - - if err = conn.WriteJSONMessage("totals", totals); err != nil { - return - } - } - } - -} diff --git a/server/service/transport_campaigns.go b/server/service/transport_campaigns.go new file mode 100644 index 0000000000..eb5ee7b8dc --- /dev/null +++ b/server/service/transport_campaigns.go @@ -0,0 +1,16 @@ +package service + +import ( + "encoding/json" + "net/http" + + "golang.org/x/net/context" +) + +func decodeCreateDistributedQueryCampaignRequest(ctx context.Context, r *http.Request) (interface{}, error) { + var req createDistributedQueryCampaignRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + return req, nil +} diff --git a/server/service/transport_queries.go b/server/service/transport_queries.go index 7cc436f848..f5c1c3ef38 100644 --- a/server/service/transport_queries.go +++ b/server/service/transport_queries.go @@ -55,11 +55,3 @@ func decodeListQueriesRequest(ctx context.Context, r *http.Request) (interface{} } return listQueriesRequest{ListOptions: opt}, nil } - -func decodeCreateDistributedQueryCampaignRequest(ctx context.Context, r *http.Request) (interface{}, error) { - var req createDistributedQueryCampaignRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return nil, err - } - return req, nil -}