Skip to content

Commit

Permalink
server: refactor server setup
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed May 4, 2024
1 parent 8d45a51 commit 105473e
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 211 deletions.
218 changes: 7 additions & 211 deletions cli/server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,15 @@ package server
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"

picoconfig "github.com/andydunstall/pico/pkg/config"
"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/server"
"github.com/andydunstall/pico/server/config"
"github.com/andydunstall/pico/server/gossip"
"github.com/andydunstall/pico/server/netmap"
proxy "github.com/andydunstall/pico/server/proxy"
adminserver "github.com/andydunstall/pico/server/server/admin"
proxyserver "github.com/andydunstall/pico/server/server/proxy"
upstreamserver "github.com/andydunstall/pico/server/server/upstream"
"github.com/hashicorp/go-sockaddr"
rungroup "github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -105,39 +95,6 @@ default value can be given using form ${VAR:default}.`,
os.Exit(1)
}

if conf.Cluster.NodeID == "" {
nodeID := netmap.GenerateNodeID()
if conf.Cluster.NodeIDPrefix != "" {
nodeID = conf.Cluster.NodeIDPrefix + nodeID
}
conf.Cluster.NodeID = nodeID
}

if conf.Proxy.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Proxy.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Proxy.AdvertiseAddr = advertiseAddr
}
if conf.Admin.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Admin.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Admin.AdvertiseAddr = advertiseAddr
}
if conf.Gossip.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Gossip.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Gossip.AdvertiseAddr = advertiseAddr
}

