Skip to content

Commit

Permalink
thriftbp: Support unix domain socket on thrift clients
Browse files Browse the repository at this point in the history
  • Loading branch information
fishy committed May 7, 2024
1 parent 3868423 commit 059c28e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 6 deletions.
33 changes: 27 additions & 6 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strconv"
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
Expand Down Expand Up @@ -61,8 +62,19 @@ type ClientPoolConfig struct {
// ImageUploadService -> image-upload
ServiceSlug string `yaml:"serviceSlug"`

// Addr is the address of a thrift service. Addr must be in the format
// "${host}:${port}"
// Addr is the address of a thrift service.
//
// Addr must be in one of the following formats:
//
// - "${host}:${port}" for TCP
// - "unix://${path}" for Unix Domain Socket
//
// NOTE: When using unix domain socket with an absolute path, there must be 3
// slashes after "unix:", with the third slash being the root. For example,
// "unix:///var/run/thrift.socket" means Unix Domain Socket to
// "/var/run/thrift.socket" (an absolute path), while
// "unix://var/run/thrift.socket" means Unix Domain Socket to
// "var/run/thrift.socket" (a relative path).
Addr string `yaml:"addr"`

// InitialConnections is the desired inital number of thrift connections
Expand Down Expand Up @@ -561,17 +573,26 @@ func newClient(
return nil, nil, fmt.Errorf("thriftbp: error getting next address for new Thrift client: %w", err)
}

transport := &countingDelegateTransport{
TTransport: thrift.NewTSocketConf(addr, cfg),
var transport thrift.TTransport
if path, ok := strings.CutPrefix(addr, "unix://"); ok {
transport = thrift.NewTSocketFromAddrConf(&net.UnixAddr{
Net: "unix",
Name: path,
}, cfg)
} else {
transport = thrift.NewTSocketConf(addr, cfg)
}
cdt := &countingDelegateTransport{
TTransport: transport,
}
if err := transport.Open(); err != nil {
if err := cdt.Open(); err != nil {
return nil, nil, fmt.Errorf("thriftbp: error opening TSocket for new Thrift client: %w", err)
}

return thrift.NewTStandardClient(
protoFactory.GetProtocol(transport),
protoFactory.GetProtocol(transport),
), transport, nil
), cdt, nil
}, maxConnectionAge, maxConnectionAgeJitter, slug)
}

Expand Down
56 changes: 56 additions & 0 deletions thriftbp/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"path/filepath"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -303,3 +304,58 @@ func TestInitialConnectionsFallback(t *testing.T) {
})
}
}

func TestUDS(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "socket")

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

store := newSecretsStore(t)
t.Cleanup(func() {
store.Close()
})

handler := thriftHostnameHandler{}
server, err := thriftbp.NewServer(thriftbp.ServerConfig{
Processor: baseplatethrift.NewBaseplateServiceV2Processor(&handler),
Socket: thrift.NewTServerSocketFromAddrTimeout(&net.UnixAddr{
Net: "unix",
Name: path,
}, 0),
})
if err != nil {
t.Fatal(err)
}

bp := baseplate.NewTestBaseplate(baseplate.NewTestBaseplateArgs{
Store: store,
})

handler.server = thriftbp.ApplyBaseplate(bp, server)
go server.Serve()
// give the server a little time to start serving
time.Sleep(10 * time.Millisecond)
t.Cleanup(func() {
server.Stop()
})

pool, err := thriftbp.NewBaseplateClientPool(thriftbp.ClientPoolConfig{
ServiceSlug: "test",
Addr: "unix://" + path,
MaxConnections: 10,
ThriftHostnameHeader: "my-thrift-header",
})
if err != nil {
t.Fatalf("Failed to create client pool: %v", err)
}
t.Cleanup(func() {
pool.Close()
})
client := baseplatethrift.NewBaseplateServiceV2Client(pool.TClient())
_, err = client.IsHealthy(ctx, &baseplatethrift.IsHealthyRequest{})
if err != nil {
t.Fatal(err)
}
}

0 comments on commit 059c28e

Please sign in to comment.