Skip to content

Commit

Permalink
improved docker reconnect mechanism, removed redundant checkings, ref…
Browse files Browse the repository at this point in the history
…actor
  • Loading branch information
yusing committed Feb 23, 2025
1 parent 5f1b78e commit bda5471
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 86 deletions.
9 changes: 8 additions & 1 deletion agent/pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,19 @@ func (cfg *AgentConfig) Transport() *http.Transport {
if addr != AgentHost+":443" {
return nil, &net.AddrError{Err: "invalid address", Addr: addr}
}
return gphttp.DefaultDialer.DialContext(ctx, network, cfg.Addr)
if network != "tcp" {
return nil, &net.OpError{Op: "dial", Net: network, Source: nil, Addr: nil}
}
return cfg.DialContext(ctx)
},
TLSClientConfig: cfg.tlsConfig,
}
}

func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) {
return gphttp.DefaultDialer.DialContext(ctx, "tcp", cfg.Addr)
}

func (cfg *AgentConfig) Name() string {
return cfg.name
}
Expand Down
6 changes: 3 additions & 3 deletions internal/api/v1/dockerapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func getDockerClients() (DockerClients, gperr.Error) {
connErrs := gperr.NewBuilder("failed to connect to docker")

for name, host := range dockerHosts {
dockerClient, err := docker.ConnectClient(host)
dockerClient, err := docker.NewClient(host)
if err != nil {
connErrs.Add(err)
continue
Expand All @@ -45,7 +45,7 @@ func getDockerClients() (DockerClients, gperr.Error) {
}

for _, agent := range cfg.ListAgents() {
dockerClient, err := docker.ConnectClient(agent.FakeDockerHost())
dockerClient, err := docker.NewClient(agent.FakeDockerHost())
if err != nil {
connErrs.Add(err)
continue
Expand Down Expand Up @@ -74,7 +74,7 @@ func getDockerClient(w http.ResponseWriter, server string) (*docker.SharedClient
if host == "" {
return nil, false, nil
}
dockerClient, err := docker.ConnectClient(host)
dockerClient, err := docker.NewClient(host)
if err != nil {
return nil, false, err
}
Expand Down
45 changes: 32 additions & 13 deletions internal/docker/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package docker

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
Expand All @@ -23,11 +25,14 @@ type (
key string
refCount uint32
closedOn int64

addr string
dial func(ctx context.Context) (net.Conn, error)
}
)

var (
clientMap = make(map[string]*SharedClient, 5)
clientMap = make(map[string]*SharedClient, 10)
clientMapMu sync.RWMutex

clientOptEnvHost = []client.Opt{
Expand Down Expand Up @@ -64,9 +69,7 @@ func init() {

for _, c := range clientMap {
delete(clientMap, c.key)
if c.Connected() {
c.Client.Close()
}
c.Client.Close()
}
})
}
Expand All @@ -78,10 +81,6 @@ func closeTimedOutClients() {
now := time.Now().Unix()

for _, c := range clientMap {
if !c.Connected() {
delete(clientMap, c.key)
continue
}
if c.closedOn == 0 {
continue
}
Expand All @@ -93,8 +92,17 @@ func closeTimedOutClients() {
}
}

func (c *SharedClient) Connected() bool {
return c != nil && c.Client != nil
func (c *SharedClient) Address() string {
return c.addr
}

func (c *SharedClient) CheckConnection(ctx context.Context) error {
conn, err := c.dial(ctx)
if err != nil {
return err
}
conn.Close()
return nil
}

// if the client is still referenced, this is no-op.
Expand All @@ -103,7 +111,7 @@ func (c *SharedClient) Close() {
c.refCount--
}

// ConnectClient creates a new Docker client connection to the specified host.
// NewClient creates a new Docker client connection to the specified host.
//
// Returns existing client if available.
//
Expand All @@ -113,7 +121,7 @@ func (c *SharedClient) Close() {
// Returns:
// - Client: the Docker client connection.
// - error: an error if the connection failed.
func ConnectClient(host string) (*SharedClient, error) {
func NewClient(host string) (*SharedClient, error) {
clientMapMu.Lock()
defer clientMapMu.Unlock()

Expand All @@ -125,6 +133,8 @@ func ConnectClient(host string) (*SharedClient, error) {

// create client
var opt []client.Opt
var addr string
var dial func(ctx context.Context) (net.Conn, error)

if agent.IsDockerHostAgent(host) {
cfg, ok := config.GetInstance().GetAgent(host)
Expand All @@ -136,6 +146,8 @@ func ConnectClient(host string) (*SharedClient, error) {
client.WithHTTPClient(cfg.NewHTTPClient()),
client.WithAPIVersionNegotiation(),
}
addr = "tcp://" + cfg.Addr
dial = cfg.DialContext
} else {
switch host {
case "":
Expand Down Expand Up @@ -177,9 +189,16 @@ func ConnectClient(host string) (*SharedClient, error) {
Client: client,
key: host,
refCount: 1,
addr: addr,
dial: dial,
}

// non-agent client
if c.dial == nil {
c.dial = client.Dialer()
}

defer logging.Debug().Str("host", host).Msg("docker client connected")
defer logging.Debug().Str("host", host).Msg("docker client initialized")

clientMap[c.key] = c
return c, nil
Expand Down
11 changes: 4 additions & 7 deletions internal/docker/idlewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/docker/docker/api/types/container"
"github.com/rs/zerolog"
D "github.com/yusing/go-proxy/internal/docker"
"github.com/yusing/go-proxy/internal/docker"
idlewatcher "github.com/yusing/go-proxy/internal/docker/idlewatcher/types"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/logging"
Expand All @@ -29,7 +29,7 @@ type (
*idlewatcher.Config
*waker

client *D.SharedClient
client *docker.SharedClient
stopByMethod StopCallback // send a docker command w.r.t. `stop_method`
ticker *time.Ticker
lastReset time.Time
Expand Down Expand Up @@ -70,7 +70,7 @@ func registerWatcher(watcherTask *task.Task, route route.Route, waker *waker) (*
return w, nil
}

client, err := D.ConnectClient(cfg.DockerHost)
client, err := docker.NewClient(cfg.DockerHost)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,9 +146,6 @@ func (w *Watcher) containerStart(ctx context.Context) error {
}

func (w *Watcher) containerStatus() (string, error) {
if !w.client.Connected() {
return "", errors.New("docker client not connected")
}
ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout"))
defer cancel()
json, err := w.client.ContainerInspect(ctx, w.ContainerID)
Expand Down Expand Up @@ -242,7 +239,7 @@ func (w *Watcher) getEventCh(dockerWatcher *watcher.DockerWatcher) (eventCh <-ch
// it exits only if the context is canceled, the container is destroyed,
// errors occurred on docker client, or route provider died (mainly caused by config reload).
func (w *Watcher) watchUntilDestroy() (returnCause error) {
dockerWatcher := watcher.NewDockerWatcherWithClient(w.client)
dockerWatcher := watcher.NewDockerWatcher(w.Config.DockerHost)
dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher)

for {
Expand Down
2 changes: 1 addition & 1 deletion internal/docker/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func Inspect(dockerHost string, containerID string) (*Container, error) {
client, err := ConnectClient(dockerHost)
client, err := NewClient(dockerHost)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions internal/docker/list_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
)
Expand All @@ -22,8 +21,8 @@ var listOptions = container.ListOptions{
All: true,
}

func ListContainers(clientHost string) ([]types.Container, error) {
dockerClient, err := ConnectClient(clientHost)
func ListContainers(clientHost string) ([]container.Summary, error) {
dockerClient, err := NewClient(clientHost)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/route/reverse_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
r.HealthMon = waker
case r.UseHealthCheck():
if r.IsDocker() {
client, err := docker.ConnectClient(r.Idlewatcher.DockerHost)
client, err := docker.NewClient(r.Idlewatcher.DockerHost)
if err == nil {
fallback := r.newHealthMonitor()
r.HealthMon = monitor.NewDockerHealthMonitor(client, r.Idlewatcher.ContainerID, r.TargetName(), r.HealthCheck, fallback)
Expand Down
2 changes: 1 addition & 1 deletion internal/route/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
r.HealthMon = waker
case r.UseHealthCheck():
if r.IsDocker() {
client, err := docker.ConnectClient(r.IdlewatcherConfig().DockerHost)
client, err := docker.NewClient(r.IdlewatcherConfig().DockerHost)
if err == nil {
fallback := monitor.NewRawHealthChecker(r.TargetURL(), r.HealthCheck)
r.HealthMon = monitor.NewDockerHealthMonitor(client, r.IdlewatcherConfig().ContainerID, r.TargetName(), r.HealthCheck, fallback)
Expand Down
Loading

0 comments on commit bda5471

Please sign in to comment.