Skip to content

Commit

Permalink
Log sink type er'where (#251)
Browse files Browse the repository at this point in the history
* version bump

* put protocol stats back
  • Loading branch information
jakthom authored Apr 22, 2022
1 parent ec5a47c commit 1deb4b1
Show file tree
Hide file tree
Showing 32 changed files with 109 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.5.2
v0.5.3
1 change: 1 addition & 0 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (a *App) handlerParams() handler.EventHandlerParams {
Config: a.config,
Cache: a.schemaCache,
Manifold: a.manifold,
Meta: a.meta,
}
return params
}
Expand Down
2 changes: 1 addition & 1 deletion examples/devel/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.5.2
image: ghcr.io/silverton-io/honeypot:v0.5.3
volumes:
- type: bind
source: ./honeypot/devel.conf.yml
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
honeypot:
container_name: honeypot
image: ghcr.io/silverton-io/honeypot:v0.5.2
image: ghcr.io/silverton-io/honeypot:v0.5.3
volumes:
- type: bind
source: ./honeypot/quickstart.conf.yml
Expand Down
2 changes: 0 additions & 2 deletions examples/quickstart/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ echo "\n\nOpening associated resources...\n";

open http://localhost:8080/;
sleep 2;
open http://localhost:8080/stats;
sleep 2;
open http://localhost:8080/schemas;
sleep 2;
open http://localhost:8080/schemas/com.silverton.io/snowplow/page_view/v1.0.json;
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func CloudeventsHandler(h EventHandlerParams) gin.HandlerFunc {
if c.ContentType() == "application/cloudevents+json" || c.ContentType() == "application/cloudevents-batch+json" {
envelopes := envelope.BuildCloudeventEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
err := h.Manifold.Distribute(annotatedEnvelopes)
err := h.Manifold.Distribute(annotatedEnvelopes, h.Meta)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func GenericHandler(h EventHandlerParams) gin.HandlerFunc {
if c.ContentType() == "application/json" {
envelopes := envelope.BuildGenericEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
err := h.Manifold.Distribute(annotatedEnvelopes)
err := h.Manifold.Distribute(annotatedEnvelopes, h.Meta)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
Expand Down
2 changes: 2 additions & 0 deletions pkg/handler/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"github.com/silverton-io/honeypot/pkg/cache"
"github.com/silverton-io/honeypot/pkg/config"
"github.com/silverton-io/honeypot/pkg/manifold"
"github.com/silverton-io/honeypot/pkg/tele"
)

type EventHandlerParams struct {
Config *config.Config
Cache *cache.SchemaCache
Manifold *manifold.SimpleManifold
Meta *tele.Meta
}
2 changes: 1 addition & 1 deletion pkg/handler/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func RelayHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
envelopes := envelope.BuildRelayEnvelopesFromRequest(c)
err := h.Manifold.Distribute(envelopes)
err := h.Manifold.Distribute(envelopes, h.Meta)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_3)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/snowplow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func SnowplowHandler(h EventHandlerParams) gin.HandlerFunc {
fn := func(c *gin.Context) {
envelopes := envelope.BuildSnowplowEnvelopesFromRequest(c, *h.Config)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
err := h.Manifold.Distribute(annotatedEnvelopes)
err := h.Manifold.Distribute(annotatedEnvelopes, h.Meta)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func WebhookHandler(h EventHandlerParams) gin.HandlerFunc {
if c.ContentType() == "application/json" {
envelopes := envelope.BuildWebhookEnvelopesFromRequest(c)
annotatedEnvelopes := annotator.Annotate(envelopes, h.Cache)
err := h.Manifold.Distribute(annotatedEnvelopes)
err := h.Manifold.Distribute(annotatedEnvelopes, h.Meta)
if err != nil {
c.Header("Retry-After", response.RETRY_AFTER_60)
c.JSON(http.StatusServiceUnavailable, response.ManifoldDistributionError)
Expand Down
13 changes: 8 additions & 5 deletions pkg/manifold/simpleManifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/silverton-io/honeypot/pkg/envelope"
"github.com/silverton-io/honeypot/pkg/sink"
"github.com/silverton-io/honeypot/pkg/tele"
)

// A stupid-simple manifold with strict guarantees
Expand All @@ -18,36 +19,38 @@ func (m *SimpleManifold) Initialize(sinks *[]sink.Sink) error {
return nil
}

func (m *SimpleManifold) Distribute(envelopes []envelope.Envelope) error {
func (m *SimpleManifold) Distribute(envelopes []envelope.Envelope, meta *tele.Meta) error {
var validEnvelopes []envelope.Envelope
var invalidEnvelopes []envelope.Envelope

for _, e := range envelopes {
isValid := *e.IsValid
if isValid {
meta.ProtocolStats.IncrementValid(e.EventProtocol, e.EventMetadata, 1)
validEnvelopes = append(validEnvelopes, e)
} else {
meta.ProtocolStats.IncrementInvalid(e.EventProtocol, e.EventMetadata, 1)
invalidEnvelopes = append(invalidEnvelopes, e)
}
}

for _, s := range *m.sinks {
ctx := context.Background()
if len(validEnvelopes) > 0 {
log.Debug().Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Msg("purging valid envelopes to sink")
log.Debug().Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Interface("sinkType", s.Type()).Msg("purging valid envelopes to sink")
publishErr := s.BatchPublishValid(ctx, validEnvelopes)
if publishErr != nil {
log.Error().Err(publishErr).Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Msg("could not purge valid envelopes to sink")
log.Error().Err(publishErr).Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Interface("sinkType", s.Type()).Msg("could not purge valid envelopes to sink")
if s.DeliveryRequired() {
return publishErr
}
}
}
if len(invalidEnvelopes) > 0 {
log.Debug().Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Msg("purging invalid envelopes to sink")
log.Debug().Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Interface("sinkType", s.Type()).Msg("purging invalid envelopes to sink")
publishErr := s.BatchPublishInvalid(ctx, invalidEnvelopes)
if publishErr != nil {
log.Error().Err(publishErr).Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Msg("could not purge invalid envelopes to sink")
log.Error().Err(publishErr).Interface("sinkId", s.Id()).Interface("sinkName", s.Name()).Interface("deliveryRequired", s.DeliveryRequired()).Interface("sinkType", s.Type()).Msg("could not purge invalid envelopes to sink")
if s.DeliveryRequired() {
return publishErr
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (s *BlackholeSink) Name() string {
return s.name
}

func (s *BlackholeSink) Type() string {
return BLACKHOLE
}

func (s *BlackholeSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (s *ClickhouseSink) Name() string {
return s.name
}

func (s *ClickhouseSink) Type() string {
return CLICKHOUSE
}

func (s *ClickhouseSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (s *ElasticsearchSink) Name() string {
return s.name
}

func (s *ElasticsearchSink) Type() string {
return ELASTICSEARCH
}

func (s *ElasticsearchSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (s *FileSink) Name() string {
return s.name
}

func (s *FileSink) Type() string {
return FILE
}

func (s *FileSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (s *HttpSink) Name() string {
return s.name
}

func (s *HttpSink) Type() string {
return HTTP
}

func (s *HttpSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (s *KafkaSink) Name() string {
return s.name
}

func (s *KafkaSink) Type() string {
return KAFKA
}

func (s *KafkaSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (s *KinesisSink) Name() string {
return s.name
}

func (s *KinesisSink) Type() string {
return KINESIS
}

func (s *KinesisSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/kinesisFirehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (s *KinesisFirehoseSink) Name() string {
return s.name
}

func (s *KinesisFirehoseSink) Type() string {
return KINESIS_FIREHOSE
}

func (s *KinesisFirehoseSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (s *MaterializeSink) Name() string {
return s.name
}

func (s *MaterializeSink) Type() string {
return MATERIALIZE
}

func (s *MaterializeSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (s *MongodbSink) Name() string {
return s.name
}

func (s *MongodbSink) Type() string {
return MONGODB
}

func (s *MongodbSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (s *MysqlSink) Name() string {
return s.name
}

func (s *MysqlSink) Type() string {
return MYSQL
}

func (s *MysqlSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (s *PostgresSink) Name() string {
return s.name
}

func (s *PostgresSink) Type() string {
return POSTGRES
}

func (s *PostgresSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/pubnub.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s *PubnubSink) Name() string {
return s.name
}

func (s *PubnubSink) Type() string {
return PUBNUB
}

func (s *PubnubSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (s *PubsubSink) Name() string {
return s.name
}

func (s *PubsubSink) Type() string {
return PUBSUB
}

func (s *PubsubSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (s *RelaySink) Name() string {
return s.name
}

func (s *RelaySink) Type() string {
return RELAY
}

func (s *RelaySink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
type Sink interface {
Id() *uuid.UUID
Name() string
Type() string
DeliveryRequired() bool
Initialize(conf config.Sink) error
BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error
Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func (ms *MockSink) Name() string {
return id
}

func (ms *MockSink) Type() string {
id := "mock"
return id
}

func (ms *MockSink) DeliveryRequired() bool {
return false
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (s *StdoutSink) Name() string {
return s.name
}

func (s *StdoutSink) Type() string {
return STDOUT
}

func (s *StdoutSink) DeliveryRequired() bool {
return s.deliveryRequired
}
Expand Down
27 changes: 12 additions & 15 deletions pkg/tele/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
)

type Meta struct {
Version string `json:"version"`
InstanceId uuid.UUID `json:"instanceId"`
StartTime time.Time `json:"startTime"`
TrackerDomain string `json:"trackerDomain"`
CookieDomain string `json:"cookieDomain"`
ProtocolStats *ProtocolStats `json:"protocolStats"`
BufferPurgeStats *BufferPurgeStats `json:"bufferPurgeStats"`
Version string `json:"version"`
InstanceId uuid.UUID `json:"instanceId"`
StartTime time.Time `json:"startTime"`
TrackerDomain string `json:"trackerDomain"`
CookieDomain string `json:"cookieDomain"`
ProtocolStats *ProtocolStats `json:"protocolStats"`
}

func (m *Meta) elapsed() float64 {
Expand All @@ -25,15 +24,13 @@ func BuildMeta(version string, conf *config.Config) *Meta {
instanceId := uuid.New()
ps := ProtocolStats{}
ps.Build()
bs := BufferPurgeStats{}
m := Meta{
Version: version,
InstanceId: instanceId,
StartTime: time.Now().UTC(),
TrackerDomain: conf.App.TrackerDomain,
CookieDomain: conf.Cookie.Domain,
ProtocolStats: &ps,
BufferPurgeStats: &bs,
Version: version,
InstanceId: instanceId,
StartTime: time.Now().UTC(),
TrackerDomain: conf.App.TrackerDomain,
CookieDomain: conf.Cookie.Domain,
ProtocolStats: &ps,
}
return &m
}
Loading

0 comments on commit 1deb4b1

Please sign in to comment.