Skip to content

Commit

Permalink
Add HTTP logging API
Browse files Browse the repository at this point in the history
Rename constants

Add pruning of sessions

Add changelog entry

Close channel on service close

Restructure everything into a single package

Fix tests

SQUASH: saving state

Get logs via the API

Switch to streaming http

Add JSON logging

Add table test template for json logger

Add tests for JSON logs

Add todo comment

Fixup todo comment

Add diagnostic session changes

Add kapacitor CLI commands for logs

Change dao to sessions

Reorganize code

Reorganize

Add context to Log function

Fix kapacitor subcommand logic

Address issues from PR

Add diagnostic to session service

Remove unreachable code

Remove deadlock introduced by using diagnostic

Remove TODOs

Add test for content type

Fix race on watch/logs subcommand

And pull out similar logic

Fix notify signals
  • Loading branch information
desa committed Oct 6, 2017
1 parent eacb373 commit a71dc4f
Show file tree
Hide file tree
Showing 15 changed files with 2,735 additions and 1,325 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost
- [#1535](https://github.com/influxdata/kapacitor/pull/1535): Add logfmt support and refactor logging.
- [#1578](https://github.com/influxdata/kapacitor/pull/1578): Add support for exposing logs via the API.

### Bugfixes

Expand Down
47 changes: 47 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -31,6 +32,7 @@ const (
basePreviewPath = "/kapacitor/v1preview"
pingPath = basePath + "/ping"
logLevelPath = basePath + "/loglevel"
logsPath = basePath + "/logs"
debugVarsPath = basePath + "/debug/vars"
tasksPath = basePath + "/tasks"
templatesPath = basePath + "/templates"
Expand Down Expand Up @@ -659,6 +661,51 @@ func (c *Client) Do(req *http.Request, result interface{}, codes ...int) (*http.
return resp, nil
}

func (c *Client) Logs(ctx context.Context, w io.Writer, q map[string]string) error {
u := c.BaseURL()
u.Path = logsPath

qp := u.Query()
for k, v := range q {
qp.Add(k, v)
}
u.RawQuery = qp.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
}
req = req.WithContext(ctx)
err = c.prepRequest(req)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad status code %v", resp.StatusCode)
}

errCh := make(chan error, 1)
defer close(errCh)
go func() {
_, err := io.Copy(w, resp.Body)
errCh <- err
}()

select {
case <-ctx.Done():
return nil
case err := <-errCh:
return err
}

}

// Ping the server for a response.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *Client) Ping() (time.Duration, string, error) {
Expand Down
93 changes: 93 additions & 0 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -10,10 +11,13 @@ import (
"log"
"net/http"
"os"
"os/signal"
"path"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"

humanize "github.com/dustin/go-humanize"
Expand Down Expand Up @@ -52,6 +56,8 @@ Commands:
define-topic-handler Create/update an alert handler for a topic.
replay Replay a recording to a task.
replay-live Replay data against a task without recording it.
watch Watch logs for a task.
logs Follow arbitrary Kapacitor logs.
enable Enable and start running a task with live data.
disable Stop running a task.
reload Reload a running task with an updated task definition.
Expand Down Expand Up @@ -153,6 +159,12 @@ func main() {
}
commandArgs = args
commandF = doReplayLive
case "watch":
commandArgs = args
commandF = doWatch
case "logs":
commandArgs = args
commandF = doLogs
case "enable":
commandArgs = args
commandF = doEnable
Expand Down Expand Up @@ -284,6 +296,10 @@ func doHelp(args []string) error {
showTopicUsage()
case "backup":
backupUsage()
case "watch":
watchUsage()
case "logs":
logsUsage()
case "level":
levelUsage()
case "help":
Expand Down Expand Up @@ -2246,3 +2262,80 @@ func doBackup(args []string) error {
}
return nil
}

func watchUsage() {
var u = `Usage: kapacitor watch <task id> [<tags> ...]
Watch logs associated with a task.
Examples:
$ kapacitor watch mytask
$ kapacitor watch mytask node=log5
`
fmt.Fprintln(os.Stderr, u)
}

func doWatch(args []string) error {
m := map[string]string{}
if len(args) < 1 {
return errors.New("must provide task ID.")
}
m["task"] = args[0]
for _, s := range args[1:] {
pair := strings.Split(s, "=")
if len(pair) != 2 {
return fmt.Errorf("bad keyvalue pair: '%v'", s)
}
m[pair[0]] = pair[1]
}

return tailLogs(m)
}

func logsUsage() {
var u = `Usage: kapacitor logs [<tags> ...]
Watch arbitrary kapacitor logs.
$ kapacitor logs service=http lvl=info
`
fmt.Fprintln(os.Stderr, u)
}

func doLogs(args []string) error {
m := map[string]string{}
for _, s := range args {
pair := strings.Split(s, "=")
if len(pair) != 2 {
return fmt.Errorf("bad keyvalue pair: '%v'", s)
}
m[pair[0]] = pair[1]
}

return tailLogs(m)
}

func tailLogs(m map[string]string) error {
ctx, cancel := context.WithCancel(context.Background())
done := false
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
var mu sync.Mutex
go func() {
<-sigs
cancel()
mu.Lock()
defer mu.Unlock()
done = true
}()

err := cli.Logs(ctx, os.Stdout, m)
mu.Lock()
defer mu.Unlock()
if err != nil && !done {
return errors.Wrap(err, "failed to retrieve logs")
}

return nil
}
9 changes: 9 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Server struct {
AlertService *alert.Service
TaskStore *task_store.Service
ReplayService *replay.Service
SessionService *diagnostic.SessionService
InfluxDBService *influxdb.Service
ConfigOverrideService *config.Service
TesterService *servicetest.Service
Expand Down Expand Up @@ -240,6 +241,7 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv
// Append these after InfluxDB because they depend on it
s.appendTaskStoreService()
s.appendReplayService()
s.appendSessionService()

// Append third-party integrations
// Append extra input services
Expand Down Expand Up @@ -421,6 +423,13 @@ func (s *Server) appendTaskStoreService() {
s.AppendService("task_store", srv)
}

func (s *Server) appendSessionService() {
srv := s.DiagService.SessionService
srv.HTTPDService = s.HTTPDService

s.AppendService("session", srv)
}

func (s *Server) appendReplayService() {
d := s.DiagService.NewReplayHandler()
srv := replay.NewService(s.config.Replay, d)
Expand Down
54 changes: 54 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10147,3 +10147,57 @@ func TestStorage_Backup(t *testing.T) {
t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot)
}
}

func TestLogSessions_HeaderJSON(t *testing.T) {
s, cli := OpenDefaultServer()
defer s.Close()

u := cli.BaseURL()
u.Path = "/logs"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
t.Fatal(err)
return
}

req.Header.Add("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
return
}
defer resp.Body.Close()

if exp, got := "application/json; charset=utf-8", resp.Header.Get("Content-Type"); exp != got {
t.Fatalf("expected: %v, got: %v\n", exp, got)
return
}

}

func TestLogSessions_HeaderGzip(t *testing.T) {
s, cli := OpenDefaultServer()
defer s.Close()

u := cli.BaseURL()
u.Path = "/logs"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
t.Fatal(err)
return
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
return
}
defer resp.Body.Close()

if exp, got := "", resp.Header.Get("Content-Encoding"); exp != got {
t.Fatalf("expected: %v, got: %v\n", exp, got)
return
}

}
Loading

0 comments on commit a71dc4f

Please sign in to comment.