Skip to content

Commit

Permalink
Share mgw addrs in peering stream if needed
Browse files Browse the repository at this point in the history
This commit adds handling so that the replication stream considers
whether the user intends to peer through mesh gateways.

The subscription will return server or mesh gateway addresses depending
on the mesh configuration setting. These watches can be updated at
runtime by modifying the mesh config entry.
  • Loading branch information
freddygv committed Oct 3, 2022
1 parent 4ff9d47 commit a8c4d6b
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 76 deletions.
3 changes: 2 additions & 1 deletion agent/consul/peering_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) {
}

_, cfg := testServerConfig(t)
cfg.GRPCTLSPort = 8505
cfg.GRPCTLSPort = freeport.GetOne(t)

srv, err := newServer(t, cfg)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion agent/consul/servercert/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func (m *CertManager) watchServerToken(ctx context.Context) {

// Cancel existing the leaf cert watch and spin up new one any time the server token changes.
// The watch needs the current token as set by the leader since certificate signing requests go to the leader.
fmt.Println("canceling and resetting")
cancel()
notifyCtx, cancel = context.WithCancel(ctx)

Expand Down
1 change: 1 addition & 0 deletions agent/grpc-external/services/peerstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,6 @@ type StateStore interface {
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *acl.EnterpriseMeta) (uint64, structs.ConfigEntry, error)
AbandonCh() <-chan struct{}
}
3 changes: 0 additions & 3 deletions agent/grpc-external/services/peerstream/stream_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
continue
}

case strings.HasPrefix(update.CorrelationID, subMeshGateway):
// TODO(Peering): figure out how to sync this separately

case update.CorrelationID == subCARoot:
resp, err = makeCARootsResponse(update)
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions agent/grpc-external/services/peerstream/subscription_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,25 @@ func (m *subscriptionManager) syncViaBlockingQuery(
ws.Add(store.AbandonCh())
ws.Add(ctx.Done())

if result, err := queryFn(ctx, store, ws); err != nil {
if result, err := queryFn(ctx, store, ws); err != nil && ctx.Err() == nil {
logger.Error("failed to sync from query", "error", err)

} else {
// Block for any changes to the state store.
updateCh <- cache.UpdateEvent{
CorrelationID: correlationID,
Result: result,
select {
case <-ctx.Done():
return
case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}:
}
ws.WatchCtx(ctx)
}

if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.Error("failed to wait before re-trying sync", "error", err)
// Block for any changes to the state store.
ws.WatchCtx(ctx)
}

select {
case <-ctx.Done():
err := waiter.Wait(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
default:
} else if err != nil {
logger.Error("failed to wait before re-trying sync", "error", err)
}
}
}
135 changes: 112 additions & 23 deletions agent/grpc-external/services/peerstream/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/golang/protobuf/proto"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
Expand Down Expand Up @@ -247,16 +251,10 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti

pending := &pendingPayload{}

// Directly replicate information about our mesh gateways to the consuming side.
// TODO(peering): should we scrub anything before replicating this?
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
return err
}

if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies.
for chainName, info := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
m.collectPendingEventForDiscoveryChain(ctx, state, pending, chainName, info)
}
}

Expand Down Expand Up @@ -490,7 +488,7 @@ func (m *subscriptionManager) syncDiscoveryChains(

state.connectServices[chainName] = info

m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
m.collectPendingEventForDiscoveryChain(ctx, state, pending, chainName, info)
}

// if it was dropped, try to emit an DELETE event
Expand All @@ -517,7 +515,7 @@ func (m *subscriptionManager) syncDiscoveryChains(
}
}

