Skip to content

Commit

Permalink
Merge pull request AliyunContainerService#180 from JarHMJ/feature/mon…
Browse files Browse the repository at this point in the history
…go_sink

Feature/mongo sink
  • Loading branch information
ringtail authored Aug 30, 2021
2 parents 5e08288 + 0342568 commit d307959
Show file tree
Hide file tree
Showing 255 changed files with 50,065 additions and 80 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Supported Sinks:
| <a href="docs/en/mysql-sink.md">mysql</a> | sink to mysql database |
| <a href="docs/en/wechat-sink.md">wechat</a> | sink to wechat |
| <a href="docs/en/webhook-sink.md">webhook</a> | sink to webhook |
| <a href="docs/en/mongodb-sink.md">webhook</a> | sink to mongodb |

### Contributing
Please check <a href="docs/en/CONTRIBUTING.md" target="_blank">CONTRIBUTING.md</a>
Expand Down
13 changes: 13 additions & 0 deletions docs/en/mongodb-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
### MongoDB

*This sink supports MongoDB 2.6 and higher*.
To use the MongoDB sink add the following flag:
```
--sink=mongo:?<MONGO_URI>
```


For example,
```
--sink=mongo:?mongodb://root:[email protected]:30694,mongo-replset-0-1.dba-c1.example.com:32761,mongo-replset-0-2.dba-c1.example.com:31958/?authSource=admin
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9
github.com/smartystreets/gunit v1.0.0 // indirect
github.com/stretchr/testify v1.5.1
go.mongodb.org/mongo-driver v1.3.7
gopkg.in/olivere/elastic.v3 v3.0.75
gopkg.in/olivere/elastic.v5 v5.0.81
gopkg.in/olivere/elastic.v6 v6.2.23
Expand Down
65 changes: 65 additions & 0 deletions go.sum

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions sinks/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/AliyunContainerService/kube-eventer/sinks/influxdb"
"github.com/AliyunContainerService/kube-eventer/sinks/kafka"
"github.com/AliyunContainerService/kube-eventer/sinks/log"
"github.com/AliyunContainerService/kube-eventer/sinks/mongo"
"github.com/AliyunContainerService/kube-eventer/sinks/mysql"
"github.com/AliyunContainerService/kube-eventer/sinks/riemann"
"github.com/AliyunContainerService/kube-eventer/sinks/sls"
Expand Down Expand Up @@ -62,6 +63,8 @@ func (this *SinkFactory) Build(uri flags.Uri) (core.EventSink, error) {
return webhook.NewWebHookSink(&uri.Val)
case "eventbridge":
return eventbridge.NewEventBridgeSink(&uri.Val)
case "mongo":
return mongo.CreateMongoSink(&uri.Val)
default:
return nil, fmt.Errorf("Sink not recognized: %s", uri.Key)
}
Expand Down
134 changes: 134 additions & 0 deletions sinks/mongo/mongo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongo

import (
"context"
"github.com/AliyunContainerService/kube-eventer/core"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
kube_api "k8s.io/api/core/v1"
"k8s.io/klog"
"net/url"
"sync"
"time"
)

type mongoSink struct {
client *mongo.Client
closeDB func()
sync.RWMutex
}

type mongoSinkPoint struct {
Count int32 `bson:"count,omitempty"`
Namespace string `bson:"namespace,omitempty"`
Kind string `bson:"kind,omitempty"`
Name string `bson:"name,omitempty"`
Type string `bson:"type,omitempty"`
Reason string `bson:"reason,omitempty"`
Message string `bson:"message,omitempty"`
EventID string `bson:"event_id,omitempty"`
FirstOccurrenceTimestamp time.Time `bson:"first_occurrence_time,omitempty"`
LastOccurrenceTimestamp time.Time `bson:"last_occurrence_time,omitempty"`
}

func (m *mongoSink) Name() string {
return "Mongo Sink"
}

func (m *mongoSink) saveData(sinkData *mongoSinkPoint) error {
eventCollection := m.client.Database("k8s").Collection("event")

ctx := context.TODO()
_, err := eventCollection.InsertOne(ctx, sinkData)
if err != nil {
return err
}
return nil
}

func eventToPoint(event *kube_api.Event) (*mongoSinkPoint, error) {
var (
lastOccurrenceTimestamp = event.LastTimestamp.Time.UTC()
firstOccurrenceTimestamp = event.FirstTimestamp.Time.UTC()
)

// Part of k8s resources FirstOccurrenceTimestamp/LastOccurrenceTimestamp is nil
if event.LastTimestamp.UTC().IsZero() {
lastOccurrenceTimestamp = event.CreationTimestamp.Time.UTC()
}

if event.FirstTimestamp.UTC().IsZero() {
firstOccurrenceTimestamp = event.CreationTimestamp.Time.UTC()
}

point := mongoSinkPoint{
Count: event.Count,
Name: event.InvolvedObject.Name,
Namespace: event.InvolvedObject.Namespace,
EventID: string(event.UID),
Type: event.Type,
Reason: event.Reason,
Message: event.Message,
Kind: event.InvolvedObject.Kind,
FirstOccurrenceTimestamp: firstOccurrenceTimestamp,
LastOccurrenceTimestamp: lastOccurrenceTimestamp,
}
return &point, nil
}

func (m *mongoSink) ExportEvents(eventBatch *core.EventBatch) {
m.Lock()
defer m.Unlock()

for _, event := range eventBatch.Events {
point, err := eventToPoint(event)
if err != nil {
klog.Warningf("Failed to convert event to point: %v", err)
continue
}

err = m.saveData(point)
if err != nil {
klog.Warningf("Failed to export data to Mongo sink: %v", err)
}
}

}

func (m *mongoSink) Stop() {
m.closeDB()
}

func CreateMongoSink(uri *url.URL) (core.EventSink, error) {
var sink mongoSink

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)

client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri.RawQuery))
if err != nil {
return nil, err
}
sink.client = client
sink.closeDB = func() {
if err = client.Disconnect(ctx); err != nil {
panic(err)
}
}

klog.V(3).Info("Mongo Sink setup successfully")
return &sink, nil
}
201 changes: 201 additions & 0 deletions vendor/go.mongodb.org/mongo-driver/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d307959

Please sign in to comment.