From 6dab877efe143fd4fae2597432a4a2002b8903ea Mon Sep 17 00:00:00 2001 From: fwkz Date: Fri, 30 Oct 2020 20:29:54 +0100 Subject: [PATCH 1/2] Stop dynamic TCP listener when upstream is no longer available --- main.go | 21 +++++++++++++++++++++ proxy/serve.go | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index e37fb9b30..35729f2e9 100644 --- a/main.go +++ b/main.go @@ -378,6 +378,7 @@ func startServers(cfg *config.Config) { case "tcp-dynamic": go func() { var buffer strings.Builder + lastPorts := []string{} for { time.Sleep(l.Refresh) table := route.GetTable() @@ -395,6 +396,10 @@ func startServers(cfg *config.Config) { } ports = unique(ports) } + for _, port := range difference(lastPorts, ports) { + log.Printf("[DEBUG] Dynamic TCP listener on port %s eligable for termination", port) + proxy.CloseDynamicProxy(port) + } for _, port := range ports { l := l port := port @@ -419,6 +424,7 @@ func startServers(cfg *config.Config) { } }() } + lastPorts = ports } }() case "https+tcp+sni": @@ -655,6 +661,21 @@ func unique(strSlice []string) []string { return list } +// difference returns elements in `a` that aren't in `b` +func difference(a, b []string) []string { + mb := make(map[string]struct{}, len(b)) + for _, x := range b { + mb[x] = struct{}{} + } + var diff []string + for _, x := range a { + if _, found := mb[x]; !found { + diff = append(diff, x) + } + } + return diff +} + func tableSchemes(r route.Routes) []string { schemes := []string{} for _, rt := range r { diff --git a/proxy/serve.go b/proxy/serve.go index 54a98a73b..41d9f5f5f 100644 --- a/proxy/serve.go +++ b/proxy/serve.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "log" "net" "net/http" "sync" @@ -27,10 +28,26 @@ type Server interface { var ( // mu guards servers which contains the list // of running proxy servers. - mu sync.Mutex - servers []Server + mu sync.Mutex + servers []Server + dynamicProxyPorts = make(map[string]Server) ) +func CloseDynamicProxy(port string) error { + if srv, ok := dynamicProxyPorts[port]; ok { + err := srv.Close() + if err != nil { + return err + } + log.Printf("[INFO] Dynamic TCP listener on port %s has been terminated", port) + mu.Lock() + servers = removeServer(servers, srv) + delete(dynamicProxyPorts, port) + mu.Unlock() + } + return nil +} + func Close() { mu.Lock() for _, srv := range servers { @@ -157,6 +174,9 @@ func ListenAndServeTCP(l config.Listen, h tcp.Handler, cfg *tls.Config) error { ReadTimeout: l.ReadTimeout, WriteTimeout: l.WriteTimeout, } + if _, ok := h.(*tcp.DynamicProxy); ok { + dynamicProxyPorts[l.Addr] = srv + } return serve(ln, srv) } @@ -177,3 +197,14 @@ func serve(ln net.Listener, srv Server) error { } return err } + +func removeServer(srvs []Server, s Server) []Server { + for i, srv := range srvs { + if s == srv { + log.Printf("[DEBUG] Removing server from the list") + srvs[i] = srvs[len(servers)-1] + return srvs[:len(srvs)-1] + } + } + return srvs +} From 10749de40f692faac3b890551c2477af323f8dae Mon Sep 17 00:00:00 2001 From: fwkz Date: Thu, 19 Nov 2020 11:09:06 +0100 Subject: [PATCH 2/2] Make `proxy.servers` a map in order to easily pick proxy to close --- main.go | 4 ++-- proxy/serve.go | 49 ++++++++++++++++++------------------------------- 2 files changed, 20 insertions(+), 33 deletions(-) diff --git a/main.go b/main.go index 35729f2e9..73155fd81 100644 --- a/main.go +++ b/main.go @@ -397,8 +397,8 @@ func startServers(cfg *config.Config) { ports = unique(ports) } for _, port := range difference(lastPorts, ports) { - log.Printf("[DEBUG] Dynamic TCP listener on port %s eligable for termination", port) - proxy.CloseDynamicProxy(port) + log.Printf("[DEBUG] Dynamic TCP listener on %s eligable for termination", port) + proxy.CloseProxy(port) } for _, port := range ports { l := l diff --git a/proxy/serve.go b/proxy/serve.go index 41d9f5f5f..bb8209f86 100644 --- a/proxy/serve.go +++ b/proxy/serve.go @@ -28,23 +28,21 @@ type Server interface { var ( // mu guards servers which contains the list // of running proxy servers. - mu sync.Mutex - servers []Server - dynamicProxyPorts = make(map[string]Server) + mu sync.Mutex + servers = make(map[string]Server) ) -func CloseDynamicProxy(port string) error { - if srv, ok := dynamicProxyPorts[port]; ok { +func CloseProxy(address string) error { + mu.Lock() + if srv, ok := servers[address]; ok { err := srv.Close() if err != nil { return err } - log.Printf("[INFO] Dynamic TCP listener on port %s has been terminated", port) - mu.Lock() - servers = removeServer(servers, srv) - delete(dynamicProxyPorts, port) - mu.Unlock() + log.Printf("[INFO] Dynamic TCP listener on %s has been terminated", address) + delete(servers, address) } + mu.Unlock() return nil } @@ -53,15 +51,17 @@ func Close() { for _, srv := range servers { srv.Close() } - servers = []Server{} + servers = make(map[string]Server) mu.Unlock() } func Shutdown(timeout time.Duration) { mu.Lock() - srvs := make([]Server, len(servers)) - copy(srvs, servers) - servers = []Server{} + srvs := make(map[string]Server, len(servers)) + for k, v := range servers { + srvs[k] = v + } + servers = make(map[string]Server) mu.Unlock() var wg sync.WaitGroup @@ -146,8 +146,9 @@ func ListenAndServeHTTPSTCPSNI(l config.Listen, h http.Handler, p tcp.Handler, c }) // tcpproxy creates its own listener from the configuration above so we can - // safely pass nil here. - return serve(nil, tps) + // safely pass nil here, nonetheless we are passing `httpsListener` to + // extract it's address and save server in the `servers` map. + return serve(httpsListener, tps) } func ListenAndServeGRPC(l config.Listen, opts []grpc.ServerOption, cfg *tls.Config) error { @@ -174,15 +175,12 @@ func ListenAndServeTCP(l config.Listen, h tcp.Handler, cfg *tls.Config) error { ReadTimeout: l.ReadTimeout, WriteTimeout: l.WriteTimeout, } - if _, ok := h.(*tcp.DynamicProxy); ok { - dynamicProxyPorts[l.Addr] = srv - } return serve(ln, srv) } func serve(ln net.Listener, srv Server) error { mu.Lock() - servers = append(servers, srv) + servers[ln.Addr().String()] = srv mu.Unlock() err := srv.Serve(ln) if err != nil { @@ -197,14 +195,3 @@ func serve(ln net.Listener, srv Server) error { } return err } - -func removeServer(srvs []Server, s Server) []Server { - for i, srv := range srvs { - if s == srv { - log.Printf("[DEBUG] Removing server from the list") - srvs[i] = srvs[len(servers)-1] - return srvs[:len(srvs)-1] - } - } - return srvs -}