func (m *subscriptionManager) emitEventForDiscoveryChain(
func (m *subscriptionManager) collectPendingEventForDiscoveryChain(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
Expand Down Expand Up @@ -738,32 +736,118 @@ func (m *subscriptionManager) notifyServerAddrUpdates(
ctx context.Context,
updateCh chan<- cache.UpdateEvent,
) {
// Wait until this is subscribed-to.
// Wait until server address updates are subscribed-to.
select {
case <-m.serverAddrsSubReady:
case <-ctx.Done():
return
}

configNotifyCh := m.notifyMeshConfigUpdates(ctx)

// Intentionally initialized to empty values.
// These are set after the first mesh config entry update arrives.
var queryCtx context.Context
cancel := func() {}

useGateways := false
for {
select {
case <-ctx.Done():
cancel()
return

case event := <-configNotifyCh:
entry, ok := event.Result.(*structs.MeshConfigEntry)
if event.Result != nil && !ok {
m.logger.Error(fmt.Sprintf("saw unexpected type %T for mesh config entry: falling back to pushing direct server addresses", event.Result))
}
if entry != nil && entry.Peering != nil && entry.Peering.PeerThroughMeshGateways {
useGateways = true
} else {
useGateways = false
}

// Cancel and re-set watches based on the updated config entry.
cancel()

queryCtx, cancel = context.WithCancel(ctx)

if useGateways {
go m.notifyServerMeshGatewayAddresses(queryCtx, updateCh)
} else {
go m.ensureServerAddrSubscription(queryCtx, updateCh)
}
}
}
}

func (m *subscriptionManager) notifyMeshConfigUpdates(ctx context.Context) <-chan cache.UpdateEvent {
const meshConfigWatch = "mesh-config-entry"

notifyCh := make(chan cache.UpdateEvent, 1)
go m.syncViaBlockingQuery(ctx, meshConfigWatch, func(ctx_ context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
_, rawEntry, err := store.ConfigEntry(ws, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
if err != nil {
return nil, fmt.Errorf("failed to get mesh config entry: %w", err)

}
return rawEntry, nil
}, meshConfigWatch, notifyCh)

return notifyCh
}

func (m *subscriptionManager) notifyServerMeshGatewayAddresses(ctx context.Context, updateCh chan<- cache.UpdateEvent) {
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
_, nodes, err := store.ServiceDump(ws, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
if err != nil {
return nil, fmt.Errorf("failed to watch mesh gateways services for servers: %w", err)
}

var gatewayAddrs []string
for _, csn := range nodes {
_, addr, port := csn.BestAddress(true)
gatewayAddrs = append(gatewayAddrs, ipaddr.FormatAddressPort(addr, port))
}
if len(gatewayAddrs) == 0 {
return nil, errors.New("configured to peer through mesh gateways but no mesh gateways are registered")
}

// We may return an empty list if there are no gateway addresses.
return &pbpeering.PeeringServerAddresses{
Addresses: gatewayAddrs,
}, nil
}, subServerAddrs, updateCh)
}

func (m *subscriptionManager) ensureServerAddrSubscription(ctx context.Context, updateCh chan<- cache.UpdateEvent) {
waiter := &retry.Waiter{
MinFailures: 1,
Factor: 500 * time.Millisecond,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}

logger := m.logger.With("queryType", "server-addresses")

var idx uint64
// TODO(peering): retry logic; fail past a threshold
for {
var err error
// Typically, this function will block inside `m.subscribeServerAddrs` and only return on error.
// Errors are logged and the watch is retried.

idx, err = m.subscribeServerAddrs(ctx, idx, updateCh)
if errors.Is(err, stream.ErrSubForceClosed) {
m.logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")
logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")

} else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
m.logger.Warn("failed to subscribe to server addresses, will attempt resume", "error", err.Error())
} else {
m.logger.Trace(err.Error())
}
logger.Warn("failed to subscribe to server addresses, will attempt resume", "error", err.Error())

select {
case <-ctx.Done():
} else if err != nil {
logger.Trace(err.Error())
return
}
if err := waiter.Wait(ctx); err != nil {
return
default:
}
}
}
Expand Down Expand Up @@ -826,17 +910,22 @@ func (m *subscriptionManager) subscribeServerAddrs(
grpcAddr := srv.Address + ":" + strconv.Itoa(srv.ExtGRPCPort)
serverAddrs = append(serverAddrs, grpcAddr)
}

if len(serverAddrs) == 0 {
m.logger.Warn("did not find any server addresses with external gRPC ports to publish")
continue
}

updateCh <- cache.UpdateEvent{
u := cache.UpdateEvent{
CorrelationID: subServerAddrs,
Result: &pbpeering.PeeringServerAddresses{
Addresses: serverAddrs,
},
}

select {
case <-ctx.Done():
return 0, ctx.Err()
case updateCh <- u:
}
}
}
Loading

0 comments on commit a8c4d6b

Please sign in to comment.