Skip to content

Commit

Permalink
Merge branch 'feature/sideload-http' of https://github.com/jregovic/k…
Browse files Browse the repository at this point in the history
…apacitor into feature/sideload-http
  • Loading branch information
jregovic committed Mar 20, 2019
2 parents 37aa85b + fb3715a commit 407f2aa
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
### Features
- [#1894](https://github.com/influxdata/kapacitor/pull/1894): Add HTTP sources for sideload configuration.


- [##1889](https://github.com/influxdata/kapacitor/pull/1889): Add HTTP endpoints as option for sideload
- [#1842](https://github.com/influxdata/kapacitor/pull/1842): Add alert inhibitors that allow an alert to suppress events from other matching alerts.
- [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration.
- [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value
Expand Down
14 changes: 14 additions & 0 deletions pipeline/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type SideloadNode struct {
// Tags is a list of tags to load.
// tick:ignore
Tags map[string]string `tick:"Tag" json:"tags"`

HttpUser string `json:"httpuser"`
HttpPassword string `json:"httppassword"`
}

func newSideloadNode(wants EdgeType) *SideloadNode {
Expand Down Expand Up @@ -96,6 +99,17 @@ func (n *SideloadNode) Tag(t string, v string) *SideloadNode {
return n
}

/*
func (n *SideloadNode) HttpUser(httpuser string) *SideloadNode {
n.HttpUser = httpuser
return n
}
func (n *SideloadNode) HttpPassword(httppassword string) *SideloadNode {
n.HttpPassword = httppassword
return n
}
*/
// MarshalJSON converts SideloadNode to JSON
// tick:ignore
func (n *SideloadNode) MarshalJSON() ([]byte, error) {
Expand Down
22 changes: 15 additions & 7 deletions pipeline/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (

func TestSideloadNode_MarshalJSON(t *testing.T) {
type fields struct {
Source string
Order []string
Fields map[string]interface{}
Tags map[string]string
Source string
Order []string
Fields map[string]interface{}
Tags map[string]string
HttpUser string
HttpPassword string
}
tests := []struct {
name string
Expand All @@ -20,8 +22,10 @@ func TestSideloadNode_MarshalJSON(t *testing.T) {
{
name: "all fields set",
fields: fields{
Source: "file:///src",
Order: []string{"a", "b", "c"},
Source: "file:///src",
Order: []string{"a", "b", "c"},
HttpUser: "",
HttpPassword: "",
Fields: map[string]interface{}{
"f1": 42.0,
"f2": "",
Expand All @@ -47,7 +51,9 @@ func TestSideloadNode_MarshalJSON(t *testing.T) {
"tags": {
"t1": "k1",
"t2": ""
}
},
"httpuser": "",
"httppassword": ""
}`,
},
}
Expand All @@ -59,6 +65,8 @@ func TestSideloadNode_MarshalJSON(t *testing.T) {
w.OrderList = tt.fields.Order
w.Fields = tt.fields.Fields
w.Tags = tt.fields.Tags
w.HttpUser = tt.fields.HttpUser
w.HttpPassword = tt.fields.HttpPassword
MarshalIndentTestHelper(t, w, tt.wantErr, tt.want)
})
}
Expand Down
5 changes: 5 additions & 0 deletions pipeline/tick/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) {
n.Pipe("sideload")

n.Dot("source", d.Source)
<<<<<<< HEAD
n.Dot("httpUser", d.HttpUser)
n.Dot("httpPassword", d.HttpPassword)
=======
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f

order := make([]interface{}, len(d.OrderList))
for i := range d.OrderList {
Expand Down
49 changes: 49 additions & 0 deletions services/sideload/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func (s *Service) Reload() error {
return nil
}

<<<<<<< HEAD
func (s *Service) Source(srcURL string) (Source, error) {
var src Source

u, err := url.Parse(srcURL)
if err != nil {
return nil, err
}
if u.Scheme != "file" && u.Scheme != "http" {
=======
func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) {
var src Source

Expand All @@ -96,21 +106,33 @@ func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) {
return nil, err
}
if u.Scheme != "file" && u.Scheme != "http" && u.Scheme != "https" {
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
return nil, fmt.Errorf("unsupported source scheme %q, must be 'file' or 'http'", u.Scheme)
}

if u.Scheme == "file" {
src, err = s.SourceFile(u.Path)
<<<<<<< HEAD
} else if u.Scheme == "http" {
src, err = s.SourceHttp(srcURL)
=======
} else if u.Scheme == "http" || u.Scheme == "https" {
src, err = s.SourceHttp(endpoint, u.Scheme)
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
}

return src, err
}

<<<<<<< HEAD
func (s *Service) SourceHttp(srcURL string) (Source, error) {
var err error
dir := srcURL
=======
func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source, error) {
var err error
dir := endpoint.Url
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
s.mu.Lock()
defer s.mu.Unlock()
/*
Expand All @@ -123,8 +145,12 @@ func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source
src = &source{
s: s,
dir: dir,
<<<<<<< HEAD
scheme: "http",
=======
scheme: scheme,
e: endpoint,
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
}
err = src.updateCache()
if err != nil {
Expand All @@ -135,7 +161,11 @@ func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source
src.referenceCount++

if err != nil {
<<<<<<< HEAD
return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", srcURL, err.Error())
=======
return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", dir, err.Error())
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
}
return src, nil
}
Expand Down Expand Up @@ -180,11 +210,20 @@ type Source interface {
}

type source struct {
<<<<<<< HEAD
s *Service
scheme string
dir string
mu sync.RWMutex
httpUser string
httpPassword string
=======
s *Service
scheme string
dir string
mu sync.RWMutex
e *httppost.Endpoint
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f

cache map[string]map[string]interface{}
referenceCount int
Expand Down Expand Up @@ -223,10 +262,16 @@ func (s *source) updateCacheFile() error {
}
func (s *source) updateCacheHttp() error {
req, err := http.NewRequest("GET", s.dir, nil)
<<<<<<< HEAD
if s.httpUser != "" {
req.SetBasicAuth(s.httpUser, s.httpPassword)
}
=======
if s.e.Auth.Username != "" && s.e.Auth.Password != "" {
req.SetBasicAuth(s.e.Auth.Username, s.e.Auth.Password)
}

>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
client := &http.Client{
Timeout: time.Second * 10,
}
Expand All @@ -253,7 +298,11 @@ func (s *source) updateCache() error {

if s.scheme == "file" {
return s.updateCacheFile()
<<<<<<< HEAD
} else if s.scheme == "http" {
=======
} else if s.scheme == "http" || s.scheme == "https" {
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
return s.updateCacheHttp()
}
return nil
Expand Down
32 changes: 21 additions & 11 deletions services/sideload/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,43 +33,53 @@ func TestService_Source_Lookup(t *testing.T) {
defer src.Close()

testCases := []struct {
order []string
key string
want interface{}
order []string
key string
httpuser string
httppassword string
want interface{}
}{
{
order: []string{
"host/hostA.yml",
"default.yml",
},
key: "key0",
want: 5.0,
httpuser: "",
httppassword: "",
key: "key0",
want: 5.0,
},
{
order: []string{
"host/hostA.yml",
"default.yml",
},
key: "key1",
want: "one",
httpuser: "",
httppassword: "",
key: "key1",
want: "one",
},
{
order: []string{
"host/hostA.yml",
"hostgroup/foo.yml",
"default.yml",
},
key: "key0",
want: 5.0,
httpuser: "",
httppassword: "",
key: "key0",
want: 5.0,
},
{
order: []string{
"host/hostA.yml",
"hostgroup/foo.yml",
"default.yml",
},
key: "key1",
want: "foo",
httpuser: "",
httppassword: "",
key: "key1",
want: "foo",
},
}
for i, tc := range testCases {
Expand Down
20 changes: 15 additions & 5 deletions sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ type SideloadNode struct {
s *pipeline.SideloadNode
source sideload.Source
orderTmpls []orderTmpl
<<<<<<< HEAD

order []string
httpUser string
httpPassword string

=======
Endpoint *httppost.Endpoint
order []string
>>>>>>> 6de5e2134e2949916786b386ad852b342640414f
bufferPool *bufpool.Pool
}

Expand All @@ -32,11 +40,13 @@ func newSideloadNode(et *ExecutingTask, n *pipeline.SideloadNode, d NodeDiagnost
var e *httppost.Endpoint
var ok bool
sn := &SideloadNode{
node: node{Node: n, et: et, diag: d},
s: n,
bufferPool: bufpool.New(),
order: make([]string, len(n.OrderList)),
orderTmpls: make([]orderTmpl, len(n.OrderList)),
node: node{Node: n, et: et, diag: d},
s: n,
httpUser: n.HttpUser,
httpPassword: n.HttpPassword,
bufferPool: bufpool.New(),
order: make([]string, len(n.OrderList)),
orderTmpls: make([]orderTmpl, len(n.OrderList)),
}
u, err := url.Parse(n.Source)
if err != nil {
Expand Down

0 comments on commit 407f2aa

Please sign in to comment.