Skip to content

Commit

Permalink
agentv2: add connection per listener
Browse files Browse the repository at this point in the history
Rather than try to manage multiple listeners on a single connection,
instead agentv2 creates a new connection for each listener.
  • Loading branch information
andydunstall committed Jun 4, 2024
1 parent e2af5f3 commit ca21d3e
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 395 deletions.
160 changes: 135 additions & 25 deletions agentv2/client/listener.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
package piko

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"sync"
"time"

"github.com/andydunstall/piko/pkg/backoff"
"github.com/andydunstall/piko/pkg/log"
"github.com/andydunstall/piko/pkg/mux"
"github.com/andydunstall/piko/pkg/websocket"
"go.uber.org/zap"
)

"go.uber.org/atomic"
const (
minReconnectBackoff = time.Millisecond * 100
maxReconnectBackoff = time.Second * 15
)

type pikoAddr struct {
endpointID string
}

func (a *pikoAddr) Network() string {
return "tcp"
}

func (a *pikoAddr) String() string {
return a.endpointID
}

// Listener is a [net.Listener] that accepts incoming connections for endpoints
// registered with the server by the client.
//
Expand All @@ -22,50 +48,134 @@ type Listener interface {
type listener struct {
endpointID string

acceptCh chan net.Conn
mux *mux.Session
muxMu sync.Mutex

options options

closed *atomic.Bool
closeCtx context.Context
closeCancel func()

logger log.Logger
}

func newListener(endpointID string) *listener {
return &listener{
endpointID: endpointID,
acceptCh: make(chan net.Conn),
closed: atomic.NewBool(false),
func listen(
ctx context.Context,
endpointID string,
options options,
logger log.Logger,
) (*listener, error) {
closeCtx, closeCancel := context.WithCancel(context.Background())
ln := &listener{
endpointID: endpointID,
options: options,
closeCtx: closeCtx,
closeCancel: closeCancel,
logger: logger,
}
mux, err := ln.connect(ctx)
if err != nil {
return nil, fmt.Errorf("connect: %w", err)
}
ln.mux = mux

return ln, nil
}

// Accept accepts a proxied connection for the endpoint.
func (l *listener) Accept() (net.Conn, error) {
conn, ok := <-l.acceptCh
if !ok {
return nil, fmt.Errorf("closed")
for {
conn, err := l.mux.Accept()
if err == nil {
return conn, nil
}

if l.closeCtx.Err() != nil {
return nil, err
}

l.logger.Warn("failed to accept conn", zap.Error(err))

mux, err := l.connect(l.closeCtx)
if err != nil {
return nil, err
}

l.muxMu.Lock()
l.mux = mux
l.muxMu.Unlock()
}
}

return conn, nil
func (l *listener) Addr() net.Addr {
return &pikoAddr{endpointID: l.endpointID}
}

func (l *listener) Close() error {
// TODO(andydunstall): Doesn't yet notify the server that the listener is
// closed. This is ok for now as the client is only used in agent.
l.closeCancel()

if !l.closed.CompareAndSwap(false, true) {
return nil
}
close(l.acceptCh)
return nil
}
l.muxMu.Lock()
err := l.mux.Close()
l.muxMu.Unlock()

func (l *listener) Addr() net.Addr {
// TODO(andydunstall)
return nil
return err
}

func (l *listener) EndpointID() string {
return l.endpointID
}

func (l *listener) OnConn(conn net.Conn) {
l.acceptCh <- conn
func (l *listener) connect(ctx context.Context) (*mux.Session, error) {
backoff := backoff.New(0, minReconnectBackoff, maxReconnectBackoff)
for {
conn, err := websocket.Dial(
ctx,
serverURL(l.options.url, l.endpointID),
websocket.WithToken(l.options.token),
websocket.WithTLSConfig(l.options.tlsConfig),
)
if err == nil {
l.logger.Debug(
"listener connected",
zap.String("url", serverURL(l.options.url, l.endpointID)),
)

return mux.OpenClient(conn), nil
}

var retryableError *websocket.RetryableError
if !errors.As(err, &retryableError) {
l.logger.Error(
"failed to connect to server; non-retryable",
zap.String("url", serverURL(l.options.url, l.endpointID)),
zap.Error(err),
)
return nil, err
}

l.logger.Warn(
"failed to connect to server; retrying",
zap.String("url", serverURL(l.options.url, l.endpointID)),
zap.Error(err),
)

if !backoff.Wait(ctx) {
return nil, ctx.Err()
}
}
}

var _ Listener = &listener{}

func serverURL(urlStr, endpointID string) string {
// Already verified URL in Config.Validate.
u, _ := url.Parse(urlStr)
u.Path += "/piko/v1/upstream/" + endpointID
if u.Scheme == "http" {
u.Scheme = "ws"
}
if u.Scheme == "https" {
u.Scheme = "wss"
}
return u.String()
}
22 changes: 11 additions & 11 deletions agentv2/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,62 @@ import (
"github.com/andydunstall/piko/pkg/log"
)

type connectOptions struct {
type options struct {
token string
url string
tlsConfig *tls.Config
logger log.Logger
}

type ConnectOption interface {
apply(*connectOptions)
type Option interface {
apply(*options)
}

type tokenOption string

func (o tokenOption) apply(opts *connectOptions) {
func (o tokenOption) apply(opts *options) {
opts.token = string(o)
}

// WithToken configures the API key to authenticate the client.
func WithToken(key string) ConnectOption {
func WithToken(key string) Option {
return tokenOption(key)
}

type urlOption string

func (o urlOption) apply(opts *connectOptions) {
func (o urlOption) apply(opts *options) {
opts.url = string(o)
}

// WithURL configures the Piko server URL. Such as
// 'https://piko.example.com:8001'.
func WithURL(url string) ConnectOption {
func WithURL(url string) Option {
return urlOption(url)
}

type tlsConfigOption struct {
TLSConfig *tls.Config
}

func (o tlsConfigOption) apply(opts *connectOptions) {
func (o tlsConfigOption) apply(opts *options) {
opts.tlsConfig = o.TLSConfig
}

// WithTLSConfig sets the TLS client configuration.
func WithTLSConfig(config *tls.Config) ConnectOption {
func WithTLSConfig(config *tls.Config) Option {
return tlsConfigOption{TLSConfig: config}
}

type loggerOption struct {
Logger log.Logger
}

func (o loggerOption) apply(opts *connectOptions) {
func (o loggerOption) apply(opts *options) {
opts.logger = o.Logger
}

// WithLogger configures the logger. Defaults to no output.
func WithLogger(logger log.Logger) ConnectOption {
func WithLogger(logger log.Logger) Option {
return loggerOption{Logger: logger}
}
Loading

0 comments on commit ca21d3e

Please sign in to comment.