Skip to content

Commit

Permalink
Add context to Log function
Browse files Browse the repository at this point in the history
  • Loading branch information
desa committed Sep 29, 2017
1 parent 03529c8 commit 55a95a7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
15 changes: 13 additions & 2 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"net/url"
"path"
"strconv"
"sync"
"time"

"github.com/influxdata/influxdb/influxql"
Expand Down Expand Up @@ -660,7 +662,7 @@ func (c *Client) Do(req *http.Request, result interface{}, codes ...int) (*http.
return resp, nil
}

func (c *Client) Logs(w io.Writer, q map[string]string) error {
func (c *Client) Logs(ctx context.Context, w io.Writer, q map[string]string) error {
u := c.BaseURL()
u.Path = logsPath

Expand All @@ -674,6 +676,7 @@ func (c *Client) Logs(w io.Writer, q map[string]string) error {
if err != nil {
return err
}
req = req.WithContext(ctx)
err = c.prepRequest(req)
if err != nil {
return err
Expand All @@ -688,7 +691,15 @@ func (c *Client) Logs(w io.Writer, q map[string]string) error {
return fmt.Errorf("bad status code %v", resp.StatusCode)
}

_, err = io.Copy(w, resp.Body)
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, err = io.Copy(w, resp.Body)
wg.Done()
}()

<-ctx.Done()
wg.Wait()

return err
}
Expand Down
48 changes: 42 additions & 6 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -10,10 +11,12 @@ import (
"log"
"net/http"
"os"
"os/signal"
"path"
"sort"
"strconv"
"strings"
"syscall"
"time"

humanize "github.com/dustin/go-humanize"
Expand Down Expand Up @@ -2260,19 +2263,43 @@ func doBackup(args []string) error {
}

func watchUsage() {
var u = `Usage: kapacitor watch <output file>
var u = `Usage: kapacitor watch <task id> [<tags> ...]
Watch logs associated with a task.
Examples:
$ kapacitor logs mytask
$ kapacitor logs mytask node=log5
`
fmt.Fprintln(os.Stderr, u)
}

func doWatch(args []string) error {
if len(args) != 1 {
m := map[string]string{}
if len(args) < 1 {
return errors.New("must provide task ID.")
}
err := cli.Logs(os.Stdout, map[string]string{"task": args[0]})
if err != nil {
m["task"] = args[0]
for _, s := range args[1:] {
pair := strings.Split(s, "=")
if len(pair) != 2 {
return fmt.Errorf("bad keyvalue pair: '%v'", s)
}
m[pair[0]] = pair[1]
}

ctx, cancel := context.WithCancel(context.Background())
cancelled := false
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)
go func() {
<-sigs
cancel()
cancelled = true
}()

if err := cli.Logs(ctx, os.Stdout, m); err != nil && !cancelled {
return errors.Wrap(err, "failed writing logs")
}
return nil
Expand All @@ -2297,8 +2324,17 @@ func doLogs(args []string) error {
}
m[pair[0]] = pair[1]
}
err := cli.Logs(os.Stdout, m)
if err != nil {
ctx, cancel := context.WithCancel(context.Background())
cancelled := false
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)
go func() {
<-sigs
cancel()
cancelled = true
}()

if err := cli.Logs(ctx, os.Stdout, m); err != nil && !cancelled {
return errors.Wrap(err, "failed writing logs")
}
return nil
Expand Down

0 comments on commit 55a95a7

Please sign in to comment.