Skip to content

Commit

Permalink
Add support for client-side TLS certs to loki-canary
Browse files Browse the repository at this point in the history
  • Loading branch information
chodges15 authored and Chris Hodges committed Jun 3, 2022
1 parent 1c5e094 commit 16637b8
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [6310](https://github.com/grafana/loki/pull/6310) **chodges15**: Add support for client-side TLS certs in loki-canary for Loki connection
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
Expand Down
30 changes: 26 additions & 4 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"crypto/tls"
"net/http"
"os/signal"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
"github.com/prometheus/common/version"

"github.com/grafana/loki/pkg/canary/comparator"
Expand All @@ -36,7 +39,10 @@ func main() {
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
useTls := flag.Bool("tls", false, "Does the loki connection use TLS?")
certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki")
keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki")
caFile := flag.String("ca-file", "", "Client certificate authority for optional use with TLS connection to Loki")
user := flag.String("user", "", "Loki username.")
pass := flag.String("pass", "", "Loki password.")
tenantID := flag.String("tenant-id", "", "Tenant ID to be set in X-Scope-OrgID header.")
Expand Down Expand Up @@ -83,6 +89,22 @@ func main() {
os.Exit(1)
}

var tlsConfig *tls.Config
tc := config.TLSConfig{}
if *certFile != "" || *keyFile != "" || *caFile != "" {
tc.CAFile = *caFile
tc.CertFile = *certFile
tc.KeyFile = *keyFile
tc.InsecureSkipVerify = false

var err error
tlsConfig, err = config.NewTLSConfig(&tc)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "TLS configuration error: %s\n", err.Error())
os.Exit(1)
}
}

sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

Expand All @@ -94,7 +116,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.reader = reader.NewReader(os.Stderr, receivedChan, *useTls, tlsConfig, *caFile, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
54 changes: 50 additions & 4 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reader

import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/config"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel"
Expand Down Expand Up @@ -50,6 +52,8 @@ type LokiReader interface {
type Reader struct {
header http.Header
tls bool
tlsConfig *tls.Config
caFile string
addr string
user string
pass string
Expand All @@ -74,6 +78,8 @@ type Reader struct {
func NewReader(writer io.Writer,
receivedChan chan time.Time,
tls bool,
tlsConfig *tls.Config,
caFile string,
address string,
user string,
pass string,
Expand Down Expand Up @@ -104,6 +110,8 @@ func NewReader(writer io.Writer,
rd := Reader{
header: h,
tls: tls,
tlsConfig: tlsConfig,
caFile: caFile,
addr: address,
user: user,
pass: pass,
Expand Down Expand Up @@ -189,7 +197,11 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
}
req.Header.Set("User-Agent", userAgent)

resp, err := http.DefaultClient.Do(req)
httpClient, err := r.httpClient()
if err != nil {
return 0, err
}
resp, err := httpClient.Do(req)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -280,7 +292,11 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
req.Header.Set("User-Agent", userAgent)

resp, err := http.DefaultClient.Do(req)
httpClient, err := r.httpClient()
if err != nil {
return nil, err
}
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -329,7 +345,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
return tss, nil
}

// run uses the established websocket connection to tail logs from Loki and
// run uses the established websocket connection to tail logs from Loki
func (r *Reader) run() {
r.closeAndReconnect()

Expand Down Expand Up @@ -421,7 +437,8 @@ func (r *Reader) closeAndReconnect() {

fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)

c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header)
dialer := r.webSocketDialer()
c, _, err := dialer.Dial(u.String(), r.header)
if err != nil {
fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
<-time.After(10 * time.Second)
Expand All @@ -442,6 +459,35 @@ func (r *Reader) closeAndReconnect() {
}
}

// httpClient uses the config in Reader to return a http client.
// http.DefaultClient will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a http.Client configured to use the client side certificates.
func (r *Reader) httpClient() (*http.Client, error) {
if r.tlsConfig == nil || r.tls == false {
return http.DefaultClient, nil
}
rt, err := config.NewTLSRoundTripper(r.tlsConfig, r.caFile, func(tls *tls.Config) (http.RoundTripper, error) {
return &http.Transport{TLSClientConfig: tls}, nil
})
if err != nil {
return nil, err
}
return &http.Client{
Transport: rt,
}, nil
}

// webSocketDialer creates a dialer for the web socket connection to Loki
// websocket.DefaultDialer will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a websocket.Dialer configured to use client side certificates.
func (r *Reader) webSocketDialer() *websocket.Dialer {
dialer := websocket.DefaultDialer
if r.tlsConfig != nil {
dialer.TLSClientConfig = r.tlsConfig
}
return dialer
}

func parseResponse(entry *loghttp.Entry) (*time.Time, error) {
sp := strings.Split(entry.Line, " ")
if len(sp) != 2 {
Expand Down

0 comments on commit 16637b8

Please sign in to comment.