Skip to content

Commit

Permalink
Merge pull request #614 from aledbf/refactor-passthrough
Browse files Browse the repository at this point in the history
Refactor nginx ssl passthrough
  • Loading branch information
aledbf authored Apr 20, 2017
2 parents 91c0006 + 590bc0d commit f6af1ca
Show file tree
Hide file tree
Showing 15 changed files with 708 additions and 122 deletions.
8 changes: 8 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 90 additions & 5 deletions controllers/nginx/pkg/cmd/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
"os"
"os/exec"
"strconv"
"strings"
"syscall"
"time"

"github.com/golang/glog"
"github.com/spf13/pflag"

proxyproto "github.com/armon/go-proxyproto"
api_v1 "k8s.io/client-go/pkg/api/v1"

"strings"

"k8s.io/ingress/controllers/nginx/pkg/config"
ngx_template "k8s.io/ingress/controllers/nginx/pkg/template"
"k8s.io/ingress/controllers/nginx/pkg/version"
Expand All @@ -53,6 +53,8 @@ const (

defaultStatusModule statusModule = "default"
vtsStatusModule statusModule = "vts"

errNoChild = "wait: no child processes"
)

var (
Expand Down Expand Up @@ -81,8 +83,40 @@ func newNGINXController() ingress.Controller {
configmap: &api_v1.ConfigMap{},
isIPV6Enabled: isIPv6Enabled(),
resolver: h,
proxy: &proxy{},
}

listener, err := net.Listen("tcp", ":443")
if err != nil {
glog.Fatalf("%v", err)
}

proxyList := &proxyproto.Listener{Listener: listener}

// start goroutine that accepts tcp connections in port 443
go func() {
for {
var conn net.Conn
var err error

if n.isProxyProtocolEnabled {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}

if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
continue
}

glog.V(3).Infof("remote adress %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.proxy.Handle(conn)
}
}()

var onChange func()
onChange = func() {
template, err := ngx_template.NewTemplate(tmplPath, onChange)
Expand Down Expand Up @@ -121,7 +155,8 @@ type NGINXController struct {

storeLister ingress.StoreLister

binary string
binary string
resolver []net.IP

cmdArgs []string

Expand All @@ -134,7 +169,10 @@ type NGINXController struct {
// returns true if IPV6 is enabled in the pod
isIPV6Enabled bool

resolver []net.IP
// returns true if proxy protocol es enabled
isProxyProtocolEnabled bool

proxy *proxy
}

// Start start a new NGINX master process running in foreground.
Expand Down Expand Up @@ -306,7 +344,7 @@ func (n NGINXController) testTemplate(cfg []byte) error {
return err
}
out, err := exec.Command(n.binary, "-t", "-c", tmpfile.Name()).CombinedOutput()
if err != nil {
if err != nil && err.Error() != errNoChild {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
-------------------------------------------------------------------------------
Expand All @@ -324,6 +362,20 @@ Error: %v
// SetConfig sets the configured configmap
func (n *NGINXController) SetConfig(cmap *api_v1.ConfigMap) {
n.configmap = cmap

n.isProxyProtocolEnabled = false
if cmap == nil {
return
}

val, ok := cmap.Data["use-proxy-protocol"]
if ok {
b, err := strconv.ParseBool(val)
if err == nil {
n.isProxyProtocolEnabled = b
return
}
}
}

// SetListers sets the configured store listers in the generic ingress controller
Expand Down Expand Up @@ -446,6 +498,39 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) ([]byte, er
return nil, err
}

servers := []*server{}
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
glog.Warningf("missing service for PassthroughBackends %v", pb.Backend)
continue
}
port, err := strconv.Atoi(pb.Port.String())
if err != nil {
for _, sp := range svc.Spec.Ports {
if sp.Name == pb.Port.String() {
port = int(sp.Port)
break
}
}
} else {
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(port) {
port = int(sp.Port)
break
}
}
}

servers = append(servers, &server{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
Port: port,
})
}

n.proxy.ServerList = servers

return content, nil
}

Expand Down
90 changes: 90 additions & 0 deletions controllers/nginx/pkg/cmd/controller/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"fmt"
"io"
"net"

"github.com/golang/glog"
"github.com/paultag/sniff/parser"
)

type server struct {
Hostname string
IP string
Port int
}

type proxy struct {
ServerList []*server
Default *server
}

func (p *proxy) Get(host string) *server {
for _, s := range p.ServerList {
if s.Hostname == host {
return s
}
}

return &server{
Hostname: "localhost",
IP: "127.0.0.1",
Port: 442,
}
}

