Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed query #7

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ go.work.sum
deploy/docker/conf/tls/*
.env
!deploy/docker/conf/tls/openssl.cnf.example
tls.env

# bruno
tools/bruno/collection.bru
19 changes: 19 additions & 0 deletions api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)
h.Inc(metricAPIQueriesErr)
return
}
// Get the query id
newQuery, err = h.Queries.Get(queryName, env.ID)
if err != nil {
apiErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}

// Temporary list of UUIDs to calculate Expected
var expected []string
Expand Down Expand Up @@ -218,6 +224,19 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)

// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
}
}

// Update value for expected
if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil {
apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
Expand Down
16 changes: 6 additions & 10 deletions logging/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,16 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
Message: queriesWrite.Messages[q],
}
go l.DispatchQueries(d, node, debug)
// Update internal metrics per query
var err error
if queriesWrite.Statuses[q] != 0 {
err = l.Queries.IncError(q, envid)
} else {
err = l.Queries.IncExecution(q, envid)
}
if err != nil {
log.Err(err).Msg("error updating query")
}

// TODO: This TrackExeuction need be removed
// Add a record for this query
if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error adding query execution")
}
// Instead of creating a new record in a separate table, we can just update the query status
if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error updating query status")
}
// Check if query is completed
if err := l.Queries.VerifyComplete(q, envid); err != nil {
log.Err(err).Msg("error verifying and completing query")
Expand Down
104 changes: 78 additions & 26 deletions queries/queries.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queries

import (
"fmt"
"time"

"github.com/jmpsec/osctrl/nodes"
Expand Down Expand Up @@ -79,6 +80,16 @@ type DistributedQuery struct {
Expiration time.Time
}

// NodeQuery links a node to a query
type NodeQuery struct {
ID uint `gorm:"primaryKey;autoIncrement"`
NodeID uint `gorm:"not null;index"`
QueryID uint `gorm:"not null;index"`
Status string `gorm:"type:varchar(10);default:'pending'"`
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time
}

// DistributedQueryTarget to keep target logic for queries
type DistributedQueryTarget struct {
gorm.Model
Expand All @@ -105,8 +116,13 @@ type Queries struct {

// CreateQueries to initialize the queries struct
func CreateQueries(backend *gorm.DB) *Queries {
var q *Queries
q = &Queries{DB: backend}
//var q *Queries
q := &Queries{DB: backend}

// table node_queries
if err := backend.AutoMigrate(&NodeQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (node_queries): %v", err)
}
// table distributed_queries
if err := backend.AutoMigrate(&DistributedQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (distributed_queries): %v", err)
Expand All @@ -126,34 +142,29 @@ func CreateQueries(backend *gorm.DB) *Queries {
return q
}

// NodeQueries to get all queries that belong to the provided node
// FIXME this will impact the performance of the TLS endpoint due to being CPU and I/O hungry
// FIMXE potential mitigation can be add a cache (Redis?) layer to store queries per node_key
func (q *Queries) NodeQueries(node nodes.OsqueryNode) (QueryReadQueries, bool, error) {
acelerate := false
// Get all current active queries and carves
queries, err := q.GetActive(node.EnvironmentID)
if err != nil {
return QueryReadQueries{}, false, err

var results []struct {
QueryName string
Query string
}

q.DB.Table("distributed_queries dq").
Select("dq.name, dq.query").
Joins("JOIN node_queries nq ON dq.id = nq.query_id").
Where("nq.node_id = ? AND nq.status = ?", node.ID, "pending").
Scan(&results)

if len(results) == 0 {
return QueryReadQueries{}, false, nil
}
// Iterate through active queries, see if they target this node and prepare data in the same loop

qs := make(QueryReadQueries)
for _, _q := range queries {
targets, err := q.GetTargets(_q.Name)
if err != nil {
return QueryReadQueries{}, false, err
}
// FIXME disable acceleration until figure out edge cases where it would trigger by mistake
/*
if len(targets) == 1 {
acelerate = true
}
*/
if isQueryTarget(node, targets) && q.NotYetExecuted(_q.Name, node.UUID) {
qs[_q.Name] = _q.Query
}
for _, _q := range results {
qs[_q.QueryName] = _q.Query
}
return qs, acelerate, nil

return qs, false, nil
}

// Gets all queries by target (active/completed/all/all-full/deleted/hidden/expired)
Expand Down Expand Up @@ -384,6 +395,18 @@ func (q *Queries) Create(query DistributedQuery) error {
return nil
}

// CreateNodeQuery to link a node to a query
func (q *Queries) CreateNodeQuery(nodeID, queryID uint) error {
nodeQuery := NodeQuery{
NodeID: nodeID,
QueryID: queryID,
}
if err := q.DB.Create(&nodeQuery).Error; err != nil {
return err
}
return nil
}

// CreateTarget to create target entry for a given query
func (q *Queries) CreateTarget(name, targetType, targetValue string) error {
queryTarget := DistributedQueryTarget{
Expand Down Expand Up @@ -449,6 +472,35 @@ func (q *Queries) SetExpected(name string, expected int, envid uint) error {
return nil
}

// UpdateQueryStatus to update the status of each query
func (q *Queries) UpdateQueryStatus(queryName string, nodeID uint, statusCode int) error {

var result string
if statusCode == 0 {
result = "completed" // TODO: need be replaced with a constant
} else {
result = "error"
}

var query DistributedQuery
// TODO: Get the query id
// I think we can put an extra field in the query so that we also get the query id back from the osquery
// This way we can avoid this query to get the query id
if err := q.DB.Where("name = ?", queryName).Find(&query).Error; err != nil {
return fmt.Errorf("error getting query id: %v", err)
}

var nodeQuery NodeQuery

if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, query.ID).Find(&nodeQuery).Error; err != nil {
return err
}
if err := q.DB.Model(&nodeQuery).Updates(map[string]interface{}{"status": result}).Error; err != nil {
return err
}
return nil
}

// TrackExecution to keep track of where queries have already ran
func (q *Queries) TrackExecution(name, uuid string, result int) error {
queryExecution := DistributedQueryExecution{
Expand Down
4 changes: 4 additions & 0 deletions tools/bruno/collection.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
vars:pre-request {
env: 1a026f60-edc1-4189-ab70-be99d541a473
baseUrl: http://localhost:9002
}
11 changes: 11 additions & 0 deletions tools/bruno/nodes/get-nodes.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
meta {
name: get-nodes
type: http
seq: 6
}

get {
url: {{baseUrl}} /api/v1/nodes/{{env}}/all
body: none
auth: none
}
19 changes: 19 additions & 0 deletions tools/bruno/queries/post - create queries.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
meta {
name: post - create queries
type: http
seq: 9
}

post {
url: {{baseUrl}}/api/v1/queries/{{env}}
body: json
auth: none
}

body:json {
{
"environment_list": ["dev"],
"query": "SELECT * FROM system_info;",
"exp_hours": 1
}
}
Loading