Skip to content

Commit

Permalink
Add Cloudwatch Logs Output
Browse files Browse the repository at this point in the history
Signed-off-by: Carlos Panato <[email protected]>
  • Loading branch information
cpanato authored and poiana committed Nov 27, 2020
1 parent 6c5617f commit 210ce91
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 58 deletions.
84 changes: 83 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Currently available outputs are :
* [**AWS Lambda**](https://aws.amazon.com/lambda/features/)
* [**AWS SQS**](https://aws.amazon.com/sqs/features/)
* [**AWS SNS**](https://aws.amazon.com/sns/features/)
* [**AWS CloudWatchLogs**](https://aws.amazon.com/cloudwatch/features/)
* **SMTP** (email)
* [**Opsgenie**](https://www.opsgenie.com/)
* [**StatsD**](https://github.com/statsd/statsd) (for monitoring of `falcosidekick`)
Expand Down Expand Up @@ -194,6 +195,10 @@ aws:
# topicarn : "" # SNS TopicArn, if not empty, AWS SNS output is enabled
rawjson: false # Send Raw JSON or parse it (default: false)
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
cloudwatchlogs:
# loggroup : "" # AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled
# logstream : "" # AWS CloudWatch Logs Stream name, if empty, Falcosidekick will try to create a log stream
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

smtp:
# hostport: "" # host:port address of SMTP server, if not empty, SMTP output is enabled
Expand Down Expand Up @@ -252,7 +257,7 @@ googlechat:
kafka:
url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# partition: 0 # Partition number of the topic.
# partition: 0 # Partition number of the topic.
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
```

Expand Down Expand Up @@ -332,6 +337,8 @@ The *env vars* "match" field names in *yaml file with this structure (**take car
* **AWS_SNS_TOPICARN** : AWS SNS TopicARN, if not empty, AWS SNS output is *enabled*
* **AWS_SNS_RAWJSON** : Send Raw JSON or parse it (default: false)
* **AWS_SNS_MINIMUMPRIORITY** : minimum priority of event for using this output, order is `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
* **AWS_CLOUDWATCHLOGS_LOGGROUP** : AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled
* **AWS_CLOUDWATCHLOGS_LOGSTREAM** : AWS CloudWatch Logs Stream name, if empty, FalcoSideKick will try to create a log stream
* **SMTP_HOSTPORT** : "host:port" address of SMTP server, if not empty, SMTP output is *enabled*
* **SMTP_USER** : user to access SMTP server
* **SMTP_PASSWORD** : password to access SMTP server
Expand Down Expand Up @@ -415,6 +422,81 @@ The daemon exposes a `prometheus` endpoint on URI `/metrics`.

The daemon is able to push its metrics to a StatsD/DogstatsD server. See [Configuration](https://github.com/falcosecurity/falcosidekick#configuration) section for how-to.


### AWS Policy example

When using the AWS output you will need to set the AWS keys with some permissions to access the resources you selected to use, like
`SQS`, `Lambda`, `SNS` and `CloudWatchLogs`

#### CloudWatch Logs Sample Policy

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "cloudwacthlogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:DescribeLogStreams",
"logs:PutRetentionPolicy",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
```

#### SQS Sample Policy

```json
{
"Version": "2012-10-17",
"Id": "sqs",
"Statement": [{
"Sid":"sendMessage",
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:*:111122223333:queue1"
}]
}
```

#### SNS Sample Policy

```json
{
"Version": "2012-10-17",
"Id": "sns",
"Statement": [{
"Sid":"publish",
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Resource": "arn:aws:sqs:*:111122223333:queue1"
}]
}
```

#### Lambda Sample Policy

```json
{
"Version": "2012-10-17",
"Id": "lambda",
"Statement": [{
"Sid":"invoke",
"Effect": "Allow",
"Principal": "*",
"Action": "lambda:InvokeFunction",
"Resource": "*"
}]
}
```

## Examples

Run you daemon and try (from falco's documentation) :
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func getConfig() *types.Configuration {
v.SetDefault("AWS.SNS.TopicArn", "")
v.SetDefault("AWS.SNS.MinimumPriority", "")
v.SetDefault("AWS.SNS.RawJSON", false)
v.SetDefault("AWS.CloudWatchLogs.LogGroup", "")
v.SetDefault("AWS.CloudWatchLogs.LogStream", "")
v.SetDefault("AWS.CloudWatchLogs.MinimumPriority", "")
v.SetDefault("SMTP.HostPort", "")
v.SetDefault("SMTP.User", "")
v.SetDefault("SMTP.Password", "")
Expand Down Expand Up @@ -175,6 +178,7 @@ func getConfig() *types.Configuration {
c.AWS.Lambda.MinimumPriority = checkPriority(c.AWS.Lambda.MinimumPriority)
c.AWS.SQS.MinimumPriority = checkPriority(c.AWS.SQS.MinimumPriority)
c.AWS.SNS.MinimumPriority = checkPriority(c.AWS.SNS.MinimumPriority)
c.AWS.CloudWatchLogs.MinimumPriority = checkPriority(c.AWS.CloudWatchLogs.MinimumPriority)
c.Opsgenie.MinimumPriority = checkPriority(c.Opsgenie.MinimumPriority)
c.Webhook.MinimumPriority = checkPriority(c.Webhook.MinimumPriority)
c.Azure.EventHub.MinimumPriority = checkPriority(c.Azure.EventHub.MinimumPriority)
Expand Down
6 changes: 5 additions & 1 deletion config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ aws:
# topicarn : "" # SNS TopicArn, if not empty, AWS SNS output is enabled
rawjson: false # Send Raw JSON or parse it (default: false)
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
cloudwatchlogs:
# loggroup : "" # AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled
# logstream : "" # AWS CloudWatch Logs Stream name, if empty, Falcosidekick will try to create a log stream
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

smtp:
# hostport: "" # host:port address of SMTP server, if not empty, SMTP output is enabled
Expand Down Expand Up @@ -141,5 +145,5 @@ googlechat:
kafka:
url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# partition: 0 # Partition number of the topic.
# partition: 0 # Partition number of the topic.
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
5 changes: 5 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,18 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go awsClient.PublishTopic(falcopayload)
}

if config.AWS.CloudWatchLogs.LogGroup != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.AWS.CloudWatchLogs.MinimumPriority)] || falcopayload.Rule == TestRule) {
go awsClient.SendCloudWatchLog(falcopayload)
}

if config.SMTP.HostPort != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.SMTP.MinimumPriority)] || falcopayload.Rule == TestRule) {
go smtpClient.SendMail(falcopayload)
}

if config.Opsgenie.APIKey != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Opsgenie.MinimumPriority)] || falcopayload.Rule == TestRule) {
go opsgenieClient.OpsgeniePost(falcopayload)
}

if config.Webhook.Address != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Webhook.MinimumPriority)] || falcopayload.Rule == TestRule) {
go webhookClient.WebhookPost(falcopayload)
}
Expand Down
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func init() {
}
}

if config.AWS.Lambda.FunctionName != "" || config.AWS.SQS.URL != "" || config.AWS.SNS.TopicArn != "" {
if config.AWS.Lambda.FunctionName != "" || config.AWS.SQS.URL != "" ||
config.AWS.SNS.TopicArn != "" || config.AWS.CloudWatchLogs.LogGroup != "" {
var err error
awsClient, err = outputs.NewAWSClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
Expand All @@ -204,6 +205,8 @@ func init() {
config.AWS.Lambda.FunctionName = ""
config.AWS.SQS.URL = ""
config.AWS.SNS.TopicArn = ""
config.AWS.CloudWatchLogs.LogGroup = ""
config.AWS.CloudWatchLogs.LogStream = ""
} else {
if config.AWS.Lambda.FunctionName != "" {
enabledOutputsText += "AWSLambda "
Expand All @@ -214,6 +217,9 @@ func init() {
if config.AWS.SNS.TopicArn != "" {
enabledOutputsText += "AWSSNS "
}
if config.AWS.CloudWatchLogs.LogGroup != "" {
enabledOutputsText += "AWSCloudWatchLogs "
}
}
}

Expand Down
82 changes: 81 additions & 1 deletion outputs/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"log"
"net/url"
"os"
"time"

"github.com/falcosecurity/falcosidekick/types"

"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
Expand Down Expand Up @@ -124,7 +127,7 @@ func (c *Client) SendMessage(falcopayload types.FalcoPayload) {

log.Printf("[INFO] : %v SQS - Send Message OK (%v)\n", c.OutputType, *resp.MessageId)
go c.CountMetric("outputs", 1, []string{"output:awssqs", "status:ok"})
c.Stats.AWSSQS.Add("ok", 1)
c.Stats.AWSSQS.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssqs", "status": "ok"}).Inc()
}

Expand Down Expand Up @@ -189,3 +192,80 @@ func (c *Client) PublishTopic(falcopayload types.FalcoPayload) {
c.Stats.AWSSNS.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssns", "status": OK}).Inc()
}

// SendCloudWatchLog sends a message to CloudWatch Log
func (c *Client) SendCloudWatchLog(falcopayload types.FalcoPayload) {
svc := cloudwatchlogs.New(c.AWSSession)

f, _ := json.Marshal(falcopayload)

c.Stats.AWSCloudWatchLogs.Add(Total, 1)

if c.Config.AWS.CloudWatchLogs.LogStream == "" {
streamName := "falcosidekick-logstream"
log.Printf("[INFO] : %v CloudWatchLogs - Log Stream not configured creating one called %s\n", c.OutputType, streamName)
inputLogStream := &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(c.Config.AWS.CloudWatchLogs.LogGroup),
LogStreamName: aws.String(streamName),
}

_, err := svc.CreateLogStream(inputLogStream)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
log.Printf("[INFO] : %v CloudWatchLogs - Log Stream %s already exist, reusing...\n", c.OutputType, streamName)
} else {
go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:error"})
c.Stats.AWSCloudWatchLogs.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": Error}).Inc()
log.Printf("[ERROR] : %v CloudWatchLogs - %v\n", c.OutputType, err.Error())
return
}
}

c.Config.AWS.CloudWatchLogs.LogStream = streamName
}

logevent := &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(f)),
Timestamp: aws.Int64(falcopayload.Time.UnixNano() / int64(time.Millisecond)),
}

input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: []*cloudwatchlogs.InputLogEvent{logevent},
LogGroupName: aws.String(c.Config.AWS.CloudWatchLogs.LogGroup),
LogStreamName: aws.String(c.Config.AWS.CloudWatchLogs.LogStream),
}

var err error
resp := &cloudwatchlogs.PutLogEventsOutput{}
resp, err = c.putLogEvents(svc, input)
if err != nil {
go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:error"})
c.Stats.AWSCloudWatchLogs.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": Error}).Inc()
log.Printf("[ERROR] : %v CloudWatchLogs - %v\n", c.OutputType, err.Error())
return
}

log.Printf("[INFO] : %v CloudWatchLogs - Send Log OK (%v)\n", c.OutputType, resp.String())
go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:ok"})
c.Stats.AWSCloudWatchLogs.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": OK}).Inc()
}

// PutLogEvents will attempt to execute and handle invalid tokens.
func (c *Client) putLogEvents(svc *cloudwatchlogs.CloudWatchLogs, input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
resp, err := svc.PutLogEvents(input)
if err != nil {
if exception, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok {
log.Printf("[INFO] : %v Refreshing token for LogGroup: %s LogStream: %s", c.OutputType, *input.LogGroupName, *input.LogStreamName)
input.SequenceToken = exception.ExpectedSequenceToken

return c.putLogEvents(svc, input)
}

return nil, err
}

return resp, nil
}
55 changes: 28 additions & 27 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,34 @@ func getInitStats() *types.Statistics {
}))

stats = &types.Statistics{
Requests: getInputNewMap("requests"),
FIFO: getInputNewMap("fifo"),
GRPC: getInputNewMap("grpc"),
Falco: expvar.NewMap("falco.priority"),
Slack: getOutputNewMap("slack"),
Rocketchat: getOutputNewMap("rocketchat"),
Mattermost: getOutputNewMap("mattermost"),
Teams: getOutputNewMap("teams"),
Datadog: getOutputNewMap("datadog"),
Discord: getOutputNewMap("discord"),
Alertmanager: getOutputNewMap("alertmanager"),
Elasticsearch: getOutputNewMap("elasticsearch"),
Loki: getOutputNewMap("loki"),
Nats: getOutputNewMap("nats"),
Influxdb: getOutputNewMap("influxdb"),
AWSLambda: getOutputNewMap("awslambda"),
AWSSQS: getOutputNewMap("awssqs"),
AWSSNS: getOutputNewMap("awssns"),
SMTP: getOutputNewMap("smtp"),
Opsgenie: getOutputNewMap("opsgenie"),
Statsd: getOutputNewMap("statsd"),
Dogstatsd: getOutputNewMap("dogstatsd"),
Webhook: getOutputNewMap("webhook"),
AzureEventHub: getOutputNewMap("azureeventhub"),
GCPPubSub: getOutputNewMap("gcppubsub"),
GoogleChat: getOutputNewMap("googlechat"),
Kafka: getOutputNewMap("kafka"),
Requests: getInputNewMap("requests"),
FIFO: getInputNewMap("fifo"),
GRPC: getInputNewMap("grpc"),
Falco: expvar.NewMap("falco.priority"),
Slack: getOutputNewMap("slack"),
Rocketchat: getOutputNewMap("rocketchat"),
Mattermost: getOutputNewMap("mattermost"),
Teams: getOutputNewMap("teams"),
Datadog: getOutputNewMap("datadog"),
Discord: getOutputNewMap("discord"),
Alertmanager: getOutputNewMap("alertmanager"),
Elasticsearch: getOutputNewMap("elasticsearch"),
Loki: getOutputNewMap("loki"),
Nats: getOutputNewMap("nats"),
Influxdb: getOutputNewMap("influxdb"),
AWSLambda: getOutputNewMap("awslambda"),
AWSSQS: getOutputNewMap("awssqs"),
AWSSNS: getOutputNewMap("awssns"),
AWSCloudWatchLogs: getOutputNewMap("awscloudwatchlogs"),
SMTP: getOutputNewMap("smtp"),
Opsgenie: getOutputNewMap("opsgenie"),
Statsd: getOutputNewMap("statsd"),
Dogstatsd: getOutputNewMap("dogstatsd"),
Webhook: getOutputNewMap("webhook"),
AzureEventHub: getOutputNewMap("azureeventhub"),
GCPPubSub: getOutputNewMap("gcppubsub"),
GoogleChat: getOutputNewMap("googlechat"),
Kafka: getOutputNewMap("kafka"),
}
stats.Falco.Add("emergency", 0)
stats.Falco.Add("alert", 0)
Expand Down
Loading

0 comments on commit 210ce91

Please sign in to comment.