Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Support automatically port forwarding to flux instances in a Kubernet…
Browse files Browse the repository at this point in the history
…es cluster.

Simplify accessing flux instances in a Kubernetes cluster by automatically creating
port forwards, just like Helm does.

This is based on the port forward implementations from kubectl and Helm:

* https://github.com/kubernetes/helm/blob/master/pkg/kube/tunnel.go
* https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/portforward.go

The flow is:

* If neither --flux-namespace nor FLUX_NAMESPACE are set, then this code will do nothing
  and everything will work as before.
* If it is, it will instantiate a Kubernetes client and search for pods in FLUX_NAMESPACE
  that have the `name=flux` labels set.
* If the number of pods with `name=flux` in the namespace are not exactly 1, it will
  return an error.
* It will find an empty port (by first binding to port 0 and then closing the port) and
  start a port forward on that port that forwards to the flux pod on port 3030.
* It will set the flux url to `http://127.0.0.1:$port/api/flux`.
* The port forward goroutine will get cleaned up automatically when fluxctl terminates.

Use-cases:

* Easier use of standalone flux instances.
* Discourage exposing flux apis to the internet (hopefully nobody does this).
* More easily manage large numbers of flux instances.
  • Loading branch information
Justin Barrick committed Jul 8, 2018
1 parent 891f2a0 commit 3262bee
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 11 deletions.
47 changes: 46 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

171 changes: 171 additions & 0 deletions cmd/fluxctl/port_forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package main

import (
"github.com/pkg/errors"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/apimachinery/pkg/util/httpstream"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"io/ioutil"
"fmt"
"net"
"net/http"
"os"
"os/user"
"path/filepath"
)

type portForward struct {
config *rest.Config
clientset kubernetes.Interface
namespace string
podName string
url string
stopChan chan struct{}
readyChan chan struct{}
}

// Initialize a port forwarder.
func newPortforwarder(namespace string) (*portForward, error) {
pf := &portForward{}
pf.stopChan = make(chan struct{}, 1)
pf.readyChan = make(chan struct{}, 1)
pf.namespace = namespace

configPath, err := pf.getKubeConfigPath()
if err != nil {
return pf, errors.Wrap(err, "finding kubeconfig")
}

pf.config, err = clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
return pf, errors.Wrap(err, "getting kubernetes configuration")
}

pf.clientset, err = kubernetes.NewForConfig(pf.config)
if err != nil {
return pf, errors.Wrap(err, "creating kubernetes client")
}

pf.podName, err = pf.findFluxPod()
if err != nil {
return pf, errors.Wrap(err, "finding flux pod")
}

err = pf.doPortforward()
if err != nil {
return pf, errors.Wrap(err, "creating flux port forward")
}

return pf, nil
}

// Returns the path to the kubernetes config file.
// If KUBECONFIG is set, it will use KUBECONFIG.
// Otherwise, it will return ~/.kube/config.
func (p *portForward) getKubeConfigPath() (string, error) {
kubeconfig := ""

if os.Getenv("KUBECONFIG") != "" {
kubeconfig = os.Getenv("KUBECONFIG")
} else {
user, err := user.Current()
if err != nil {
return "", err
}

kubeconfig = filepath.Join(user.HomeDir, ".kube", "config")
}

return kubeconfig, nil
}

// Find the name of a flux pod by label, returns an error if the label returns
// more or less than one pod.
// It searches for the label: name=flux.
func (p *portForward) findFluxPod() (string, error) {
pods, err := p.clientset.CoreV1().Pods(p.namespace).List(metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
"name": "flux",
},
}),
})

if err != nil {
return "", err
}

if len(pods.Items) == 0 {
return "", errors.New("Could not find flux pod for selector: labels name=flux")
}

if len(pods.Items) != 1 {
return "", errors.New("Ambigous flux pod: found more than one flux pod for selector: labels name=flux")
}

return pods.Items[0].ObjectMeta.Name, nil
}

// Create an httpstream.Dialer for use with portforward.New
func (p *portForward) dialer() (httpstream.Dialer, error) {
url := p.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(p.namespace).
Name(p.podName).
SubResource("portforward").URL()

transport, upgrader, err := spdy.RoundTripperFor(p.config)
if err != nil {
return nil, err
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
return dialer, nil
}

// Create a port forward to a flux pod - blocks until the tunnel is ready for use.
func (p *portForward) doPortforward() error {
dialer, err := p.dialer()
if err != nil {
return err
}

freePort, err := p.getFreePort()
if err != nil {
return err
}

ports := []string{fmt.Sprintf("%d:3030", freePort)}

discard := ioutil.Discard
pf, err := portforward.New(dialer, ports, p.stopChan, p.readyChan, discard, discard)
if err != nil {
return err
}

go pf.ForwardPorts()

<- p.readyChan

p.url = fmt.Sprintf("http://127.0.0.1:%d/api/flux", freePort)
return nil
}

func (p *portForward) getFreePort() (int, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}

port := listener.Addr().(*net.TCPAddr).Port
err = listener.Close()
if err != nil {
return 0, err
}

return port, nil
}
93 changes: 93 additions & 0 deletions cmd/fluxctl/port_forward_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"os"
"os/user"
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubernetes "k8s.io/client-go/kubernetes/fake"
)

func TestPortForwardGetKubeConfigPath(t *testing.T) {
pf := portForward{}

path, err := pf.getKubeConfigPath()
assert.Nil(t, err)

user, _ := user.Current()
assert.Equal(t, user.HomeDir + "/.kube/config", path)
}

func TestPortForwardGetKubeConfigPathEnvVarSet(t *testing.T) {
os.Setenv("KUBECONFIG", "/my/kube/config")
defer os.Setenv("KUBECONFIG", "")

pf := portForward{}

path, err := pf.getKubeConfigPath()
assert.Nil(t, err)

assert.Equal(t, "/my/kube/config", path)
}

func newPod(name string, labels map[string]string) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Name: name,
},
}
}

func TestFindFluxPod(t *testing.T) {
pf := portForward{
clientset: fakekubernetes.NewSimpleClientset(
newPod("mypod1", map[string]string{
"name": "other",
}),
newPod("mypod2", map[string]string{
"name": "flux",
}),
newPod("mypod3", map[string]string{})),
}

pod, err := pf.findFluxPod()
assert.Nil(t, err)
assert.Equal(t, "mypod2", pod)
}

func TestFindFluxPodNoneExist(t *testing.T) {
pf := portForward{
clientset: fakekubernetes.NewSimpleClientset(
newPod("mypod1", map[string]string{
"name": "other",
})),
}

_, err := pf.findFluxPod()
assert.NotNil(t, err)
assert.Equal(t, "Could not find flux pod for selector: labels name=flux", err.Error())
}

func TestFindFluxPodMultiple(t *testing.T) {
pf := portForward{
clientset: fakekubernetes.NewSimpleClientset(
newPod("mypod1", map[string]string{
"name": "flux",
}),
newPod("mypod2", map[string]string{
"name": "flux",
}),
newPod("mypod3", map[string]string{})),
}

_, err := pf.findFluxPod()
assert.NotNil(t, err)
assert.Equal(t, "Ambigous flux pod: found more than one flux pod for selector: labels name=flux", err.Error())
}
Loading

0 comments on commit 3262bee

Please sign in to comment.