Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api] Add Unix domain socket support #16872

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16872.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: Added support for Unix domain sockets
```
81 changes: 73 additions & 8 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -205,6 +205,16 @@ type Config struct {
// retryOptions holds the configuration necessary to perform retries
// on put calls.
retryOptions *retryOptions

// url is populated with the initial parsed address and is not modified in the
// case of a unix:// URL, as opposed to Address.
url *url.URL
}

// URL returns a copy of the initial parsed address and is not modified in the
// case of a `unix://` URL, as opposed to Address.
func (c *Config) URL() *url.URL {
return c.url
}
Comment on lines +214 to 218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to other reviewers: @angrycub and I paired on this for quite a while. The code here in the api package handles the unix:// scheme correctly now without doing a bunch of weird string manipulation. But the nomad operator api package has some special case logic around emitting curl-alike outputs. Rather than exposing flags or fields special-casing the UDS, we landed on this method, which is potentially more generally useful for callers and lets the nomad operator api check the scheme directly.


// ClientConfig copies the configuration with a new client address, region, and
@@ -214,6 +224,7 @@ func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config {
if tlsEnabled {
scheme = "https"
}

config := &Config{
Address: fmt.Sprintf("%s://%s", scheme, address),
Region: region,
@@ -223,6 +234,7 @@ func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config {
HttpAuth: c.HttpAuth,
WaitTime: c.WaitTime,
TLSConfig: c.TLSConfig.Copy(),
url: copyURL(c.url),
}

// Update the tls server name for connecting to a client
@@ -278,9 +290,30 @@ func (t *TLSConfig) Copy() *TLSConfig {
return nt
}

// defaultUDSClient creates a unix domain socket client. Errors return a nil
// http.Client, which is tested for in ConfigureTLS. This function expects that
// the Address has already been parsed into the config.url value.
func defaultUDSClient(config *Config) *http.Client {

config.Address = "http://127.0.0.1"

httpClient := &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", config.url.EscapedPath())
},
},
}
return defaultClient(httpClient)
}

func defaultHttpClient() *http.Client {
httpClient := cleanhttp.DefaultPooledClient()
transport := httpClient.Transport.(*http.Transport)
return defaultClient(httpClient)
}

