Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not reap the world on restart #2

Open
wants to merge 5 commits into
base: release/0.49.x-invoca
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func TestNamespacesNodeServicesClient_NodeServices(t *testing.T) {
}
t.Run(name, func(tt *testing.T) {
require := require.New(tt)
svr, err := testutil.NewTestServerConfigT(tt, nil)
svr, err := testutil.NewTestServerConfigT(tt, InvocaSpecificConsulConfig)
require.NoError(err)
defer svr.Stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestPreNamespacesNodeServicesClient_NodeServices(t *testing.T) {
for name, c := range cases {
t.Run(name, func(tt *testing.T) {
require := require.New(tt)
svr, err := testutil.NewTestServerConfigT(tt, nil)
svr, err := testutil.NewTestServerConfigT(tt, InvocaSpecificConsulConfig)
require.NoError(err)
defer svr.Stop()

Expand Down
21 changes: 21 additions & 0 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type ServiceResource struct {
// Ctx is used to cancel processes kicked off by ServiceResource.
Ctx context.Context

// waitForServiceSnapshotToBePopulatedCh is the chan used to signal that a snapshot of services has synced
// reaping of services in consul should wait for this to allow best effort population of "view of services"
WaitForInitialServicesToBePopulatedCh chan bool

// AllowK8sNamespacesSet is a set of k8s namespaces to explicitly allow for
// syncing. It supports the special character `*` which indicates that
// all k8s namespaces are eligible unless explicitly denied. This filter
Expand Down Expand Up @@ -816,6 +820,23 @@ func (t *ServiceResource) addPrefixAndK8SNamespace(name, namespace string) strin
return name
}

func (t *ServiceResource) PopulateInitialServices() {
defer close(t.WaitForInitialServicesToBePopulatedCh)

// Open question, is this okay when there are thousands of Services in the cluster?
allTheServices, err := t.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{})
if err != nil {
// TODO - Is it valid to start reaping if we don't get all the Services initial to populate our view?
t.Log.Warn("Failed to list all services to preform initial sync", "err", err)
return
}

for _, service := range allTheServices.Items {
key := fmt.Sprintf("%s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
t.Upsert(key, &service)
}
}

// consulHealthCheckID deterministically generates a health check ID based on service ID and Kubernetes namespace.
func consulHealthCheckID(k8sNS string, serviceID string) string {
return fmt.Sprintf("%s/%s", k8sNS, serviceID)
Expand Down
19 changes: 18 additions & 1 deletion control-plane/catalog/to-consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type ConsulSyncer struct {
lock sync.Mutex
once sync.Once

// WaitForServiceSnapshotToBePopulatedCh is the chan used to wait for a snapshot of services to be
// populated at startup before to starting to reap services in consul that are no longer needed
WaitForServiceSnapshotToBePopulatedCh chan bool

// initialSync is used to ensure that we have received our initial list
// of services before we start reaping services. When it is closed,
// the initial sync is complete.
Expand Down Expand Up @@ -130,7 +134,10 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) {

// Signal that the initial sync is complete and our maps have been populated.
// We can now safely reap untracked services.
s.initialSyncOnce.Do(func() { close(s.initialSync) })
s.initialSyncOnce.Do(func() {
s.Log.Debug("[Sync] initial sync happened, reaping of services enabled")
close(s.initialSync)
})
}

// Run is the long-running runloop for reconciling the local set of
Expand Down Expand Up @@ -167,6 +174,12 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) {
// populated. If we don't wait, we will reap all services tagged with k8s
// because we have no tracked services in our maps yet.
<-s.initialSync
s.Log.Debug("[watchReapableServices] initial sync has happened")

s.Log.Info("[watchReapableServices] waiting for services to be populated before enabling reaping of services")
<-s.WaitForServiceSnapshotToBePopulatedCh
s.Log.Debug("[watchReapableServices] services have been populated at startup. We can start looking " +
"for no longer needed services in Consul")

opts := &api.QueryOptions{
AllowStale: true,
Expand Down Expand Up @@ -250,6 +263,10 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name, namespace string)
s.Log.Info("starting service watcher", "service-name", name, "service-consul-namespace", namespace)
defer s.Log.Info("stopping service watcher", "service-name", name, "service-consul-namespace", namespace)

s.Log.Debug("[watchService] waiting for services to be populated before enabling watching a service")
<-s.WaitForServiceSnapshotToBePopulatedCh
s.Log.Debug("[watchService] services have been populated at startup, watching")

for {
select {
// Quit if our context is over
Expand Down
6 changes: 3 additions & 3 deletions control-plane/catalog/to-consul/syncer_ent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestConsulSyncer_ConsulNamespaces(t *testing.T) {
t.Parallel()

a, err := testutil.NewTestServerConfigT(t, nil)
a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig)
require.NoError(t, err)
defer a.Stop()

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestConsulSyncer_ConsulNamespaces(t *testing.T) {
func TestConsulSyncer_ReapConsulNamespace(t *testing.T) {
t.Parallel()

a, err := testutil.NewTestServerConfigT(t, nil)
a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig)
require.NoError(t, err)
defer a.Stop()

Expand Down Expand Up @@ -135,7 +135,7 @@ func TestConsulSyncer_ReapConsulNamespace(t *testing.T) {
func TestConsulSyncer_reapServiceInstanceNamespacesEnabled(t *testing.T) {
t.Parallel()

a, err := testutil.NewTestServerConfigT(t, nil)
a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig)
require.NoError(t, err)
defer a.Stop()

Expand Down
13 changes: 9 additions & 4 deletions control-plane/catalog/to-consul/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestConsulSyncer_register(t *testing.T) {
require := require.New(t)

// Set up server, client, syncer
a, err := testutil.NewTestServerConfigT(t, nil)

a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig)
require.NoError(err)
defer a.Stop()

Expand Down Expand Up @@ -72,7 +73,7 @@ func TestConsulSyncer_reapServiceInstance(t *testing.T) {
require := require.New(t)

// Set up server, client, syncer
a, err := testutil.NewTestServerConfigT(t, nil)
a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig)
require.NoError(err)
defer a.Stop()

Expand Down Expand Up @@ -137,7 +138,7 @@ func TestConsulSyncer_reapService(t *testing.T) {
for _, k8sNS := range sourceK8sNamespaceAnnotations {
t.Run(k8sNS, func(tt *testing.T) {
// Set up server, client, syncer
a, err := testutil.NewTestServerConfigT(tt, nil)
a, err := testutil.NewTestServerConfigT(tt, InvocaSpecificConsulConfig)
require.NoError(tt, err)
defer a.Stop()

Expand Down Expand Up @@ -185,7 +186,7 @@ func TestConsulSyncer_reapService(t *testing.T) {
func TestConsulSyncer_noReapingUntilInitialSync(t *testing.T) {
t.Parallel()

a, err := testutil.NewTestServerConfigT(t, nil)
a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig)
require.NoError(t, err)
defer a.Stop()
client, err := api.NewClient(&api.Config{
Expand Down Expand Up @@ -289,6 +290,9 @@ func testConsulSyncer(client *api.Client) (*ConsulSyncer, func()) {
// testConsulSyncerWithConfig starts a consul syncer that can be configured
// prior to starting via the configurator method.
func testConsulSyncerWithConfig(client *api.Client, configurator func(*ConsulSyncer)) (*ConsulSyncer, func()) {
waitForInitialServicesCh := make(chan bool)
close(waitForInitialServicesCh)

s := &ConsulSyncer{
Client: client,
Log: hclog.Default(),
Expand All @@ -299,6 +303,7 @@ func testConsulSyncerWithConfig(client *api.Client, configurator func(*ConsulSyn
ConsulNodeServicesClient: &PreNamespacesNodeServicesClient{
Client: client,
},
WaitForServiceSnapshotToBePopulatedCh: waitForInitialServicesCh,
}
configurator(s)
s.init()
Expand Down
6 changes: 6 additions & 0 deletions control-plane/catalog/to-consul/testing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package catalog

import (
"github.com/hashicorp/consul/sdk/testutil"
"sync"

"github.com/hashicorp/consul/api"
Expand All @@ -27,3 +28,8 @@ func (s *testSyncer) Sync(rs []*api.CatalogRegistration) {
func newTestSyncer() *testSyncer {
return &testSyncer{}
}

// callback to remove the peering key from the config generated, avoids "invalid config key peering"
func InvocaSpecificConsulConfig(cfg *testutil.TestServerConfig) {
cfg.Peering = nil
}
69 changes: 39 additions & 30 deletions control-plane/subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,43 +241,52 @@ func (c *Command) Run(args []string) int {
Client: c.consulClient,
}
}

waitForInitalServicesCh := make(chan bool)

// Build the Consul sync and start it
syncer := &catalogtoconsul.ConsulSyncer{
Client: c.consulClient,
Log: c.logger.Named("to-consul/sink"),
EnableNamespaces: c.flagEnableNamespaces,
CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy,
SyncPeriod: c.flagConsulWritePeriod,
ServicePollPeriod: c.flagConsulWritePeriod * 2,
ConsulK8STag: c.flagConsulK8STag,
ConsulNodeName: c.flagConsulNodeName,
ConsulNodeServicesClient: svcsClient,
Client: c.consulClient,
Log: c.logger.Named("to-consul/sink"),
EnableNamespaces: c.flagEnableNamespaces,
CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy,
SyncPeriod: c.flagConsulWritePeriod,
ServicePollPeriod: c.flagConsulWritePeriod * 2,
ConsulK8STag: c.flagConsulK8STag,
ConsulNodeName: c.flagConsulNodeName,
ConsulNodeServicesClient: svcsClient,
WaitForServiceSnapshotToBePopulatedCh: waitForInitalServicesCh,
}
go syncer.Run(ctx)

resource := catalogtoconsul.ServiceResource{
Log: c.logger.Named("to-consul/source"),
Client: c.clientset,
Syncer: syncer,
Ctx: ctx,
AllowK8sNamespacesSet: allowSet,
DenyK8sNamespacesSet: denySet,
ExplicitEnable: !c.flagK8SDefault,
ClusterIPSync: c.flagSyncClusterIPServices,
LoadBalancerEndpointsSync: c.flagSyncLBEndpoints,
NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType),
ConsulK8STag: c.flagConsulK8STag,
ConsulServicePrefix: c.flagConsulServicePrefix,
AddK8SNamespaceSuffix: c.flagAddK8SNamespaceSuffix,
EnableNamespaces: c.flagEnableNamespaces,
ConsulDestinationNamespace: c.flagConsulDestinationNamespace,
EnableK8SNSMirroring: c.flagEnableK8SNSMirroring,
K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix,
ConsulNodeName: c.flagConsulNodeName,
WaitForInitialServicesToBePopulatedCh: waitForInitalServicesCh,
}

resource.PopulateInitialServices()

// Build the controller and start it
ctl := &controller.Controller{
Log: c.logger.Named("to-consul/controller"),
Resource: &catalogtoconsul.ServiceResource{
Log: c.logger.Named("to-consul/source"),
Client: c.clientset,
Syncer: syncer,
Ctx: ctx,
AllowK8sNamespacesSet: allowSet,
DenyK8sNamespacesSet: denySet,
ExplicitEnable: !c.flagK8SDefault,
ClusterIPSync: c.flagSyncClusterIPServices,
LoadBalancerEndpointsSync: c.flagSyncLBEndpoints,
NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType),
ConsulK8STag: c.flagConsulK8STag,
ConsulServicePrefix: c.flagConsulServicePrefix,
AddK8SNamespaceSuffix: c.flagAddK8SNamespaceSuffix,
EnableNamespaces: c.flagEnableNamespaces,
ConsulDestinationNamespace: c.flagConsulDestinationNamespace,
EnableK8SNSMirroring: c.flagEnableK8SNSMirroring,
K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix,
ConsulNodeName: c.flagConsulNodeName,
},
Log: c.logger.Named("to-consul/controller"),
Resource: &resource,
}

toConsulCh = make(chan struct{})
Expand Down
Loading