if err := run(&conf, logger); err != nil {
logger.Error("failed to run server", zap.Error(err))
os.Exit(1)
Expand All @@ -148,74 +105,10 @@ default value can be given using form ${VAR:default}.`,
}

func run(conf *config.Config, logger log.Logger) error {
logger.Info("starting pico server", zap.Any("conf", conf))

registry := prometheus.NewRegistry()

adminLn, err := net.Listen("tcp", conf.Admin.BindAddr)
if err != nil {
return fmt.Errorf("admin listen: %s: %w", conf.Admin.BindAddr, err)
}
adminServer := adminserver.NewServer(
adminLn,
registry,
logger,
)

networkMap := netmap.NewNetworkMap(&netmap.Node{
ID: conf.Cluster.NodeID,
ProxyAddr: conf.Proxy.AdvertiseAddr,
AdminAddr: conf.Admin.AdvertiseAddr,
}, logger)
networkMap.Metrics().Register(registry)
adminServer.AddStatus("/netmap", netmap.NewStatus(networkMap))

gossiper, err := gossip.NewGossip(networkMap, conf.Gossip, logger)
if err != nil {
return fmt.Errorf("gossip: %w", err)
}
defer gossiper.Close()
adminServer.AddStatus("/gossip", gossip.NewStatus(gossiper))

// Attempt to join an existing cluster. Note if 'join' is a domain that
// doesn't map to any entries (except ourselves), then join will succeed
// since it means we're the first member.
nodeIDs, err := gossiper.Join(conf.Cluster.Join)
if err != nil {
return fmt.Errorf("join cluster: %w", err)
}
if len(nodeIDs) > 0 {
logger.Info(
"joined cluster",
zap.Strings("node-ids", nodeIDs),
)
}

p := proxy.NewProxy(networkMap, proxy.WithLogger(logger))
adminServer.AddStatus("/proxy", proxy.NewStatus(p))

proxyLn, err := net.Listen("tcp", conf.Proxy.BindAddr)
server, err := server.NewServer(conf, logger)
if err != nil {
return fmt.Errorf("proxy listen: %s: %w", conf.Proxy.BindAddr, err)
return fmt.Errorf("server: %w", err)
}
proxyServer := proxyserver.NewServer(
proxyLn,
p,
&conf.Proxy,
registry,
logger,
)

upstreamLn, err := net.Listen("tcp", conf.Upstream.BindAddr)
if err != nil {
return fmt.Errorf("upstream listen: %s: %w", conf.Upstream.BindAddr, err)
}
upstreamServer := upstreamserver.NewServer(
upstreamLn,
p,
registry,
logger,
)

var group rungroup.Group

Expand All @@ -230,21 +123,6 @@ func run(conf *config.Config, logger log.Logger) error {
"received shutdown signal",
zap.String("signal", sig.String()),
)

leaveCtx, cancel := context.WithTimeout(
context.Background(),
time.Duration(conf.Server.GracefulShutdownTimeout)*time.Second,
)
defer cancel()

// Leave as soon as we receive the shutdown signal to avoid receiving
// forward proxy requests.
if err := gossiper.Leave(leaveCtx); err != nil {
logger.Warn("failed to gracefully leave cluster", zap.Error(err))
} else {
logger.Info("left cluster")
}

return nil
case <-signalCtx.Done():
return nil
Expand All @@ -253,94 +131,12 @@ func run(conf *config.Config, logger log.Logger) error {
signalCancel()
})

// Proxy server.
group.Add(func() error {
if err := proxyServer.Serve(); err != nil {
return fmt.Errorf("proxy server serve: %w", err)
}
return nil
}, func(error) {
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Duration(conf.Server.GracefulShutdownTimeout)*time.Second,
)
defer cancel()

if err := proxyServer.Shutdown(shutdownCtx); err != nil {
logger.Warn("failed to gracefully shutdown proxy server", zap.Error(err))
}

logger.Info("proxy server shut down")
})

// Upstream server.
group.Add(func() error {
if err := upstreamServer.Serve(); err != nil {
return fmt.Errorf("upstream server serve: %w", err)
}
return nil
}, func(error) {
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Duration(conf.Server.GracefulShutdownTimeout)*time.Second,
)
defer cancel()

if err := upstreamServer.Shutdown(shutdownCtx); err != nil {
logger.Warn("failed to gracefully shutdown upstream server", zap.Error(err))
}

logger.Info("upstream server shut down")
})

// Admin server.
runCtx, runCancel := context.WithCancel(context.Background())
group.Add(func() error {
if err := adminServer.Serve(); err != nil {
return fmt.Errorf("admin server serve: %w", err)
}
return nil
return server.Run(runCtx)
}, func(error) {
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Duration(conf.Server.GracefulShutdownTimeout)*time.Second,
)
defer cancel()

if err := adminServer.Shutdown(shutdownCtx); err != nil {
logger.Warn("failed to gracefully shutdown server", zap.Error(err))
}

logger.Info("admin server shut down")
runCancel()
})

if err := group.Run(); err != nil {
return err
}

logger.Info("shutdown complete")

return nil
}

func advertiseAddrFromBindAddr(bindAddr string) (string, error) {
if strings.HasPrefix(bindAddr, ":") {
bindAddr = "0.0.0.0" + bindAddr
}

host, port, err := net.SplitHostPort(bindAddr)
if err != nil {
return "", fmt.Errorf("invalid bind addr: %s: %w", bindAddr, err)
}

if host == "0.0.0.0" {
ip, err := sockaddr.GetPrivateIP()
if err != nil {
return "", fmt.Errorf("get interface addr: %w", err)
}
if ip == "" {
return "", fmt.Errorf("no private ip found")
}
return ip + ":" + port, nil
}
return bindAddr, nil
return group.Run()
}
18 changes: 18 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (c *ProxyConfig) Validate() error {
type UpstreamConfig struct {
// BindAddr is the address to bind to listen for incoming HTTP connections.
BindAddr string `json:"bind_addr" yaml:"bind_addr"`

// AdvertiseAddr is the address to advertise to other nodes.
AdvertiseAddr string `json:"advertise_addr" yaml:"advertise_addr"`
}

func (c *UpstreamConfig) Validate() error {
Expand Down Expand Up @@ -162,6 +165,21 @@ The host/port to listen for connections from upstream listeners.
If the host is unspecified it defaults to all listeners, such as
'--proxy.bind-addr :8001' will listen on '0.0.0.0:8001'`,
)
fs.StringVar(
&c.Upstream.AdvertiseAddr,
"upstream.advertise-addr",
"",
`
Upstream listen address to advertise to other nodes in the cluster.
Such as if the listen address is ':8001', the advertised address may be
'10.26.104.45:8001' or 'node1.cluster:8001'.
By default, if the bind address includes an IP to bind to that will be used.
If the bind address does not include an IP (such as ':8001') the nodes
private IP will be used, such as a bind address of ':8001' may have an
advertise address of '10.16.104.14:8001'.`,
)

fs.StringVar(
&c.Admin.BindAddr,
Expand Down
3 changes: 3 additions & 0 deletions server/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (p *Proxy) Request(
zap.String("host", r.Host),
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Bool("forwarded", forwarded),
)

endpointID := endpointIDFromRequest(r)
Expand All @@ -67,6 +68,8 @@ func (p *Proxy) Request(
return errorResponse(http.StatusBadRequest, "missing pico endpoint id")
}

logger = logger.With(zap.String("endpoint-id", endpointID))

start := time.Now()

// Attempt to send to an endpoint connected to the local node.
Expand Down
Loading

0 comments on commit 105473e

Please sign in to comment.