Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple endpoints #880

Merged
merged 8 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 75 additions & 56 deletions cmd/fleet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,83 +39,102 @@ func diagConn(c net.Conn, s http.ConnState) {
}

func runServer(ctx context.Context, router http.Handler, cfg *config.Server) error {
addr := cfg.BindAddress()
listeners := cfg.BindEndpoints()
rdto := cfg.Timeouts.Read
wrto := cfg.Timeouts.Write
idle := cfg.Timeouts.Idle
rdhr := cfg.Timeouts.ReadHeader
mhbz := cfg.Limits.MaxHeaderByteSize
bctx := func(net.Listener) context.Context { return ctx }

log.Info().
Str("bind", addr).
Dur("rdTimeout", rdto).
Dur("wrTimeout", wrto).
Msg("server listening")

server := http.Server{
Addr: addr,
ReadTimeout: rdto,
WriteTimeout: wrto,
IdleTimeout: idle,
ReadHeaderTimeout: rdhr,
Handler: router,
BaseContext: bctx,
ConnState: diagConn,
MaxHeaderBytes: mhbz,
ErrorLog: errLogger(),
}
errChan := make(chan error)
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

forceCh := make(chan struct{})
defer close(forceCh)

// handler to close server
go func() {
select {
case <-ctx.Done():
log.Debug().Msg("force server close on ctx.Done()")
server.Close()
case <-forceCh:
log.Debug().Msg("go routine forced closed on exit")
for _, addr := range listeners {
log.Info().
Str("bind", addr).
Dur("rdTimeout", rdto).
Dur("wrTimeout", wrto).
Msg("server listening")

server := http.Server{
Addr: addr,
ReadTimeout: rdto,
WriteTimeout: wrto,
IdleTimeout: idle,
ReadHeaderTimeout: rdhr,
Handler: router,
BaseContext: bctx,
ConnState: diagConn,
MaxHeaderBytes: mhbz,
ErrorLog: errLogger(),
}
}()

var listenCfg net.ListenConfig

ln, err := listenCfg.Listen(ctx, "tcp", addr)
if err != nil {
return err
}
forceCh := make(chan struct{})
defer close(forceCh)

// Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped
defer func() { ln.Close() }()
// handler to close server
go func() {
select {
case <-ctx.Done():
log.Debug().Msg("force server close on ctx.Done()")
server.Close()
case <-forceCh:
log.Debug().Msg("go routine forced closed on exit")
Comment on lines +83 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused what the purpose of forceCh and this message are, does this occur when the server closes with a non context.Cancelled error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i'm not really sure why this is here. i understand it closes goroutine with main func but not sure why this was a problem and why this was not addressed with cancel contexts.
not touching this atm

}
}()

// Conn Limiter must be before the TLS handshake in the stack;
// The server should not eat the cost of the handshake if there
// is no capacity to service the connection.
// Also, it appears the HTTP2 implementation depends on the tls.Listener
// being at the top of the stack.
ln = wrapConnLimitter(ctx, ln, cfg)
var listenCfg net.ListenConfig

if cfg.TLS != nil && cfg.TLS.IsEnabled() {
commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS)
ln, err := listenCfg.Listen(ctx, "tcp", addr)
if err != nil {
return err
}
server.TLSConfig = commonTlsCfg.ToConfig()

// Must enable http/2 in the configuration explicitly.
// (see https://golang.org/pkg/net/http/#Server.Serve)
server.TLSConfig.NextProtos = []string{"h2", "http/1.1"}
// Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped
defer func() { ln.Close() }()

ln = tls.NewListener(ln, server.TLSConfig)
// Conn Limiter must be before the TLS handshake in the stack;
// The server should not eat the cost of the handshake if there
// is no capacity to service the connection.
// Also, it appears the HTTP2 implementation depends on the tls.Listener
// being at the top of the stack.
ln = wrapConnLimitter(ctx, ln, cfg)

if cfg.TLS != nil && cfg.TLS.IsEnabled() {
commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS)
if err != nil {
return err
}
server.TLSConfig = commonTlsCfg.ToConfig()

// Must enable http/2 in the configuration explicitly.
// (see https://golang.org/pkg/net/http/#Server.Serve)
server.TLSConfig.NextProtos = []string{"h2", "http/1.1"}

ln = tls.NewListener(ln, server.TLSConfig)

} else {
log.Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

log.Debug().Msgf("Listening on %s", addr)

go func(ctx context.Context, errChan chan error, ln net.Listener) {
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do direct a direct comparison of the error (err != http.ErrServerClosed), or use the errors.Is() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean replacing err != nil && err != http.ErrServerClosed with err != http.ErrServerClose?
if so you might end up with sending nil to a chan

errChan <- err
}
}(cancelCtx, errChan, ln)

} else {
log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
return err
select {
case err := <-errChan:
if err != context.Canceled {
return err
}
case <-cancelCtx.Done():
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func TestConfig(t *testing.T) {
{
Type: "fleet-server",
Server: Server{
Host: "localhost",
Port: 8888,
Host: "localhost",
Port: 8888,
InternalPort: 8221,
Timeouts: ServerTimeouts{
Read: 20 * time.Second,
ReadHeader: 5 * time.Second,
Expand Down
33 changes: 31 additions & 2 deletions internal/pkg/config/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

const kDefaultHost = "0.0.0.0"
const kDefaultPort = 8220
const kDefaultInternalHost = "localhost"
const kDefaultInternalPort = 8221

// Policy is the configuration policy to use.
type Policy struct {
Expand Down Expand Up @@ -57,6 +59,7 @@ func (c *ServerBulk) InitDefaults() {
type Server struct {
Host string `config:"host"`
Port uint16 `config:"port"`
InternalPort uint16 `config:"internal_port"`
TLS *tlscommon.ServerConfig `config:"ssl"`
Timeouts ServerTimeouts `config:"timeouts"`
Profiler ServerProfiler `config:"profiler"`
Expand All @@ -73,6 +76,7 @@ type Server struct {
func (c *Server) InitDefaults() {
c.Host = kDefaultHost
c.Port = kDefaultPort
c.InternalPort = kDefaultInternalPort
c.Timeouts.InitDefaults()
c.CompressionLevel = flate.BestSpeed
c.CompressionThresh = 1024
Expand All @@ -83,13 +87,38 @@ func (c *Server) InitDefaults() {
c.GC.InitDefaults()
}

// BindEndpoints returns the binding address for the all HTTP server listeners.
func (c *Server) BindEndpoints() []string {
primaryAddress := c.BindAddress()
endpoints := make([]string, 0, 2)
endpoints = append(endpoints, primaryAddress)

if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != ":0" && internalAddress != primaryAddress {
endpoints = append(endpoints, internalAddress)
}

return endpoints
}

// BindAddress returns the binding address for the HTTP server.
func (c *Server) BindAddress() string {
host := c.Host
return bindAddress(c.Host, c.Port)
}

// BindInternalAddress returns the binding address for the internal HTTP server.
func (c *Server) BindInternalAddress() string {
if c.InternalPort <= 0 {
return bindAddress(kDefaultInternalHost, kDefaultInternalPort)
}

return bindAddress(kDefaultInternalHost, c.InternalPort)
}

func bindAddress(host string, port uint16) string {
if strings.Count(host, ":") > 1 && strings.Count(host, "]") == 0 {
host = "[" + host + "]"
}
return fmt.Sprintf("%s:%d", host, c.Port)
return fmt.Sprintf("%s:%d", host, port)
}

// Input is the input defined by Agent to run Fleet Server.
Expand Down