Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added PeerManagement.Type = 'member-list' #492

Closed
wants to merge 12 commits into from
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ test_redimem
!/cmd/test_redimem

dockerize*

# IDE configs
.idea/
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

COPY --from=builder /app/refinery /usr/bin/refinery

ENTRYPOINT ["/usr/bin/refinery"]
22 changes: 13 additions & 9 deletions app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build all || race
// +build all race

package app
Expand All @@ -21,10 +22,6 @@ import (

"github.com/facebookgo/inject"
"github.com/facebookgo/startstop"
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
"gopkg.in/alexcesaro/statsd.v2"

"github.com/honeycombio/libhoney-go"
"github.com/honeycombio/libhoney-go/transmission"
"github.com/honeycombio/refinery/collect"
Expand All @@ -35,6 +32,9 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
"gopkg.in/alexcesaro/statsd.v2"
)

const legacyAPIKey = "c9945edf5d245834089a1bd6cc9ad01e"
Expand Down Expand Up @@ -98,6 +98,10 @@ func (p *testPeers) GetPeers() ([]string, error) {
func (p *testPeers) RegisterUpdatedPeersCallback(callback func()) {
}

func (p *testPeers) Close(ctx context.Context) error {
return nil
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
Expand All @@ -124,7 +128,7 @@ func newStartedApp(

var err error
if peers == nil {
peers, err = peer.NewPeers(c)
peers, err = peer.NewPeers(context.Background(), c)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -262,8 +266,8 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) {

var out bytes.Buffer
a, graph := newStartedApp(t, &transmission.WriterSender{W: &out}, 10500, nil, false)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) {return "test", nil})
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) {return "test", nil})
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })

// Send a root span, it should be sent in short order.
req := httptest.NewRequest(
Expand Down Expand Up @@ -561,8 +565,8 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
basePort := 15000 + (i * 2)
senders[i] = &transmission.MockSender{}
app, graph := newStartedApp(t, senders[i], basePort, peers, false)
app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil})
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil})
app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
apps[i] = app
defer startstop.Stop(graph.Objects(), nil)

Expand Down
17 changes: 10 additions & 7 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -85,13 +86,6 @@ func main() {
os.Exit(1)
}

peers, err := peer.NewPeers(c)

if err != nil {
fmt.Printf("unable to load peers: %+v\n", err)
os.Exit(1)
}

// get desired implementation for each dependency to inject
lgr := logger.GetLoggerImplementation(c)
collector := collect.GetCollectorImplementation(c)
Expand All @@ -110,6 +104,15 @@ func main() {
os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), c.GetPeerTimeout())
defer cancel()
peers, err := peer.NewPeers(ctx, c)

if err != nil {
fmt.Printf("unable to load peers: %+v\n", err)
os.Exit(1)
}

// upstreamTransport is the http transport used to send things on to Honeycomb
upstreamTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand Down
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ type Config interface {

GetDatasetPrefix() string

// GetPeerTimeout returns the current timeout that is applied to all remote peer operations
GetPeerTimeout() time.Duration

// GetMemberListListenAddr returns the `address:port` the member-list will listen on to gossip
// with other members and notify of peer changes
GetMemberListListenAddr() string

// GetMemberListAdvertiseAddr returns the address which member-list will announce to other
// members in the member-list.
GetMemberListAdvertiseAddr() string

// GetMemberListKnownMembers returns a list of `host:port` members this instance can
// contact to discover other members.
GetMemberListKnownMembers() []string

// GetQueryAuthToken returns the token that must be used to access the /query endpoints
GetQueryAuthToken() string
}
42 changes: 39 additions & 3 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/go-playground/validator"
libhoney "github.com/honeycombio/libhoney-go"
"github.com/honeycombio/libhoney-go"
"github.com/sirupsen/logrus"
viper "github.com/spf13/viper"
"github.com/spf13/viper"
)

