diff --git a/CHANGELOG.md b/CHANGELOG.md index c931ca04a2..45287dc38a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ ### Features - [#1894](https://github.com/influxdata/kapacitor/pull/1894): Add HTTP sources for sideload configuration. + +- [##1889](https://github.com/influxdata/kapacitor/pull/1889): Add HTTP endpoints as option for sideload - [#1842](https://github.com/influxdata/kapacitor/pull/1842): Add alert inhibitors that allow an alert to suppress events from other matching alerts. - [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration. - [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value diff --git a/pipeline/sideload.go b/pipeline/sideload.go index a86625dcf6..8cd005341f 100644 --- a/pipeline/sideload.go +++ b/pipeline/sideload.go @@ -59,6 +59,9 @@ type SideloadNode struct { // Tags is a list of tags to load. // tick:ignore Tags map[string]string `tick:"Tag" json:"tags"` + + HttpUser string `json:"httpuser"` + HttpPassword string `json:"httppassword"` } func newSideloadNode(wants EdgeType) *SideloadNode { @@ -96,6 +99,17 @@ func (n *SideloadNode) Tag(t string, v string) *SideloadNode { return n } +/* +func (n *SideloadNode) HttpUser(httpuser string) *SideloadNode { + n.HttpUser = httpuser + return n +} + +func (n *SideloadNode) HttpPassword(httppassword string) *SideloadNode { + n.HttpPassword = httppassword + return n +} +*/ // MarshalJSON converts SideloadNode to JSON // tick:ignore func (n *SideloadNode) MarshalJSON() ([]byte, error) { diff --git a/pipeline/sideload_test.go b/pipeline/sideload_test.go index c77074784e..be322efc66 100644 --- a/pipeline/sideload_test.go +++ b/pipeline/sideload_test.go @@ -6,10 +6,12 @@ import ( func TestSideloadNode_MarshalJSON(t *testing.T) { type fields struct { - Source string - Order []string - Fields map[string]interface{} - Tags map[string]string + Source string + Order []string + Fields map[string]interface{} + Tags map[string]string + HttpUser string + HttpPassword string } tests := []struct { name string @@ -20,8 +22,10 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { { name: "all fields set", fields: fields{ - Source: "file:///src", - Order: []string{"a", "b", "c"}, + Source: "file:///src", + Order: []string{"a", "b", "c"}, + HttpUser: "", + HttpPassword: "", Fields: map[string]interface{}{ "f1": 42.0, "f2": "", @@ -47,7 +51,9 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { "tags": { "t1": "k1", "t2": "" - } + }, + "httpuser": "", + "httppassword": "" }`, }, } @@ -59,6 +65,8 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { w.OrderList = tt.fields.Order w.Fields = tt.fields.Fields w.Tags = tt.fields.Tags + w.HttpUser = tt.fields.HttpUser + w.HttpPassword = tt.fields.HttpPassword MarshalIndentTestHelper(t, w, tt.wantErr, tt.want) }) } diff --git a/pipeline/tick/sideload.go b/pipeline/tick/sideload.go index f40e2e4924..a670261183 100644 --- a/pipeline/tick/sideload.go +++ b/pipeline/tick/sideload.go @@ -26,6 +26,11 @@ func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) { n.Pipe("sideload") n.Dot("source", d.Source) +<<<<<<< HEAD + n.Dot("httpUser", d.HttpUser) + n.Dot("httpPassword", d.HttpPassword) +======= +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f order := make([]interface{}, len(d.OrderList)) for i := range d.OrderList { diff --git a/services/sideload/service.go b/services/sideload/service.go index eab64e419c..9f94bb6efb 100644 --- a/services/sideload/service.go +++ b/services/sideload/service.go @@ -88,6 +88,16 @@ func (s *Service) Reload() error { return nil } +<<<<<<< HEAD +func (s *Service) Source(srcURL string) (Source, error) { + var src Source + + u, err := url.Parse(srcURL) + if err != nil { + return nil, err + } + if u.Scheme != "file" && u.Scheme != "http" { +======= func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) { var src Source @@ -96,21 +106,33 @@ func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) { return nil, err } if u.Scheme != "file" && u.Scheme != "http" && u.Scheme != "https" { +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f return nil, fmt.Errorf("unsupported source scheme %q, must be 'file' or 'http'", u.Scheme) } if u.Scheme == "file" { src, err = s.SourceFile(u.Path) +<<<<<<< HEAD + } else if u.Scheme == "http" { + src, err = s.SourceHttp(srcURL) +======= } else if u.Scheme == "http" || u.Scheme == "https" { src, err = s.SourceHttp(endpoint, u.Scheme) +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f } return src, err } +<<<<<<< HEAD +func (s *Service) SourceHttp(srcURL string) (Source, error) { + var err error + dir := srcURL +======= func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source, error) { var err error dir := endpoint.Url +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f s.mu.Lock() defer s.mu.Unlock() /* @@ -123,8 +145,12 @@ func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source src = &source{ s: s, dir: dir, +<<<<<<< HEAD + scheme: "http", +======= scheme: scheme, e: endpoint, +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f } err = src.updateCache() if err != nil { @@ -135,7 +161,11 @@ func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source src.referenceCount++ if err != nil { +<<<<<<< HEAD + return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", srcURL, err.Error()) +======= return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", dir, err.Error()) +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f } return src, nil } @@ -180,11 +210,20 @@ type Source interface { } type source struct { +<<<<<<< HEAD + s *Service + scheme string + dir string + mu sync.RWMutex + httpUser string + httpPassword string +======= s *Service scheme string dir string mu sync.RWMutex e *httppost.Endpoint +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f cache map[string]map[string]interface{} referenceCount int @@ -223,10 +262,16 @@ func (s *source) updateCacheFile() error { } func (s *source) updateCacheHttp() error { req, err := http.NewRequest("GET", s.dir, nil) +<<<<<<< HEAD + if s.httpUser != "" { + req.SetBasicAuth(s.httpUser, s.httpPassword) + } +======= if s.e.Auth.Username != "" && s.e.Auth.Password != "" { req.SetBasicAuth(s.e.Auth.Username, s.e.Auth.Password) } +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f client := &http.Client{ Timeout: time.Second * 10, } @@ -253,7 +298,11 @@ func (s *source) updateCache() error { if s.scheme == "file" { return s.updateCacheFile() +<<<<<<< HEAD + } else if s.scheme == "http" { +======= } else if s.scheme == "http" || s.scheme == "https" { +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f return s.updateCacheHttp() } return nil diff --git a/services/sideload/service_test.go b/services/sideload/service_test.go index 5f1cc3264f..ebe3fe3bc7 100644 --- a/services/sideload/service_test.go +++ b/services/sideload/service_test.go @@ -33,25 +33,31 @@ func TestService_Source_Lookup(t *testing.T) { defer src.Close() testCases := []struct { - order []string - key string - want interface{} + order []string + key string + httpuser string + httppassword string + want interface{} }{ { order: []string{ "host/hostA.yml", "default.yml", }, - key: "key0", - want: 5.0, + httpuser: "", + httppassword: "", + key: "key0", + want: 5.0, }, { order: []string{ "host/hostA.yml", "default.yml", }, - key: "key1", - want: "one", + httpuser: "", + httppassword: "", + key: "key1", + want: "one", }, { order: []string{ @@ -59,8 +65,10 @@ func TestService_Source_Lookup(t *testing.T) { "hostgroup/foo.yml", "default.yml", }, - key: "key0", - want: 5.0, + httpuser: "", + httppassword: "", + key: "key0", + want: 5.0, }, { order: []string{ @@ -68,8 +76,10 @@ func TestService_Source_Lookup(t *testing.T) { "hostgroup/foo.yml", "default.yml", }, - key: "key1", - want: "foo", + httpuser: "", + httppassword: "", + key: "key1", + want: "foo", }, } for i, tc := range testCases { diff --git a/sideload.go b/sideload.go index ecb1f56f9d..9002133769 100644 --- a/sideload.go +++ b/sideload.go @@ -22,8 +22,16 @@ type SideloadNode struct { s *pipeline.SideloadNode source sideload.Source orderTmpls []orderTmpl +<<<<<<< HEAD + + order []string + httpUser string + httpPassword string + +======= Endpoint *httppost.Endpoint order []string +>>>>>>> 6de5e2134e2949916786b386ad852b342640414f bufferPool *bufpool.Pool } @@ -32,11 +40,13 @@ func newSideloadNode(et *ExecutingTask, n *pipeline.SideloadNode, d NodeDiagnost var e *httppost.Endpoint var ok bool sn := &SideloadNode{ - node: node{Node: n, et: et, diag: d}, - s: n, - bufferPool: bufpool.New(), - order: make([]string, len(n.OrderList)), - orderTmpls: make([]orderTmpl, len(n.OrderList)), + node: node{Node: n, et: et, diag: d}, + s: n, + httpUser: n.HttpUser, + httpPassword: n.HttpPassword, + bufferPool: bufpool.New(), + order: make([]string, len(n.OrderList)), + orderTmpls: make([]orderTmpl, len(n.OrderList)), } u, err := url.Parse(n.Source) if err != nil {