Skip to content

Commit

Permalink
kafka: refactor kafka to use another library
Browse files Browse the repository at this point in the history
Signed-off-by: Carlos Panato <[email protected]>
  • Loading branch information
cpanato committed Dec 12, 2020
1 parent a86ca9b commit 5f44293
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 42 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
ARG BUILDER_IMAGE=golang:1.15.5-alpine
ARG BUILDER_IMAGE=golang:1.15.5
ARG BASE_IMAGE=alpine:3.12

FROM ${BUILDER_IMAGE} AS build-stage

RUN apk add --update --no-cache alpine-sdk ca-certificates librdkafka coreutils
ENV CGO_ENABLED=0

WORKDIR /src
ADD . .

RUN go mod download
RUN go build -tags musl -gcflags all=-trimpath=/src -asmflags all=-trimpath=/src -a -installsuffix cgo -o falcosidekick .
RUN make falcosidekick

# Final Docker image
FROM ${BASE_IMAGE} AS final-stage
LABEL MAINTAINER "Thomas Labarussias <[email protected]>"

RUN apk add --update --no-cache ca-certificates librdkafka
RUN apk add --update --no-cache ca-certificates

# Create user falcosidekick
RUN addgroup -S falcosidekick && adduser -u 1234 -S falcosidekick -G falcosidekick
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ GOLANGCI_LINT := $(TOOLS_BIN_DIR)/$(GOLANGCI_LINT_BIN)-$(GOLANGCI_LINT_VER)

.PHONY: falcosidekick
falcosidekick:
$(GO) build -o $@
$(GO) build -gcflags all=-trimpath=/src -asmflags all=-trimpath=/src -a -installsuffix cgo -o $@ .

.PHONY: build-image
build-image:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ googlechat:
messageformat: "Alert : rule *{{ .Rule }}* triggered by user *{{ index .OutputFields \"user.name\" }}*" # a Go template to format Google Chat Text above Attachment, displayed in addition to the output from `GOOGLECHAT_OUTPUTFORMAT`, see [Slack Message Formatting](#slack-message-formatting) in the README for details. If empty, no Text is displayed before Attachment.

kafka:
url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# partition: 0 # Partition number of the topic.
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
Expand Down Expand Up @@ -380,7 +380,7 @@ The *env vars* "match" field names in *yaml file with this structure (**take car
* **GOOGLECHAT_OUTPUTFORMAT** : `all` (default), `text` (only text is displayed in Google Chat)
* **GOOGLECHAT_MINIMUMPRIORITY** : minimum priority of event for using this output, order is `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
* **GOOGLECHAT_MESSAGEFORMAT** : a Go template to format Google Chat Text above Attachment, displayed in addition to the output from `GOOGLECHAT_OUTPUTFORMAT`, see [Slack Message Formatting](#slack-message-formatting) in the README for details. If empty, no Text is displayed before sections.
* **KAFKA_URL**: THe URL of the Kafka server (ex: http://kafka), if not empty, Kafka is *enabled*
* **KAFKA_HOSTPORT**: The Host:Port of the Kafka (ex: localhost:9092), if not empty, Kafka is *enabled*
* **KAFKA_TOPIC**: The name of the Kafka topic
* **KAFKA_PARTITION**: The number of the Kafka partition
* **KAFKA_MINIMUMPRIORITY**: minimum priority of event for using this output, order is `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
Expand Down
2 changes: 1 addition & 1 deletion config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ googlechat:
messageformat: 'Alert : rule *{{ .Rule }}* triggered by user *{{ index .OutputFields "user.name" }}*' # a Go template to format Slack Text above Attachment, displayed in addition to the output from `SLACK_OUTPUTFORMAT`, see [Slack Message Formatting](#slack-message-formatting) in the README for details. If empty, no Text is displayed before Attachment.

kafka:
url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# partition: 0 # Partition number of the topic.
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ require (
github.com/Azure/azure-event-hubs-go/v3 v3.3.3
github.com/DataDog/datadog-go v4.2.0+incompatible
github.com/aws/aws-sdk-go v1.35.30
github.com/confluentinc/confluent-kafka-go v1.5.2
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21
github.com/emersion/go-smtp v0.14.0
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/stan.go v0.7.0
github.com/prometheus/client_golang v1.8.0
github.com/segmentio/kafka-go v0.4.8
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58
Expand Down
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/confluentinc/confluent-kafka-go v1.5.2 h1:l+qt+a0Okmq0Bdr1P55IX4fiwFJyg0lZQmfHkAFkv7E=
github.com/confluentinc/confluent-kafka-go v1.5.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand All @@ -166,6 +164,7 @@ github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TR
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
Expand Down Expand Up @@ -235,6 +234,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -333,6 +334,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -409,6 +412,7 @@ github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down Expand Up @@ -464,6 +468,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.8 h1:LO36H2tb7RcCRjsYzT/qf7xE+vRBXgddZDD82e1eiWY=
github.com/segmentio/kafka-go v0.4.8/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down Expand Up @@ -505,6 +511,10 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -534,6 +544,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
2 changes: 1 addition & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go googleChatClient.GooglechatPost(falcopayload)
}

if config.Kafka.URL != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Kafka.MinimumPriority)] || falcopayload.Rule == TestRule) {
if config.Kafka.HostPort != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Kafka.MinimumPriority)] || falcopayload.Rule == TestRule) {
go kafkaClient.KafkaProduce(falcopayload)
}
}
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ func init() {
}
}

