Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification Enhancements include reminder #255

Merged
merged 5 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ We welcome contributions! Many people all over the world have helped make this p


## Filing issues
(TBD)
(TBD)
20 changes: 12 additions & 8 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,31 @@ func createInitCommand() *cobra.Command {
template := `vatz_protocol_info:
protocol_identifier: "Put Your Protocol here"
port: 9090
health_checker_schedule:
- "0 1 * * *"
notification_info:
host_name: "Your machine name"
host_name: "Put your machine's host name"
default_reminder_schedule:
- "*/15 * * * *"
dispatch_channels:
- channel: "discord"
secret: "Your channel secret"
secret: "Put your Discord Webhook"
- channel: "telegram"
secret: "Your channel secret"
chat_id: "482109801"
health_checker_schedule:
- "0 1 * * *"
secret: "Put Your Bot's Token"
chat_id: "Put Your Chat's chat_id'"
reminder_schedule:
- "*/5 * * * *"
plugins_infos:
default_verify_interval: 15
default_execute_interval: 30
default_plugin_name: "vatz-plugin"
plugins:
- plugin_name: "sample1"
- plugin_name: "samplePlugin1"
plugin_address: "localhost"
plugin_port: 9001
executable_methods:
- method_name: "sampleMethod1"
- plugin_name: "sample2"
- plugin_name: "samplePlugin2"
plugin_address: "localhost"
verify_interval: 7
execute_interval: 9
Expand Down
13 changes: 6 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func initiateServer(ch <-chan os.Signal) error {
reflection.Register(s)

vatzConfig := cfg.Vatz

addr := fmt.Sprintf(":%d", vatzConfig.Port)
err := healthChecker.VATZHealthCheck(vatzConfig.HealthCheckerSchedule, dispatchers)
if err != nil {
Expand All @@ -75,8 +74,9 @@ func initiateServer(ch <-chan os.Signal) error {

listener, err := net.Listen("tcp", addr)
if err != nil {
log.Error().Str("module", "main").Msgf("VATZ listener Error: %s", err)
log.Error().Str("module", "main").Msgf("VATZ Listener Error: %s", err)
}

log.Info().Str("module", "main").Msgf("VATZ Listening Port: %s", addr)
startExecutor(cfg.PluginInfos, ch)

Expand All @@ -86,7 +86,6 @@ func initiateServer(ch <-chan os.Signal) error {
}()

log.Info().Str("module", "main").Msg("VATZ Manager Started")

initHealthServer(s)
if err := s.Serve(listener); err != nil {
log.Panic().Str("module", "main").Msgf("Serve Error: %s", err)
Expand Down Expand Up @@ -133,7 +132,7 @@ func getClients(plugins []config.Plugin) []pluginpb.PluginClient {

func multiPluginExecutor(plugin config.Plugin,
singleClient pluginpb.PluginClient,
isOkayToSend bool,
okToSend bool,
quit <-chan os.Signal) {

verifyTicker := time.NewTicker(time.Duration(plugin.VerifyInterval) * time.Second)
Expand All @@ -145,12 +144,12 @@ func multiPluginExecutor(plugin config.Plugin,
case <-verifyTicker.C:
live, _ := healthChecker.PluginHealthCheck(ctx, singleClient, plugin, dispatchers)
if live == tp.AliveStatusUp {
isOkayToSend = true
okToSend = true
} else {
isOkayToSend = false
okToSend = false
}
case <-executeTicker.C:
if isOkayToSend == true {
if okToSend == true {
err := executor.Execute(ctx, singleClient, plugin, dispatchers)
if err != nil {
log.Error().Str("module", "main").Msgf("Executor Error: %s", err)
Expand Down
14 changes: 7 additions & 7 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ type Config struct {
}

type NotificationInfo struct {
DiscordSecret string `yaml:"discord_secret"`
PagerDutySecret string `yaml:"pager_duty_secret"`
HostName string `yaml:"host_name"`
DispatchChannels []struct {
Channel string `yaml:"channel"`
Secret string `yaml:"secret"`
ChatID string `yaml:"chat_id"`
HostName string `yaml:"host_name"`
DefaultReminderSchedule []string `yaml:"default_reminder_schedule"`
DispatchChannels []struct {
Channel string `yaml:"channel"`
Secret string `yaml:"secret"`
ChatID string `yaml:"chat_id"`
ReminderSchedule []string `yaml:"reminder_schedule"`
} `yaml:"dispatch_channels"`
}

Expand Down
6 changes: 0 additions & 6 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ func TestDefaultConfig(t *testing.T) {
// Asserts.
assert.Equal(t, test.ExpectProtocolID, cfg.Vatz.ProtocolIdentifier)
assert.Equal(t, test.ExpectVatzPort, cfg.Vatz.Port)
assert.Equal(t, test.ExpectDiscordSecret, cfg.Vatz.NotificationInfo.DiscordSecret)
assert.Equal(t, test.ExpectPagerDutySecret, cfg.Vatz.NotificationInfo.PagerDutySecret)
assert.Equal(t, test.ExpectHostName, cfg.Vatz.NotificationInfo.HostName)
for i, dispatchChannel := range test.DispatchChannels {
assert.Equal(t, dispatchChannel.ExpectChannel, cfg.Vatz.NotificationInfo.DispatchChannels[i].Channel)
Expand Down Expand Up @@ -180,8 +178,6 @@ func TestOverrideDefaultValues(t *testing.T) {
// Asserts.
assert.Equal(t, test.ExpectProtocolID, cfg.Vatz.ProtocolIdentifier)
assert.Equal(t, test.ExpectVatzPort, cfg.Vatz.Port)
assert.Equal(t, test.ExpectDiscordSecret, cfg.Vatz.NotificationInfo.DiscordSecret)
assert.Equal(t, test.ExpectPagerDutySecret, cfg.Vatz.NotificationInfo.PagerDutySecret)
assert.Equal(t, test.ExpectHostName, cfg.Vatz.NotificationInfo.HostName)
for i, dispatchChannel := range test.DispatchChannels {
assert.Equal(t, dispatchChannel.ExpectChannel, cfg.Vatz.NotificationInfo.DispatchChannels[i].Channel)
Expand Down Expand Up @@ -363,8 +359,6 @@ func TestGetConfig(t *testing.T) {
// Asserts.
assert.Equal(t, test.ExpectProtocolID, cfg.Vatz.ProtocolIdentifier)
assert.Equal(t, test.ExpectVatzPort, cfg.Vatz.Port)
assert.Equal(t, test.ExpectDiscordSecret, cfg.Vatz.NotificationInfo.DiscordSecret)
assert.Equal(t, test.ExpectPagerDutySecret, cfg.Vatz.NotificationInfo.PagerDutySecret)
assert.Equal(t, test.ExpectHostName, cfg.Vatz.NotificationInfo.HostName)
for i, dispatchChannel := range test.DispatchChannels {
assert.Equal(t, dispatchChannel.ExpectChannel, cfg.Vatz.NotificationInfo.DispatchChannels[i].Channel)
Expand Down
83 changes: 67 additions & 16 deletions manager/dispatcher/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package dispatcher
import (
"bytes"
"encoding/json"
"fmt"
"github.com/robfig/cron/v3"
"net/http"
"strings"
"sync"
"time"

pluginpb "github.com/dsrvlabs/vatz-proto/plugin/v1"
pb "github.com/dsrvlabs/vatz-proto/plugin/v1"
tp "github.com/dsrvlabs/vatz/manager/types"
"github.com/rs/zerolog/log"
)
Expand All @@ -25,19 +28,56 @@ const (
)

type discord struct {
channel tp.Channel
secret string
host string
channel tp.Channel
secret string
reminderSchedule []string
reminderCron *cron.Cron
entry sync.Map
}

func (d discord) SendNotification(request tp.ReqMsg) error {
err := d.sendNotificationForDiscord(request, d.secret)
if err != nil {
panic(err)
func (d *discord) SetDispatcher(firstRunMsg bool, preStat tp.StateFlag, notifyInfo tp.NotifyInfo) error {
reqToNotify, reminderState, deliverMessage := messageHandler(firstRunMsg, preStat, notifyInfo)
methodName := notifyInfo.Method

if reqToNotify {
d.SendNotification(deliverMessage)
}

if reminderState == tp.ON {
newEntries := []cron.EntryID{}
//In case of reminder has to keep but stateFlag has changed,
//e.g.) CRITICAL -> WARNING
//e.g.) ERROR -> INFO -> ERROR
if entries, ok := d.entry.Load(methodName); ok {
for _, entry := range entries.([]cron.EntryID) {
d.reminderCron.Remove(entry)
}
d.reminderCron.Stop()
}
for _, schedule := range d.reminderSchedule {
id, _ := d.reminderCron.AddFunc(schedule, func() {
d.SendNotification(deliverMessage)
})
newEntries = append(newEntries, id)
}
d.entry.Store(methodName, newEntries)
d.reminderCron.Start()

} else if reminderState == tp.OFF {
entries, _ := d.entry.Load(methodName)
for _, entity := range entries.([]cron.EntryID) {
{
d.reminderCron.Remove(entity)
}
d.reminderCron.Stop()
}
}

return nil
}

func (d discord) sendNotificationForDiscord(msg tp.ReqMsg, webhook string) error {
func (d *discord) SendNotification(msg tp.ReqMsg) error {
if msg.ResourceType == "" {
msg.ResourceType = "No Resource Type"
}
Expand All @@ -46,29 +86,40 @@ func (d discord) sendNotificationForDiscord(msg tp.ReqMsg, webhook string) error
}

// Check discord secret
if strings.Contains(webhook, discordWebhookFormat) {
if strings.Contains(d.secret, discordWebhookFormat) {
sMsg := tp.DiscordMsg{Embeds: make([]tp.Embed, 1)}
emoji := "🚨"
switch msg.Severity {
case pluginpb.SEVERITY_CRITICAL:
case pb.SEVERITY_CRITICAL:
sMsg.Embeds[0].Color = discordRed
case pluginpb.SEVERITY_WARNING:
case pb.SEVERITY_WARNING:
sMsg.Embeds[0].Color = discordYellow
case pluginpb.SEVERITY_INFO:
sMsg.Embeds[0].Color = discordBlue
case pb.SEVERITY_INFO:
sMsg.Embeds[0].Color = discordGreen
default:
sMsg.Embeds[0].Color = discordGray
}

sMsg.Embeds[0].Title = msg.Severity.String()
sMsg.Embeds[0].Fields = []tp.Field{{Name: msg.ResourceType, Value: msg.Msg, Inline: false}}
if msg.State == pb.STATE_SUCCESS {
if msg.Severity == pb.SEVERITY_CRITICAL {
emoji = "‼️"
} else if msg.Severity == pb.SEVERITY_WARNING {
emoji = "❗"
} else if msg.Severity == pb.SEVERITY_INFO {
emoji = "✅"
}
}

sMsg.Embeds[0].Title = fmt.Sprintf(`%s %s`, emoji, msg.Severity.String())
sMsg.Embeds[0].Fields = []tp.Field{{Name: "(" + d.host + ") " + msg.ResourceType, Value: msg.Msg, Inline: false}}
sMsg.Embeds[0].Timestamp = time.Now()

message, err := json.Marshal(sMsg)
if err != nil {
return err
}

req, _ := http.NewRequest("POST", webhook, bytes.NewReader(message))
req, _ := http.NewRequest("POST", d.secret, bytes.NewReader(message))
req.Header.Set("Content-Type", "application/json")
c := &http.Client{}
_, err = c.Do(req)
Expand Down
87 changes: 68 additions & 19 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package dispatcher

import (
"fmt"
"sync"

"errors"
pb "github.com/dsrvlabs/vatz-proto/plugin/v1"
"github.com/dsrvlabs/vatz/manager/config"
tp "github.com/dsrvlabs/vatz/manager/types"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
"strings"
"sync"
"time"
)

/* TODO: Discussion.
Expand All @@ -21,30 +25,75 @@ var (

// Dispatcher Notification provides interfaces to send alert dispatcher message with variable channel.
type Dispatcher interface {
SetDispatcher(firstExecution bool, previousFlag tp.StateFlag, notifyInfo tp.NotifyInfo) error
SendNotification(request tp.ReqMsg) error
}

func GetDispatchers(cfg config.NotificationInfo) []Dispatcher {
if len(cfg.DispatchChannels) == 0 {
dpError := errors.New("Error: No Dispatcher has set.")
log.Error().Str("module", "dispatcher").Msg("Please, Set at least a channel for dispatcher, e.g.) Discord or Telegram")
panic(dpError)
}

dispatcherOnce.Do(func() {
for _, chanInfo := range cfg.DispatchChannels {
switch chanInfo.Channel {
case "discord":
discord := &discord{
channel: tp.Discord,
secret: chanInfo.Secret,
}
dispatcherSingletons = append(dispatcherSingletons, discord)
case "telegram":
telegram := &telegram{
channel: tp.Telegram,
secret: chanInfo.Secret,
chatID: chanInfo.ChatID,
}
dispatcherSingletons = append(dispatcherSingletons, telegram)
default:
fmt.Println(chanInfo.Channel, "is not work")
if len(chanInfo.ReminderSchedule) == 0 {
chanInfo.ReminderSchedule = cfg.DefaultReminderSchedule
}
switch channel := chanInfo.Channel; {
case strings.EqualFold(channel, string(tp.Discord)):
dispatcherSingletons = append(dispatcherSingletons, &discord{
host: cfg.HostName,
channel: tp.Discord,
secret: chanInfo.Secret,
reminderCron: cron.New(cron.WithLocation(time.UTC)),
reminderSchedule: chanInfo.ReminderSchedule,
entry: sync.Map{},
})
case strings.EqualFold(channel, string(tp.Telegram)):
dispatcherSingletons = append(dispatcherSingletons, &telegram{
host: cfg.HostName,
channel: tp.Telegram,
secret: chanInfo.Secret,
chatID: chanInfo.ChatID,
reminderCron: cron.New(cron.WithLocation(time.UTC)),
reminderSchedule: chanInfo.ReminderSchedule,
entry: sync.Map{},
})
}
}
})

return dispatcherSingletons
}

func messageHandler(isFirst bool, preStat tp.StateFlag, info tp.NotifyInfo) (bool, tp.Reminder, tp.ReqMsg) {
notifyOn := false
reminderState := tp.HANG
isFlagStateChanged := false

if preStat.State != info.State || preStat.Severity != info.Severity {
isFlagStateChanged = true
}

if info.State == pb.STATE_FAILURE ||
(info.State == pb.STATE_SUCCESS && info.Severity == pb.SEVERITY_WARNING) ||
(info.State == pb.STATE_SUCCESS && info.Severity == pb.SEVERITY_CRITICAL) {
if isFirst || isFlagStateChanged {
notifyOn = true
reminderState = tp.ON
}
} else if info.State == pb.STATE_SUCCESS && info.Severity == pb.SEVERITY_INFO && isFlagStateChanged {
notifyOn = true
reminderState = tp.OFF
}

return notifyOn, reminderState, tp.ReqMsg{
FuncName: info.Method,
State: info.State,
Msg: info.ExecuteMsg,
Severity: info.Severity,
ResourceType: info.Plugin,
}
}
Loading