Skip to content

Commit

Permalink
usage model (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Mar 3, 2023
1 parent 90126e1 commit 7a1411c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
5 changes: 1 addition & 4 deletions controller/metrics/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ func Run(cfg *Config) error {
}

go func() {
ingester := &UsageIngester{}
for {
select {
case event := <-events:
if err := ingester.Ingest(event); err != nil {
logrus.Error(err)
}
logrus.Info(Ingest(event))
}
}
}()
Expand Down
29 changes: 29 additions & 0 deletions controller/metrics/model.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
package metrics

import (
"fmt"
"github.com/openziti/zrok/util"
"time"
)

type Usage struct {
ProcessedStamp time.Time
IntervalStart time.Time
ZitiServiceId string
ZitiCircuitId string
FrontendTx int64
FrontendRx int64
BackendTx int64
BackendRx int64
}

func (u Usage) String() string {
out := "Usage {"
out += fmt.Sprintf("processed '%v'", u.ProcessedStamp)
out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart)
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
out += "}"
return out
}

type Source interface {
Start(chan map[string]interface{}) (chan struct{}, error)
Stop()
Expand Down
63 changes: 31 additions & 32 deletions controller/metrics/usageIngester.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
package metrics

import (
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
"reflect"
"time"
)

type UsageIngester struct{}

func (i *UsageIngester) Ingest(event map[string]interface{}) error {
func Ingest(event map[string]interface{}) *Usage {
u := &Usage{ProcessedStamp: time.Now()}
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
start := float64(0)
if v, found := event["interval_start_utc"]; found {
if vFloat64, ok := v.(float64); ok {
start = vFloat64
u.IntervalStart = time.Unix(int64(vFloat64), 0)
} else {
logrus.Error("unable to assert 'interval_start_utc'")
}
} else {
logrus.Error("missing 'interval_start_utc'")
}
clientId := ""
serviceId := ""
if v, found := event["tags"]; found {
if tags, ok := v.(map[string]interface{}); ok {
if v, found := tags["clientId"]; found {
if vStr, ok := v.(string); ok {
clientId = vStr
} else {
logrus.Error("unable to assert 'tags/clientId'")
}
} else {
logrus.Errorf("missing 'tags/clientId'")
}
if v, found := tags["serviceId"]; found {
if vStr, ok := v.(string); ok {
serviceId = vStr
u.ZitiServiceId = vStr
} else {
logrus.Error("unable to assert 'tags/serviceId'")
}
Expand All @@ -48,49 +35,61 @@ func (i *UsageIngester) Ingest(event map[string]interface{}) error {
} else {
logrus.Errorf("missing 'tags'")
}
tx := int64(0)
rx := int64(0)
if v, found := event["usage"]; found {
if usage, ok := v.(map[string]interface{}); ok {
if v, found := usage["ingress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.tx'")
}
} else {
logrus.Warn("missing 'usage/ingress.tx'")
}
if v, found := usage["ingress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.rx")
}
} else {
logrus.Warn("missing 'usage/ingress.rx")
}
if v, found := usage["egress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
tx = int64(vFloat64)
u.BackendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.tx'")
}
} else {
logrus.Error("missing 'usage/egress.tx'")
logrus.Warn("missing 'usage/egress.tx'")
}
if v, found := usage["egress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
rx = int64(vFloat64)
u.BackendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.rx'")
}
} else {
logrus.Error("missing 'usage/egress.rx'")
logrus.Warn("missing 'usage/egress.rx'")
}
} else {
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
}
} else {
logrus.Error("missing 'usage'")
logrus.Warnf("missing 'usage'")
}
circuitId := ""
if v, found := event["circuit_id"]; found {
if vStr, ok := v.(string); ok {
circuitId = vStr
u.ZitiCircuitId = vStr
} else {
logrus.Error("unable to assert 'circuit_id'")
}
} else {
logrus.Error("missing 'circuit_id'")
logrus.Warn("missing 'circuit_id'")
}

logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx))

} else {
logrus.Errorf("not 'fabric.usage'")
}
return nil
return u
}
5 changes: 1 addition & 4 deletions controller/usageAgent.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
break
}
if err == nil {
ui := &metrics.UsageIngester{}
if err := ui.Ingest(event); err != nil {
logrus.Errorf("error ingesting '%v': %v", string(msg.Body), err)
}
logrus.Info(metrics.Ingest(event))
} else {
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
}
Expand Down

0 comments on commit 7a1411c

Please sign in to comment.