if config.Kafka.URL != "" && config.Kafka.Topic != "" {
if config.Kafka.HostPort != "" && config.Kafka.Topic != "" {
var err error
kafkaClient, err = outputs.NewKafkaClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.Kafka.URL = ""
config.Kafka.HostPort = ""
} else {
enabledOutputsText += "Kafka "
}
Expand Down
4 changes: 2 additions & 2 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"cloud.google.com/go/pubsub"
"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/segmentio/kafka-go"

"github.com/falcosecurity/falcosidekick/types"
)
Expand Down Expand Up @@ -52,7 +52,7 @@ type Client struct {
StatsdClient *statsd.Client
DogstatsdClient *statsd.Client
GCPTopicClient *pubsub.Topic
KafkaProducer *kafka.Producer
KafkaProducer *kafka.Conn
}

// NewClient returns a new output.Client for accessing the different API.
Expand Down
54 changes: 30 additions & 24 deletions outputs/kafka.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package outputs

import (
"context"
"encoding/json"
"log"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/segmentio/kafka-go"

"github.com/falcosecurity/falcosidekick/types"
)

// NewKafkaClient returns a new output.Client for accessing the Apache Kafka.
func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": config.Kafka.URL})
conn, err := kafka.DialLeader(context.Background(), "tcp", config.Kafka.HostPort, config.Kafka.Topic, config.Kafka.Partition)
if err != nil {
log.Printf("[ERROR] : Kafka - %v\n", "Error connecting Apache Kafka server")
log.Printf("[ERROR] : Kafka - %v - %v\n", "failed to connect to Apache Kafka server", err.Error())
return nil, err
}

Expand All @@ -25,38 +27,42 @@ func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promSt
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
KafkaProducer: p,
KafkaProducer: conn,
}, nil
}

// KafkaProduce sends a message to a Apach Kafka Topic
func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) {
b, err := json.Marshal(falcopayload)
c.Stats.Kafka.Add(Total, 1)

falcoMsg, err := json.Marshal(falcopayload)
if err != nil {
log.Printf("[ERROR] : Kafka - %v - %v\n", "Error while marshalling message", err.Error())
c.setKafkaErrorMetrics()
log.Printf("[ERROR] : Kafka - %v - %v\n", "failed to marshalling message", err.Error())
return
}

msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &c.Config.Kafka.Topic,
},
Value: b,
kafkaMsg := kafka.Message{
Value: falcoMsg,
}

if c.Config.Kafka.Partition == 0 {
msg.TopicPartition.Partition = kafka.PartitionAny
} else {
msg.TopicPartition.Partition = c.Config.Kafka.Partition
c.KafkaProducer.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = c.KafkaProducer.WriteMessages(kafkaMsg)
if err != nil {
c.setKafkaErrorMetrics()
log.Printf("[ERROR] : Kafka - %v\n", err)
return
}

err = c.KafkaProducer.Produce(msg, nil)
go c.CountMetric("outputs", 1, []string{"output:kafka", "status:ok"})
c.Stats.Kafka.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Inc()
log.Printf("[INFO] : Kafka - Publish OK\n")
}

if err != nil {
c.Stats.Kafka.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Inc()
} else {
c.Stats.Kafka.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Inc()
}
c.Stats.Kafka.Add(Total, 1)
// setKafkaErrorMetrics set the error stats
func (c *Client) setKafkaErrorMetrics() {
go c.CountMetric(Outputs, 1, []string{"output:kafka", "status:error"})
c.Stats.Kafka.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Inc()
}
4 changes: 2 additions & 2 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ type GooglechatConfig struct {
}

type kafkaConfig struct {
URL string
HostPort string
Topic string
Partition int32
Partition int
MinimumPriority string
}

Expand Down

0 comments on commit 5f44293

Please sign in to comment.