Skip to content

Commit

Permalink
Merge pull request #12 from led0nk/fixes
Browse files Browse the repository at this point in the history
Fixes to file-saving, config, serviceManager, merged oiverseer into observer
  • Loading branch information
led0nk authored Jun 5, 2024
2 parents 010816e + 0061c4a commit dce7caa
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 325 deletions.
57 changes: 26 additions & 31 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
blist "github.com/led0nk/ark-overseer/internal/blacklist"
"github.com/led0nk/ark-overseer/internal/jsondb"
"github.com/led0nk/ark-overseer/internal/notifier"
"github.com/led0nk/ark-overseer/internal/overseer"
v1 "github.com/led0nk/ark-overseer/internal/server"
"github.com/led0nk/ark-overseer/internal/services"
"github.com/led0nk/ark-overseer/observer"
Expand All @@ -33,10 +32,10 @@ func main() {
configPath = flag.String("config", "config", "path to config-file")
sStore internal.ServerStore
obs internal.Observer
ovs internal.Overseer
blacklist internal.Blacklist
cfg config.Configuration
logLevel slog.Level
wg sync.WaitGroup
shutdownWg sync.WaitGroup
initWg sync.WaitGroup
)
flag.Parse()
Expand All @@ -54,27 +53,21 @@ func main() {

logger.Info("server address", "addr", *addr)

cfg, err := config.NewConfiguration(*configPath + "/config.yaml")
if err != nil {
logger.Error("failed to create new config", "error", err)
}

sStore, err = jsondb.NewServerStorage(ctx, *db+"/cluster.json")
if err != nil {
logger.ErrorContext(ctx, "failed to create new cluster", "error", err)
os.Exit(1)
}

em := events.NewEventManager()
sm := services.NewServiceManager(em)
sm := services.NewServiceManager(em, &initWg)

notify := notifier.NewNotifier(sStore, em)
sStore = notify

obs, err = observer.NewObserver(ctx, sStore)
cfg, err = config.NewConfiguration(*configPath+"/config.yaml", em)
if err != nil {
logger.ErrorContext(ctx, "failed to create endpoint storage", "error", err)
os.Exit(1)
logger.Error("failed to create new config", "error", err)
}

blacklist, err = blist.NewBlacklist(*blpath + "/blacklist.json")
Expand All @@ -83,36 +76,30 @@ func main() {
os.Exit(1)
}

ovs, err = overseer.NewOverseer(ctx, sStore, blacklist, em)
obs, err = observer.NewObserver(ctx, sStore, blacklist, em)
if err != nil {
logger.ErrorContext(ctx, "failed to create overseer", "error", err)
logger.ErrorContext(ctx, "failed to create endpoint storage", "error", err)
os.Exit(1)
}

wg.Add(1)
shutdownWg.Add(1)
go func() {
defer wg.Done()
defer shutdownWg.Done()
em.StartListening(ctx, sm, "serviceManager")
}()

//TODO: Wait group for initialization
initWg.Add(1)
//NOTE: Wait group for initialization, 2 because the first 1 is the publish for init.services and the 2nd is the handled event
initWg.Add(2)
go func() {
defer initWg.Done()
em.Publish(events.EventMessage{Type: "init.services", Payload: cfg})
}()

wg.Add(1)
go func() {
defer wg.Done()
shutdownWg.Add(1)
go func(cfg config.Configuration) {
defer shutdownWg.Done()
em.StartListening(ctx, obs, "observer")
}()

wg.Add(1)
go func() {
defer wg.Done()
em.StartListening(ctx, ovs, "overseer")
}()
}(cfg)

initWg.Wait()
initWg.Add(1)
Expand All @@ -123,9 +110,9 @@ func main() {

server := v1.NewServer(*addr, *domain, logger, sStore, blacklist, cfg)

wg.Add(1)
shutdownWg.Add(1)
go func() {
defer wg.Done()
defer shutdownWg.Done()
err := server.ServeHTTP(ctx)
if err != nil {
logger.ErrorContext(ctx, "failed to shutdown http server", "error", err)
Expand All @@ -143,6 +130,14 @@ func main() {
cancel()
}()

wg.Wait()
shutdownWg.Wait()
shutdownWg.Add(1)

logger.InfoContext(ctx, "finally saving server storage", "info", "shutdown")
err = sStore.Save()
if err != nil {
logger.ErrorContext(ctx, "failed to save server storage", "error", err)
}

logger.InfoContext(ctx, "application stopped gracefully", "info", "shutdown")
}
1 change: 0 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
{}
13 changes: 3 additions & 10 deletions internal/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ type ServerStore interface {
GetByID(context.Context, uuid.UUID) (*model.Server, error)
Delete(context.Context, uuid.UUID) error
Update(context.Context, *model.Server) error
Save() error
}

type Observer interface {
ReadEndpoint(*model.Server) error
DataScraper(context.Context, *model.Server)
DataScraper(context.Context, *model.Server) chan *model.Server
Scanner(context.Context, chan *model.Server) chan *model.Server
SpawnScraper(context.Context)
AddScraper(context.Context, *model.Server) error
KillScraper(uuid.UUID) error
Expand All @@ -32,15 +34,6 @@ type Blacklist interface {
Delete(context.Context, uuid.UUID) error
}

type Overseer interface {
ReadEndpoint(*model.Server) error
Scanner(context.Context, *model.Server)
SpawnScanner(context.Context)
AddScanner(context.Context, *model.Server) error
KillScanner(uuid.UUID) error
HandleEvent(context.Context, events.EventMessage)
}

type Notification interface {
Connect(context.Context) error
Send(context.Context, string) error
Expand Down
62 changes: 38 additions & 24 deletions internal/jsondb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"sort"
"sync"
"time"

"github.com/google/uuid"
"github.com/led0nk/ark-overseer/internal/model"
Expand All @@ -20,42 +21,58 @@ type ServerStorage struct {
}

func NewServerStorage(ctx context.Context, filename string) (*ServerStorage, error) {
cluster := &ServerStorage{
store := &ServerStorage{
filename: filename,
server: make(map[uuid.UUID]*model.Server),
}
if err := cluster.readJSON(ctx); err != nil {
if err := store.load(); err != nil {
return nil, err
}
return cluster, nil

go store.autoSave(ctx)

return store, nil
}

func (s *ServerStorage) writeJSON(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
as_json, err := json.MarshalIndent(s.server, "", "\t")
if err != nil {
return err
}
func (s *ServerStorage) Save() error {
s.mu.Lock()
defer s.mu.Unlock()
as_json, err := json.MarshalIndent(s.server, "", "\t")
if err != nil {
return err
}

err = os.WriteFile(s.filename, as_json, 0644)
if err != nil {
return err
err = os.WriteFile(s.filename, as_json, 0644)
if err != nil {
return err
}
return nil
}

func (s *ServerStorage) autoSave(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := s.Save()
if err != nil {
return
}
}
return nil
}
}

// read JSON data from file = filename
func (s *ServerStorage) readJSON(ctx context.Context) error {
func (s *ServerStorage) load() error {
if _, err := os.Stat(s.filename); os.IsNotExist(err) {
err = os.MkdirAll(filepath.Dir(s.filename), 0777)
if err != nil {
return err
}
err = s.writeJSON(ctx)
err = s.Save()
if err != nil {
return err
}
Expand All @@ -76,7 +93,7 @@ func (s *ServerStorage) Create(ctx context.Context, server *model.Server) (*mode
}

s.server[server.ID] = server
if err := s.writeJSON(ctx); err != nil {
if err := s.Save(); err != nil {
return nil, err
}

Expand All @@ -92,7 +109,7 @@ func (s *ServerStorage) Update(ctx context.Context, server *model.Server) error
return ctx.Err()
default:
s.server[server.ID] = server
return s.writeJSON(ctx)
return nil
}
}

Expand Down Expand Up @@ -144,9 +161,6 @@ func (s *ServerStorage) Delete(ctx context.Context, ID uuid.UUID) error {

delete(s.server, ID)

if err := s.writeJSON(ctx); err != nil {
return err
}
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ func (n *Notifier) Delete(ctx context.Context, id uuid.UUID) error {
}

func (n *Notifier) GetByID(ctx context.Context, id uuid.UUID) (*model.Server, error) {
//n.notify("get by id")
return n.sStore.GetByID(ctx, id)
}

func (n *Notifier) GetByName(ctx context.Context, name string) (*model.Server, error) {
//n.notify("get by name")
return n.sStore.GetByName(ctx, name)
}

func (n *Notifier) List(ctx context.Context) ([]*model.Server, error) {
//n.notify("list")
return n.sStore.List(ctx)
}

func (n *Notifier) Update(ctx context.Context, srv *model.Server) error {
//(n.notify("update")
return n.sStore.Update(ctx, srv)
}

func (n *Notifier) Save() error {
return n.sStore.Save()
}
Loading

0 comments on commit dce7caa

Please sign in to comment.