type fileConfig struct {
Expand Down Expand Up @@ -82,7 +82,7 @@ type HoneycombMetricsConfig struct {
}

type PeerManagementConfig struct {
Type string `validate:"required,oneof= file redis"`
Type string `validate:"required,oneof= file redis member-list"`
Peers []string `validate:"dive,url"`
RedisHost string
RedisUsername string
Expand All @@ -92,6 +92,10 @@ type PeerManagementConfig struct {
IdentifierInterfaceName string
UseIPV6Identifier bool
RedisIdentifier string
Timeout time.Duration
MemberListListenAddr string
MemberListAdvertiseAddr string
MemberListKnownMembers []string
}

// NewConfig creates a new config struct
Expand All @@ -102,18 +106,22 @@ func NewConfig(config, rules string, errorCallback func(error)) (Config, error)
c.BindEnv("PeerManagement.RedisHost", "REFINERY_REDIS_HOST")
c.BindEnv("PeerManagement.RedisUsername", "REFINERY_REDIS_USERNAME")
c.BindEnv("PeerManagement.RedisPassword", "REFINERY_REDIS_PASSWORD")
c.BindEnv("PeerManagement.MemberListAdvertiseAddr", "REFINERY_MEMBER_LIST_ADVERTISE_ADDR")
c.BindEnv("PeerManagement.MemberListListenAddr", "REFINERY_MEMBER_LIST_LISTEN_ADDR")
c.BindEnv("HoneycombLogger.LoggerAPIKey", "REFINERY_HONEYCOMB_API_KEY")
c.BindEnv("HoneycombMetrics.MetricsAPIKey", "REFINERY_HONEYCOMB_API_KEY")
c.BindEnv("QueryAuthToken", "REFINERY_QUERY_AUTH_TOKEN")
c.SetDefault("ListenAddr", "0.0.0.0:8080")
c.SetDefault("PeerListenAddr", "0.0.0.0:8081")
c.SetDefault("MemberListListenAddr", "0.0.0.0:8519")
c.SetDefault("CompressPeerCommunication", true)
c.SetDefault("APIKeys", []string{"*"})
c.SetDefault("PeerManagement.Peers", []string{"http://127.0.0.1:8081"})
c.SetDefault("PeerManagement.Type", "file")
c.SetDefault("PeerManagement.UseTLS", false)
c.SetDefault("PeerManagement.UseTLSInsecure", false)
c.SetDefault("PeerManagement.UseIPV6Identifier", false)
c.SetDefault("PeerManagement.Timeout", 5*time.Second)
c.SetDefault("HoneycombAPI", "https://api.honeycomb.io")
c.SetDefault("Logger", "logrus")
c.SetDefault("LoggingLevel", "debug")
Expand Down Expand Up @@ -785,6 +793,34 @@ func (f *fileConfig) GetDatasetPrefix() string {
return f.conf.DatasetPrefix
}

func (f *fileConfig) GetPeerTimeout() time.Duration {
f.mux.RLock()
defer f.mux.RUnlock()

return f.conf.PeerManagement.Timeout
}

func (f *fileConfig) GetMemberListListenAddr() string {
f.mux.RLock()
defer f.mux.RUnlock()

return f.conf.PeerManagement.MemberListListenAddr
}

func (f *fileConfig) GetMemberListAdvertiseAddr() string {
f.mux.RLock()
defer f.mux.RUnlock()

return f.conf.PeerManagement.MemberListAdvertiseAddr
}

func (f *fileConfig) GetMemberListKnownMembers() []string {
f.mux.RLock()
defer f.mux.RUnlock()

return f.conf.PeerManagement.MemberListKnownMembers
}

func (f *fileConfig) GetQueryAuthToken() string {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down
52 changes: 44 additions & 8 deletions config/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type MockConfig struct {
AddHostMetadataToTrace bool
EnvironmentCacheTTL time.Duration
DatasetPrefix string
PeerTimeout time.Duration
MemberListListenAddr string
MemberListAdvertiseAddr string
MemberListKnownMembers []string
QueryAuthToken string

Mux sync.RWMutex
Expand Down Expand Up @@ -334,18 +338,50 @@ func (m *MockConfig) GetAddHostMetadataToTrace() bool {
return m.AddHostMetadataToTrace
}

func (f *MockConfig) GetEnvironmentCacheTTL() time.Duration {
f.Mux.RLock()
defer f.Mux.RUnlock()
func (m *MockConfig) GetEnvironmentCacheTTL() time.Duration {
m.Mux.RLock()
defer m.Mux.RUnlock()

return f.EnvironmentCacheTTL
return m.EnvironmentCacheTTL
}

func (f *MockConfig) GetDatasetPrefix() string {
f.Mux.RLock()
defer f.Mux.RUnlock()
func (m *MockConfig) GetDatasetPrefix() string {
m.Mux.RLock()
defer m.Mux.RUnlock()

return m.DatasetPrefix
}

func (m *MockConfig) GetPeerTimeout() time.Duration {
m.Mux.RLock()
defer m.Mux.RUnlock()

return m.PeerTimeout
}

func (m *MockConfig) GetMemberListListenAddr() string {
m.Mux.RLock()
defer m.Mux.RUnlock()

return m.MemberListListenAddr
}

func (m *MockConfig) GetMemberListAdvertiseAddr() string {
m.Mux.RLock()
defer m.Mux.RUnlock()

if m.MemberListAdvertiseAddr == "" {
return m.MemberListListenAddr
}

return m.MemberListAdvertiseAddr
}

func (m *MockConfig) GetMemberListKnownMembers() []string {
m.Mux.RLock()
defer m.Mux.RUnlock()

return f.DatasetPrefix
return m.MemberListKnownMembers
}

func (f *MockConfig) GetQueryAuthToken() string {
Expand Down
52 changes: 50 additions & 2 deletions config_complete.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ Metrics = "honeycomb"

# IdentifierInterfaceName is optional. By default, when using RedisHost, Refinery will use
# the local hostname to identify itself to other peers in Redis. If your environment
# requires that you use IPs as identifiers (for example, if peers can't resolve eachother
# requires that you use IPs as identifiers (for example, if peers can't resolve each other
# by name), you can specify the network interface that Refinery is listening on here.
# Refinery will use the first unicast address that it finds on the specified network
# interface as its identifier.
Expand All @@ -203,11 +203,59 @@ Metrics = "honeycomb"

# RedisIdentifier is optional. By default, when using RedisHost, Refinery will use
# the local hostname to identify itself to other peers in Redis. If your environment
# requires that you use IPs as identifiers (for example, if peers can't resolve eachother
# requires that you use IPs as identifiers (for example, if peers can't resolve each other
# by name), you can specify the exact identifier (IP address, etc) to use here.
# Not eligible for live reload. Overrides IdentifierInterfaceName, if both are set.
# RedisIdentifier = "192.168.1.1"

# Timeout is optional. By default, when using RedisHost, Refinery will timeout
# after 5s when communicating with Redis.
# Timeout = "5s"

# [PeerManagement]
# Member-List is a peer discovery system that requires no external third-party system
# (except maybe DNS) to discover and monitor the health of other Refinery peers in the
# cluster. It uses a different address from PeerListenAddr to exchange gossip messages
# within the cluster. See https://github.com/hashicorp/memberlist for details
# Type = "member-list"

# MemberListListenAddr is optional when using Type = "member-list". Refinery will use
# this `address:port` to listen to gossip messages from other Refinery instances in
# the cluster which will be used to notify when Refinery instances join or leave the
# member-list cluster.
# MemberListListenAddr = "0.0.0.0:8519"

# MemberListKnownMembers is required when using Type = 'member-list". Refinery will use
# this list of `host:port` entries as the initial seed nodes such that other instances
# of refinery can discover the rest of the cluster through gossip messages. This is typically
# a list or a single DNS name which will return the address of a running instance of refinery.
# MemberListKnownMembers = [
# "redis.service.us-east-1.consul:8519",
# "192.168.1.1:8519",
# "refinery-1231:8519"
# ]

# MemberListAdvertiseAddr is optional when using Type = "member-list". Refinery will use
# this `address:port` to announce to other members in the member-list. Other Refinery
# member-list members will connect to this instance to share gossip messages. Use this
# when NAT traversal is required by your network configuration. If no value is supplied
# Refinery will attempt to discover the local hostname or public ip address using the
# IdentifierInterfaceName value if set.
# MemberListAdvertiseAddr = "192.168.1.1:8519"

# IdentifierInterfaceName is optional. Refinery will use the local hostname to identify
# itself to other peers in the "member-list" cluster. If your environment requires that
# you use IPs as identifiers (for example, if peers can't resolve each other by name),
# you can specify the network interface that Refinery is listening on here. Refinery will
# use the first unicast address that it finds on the specified network interface as its
# identifier.
# Not eligible for live reload.
# IdentifierInterfaceName = "eth0"

# Timeout is optional. By default Refinery will timeout after 5s when initially joining
# the member-list cluster.
# Timeout = "5s"

#########################
## In-Memory Collector ##
#########################
Expand Down
Loading