forked from influxdata/chronograf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueries.go
134 lines (111 loc) · 3.39 KB
/
queries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package server
import (
"encoding/json"
"fmt"
"net/http"
"time"
"golang.org/x/net/context"
"github.com/influxdata/chronograf"
"github.com/influxdata/chronograf/influx"
"github.com/influxdata/chronograf/influx/queries"
)
// QueryRequest is query that will be converted to a queryConfig
type QueryRequest struct {
ID string `json:"id"`
Query string `json:"query"`
}
// QueriesRequest converts all queries to queryConfigs with the help
// of the template variables
type QueriesRequest struct {
Queries []QueryRequest `json:"queries"`
TemplateVars []chronograf.TemplateVar `json:"tempVars,omitempty"`
}
// QueryResponse is the return result of a QueryRequest including
// the raw query, the templated query, the queryConfig and the queryAST
type QueryResponse struct {
Duration int64 `json:"durationMs"`
ID string `json:"id"`
Query string `json:"query"`
QueryConfig chronograf.QueryConfig `json:"queryConfig"`
QueryAST *queries.SelectStatement `json:"queryAST,omitempty"`
QueryTemplated *string `json:"queryTemplated,omitempty"`
}
// QueriesResponse is the response for a QueriesRequest
type QueriesResponse struct {
Queries []QueryResponse `json:"queries"`
}
// Queries analyzes InfluxQL to produce front-end friendly QueryConfig
func (s *Service) Queries(w http.ResponseWriter, r *http.Request) {
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
return
}
ctx := r.Context()
src, err := s.Store.Sources(ctx).Get(ctx, srcID)
if err != nil {
notFound(w, srcID, s.Logger)
return
}
var req QueriesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w, s.Logger)
return
}
res := QueriesResponse{
Queries: make([]QueryResponse, len(req.Queries)),
}
for i, q := range req.Queries {
qr := QueryResponse{
ID: q.ID,
Query: q.Query,
}
qc := ToQueryConfig(q.Query)
if err := s.DefaultRP(ctx, &qc, &src); err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
qc.Shifts = []chronograf.TimeShift{}
qr.QueryConfig = qc
if stmt, err := queries.ParseSelect(q.Query); err == nil {
qr.QueryAST = stmt
}
if dur, err := influx.ParseTime(q.Query, time.Now()); err == nil {
ms := dur.Nanoseconds() / int64(time.Millisecond)
if ms == 0 {
ms = 1
}
qr.Duration = ms
}
qr.QueryConfig.ID = q.ID
res.Queries[i] = qr
}
encodeJSON(w, http.StatusOK, res, s.Logger)
}
// DefaultRP will add the default retention policy to the QC if one has not been specified
func (s *Service) DefaultRP(ctx context.Context, qc *chronograf.QueryConfig, src *chronograf.Source) error {
// Only need to find the default RP IFF the qc's rp is empty
if qc.RetentionPolicy != "" {
return nil
}
// For queries without databases, measurements, or fields we will not
// be able to find an RP
if qc.Database == "" || qc.Measurement == "" || len(qc.Fields) == 0 {
return nil
}
db := s.Databases
if err := db.Connect(ctx, src); err != nil {
return fmt.Errorf("Unable to connect to source: %v", err)
}
rps, err := db.AllRP(ctx, qc.Database)
if err != nil {
return fmt.Errorf("Unable to load RPs from DB %s: %v", qc.Database, err)
}
for _, rp := range rps {
if rp.Default {
qc.RetentionPolicy = rp.Name
return nil
}
}
return nil
}