From 8cc7af0148aec59982d395d837c5e798392222de Mon Sep 17 00:00:00 2001 From: Zhuoyuan Liu Date: Mon, 4 Nov 2024 15:09:18 +0100 Subject: [PATCH 1/5] add a new table for node query --- queries/queries.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/queries/queries.go b/queries/queries.go index f0e720db..a325cab7 100644 --- a/queries/queries.go +++ b/queries/queries.go @@ -79,6 +79,16 @@ type DistributedQuery struct { Expiration time.Time } +// NodeQuery links a node to a query +type NodeQuery struct { + NodeQueryID uint `gorm:"primaryKey;autoIncrement"` + NodeID uint `gorm:"not null;index"` + QueryID uint `gorm:"not null;index"` + Status string `gorm:"type:varchar(50);default:'pending'"` // Indexed for fast lookups + AssignedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` + CompletedAt time.Time +} + // DistributedQueryTarget to keep target logic for queries type DistributedQueryTarget struct { gorm.Model @@ -107,6 +117,7 @@ type Queries struct { func CreateQueries(backend *gorm.DB) *Queries { var q *Queries q = &Queries{DB: backend} + // table distributed_queries if err := backend.AutoMigrate(&DistributedQuery{}); err != nil { log.Fatal().Msgf("Failed to AutoMigrate table (distributed_queries): %v", err) From ad4957341bae293a328591c1cc45b68330640658 Mon Sep 17 00:00:00 2001 From: Zhuoyuan Liu Date: Tue, 5 Nov 2024 13:41:38 +0100 Subject: [PATCH 2/5] update the behaviour when creating query --- api/handlers/queries.go | 8 +++++ logging/process.go | 4 +++ queries/queries.go | 78 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 82 insertions(+), 8 deletions(-) diff --git a/api/handlers/queries.go b/api/handlers/queries.go index 6a932f03..2f182de2 100644 --- a/api/handlers/queries.go +++ b/api/handlers/queries.go @@ -218,6 +218,14 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request) // Remove duplicates from expected expectedClear := removeStringDuplicates(expected) + + // Create new record for query list + for _, id := range expectedClear { + if err := h.Queries.CreateNodeQuery(newQuery.ID, id); err != nil { + log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, id) + } + } + // Update value for expected if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil { apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err) diff --git a/logging/process.go b/logging/process.go index 06ffe666..03ace40d 100644 --- a/logging/process.go +++ b/logging/process.go @@ -84,6 +84,10 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest, if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil { log.Err(err).Msg("error adding query execution") } + // Instead of creating a new record in a separate table, we can just update the query status + if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil { + log.Err(err).Msg("error updating query status") + } // Check if query is completed if err := l.Queries.VerifyComplete(q, envid); err != nil { log.Err(err).Msg("error verifying and completing query") diff --git a/queries/queries.go b/queries/queries.go index a325cab7..2de517bb 100644 --- a/queries/queries.go +++ b/queries/queries.go @@ -81,12 +81,12 @@ type DistributedQuery struct { // NodeQuery links a node to a query type NodeQuery struct { - NodeQueryID uint `gorm:"primaryKey;autoIncrement"` - NodeID uint `gorm:"not null;index"` - QueryID uint `gorm:"not null;index"` - Status string `gorm:"type:varchar(50);default:'pending'"` // Indexed for fast lookups - AssignedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` - CompletedAt time.Time + ID uint `gorm:"primaryKey;autoIncrement"` + NodeID uint `gorm:"not null;index"` + QueryID uint `gorm:"not null;index"` + Status string `gorm:"type:varchar(10);default:'pending'"` + CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"` + UpdatedAt time.Time } // DistributedQueryTarget to keep target logic for queries @@ -115,9 +115,13 @@ type Queries struct { // CreateQueries to initialize the queries struct func CreateQueries(backend *gorm.DB) *Queries { - var q *Queries - q = &Queries{DB: backend} + //var q *Queries + q := &Queries{DB: backend} + // table node_queries + if err := backend.AutoMigrate(&NodeQuery{}); err != nil { + log.Fatal().Msgf("Failed to AutoMigrate table (node_queries): %v", err) + } // table distributed_queries if err := backend.AutoMigrate(&DistributedQuery{}); err != nil { log.Fatal().Msgf("Failed to AutoMigrate table (distributed_queries): %v", err) @@ -137,6 +141,31 @@ func CreateQueries(backend *gorm.DB) *Queries { return q } +func (q *Queries) NodeQueries_V2(node nodes.OsqueryNode) (QueryReadQueries, bool, error) { + + var results []struct { + QueryName string + Query string + } + + q.DB.Table("distributed_queries dq"). + Select("dq.name, dq.query"). + Joins("JOIN node_queries nq ON dq.id = nq.query_id"). + Where("nq.node_id = ? AND nq.status = ?", node.ID, "pending"). + Scan(&results) + + if len(results) == 0 { + return QueryReadQueries{}, false, nil + } + + qs := make(QueryReadQueries) + for _, _q := range results { + qs[_q.QueryName] = _q.Query + } + + return qs, false, nil +} + // NodeQueries to get all queries that belong to the provided node // FIXME this will impact the performance of the TLS endpoint due to being CPU and I/O hungry // FIMXE potential mitigation can be add a cache (Redis?) layer to store queries per node_key @@ -395,6 +424,18 @@ func (q *Queries) Create(query DistributedQuery) error { return nil } +// CreateNodeQuery to link a node to a query +func (q *Queries) CreateNodeQuery(nodeID, queryID uint) error { + nodeQuery := NodeQuery{ + NodeID: nodeID, + QueryID: queryID, + } + if err := q.DB.Create(&nodeQuery).Error; err != nil { + return err + } + return nil +} + // CreateTarget to create target entry for a given query func (q *Queries) CreateTarget(name, targetType, targetValue string) error { queryTarget := DistributedQueryTarget{ @@ -460,6 +501,27 @@ func (q *Queries) SetExpected(name string, expected int, envid uint) error { return nil } +// UpdateQueryStatus to update the status of each query +func (q *Queries) UpdateQueryStatus(queryName string, nodeID uint, statusCode int) error { + + var result string + if statusCode == 0 { + result = "completed" // TODO: need be replaced with a constant + } else { + result = "error" + } + var nodeQuery NodeQuery + // For the current setup, we need a joint query to update the status, + // I am wondering if we can put an extra field in the query so that we also get the query id back from the osquery + if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, queryName).Find(&nodeQuery).Error; err != nil { + return err + } + if err := q.DB.Model(&nodeQuery).Updates(map[string]interface{}{"status": result}).Error; err != nil { + return err + } + return nil +} + // TrackExecution to keep track of where queries have already ran func (q *Queries) TrackExecution(name, uuid string, result int) error { queryExecution := DistributedQueryExecution{ From e5bddf37a4a04d666a79315df94f51e23ccd8f2c Mon Sep 17 00:00:00 2001 From: Zhuoyuan Liu Date: Wed, 13 Nov 2024 16:07:56 +0100 Subject: [PATCH 3/5] add query --- .gitignore | 3 +++ api/handlers/queries.go | 17 ++++++++++++++--- tools/bruno/collection.bru | 4 ++++ tools/bruno/nodes/get-nodes.bru | 11 +++++++++++ tools/bruno/queries/post - create queries.bru | 19 +++++++++++++++++++ 5 files changed, 51 insertions(+), 3 deletions(-) create mode 100644 tools/bruno/nodes/get-nodes.bru create mode 100644 tools/bruno/queries/post - create queries.bru diff --git a/.gitignore b/.gitignore index df914592..113d8649 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,6 @@ go.work.sum deploy/docker/conf/tls/* .env !deploy/docker/conf/tls/openssl.cnf.example + +# bruno +tools/bruno/collection.bru diff --git a/api/handlers/queries.go b/api/handlers/queries.go index 2f182de2..8a632720 100644 --- a/api/handlers/queries.go +++ b/api/handlers/queries.go @@ -143,6 +143,12 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request) h.Inc(metricAPIQueriesErr) return } + // Get the query id + newQuery, err = h.Queries.Get(queryName, env.ID) + if err != nil { + apiErrorResponse(w, "error creating query", http.StatusInternalServerError, err) + return + } // Temporary list of UUIDs to calculate Expected var expected []string @@ -220,9 +226,14 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request) expectedClear := removeStringDuplicates(expected) // Create new record for query list - for _, id := range expectedClear { - if err := h.Queries.CreateNodeQuery(newQuery.ID, id); err != nil { - log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, id) + for _, nodeUUID := range expectedClear { + node, err := h.Nodes.GetByUUID(nodeUUID) + if err != nil { + log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID) + continue + } + if err := h.Queries.CreateNodeQuery(newQuery.ID, node.ID); err != nil { + log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID) } } diff --git a/tools/bruno/collection.bru b/tools/bruno/collection.bru index e69de29b..4c9c660e 100644 --- a/tools/bruno/collection.bru +++ b/tools/bruno/collection.bru @@ -0,0 +1,4 @@ +vars:pre-request { + env: 1a026f60-edc1-4189-ab70-be99d541a473 + baseUrl: http://localhost:9002 +} diff --git a/tools/bruno/nodes/get-nodes.bru b/tools/bruno/nodes/get-nodes.bru new file mode 100644 index 00000000..fe403055 --- /dev/null +++ b/tools/bruno/nodes/get-nodes.bru @@ -0,0 +1,11 @@ +meta { + name: get-nodes + type: http + seq: 6 +} + +get { + url: {{baseUrl}} /api/v1/nodes/{{env}}/all + body: none + auth: none +} diff --git a/tools/bruno/queries/post - create queries.bru b/tools/bruno/queries/post - create queries.bru new file mode 100644 index 00000000..cfbf4989 --- /dev/null +++ b/tools/bruno/queries/post - create queries.bru @@ -0,0 +1,19 @@ +meta { + name: post - create queries + type: http + seq: 9 +} + +post { + url: {{baseUrl}}/api/v1/queries/{{env}} + body: json + auth: none +} + +body:json { + { + "environment_list": ["dev"], + "query": "SELECT * FROM system_info;", + "exp_hours": 1 + } +} From 5a759a4804ad88b54d22c002524e137693113ba5 Mon Sep 17 00:00:00 2001 From: Zhuoyuan Liu Date: Wed, 13 Nov 2024 16:47:07 +0100 Subject: [PATCH 4/5] bug fix --- .gitignore | 1 + api/handlers/queries.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 113d8649..b4b489d3 100644 --- a/.gitignore +++ b/.gitignore @@ -72,6 +72,7 @@ go.work.sum deploy/docker/conf/tls/* .env !deploy/docker/conf/tls/openssl.cnf.example +tls.env # bruno tools/bruno/collection.bru diff --git a/api/handlers/queries.go b/api/handlers/queries.go index 8a632720..37b7c9e4 100644 --- a/api/handlers/queries.go +++ b/api/handlers/queries.go @@ -232,7 +232,7 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request) log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID) continue } - if err := h.Queries.CreateNodeQuery(newQuery.ID, node.ID); err != nil { + if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil { log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID) } } From 2fd500c8b7f7b18c6949e3e7ea45f29088e8247a Mon Sep 17 00:00:00 2001 From: Zhuoyuan Liu Date: Thu, 14 Nov 2024 11:07:30 +0100 Subject: [PATCH 5/5] fix --- logging/process.go | 12 ++---------- queries/queries.go | 47 +++++++++++++--------------------------------- 2 files changed, 15 insertions(+), 44 deletions(-) diff --git a/logging/process.go b/logging/process.go index 03ace40d..22cfe8f7 100644 --- a/logging/process.go +++ b/logging/process.go @@ -70,16 +70,8 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest, Message: queriesWrite.Messages[q], } go l.DispatchQueries(d, node, debug) - // Update internal metrics per query - var err error - if queriesWrite.Statuses[q] != 0 { - err = l.Queries.IncError(q, envid) - } else { - err = l.Queries.IncExecution(q, envid) - } - if err != nil { - log.Err(err).Msg("error updating query") - } + + // TODO: This TrackExeuction need be removed // Add a record for this query if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil { log.Err(err).Msg("error adding query execution") diff --git a/queries/queries.go b/queries/queries.go index 2de517bb..15f5a140 100644 --- a/queries/queries.go +++ b/queries/queries.go @@ -1,6 +1,7 @@ package queries import ( + "fmt" "time" "github.com/jmpsec/osctrl/nodes" @@ -141,7 +142,7 @@ func CreateQueries(backend *gorm.DB) *Queries { return q } -func (q *Queries) NodeQueries_V2(node nodes.OsqueryNode) (QueryReadQueries, bool, error) { +func (q *Queries) NodeQueries(node nodes.OsqueryNode) (QueryReadQueries, bool, error) { var results []struct { QueryName string @@ -166,36 +167,6 @@ func (q *Queries) NodeQueries_V2(node nodes.OsqueryNode) (QueryReadQueries, bool return qs, false, nil } -// NodeQueries to get all queries that belong to the provided node -// FIXME this will impact the performance of the TLS endpoint due to being CPU and I/O hungry -// FIMXE potential mitigation can be add a cache (Redis?) layer to store queries per node_key -func (q *Queries) NodeQueries(node nodes.OsqueryNode) (QueryReadQueries, bool, error) { - acelerate := false - // Get all current active queries and carves - queries, err := q.GetActive(node.EnvironmentID) - if err != nil { - return QueryReadQueries{}, false, err - } - // Iterate through active queries, see if they target this node and prepare data in the same loop - qs := make(QueryReadQueries) - for _, _q := range queries { - targets, err := q.GetTargets(_q.Name) - if err != nil { - return QueryReadQueries{}, false, err - } - // FIXME disable acceleration until figure out edge cases where it would trigger by mistake - /* - if len(targets) == 1 { - acelerate = true - } - */ - if isQueryTarget(node, targets) && q.NotYetExecuted(_q.Name, node.UUID) { - qs[_q.Name] = _q.Query - } - } - return qs, acelerate, nil -} - // Gets all queries by target (active/completed/all/all-full/deleted/hidden/expired) func (q *Queries) Gets(target, qtype string, envid uint) ([]DistributedQuery, error) { var queries []DistributedQuery @@ -510,10 +481,18 @@ func (q *Queries) UpdateQueryStatus(queryName string, nodeID uint, statusCode in } else { result = "error" } + + var query DistributedQuery + // TODO: Get the query id + // I think we can put an extra field in the query so that we also get the query id back from the osquery + // This way we can avoid this query to get the query id + if err := q.DB.Where("name = ?", queryName).Find(&query).Error; err != nil { + return fmt.Errorf("error getting query id: %v", err) + } + var nodeQuery NodeQuery - // For the current setup, we need a joint query to update the status, - // I am wondering if we can put an extra field in the query so that we also get the query id back from the osquery - if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, queryName).Find(&nodeQuery).Error; err != nil { + + if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, query.ID).Find(&nodeQuery).Error; err != nil { return err } if err := q.DB.Model(&nodeQuery).Updates(map[string]interface{}{"status": result}).Error; err != nil {