func defaultClient(c *http.Client) *http.Client {
transport := c.Transport.(*http.Transport)
transport.TLSHandshakeTimeout = 10 * time.Second
transport.TLSClientConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
@@ -290,7 +323,7 @@ func defaultHttpClient() *http.Client {
// well yet: https://github.com/gorilla/websocket/issues/417
transport.ForceAttemptHTTP2 = false

return httpClient
return c
}

// DefaultConfig returns a default configuration for the client
@@ -467,18 +500,29 @@ type Client struct {

// NewClient returns a new client
func NewClient(config *Config) (*Client, error) {
var err error
// bootstrap the config
defConfig := DefaultConfig()

if config.Address == "" {
config.Address = defConfig.Address
} else if _, err := url.Parse(config.Address); err != nil {
}

// we have to test the address that comes from DefaultConfig, because it
// could be the value of NOMAD_ADDR which is applied without testing
if config.url, err = url.Parse(config.Address); err != nil {
return nil, fmt.Errorf("invalid address '%s': %v", config.Address, err)
}

httpClient := config.HttpClient
if httpClient == nil {
httpClient = defaultHttpClient()
switch {
case config.url.Scheme == "unix":
httpClient = defaultUDSClient(config) // mutates config
default:
httpClient = defaultHttpClient()
}

if err := ConfigureTLS(httpClient, config.TLSConfig); err != nil {
return nil, err
}
@@ -760,24 +804,32 @@ func (r *request) toHTTP() (*http.Request, error) {

// newRequest is used to create a new request
func (c *Client) newRequest(method, path string) (*request, error) {
base, _ := url.Parse(c.config.Address)

u, err := url.Parse(path)
if err != nil {
return nil, err
}

r := &request{
config: &c.config,
method: method,
url: &url.URL{
Scheme: base.Scheme,
User: base.User,
Host: base.Host,
Scheme: c.config.url.Scheme,
User: c.config.url.User,
Host: c.config.url.Host,
Path: u.Path,
RawPath: u.RawPath,
},
header: make(http.Header),
params: make(map[string][]string),
}

// fixup socket paths
if r.url.Scheme == "unix" {
r.url.Scheme = "http"
r.url.Host = "127.0.0.1"
}

if c.config.Region != "" {
r.params.Set("region", c.config.Region)
}
@@ -1210,3 +1262,16 @@ func (o *WriteOptions) WithContext(ctx context.Context) *WriteOptions {
o2.ctx = ctx
return o2
}

// copyURL makes a deep copy of a net/url.URL
func copyURL(u1 *url.URL) *url.URL {
if u1 == nil {
return nil
}
o := *u1
if o.User != nil {
ou := *u1.User
o.User = &ou
}
return &o
}
10 changes: 9 additions & 1 deletion api/raw.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,10 @@

package api

import "io"
import (
"io"
"net/http"
)

// Raw can be used to do raw queries against custom endpoints
type Raw struct {
@@ -39,3 +42,8 @@ func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*W
func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
return raw.c.delete(endpoint, nil, out, q)
}

// Do uses the raw client's internal httpClient to process the request
func (raw *Raw) Do(req *http.Request) (*http.Response, error) {
return raw.c.httpClient.Do(req)
}
54 changes: 29 additions & 25 deletions command/operator_api.go
Original file line number Diff line number Diff line change
@@ -5,17 +5,14 @@ package command

import (
"bytes"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/nomad/api"
"github.com/posener/complete"
)
@@ -138,11 +135,18 @@ func (c *OperatorAPICommand) Run(args []string) int {

// By default verbose func is a noop
verbose := func(string, ...interface{}) {}
verboseSocket := func(*api.Config, string, ...interface{}) {}

if c.verboseFlag {
verbose = func(format string, a ...interface{}) {
// Use Warn instead of Info because Info goes to stdout
c.Ui.Warn(fmt.Sprintf(format, a...))
}
verboseSocket = func(cfg *api.Config, format string, a ...interface{}) {
if cfg.URL() != nil && cfg.URL().Scheme == "unix" {
c.Ui.Warn(fmt.Sprintf(format, a...))
}
}
}

// Opportunistically read from stdin and POST unless method has been
@@ -166,11 +170,13 @@ func (c *OperatorAPICommand) Run(args []string) int {
c.method = "GET"
}

// NewClient mutates or validates Config.Address, so call it to match
// the behavior of other commands. Typically these are called as a combination
// using c.Client(); however, we need access to the client configuration
// to build the corresponding curl output.
config := c.clientConfig()
apiC, err := api.NewClient(config)

// NewClient mutates or validates Config.Address, so call it to match
// the behavior of other commands.
_, err := api.NewClient(config)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %v", err))
return 1
@@ -198,30 +204,21 @@ func (c *OperatorAPICommand) Run(args []string) int {
c.Ui.Output(out)
return 0
}

// Re-implement a big chunk of api/api.go since we don't export it.
client := cleanhttp.DefaultClient()
transport := client.Transport.(*http.Transport)
transport.TLSHandshakeTimeout = 10 * time.Second
transport.TLSClientConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
}

if err := api.ConfigureTLS(client, config.TLSConfig); err != nil {
c.Ui.Error(fmt.Sprintf("Error configuring TLS: %v", err))
return 1
}
apiR := apiC.Raw()

setQueryParams(config, path)

verbose("> %s %s", c.method, path)
verboseSocket(config, fmt.Sprintf("* Trying %s...", config.URL().EscapedPath()))

req, err := http.NewRequest(c.method, path.String(), c.body)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error making request: %v", err))
return 1
}

h := req.URL.Hostname()
verboseSocket(config, fmt.Sprintf("* Connected to %s (%s)", h, config.URL().EscapedPath()))
verbose("> %s %s %s", c.method, req.URL.Path, req.Proto)

// Set headers from command line
req.Header = headerFlags.headers

@@ -244,11 +241,11 @@ func (c *OperatorAPICommand) Run(args []string) int {
verbose("> %s: %s", k, v)
}
}

verbose(">")
verbose("* Sending request and receiving response...")

// Do the request!
resp, err := client.Do(req)
resp, err := apiR.Do(req)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error performing request: %v", err))
return 1
@@ -310,14 +307,19 @@ func (c *OperatorAPICommand) apiToCurl(config *api.Config, headers http.Header,
parts = append(parts, "--verbose")
}

if c.method != "" {
// add method flags. Note: curl output complains about `-X GET`
if c.method != "" && c.method != http.MethodGet {
parts = append(parts, "-X "+c.method)
}

if c.body != nil {
parts = append(parts, "--data-binary @-")
}

if config.URL().EscapedPath() != "" {
parts = append(parts, fmt.Sprintf("--unix-socket %q", config.URL().EscapedPath()))
}

if config.TLSConfig != nil {
parts = tlsToCurl(parts, config.TLSConfig)

@@ -412,7 +414,9 @@ func pathToURL(config *api.Config, path string) (*url.URL, error) {

// If the scheme is missing from the path, it likely means the path is just
// the HTTP handler path. Attempt to infer this.
if !strings.HasPrefix(path, "http://") && !strings.HasPrefix(path, "https://") {
if !strings.HasPrefix(path, "http://") &&
!strings.HasPrefix(path, "https://") &&
!strings.HasPrefix(path, "unix://") {
scheme := "http"

// If the user has set any TLS configuration value, this is a good sign
74 changes: 74 additions & 0 deletions command/operator_api_test.go
Original file line number Diff line number Diff line change
@@ -5,10 +5,13 @@ package command

import (
"bytes"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path"
"testing"
"time"

@@ -220,3 +223,74 @@ func TestOperatorAPICommand_ContentLength(t *testing.T) {
t.Fatalf("timed out waiting for request")
}
}

func makeSocketListener(t *testing.T) (net.Listener, string) {
td := os.TempDir() // testing.TempDir() on macOS makes paths that are too long
sPath := path.Join(td, t.Name()+".sock")
os.Remove(sPath) // git rid of stale ones now.

t.Cleanup(func() { os.Remove(sPath) })

// Create a Unix domain socket and listen for incoming connections.
socket, err := net.Listen("unix", sPath)
must.NoError(t, err)
return socket, sPath
}

// TestOperatorAPICommand_Socket tests that requests can be routed over a unix
// domain socket
//
// Can not be run in parallel as it modifies the environment.
func TestOperatorAPICommand_Socket(t *testing.T) {

ping := make(chan struct{}, 1)
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ping <- struct{}{}
}))
sock, sockPath := makeSocketListener(t)
ts.Listener = sock
ts.Start()
defer ts.Close()

// Setup command.
ui := cli.NewMockUi()
cmd := &OperatorAPICommand{Meta: Meta{Ui: ui}}

tcs := []struct {
name string
env map[string]string
args []string
exitCode int
}{
{
name: "nomad_addr",
env: map[string]string{"NOMAD_ADDR": "unix://" + sockPath},
args: []string{"/v1/jobs"},
exitCode: 0,
},
{
name: "nomad_addr opaques host",
env: map[string]string{"NOMAD_ADDR": "unix://" + sockPath},
args: []string{"http://example.com/v1/jobs"},
exitCode: 0,
},
}
for i, tc := range tcs {
t.Run(fmt.Sprintf("%v_%s", i+1, t.Name()), func(t *testing.T) {
tc := tc
for k, v := range tc.env {
t.Setenv(k, v)
}

exitCode := cmd.Run(tc.args)
must.Eq(t, tc.exitCode, exitCode, must.Sprint(ui.ErrorWriter.String()))

select {
case l := <-ping:
must.Eq(t, struct{}{}, l)
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for request")
}
})
}
}