Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Add portforward package #2952

Merged
merged 2 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/fluxctl/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"fmt"
"strings"

portforward "github.com/justinbarrick/go-k8s-portforward"
"github.com/fluxcd/flux/pkg/portforward"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down
18 changes: 0 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,25 @@ replace (
replace github.com/fluxcd/flux/pkg/install => ./pkg/install

require (
github.com/Azure/go-autorest v11.7.1+incompatible // indirect
github.com/Jeffail/gabs v1.4.0
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/semver/v3 v3.0.3
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.27.1
github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668
github.com/cheggaaa/pb/v3 v3.0.2
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/docker/distribution v2.7.1+incompatible
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/evanphx/json-patch v4.5.0+incompatible
github.com/fluxcd/flux/pkg/install v0.0.0-00010101000000-000000000000
github.com/fluxcd/helm-operator v1.0.0-rc6
github.com/fluxcd/helm-operator/pkg/install v0.0.0-00010101000000-000000000000 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-kit/kit v0.9.0
github.com/gogo/googleapis v1.3.1 // indirect
github.com/gogo/status v1.1.0 // indirect
github.com/golang/gddo v0.0.0-20190312205958-5a2505f3dbf0
github.com/google/go-containerregistry v0.0.0-20200121192426-b0ae1fc74a66
github.com/google/go-github/v28 v28.1.1
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.0
github.com/imdario/mergo v0.3.8
github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d
github.com/ncabatoff/go-seq v0.0.0-20180805175032-b08ef85ed833 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9 // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942
github.com/prometheus/client_golang v1.2.1
Expand All @@ -69,23 +56,18 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
github.com/uber/jaeger-client-go v2.21.1+incompatible // indirect
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e
github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/whilp/git-urls v0.0.0-20160530060445-31bac0d230fa
github.com/xeipuuv/gojsonschema v1.1.0
go.mozilla.org/sops/v3 v3.5.0
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20191028164358-195ce5e7f934
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gopkg.in/yaml.v2 v2.2.8
helm.sh/helm/v3 v3.0.3 // indirect
k8s.io/api v0.17.4
k8s.io/apiextensions-apiserver v0.17.4
k8s.io/apimachinery v0.17.4
k8s.io/client-go v11.0.0+incompatible
k8s.io/helm v2.16.1+incompatible // indirect
k8s.io/klog v1.0.0
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/justinbarrick/go-k8s-portforward v1.0.2/go.mod h1:klMOboLnC1/UlkyJnYFjcMcbOtwAcKop+LkIZ4r428o=
github.com/justinbarrick/go-k8s-portforward v1.0.3/go.mod h1:GkvGI25j2iHpJVINl/hZC+sbf9IJ1XkY1MtjSh3Usuk=
github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d h1:xQ/ZtcWCKzWg5QbOhq6RFPvevl+IE580Vm0Vgxuw3xs=
github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d/go.mod h1:GkvGI25j2iHpJVINl/hZC+sbf9IJ1XkY1MtjSh3Usuk=
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
Expand Down
208 changes: 208 additions & 0 deletions pkg/portforward/portforward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package portforward

// based on https://github.com/justinbarrick/go-k8s-portforward
squaremo marked this conversation as resolved.
Show resolved Hide resolved
// licensed under the Apache License 2.0

import (
"fmt"
"io/ioutil"
"net"
"net/http"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

// Used for creating a port forward into a Kubernetes pod
// in a Kubernetes cluster.
type PortForward struct {
// The parsed Kubernetes configuration file.
Config *rest.Config
// The initialized Kubernetes client.
Clientset kubernetes.Interface
// The pod name to use, required if Labels is empty.
Name string
// The labels to use to find the pod.
Labels metav1.LabelSelector
// The port on the pod to forward traffic to.
DestinationPort int
// The port that the port forward should listen to, random if not set.
ListenPort int
// The namespace to look for the pod in.
Namespace string
stopChan chan struct{}
readyChan chan struct{}
}

// Initialize a port forwarder, loads the Kubernetes configuration file and creates the client.
// You do not need to use this function if you have a client to use already - the PortForward
// struct can be created directly.
func NewPortForwarder(namespace string, labels metav1.LabelSelector, port int) (*PortForward, error) {
pf := &PortForward{
Namespace: namespace,
Labels: labels,
DestinationPort: port,
}

var err error
pf.Config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
).ClientConfig()
if err != nil {
return pf, errors.Wrap(err, "Could not load kubernetes configuration file")
}

pf.Clientset, err = kubernetes.NewForConfig(pf.Config)
if err != nil {
return pf, errors.Wrap(err, "Could not create kubernetes client")
}

return pf, nil
}

// Start a port forward to a pod - blocks until the tunnel is ready for use.
func (p *PortForward) Start() error {
p.stopChan = make(chan struct{}, 1)
readyChan := make(chan struct{}, 1)
errChan := make(chan error, 1)

listenPort, err := p.getListenPort()
if err != nil {
return errors.Wrap(err, "Could not find a port to bind to")
}

dialer, err := p.dialer()
if err != nil {
return errors.Wrap(err, "Could not create a dialer")
}

ports := []string{
fmt.Sprintf("%d:%d", listenPort, p.DestinationPort),
}

discard := ioutil.Discard
pf, err := portforward.New(dialer, ports, p.stopChan, readyChan, discard, discard)
if err != nil {
return errors.Wrap(err, "Could not port forward into pod")
}

go func() {
errChan <- pf.ForwardPorts()
}()

select {
case err = <-errChan:
return errors.Wrap(err, "Could not create port forward")
case <-readyChan:
return nil
}

return nil
}

// Stop a port forward.
func (p *PortForward) Stop() {
p.stopChan <- struct{}{}
}

// Returns the port that the port forward should listen on.
// If ListenPort is set, then it returns ListenPort.
// Otherwise, it will call getFreePort() to find an open port.
func (p *PortForward) getListenPort() (int, error) {
var err error

if p.ListenPort == 0 {
p.ListenPort, err = p.getFreePort()
}

return p.ListenPort, err
}

// Get a free port on the system by binding to port 0, checking
// the bound port number, and then closing the socket.
func (p *PortForward) getFreePort() (int, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}

port := listener.Addr().(*net.TCPAddr).Port
err = listener.Close()
if err != nil {
return 0, err
}

return port, nil
}

// Create an httpstream.Dialer for use with portforward.New
func (p *PortForward) dialer() (httpstream.Dialer, error) {
pod, err := p.getPodName()
if err != nil {
return nil, errors.Wrap(err, "Could not get pod name")
}

url := p.Clientset.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(p.Namespace).
Name(pod).
SubResource("portforward").URL()

transport, upgrader, err := spdy.RoundTripperFor(p.Config)
if err != nil {
return nil, errors.Wrap(err, "Could not create round tripper")
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
return dialer, nil
}

// Gets the pod name to port forward to, if Name is set, Name is returned. Otherwise,
// it will call findPodByLabels().
func (p *PortForward) getPodName() (string, error) {
var err error
if p.Name == "" {
p.Name, err = p.findPodByLabels()
}
return p.Name, err
}

// Find the name of a pod by label, returns an error if the label returns
// more or less than one pod.
// It searches for the labels specified by labels.
func (p *PortForward) findPodByLabels() (string, error) {
if len(p.Labels.MatchLabels) == 0 && len(p.Labels.MatchExpressions) == 0 {
return "", errors.New("No pod labels specified")
}

pods, err := p.Clientset.CoreV1().Pods(p.Namespace).List(metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&p.Labels),
FieldSelector: fields.OneTermEqualSelector("status.phase", string(v1.PodRunning)).String(),
})

if err != nil {
return "", errors.Wrap(err, "Listing pods in kubernetes")
}

formatted := metav1.FormatLabelSelector(&p.Labels)

if len(pods.Items) == 0 {
return "", errors.New(fmt.Sprintf("Could not find running pod for selector: labels \"%s\"", formatted))
}

if len(pods.Items) != 1 {
return "", errors.New(fmt.Sprintf("Ambiguous pod: found more than one pod for selector: labels \"%s\"", formatted))
}

return pods.Items[0].ObjectMeta.Name, nil
}
Loading