Skip to content

Commit

Permalink
[WIP] Imperative API: Proxy and port-forward (#85)
Browse files Browse the repository at this point in the history
* Add a port-forward type

* Add support for automatic reconnects when pods are terminated

* List clients

* Add a Dockerfile for the proxy

* Add proxy

* Update go.mod

* Update package name

* Add config parsing

* Immediately close connection when there are no clients
  • Loading branch information
maxsokolovsky authored Nov 8, 2024
1 parent afe9785 commit bd575a6
Show file tree
Hide file tree
Showing 9 changed files with 560 additions and 26 deletions.
13 changes: 13 additions & 0 deletions Dockerfile.proxy
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM golang:1.23 AS builder
WORKDIR /app

COPY go.mod .
COPY go.sum .
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 go build -o proxy ./proxy

FROM scratch
COPY --from=builder /app/proxy .
CMD ["./proxy"]
18 changes: 18 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
"github.com/sirupsen/logrus"

"github.com/rancher/remotedialer/proxy"
)

func main() {
cfg, err := proxy.ConfigFromEnvironment()
if err != nil {
logrus.Fatalf("fatal configuration error: %v", err)
}
err = proxy.Start(cfg)
if err != nil {
logrus.Fatal(err)
}
}
157 changes: 157 additions & 0 deletions forward/port-forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package portforward

import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"net/url"
"strings"
"time"

"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

type PortForward struct {
restConfig *rest.Config
podClient corev1.PodInterface
Namespace string
LabelSelector string
Ports []string
readyCh chan struct{}
stopCh chan struct{}
cancel context.CancelFunc
}

// New initializes and returns a PortForward value after validating the incoming parameters.
func New(restConfig *rest.Config, podClient corev1.PodInterface, namespace string, labelSelector string, ports []string) (*PortForward, error) {
if restConfig == nil {
return nil, fmt.Errorf("restConfig must not be nil")
}
if podClient == nil {
return nil, fmt.Errorf("podClient must not be nil")
}
if labelSelector == "" {
return nil, fmt.Errorf("labelSelector must not be empty")
}
if len(ports) == 0 {
return nil, fmt.Errorf("ports must not be empty")
}
if namespace == "" {
return nil, fmt.Errorf("namespace must not be empty")
}
return &PortForward{
restConfig: restConfig,
podClient: podClient,
Namespace: namespace,
LabelSelector: labelSelector,
Ports: ports,
readyCh: make(chan struct{}, 1),
stopCh: make(chan struct{}, 1),
}, nil
}

// Stop releases the resources of the running port-forwarder.
func (r *PortForward) Stop() {
r.cancel()
r.stopCh <- struct{}{}
}

// Start launches a port forwarder in the background.
func (r *PortForward) Start() {
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel

go func() {
for {
select {
case <-ctx.Done():
logrus.Infoln("Goroutine stopped.")
return
default:
r.readyCh = make(chan struct{}, 1)
err := r.runForwarder(r.readyCh, r.stopCh, r.Ports)
if err != nil {
if errors.Is(err, portforward.ErrLostConnectionToPod) {
logrus.Error("Lost connection to pod; restarting.")
time.Sleep(time.Second)
continue
} else {
logrus.Errorf("Non-restartable error: %v", err)
return
}
}
}
}
}()
// TODO: maybe block until port forwarding is ready.
}

// runForwarder starts a port forwarder and blocks until it is stopped when it receives a value on the stopCh.
func (r *PortForward) runForwarder(readyCh, stopCh chan struct{}, ports []string) error {
podName, err := r.podName()
if err != nil {
return err
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", r.Namespace, podName)
hostIP := strings.TrimPrefix(r.restConfig.Host, "https://")
serverURL := url.URL{
Scheme: "https",
Path: path,
Host: hostIP,
}
roundTripper, upgrader, err := spdy.RoundTripperFor(r.restConfig)
if err != nil {
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{
Transport: roundTripper,
}, http.MethodPost, &serverURL)

stdout, stderr := new(bytes.Buffer), new(bytes.Buffer)
forwarder, err := portforward.New(dialer, ports, stopCh, readyCh, stdout, stderr)
if err != nil {
return err
}

go func() {
for range readyCh {
} // Wait until port forwarding is ready.

if s := stderr.String(); s != "" {
logrus.Error(s)
} else if s = stdout.String(); s != "" {
logrus.Info(s)
}
}()

return forwarder.ForwardPorts()
}

// podName tries to select a random pod and return its name from the pods that the
// underlying client finds by label selector.
// The method continuously retries if there are no pods yet that match the selector.
func (r *PortForward) podName() (string, error) {
for {
pods, err := r.podClient.List(context.Background(), metav1.ListOptions{
LabelSelector: r.LabelSelector,
})
if err != nil {
return "", err
}
if len(pods.Items) < 1 {
logrus.Debugf("no pod found with label selector %q, retrying", r.LabelSelector)
time.Sleep(time.Second)
continue
}
i := rand.Intn(len(pods.Items))
return pods.Items[i].Name, nil
}
}
59 changes: 51 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,30 +1,73 @@
module github.com/rancher/remotedialer

go 1.20
go 1.23

require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/rancher/dynamiclistener v0.6.0
github.com/rancher/norman v0.0.0-20240827141653-344b77749da7
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
k8s.io/apimachinery v0.30.1
k8s.io/client-go v0.30.1
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rancher/lasso v0.0.0-20240705194423-b2a060d103c1 // indirect
github.com/rancher/wrangler/v3 v3.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.30.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Loading

0 comments on commit bd575a6

Please sign in to comment.