Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second step for refactor distributed query #576

Merged
merged 11 commits into from
Jan 7, 2025
73 changes: 35 additions & 38 deletions admin/handlers/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,93 +156,90 @@ func (h *HandlersAdmin) QueryRunPOSTHandler(w http.ResponseWriter, r *http.Reque
adminErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}
// Temporary list of UUIDs to calculate Expected
var expected []string

// List all the nodes that match the query
var expected []uint

targetNodesID := []uint{}
// TODO: Refactor this to use osctrl-api instead of direct DB queries
// Create environment target
if len(q.Environments) > 0 {
expected = []uint{}
for _, e := range q.Environments {
if (e != "") && h.Envs.Exists(e) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetEnvironment, e); err != nil {
adminErrorResponse(w, "error creating query environment target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
nodes, err := h.Nodes.GetByEnv(e, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
adminErrorResponse(w, "error getting nodes by environment", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create platform target
if len(q.Platforms) > 0 {
expected = []uint{}
platforms, _ := h.Nodes.GetAllPlatforms()
for _, p := range q.Platforms {
if (p != "") && checkValidPlatform(platforms, p) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetPlatform, p); err != nil {
adminErrorResponse(w, "error creating query platform target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
nodes, err := h.Nodes.GetByPlatform(p, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
adminErrorResponse(w, "error getting nodes by platform", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create UUIDs target
if len(q.UUIDs) > 0 {
expected = []uint{}
for _, u := range q.UUIDs {
if (u != "") && h.Nodes.CheckByUUID(u) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetUUID, u); err != nil {
adminErrorResponse(w, "error creating query UUID target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
if u != "" {
node, err := h.Nodes.GetByUUID(u)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", u)
continue
}
expected = append(expected, u)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create hostnames target
if len(q.Hosts) > 0 {
expected = []uint{}
for _, _h := range q.Hosts {
if (_h != "") && h.Nodes.CheckByHost(_h) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetLocalname, _h); err != nil {
adminErrorResponse(w, "error creating query hostname target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
if _h != "" {
node, err := h.Nodes.GetByIdentifier(_h)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", _h)
continue
}
expected = append(expected, _h)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
// If the list is empty, we don't need to create node queries
if len(targetNodesID) != 0 {
if err := h.Queries.CreateNodeQueries(targetNodesID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node queries for query %s", newQuery.Name)
adminErrorResponse(w, "error creating node queries", http.StatusInternalServerError, err)
return
}
}
// Update value for expected
if err := h.Queries.SetExpected(newQuery.Name, len(expectedClear), env.ID); err != nil {
if err := h.Queries.SetExpected(newQuery.Name, len(targetNodesID), env.ID); err != nil {
adminErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
Expand Down
6 changes: 6 additions & 0 deletions admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,12 @@ func osctrlAdminService() {
log.Err(err).Msg("Error getting all environments")
}
for _, e := range allEnvs {
// Periotically check if the queries are completed
// not sure if we need to complete the Carves
if err := queriesmgr.CleanupCompletedQueries(e.ID); err != nil {
log.Err(err).Msg("Error completing expired queries")
}
// Periotically check if the queries are expired
if err := queriesmgr.CleanupExpiredQueries(e.ID); err != nil {
log.Err(err).Msg("Error cleaning up expired queries")
}
Expand Down
80 changes: 39 additions & 41 deletions api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,95 +150,93 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)
return
}

// Temporary list of UUIDs to calculate Expected
var expected []string
// Create targets
// List all the nodes that match the query
var expected []uint

targetNodesID := []uint{}
// Current logic is to select nodes meeting all criteria in the query
// TODO: I believe we should only allow to list nodes in one environment in URL paths
// We will refactor this part to be tag based queries and add more options to the query
if len(q.Environments) > 0 {
expected = []uint{}
for _, e := range q.Environments {
if (e != "") && h.Envs.Exists(e) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetEnvironment, e); err != nil {
apiErrorResponse(w, "error creating query environment target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
nodes, err := h.Nodes.GetByEnv(e, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
apiErrorResponse(w, "error getting nodes by environment", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create platform target
if len(q.Platforms) > 0 {
expected = []uint{}
platforms, _ := h.Nodes.GetAllPlatforms()
for _, p := range q.Platforms {
if (p != "") && checkValidPlatform(platforms, p) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetPlatform, p); err != nil {
apiErrorResponse(w, "error creating query platform target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
nodes, err := h.Nodes.GetByPlatform(p, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
apiErrorResponse(w, "error getting nodes by platform", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create UUIDs target
if len(q.UUIDs) > 0 {
expected = []uint{}
for _, u := range q.UUIDs {
if (u != "") && h.Nodes.CheckByUUID(u) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetUUID, u); err != nil {
apiErrorResponse(w, "error creating query UUID target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
if u != "" {
node, err := h.Nodes.GetByUUID(u)
if err != nil {
log.Warn().Msgf("error getting node %s and failed to create node query for it", u)
continue
}
expected = append(expected, u)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create hostnames target
// Currently we are using the GetByIdentifier function and it need be more clear
// about the definition of the identifier
if len(q.Hosts) > 0 {
for _, _h := range q.Hosts {
if (_h != "") && h.Nodes.CheckByHost(_h) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetLocalname, _h); err != nil {
apiErrorResponse(w, "error creating query hostname target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
expected = []uint{}
for _, hostName := range q.Hosts {
if hostName != "" {
node, err := h.Nodes.GetByIdentifier(hostName)
if err != nil {
log.Warn().Msgf("error getting node %s and failed to create node query for it", hostName)
continue
}
expected = append(expected, _h)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}

// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
// If the list is empty, we don't need to create node queries
if len(targetNodesID) != 0 {
if err := h.Queries.CreateNodeQueries(targetNodesID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node queries for query %s", newQuery.Name)
apiErrorResponse(w, "error creating node queries", http.StatusInternalServerError, err)
return
}
}

// Update value for expected
if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil {
if err := h.Queries.SetExpected(queryName, len(targetNodesID), env.ID); err != nil {
apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
h.Inc(metricAPICarvesErr)
return
Expand Down
15 changes: 0 additions & 15 deletions api/handlers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,3 @@ func checkValidPlatform(platforms []string, platform string) bool {
}
return false
}

// Helper to remove duplicates from []string
func removeStringDuplicates(s []string) []string {
seen := make(map[string]struct{}, len(s))
i := 0
for _, v := range s {
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
s[i] = v
i++
}
return s[:i]
}
8 changes: 0 additions & 8 deletions logging/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,6 @@ func (logDB *LoggerDB) CleanQueryLogs(entries int64) error {
if err := logDB.Database.Conn.Unscoped().Delete(&queriesTargets).Error; err != nil {
return err
}
// Get query executions
var queriesExecutions []queries.DistributedQueryExecution
if err := logDB.Database.Conn.Where("name = ?", q.Name).Find(&queriesExecutions).Error; err != nil {
return err
}
if err := logDB.Database.Conn.Unscoped().Delete(&queriesExecutions).Error; err != nil {
return err
}
// Delete query
if err := logDB.Database.Conn.Unscoped().Delete(&q).Error; err != nil {
return err
Expand Down
11 changes: 1 addition & 10 deletions logging/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,9 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
if err != nil {
log.Err(err).Msg("error updating query")
}
// TODO: This TrackExeuction need be removed
// Add a record for this query
if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error adding query execution")
}
// Instead of creating a new record in a separate table, we can just update the query status
// Update query status
if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error updating query status")
}
// Check if query is completed
if err := l.Queries.VerifyComplete(q, envid); err != nil {
log.Err(err).Msg("error verifying and completing query")
}
}
}
2 changes: 1 addition & 1 deletion nodes/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (n *NodeManager) UpdateMetadataByUUID(uuid string, metadata NodeMetadata) e
return fmt.Errorf("RecordUsername %v", err)
}
if metadata.Username != node.Username && metadata.Username != "" {
updates["username"] =metadata.Username
updates["username"] = metadata.Username
}
// Record hostname
if err := n.RecordHostname(metadata.Hostname, node); err != nil {
Expand Down
Loading