func (p *proxy) Handle(conn net.Conn) {
defer conn.Close()
data := make([]byte, 4096)

length, err := conn.Read(data)
if err != nil {
glog.V(4).Infof("error reading the first 4k of the connection: %s", err)
return
}

var proxy *server
hostname, err := parser.GetHostname(data[:])
if err == nil {
glog.V(3).Infof("parsed hostname from TLS Client Hello: %s", hostname)
proxy = p.Get(hostname)
if proxy == nil {
return
}
} else {
proxy = p.Default
if proxy == nil {
return
}
}

clientConn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", proxy.IP, proxy.Port))
if err != nil {
return
}
defer clientConn.Close()

_, err = clientConn.Write(data[:length])
if err != nil {
clientConn.Close()
}
pipe(clientConn, conn)
}

func pipe(client, server net.Conn) {
doCopy := func(s, c net.Conn, cancel chan<- bool) {
io.Copy(s, c)
cancel <- true
}

cancel := make(chan bool, 2)

go doCopy(server, client, cancel)
go doCopy(client, server, cancel)

select {
case <-cancel:
return
}
}
2 changes: 1 addition & 1 deletion controllers/nginx/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (

logFormatUpstream = `%v - [$proxy_add_x_forwarded_for] - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_length $request_time [$proxy_upstream_name] $upstream_addr $upstream_response_length $upstream_response_time $upstream_status`

logFormatStream = `[$time_local] $protocol [$ssl_preread_server_name] [$stream_upstream] $status $bytes_sent $bytes_received $session_time`
logFormatStream = `[$time_local] $protocol $status $bytes_sent $bytes_received $session_time`

// http://nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_buffer_size
// Sets the size of the buffer used for sending data.
Expand Down
59 changes: 15 additions & 44 deletions controllers/nginx/pkg/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,20 @@ var (
}
return true
},
"buildLocation": buildLocation,
"buildAuthLocation": buildAuthLocation,
"buildAuthResponseHeaders": buildAuthResponseHeaders,
"buildProxyPass": buildProxyPass,
"buildRateLimitZones": buildRateLimitZones,
"buildRateLimit": buildRateLimit,
"buildSSLPassthroughUpstreams": buildSSLPassthroughUpstreams,
"buildResolvers": buildResolvers,
"isLocationAllowed": isLocationAllowed,
"buildLogFormatUpstream": buildLogFormatUpstream,
"contains": strings.Contains,
"hasPrefix": strings.HasPrefix,
"hasSuffix": strings.HasSuffix,
"toUpper": strings.ToUpper,
"toLower": strings.ToLower,
"buildLocation": buildLocation,
"buildAuthLocation": buildAuthLocation,
"buildAuthResponseHeaders": buildAuthResponseHeaders,
"buildProxyPass": buildProxyPass,
"buildRateLimitZones": buildRateLimitZones,
"buildRateLimit": buildRateLimit,
"buildResolvers": buildResolvers,
"isLocationAllowed": isLocationAllowed,
"buildLogFormatUpstream": buildLogFormatUpstream,
"contains": strings.Contains,
"hasPrefix": strings.HasPrefix,
"hasSuffix": strings.HasSuffix,
"toUpper": strings.ToUpper,
"toLower": strings.ToLower,
}
)

Expand All @@ -169,34 +168,6 @@ func buildResolvers(a interface{}) string {
return strings.Join(r, " ")
}

func buildSSLPassthroughUpstreams(b interface{}, sslb interface{}) string {
backends := b.([]*ingress.Backend)
sslBackends := sslb.([]*ingress.SSLPassthroughBackend)
buf := bytes.NewBuffer(make([]byte, 0, 10))

// multiple services can use the same upstream.
// avoid duplications using a map[name]=true
u := make(map[string]bool)
for _, passthrough := range sslBackends {
if u[passthrough.Backend] {
continue
}
u[passthrough.Backend] = true
fmt.Fprintf(buf, "upstream %v {\n", passthrough.Backend)
for _, backend := range backends {
if backend.Name == passthrough.Backend {
for _, server := range backend.Endpoints {
fmt.Fprintf(buf, "\t\tserver %v:%v;\n", server.Address, server.Port)
}
break
}
}
fmt.Fprint(buf, "\t}\n\n")
}

return buf.String()
}

// buildLocation produces the location string, if the ingress has redirects
// (specified through the ingress.kubernetes.io/rewrite-to annotation)
func buildLocation(input interface{}) string {
Expand Down Expand Up @@ -283,7 +254,7 @@ func buildProxyPass(b interface{}, loc interface{}) string {

for _, backend := range backends {
if backend.Name == location.Backend {
if backend.Secure {
if backend.Secure || backend.SSLPassthrough {
proto = "https"
}
break
Expand Down
Loading

0 comments on commit f6af1ca

Please sign in to comment.