From afd76af82d198e2f7b70accfd5c2dca71034f8a8 Mon Sep 17 00:00:00 2001 From: denis-tingaikin Date: Mon, 30 Sep 2024 14:09:13 +0300 Subject: [PATCH] fix update nsm crd use-cases Signed-off-by: denis-tingaikin --- .../chains/registryk8s/registry-k8s_test.go | 78 +++++++++++++++++++ pkg/registry/etcd/ns_server.go | 27 +++---- pkg/registry/etcd/ns_server_test.go | 45 ++++++++++- pkg/registry/etcd/nse_server.go | 1 + pkg/registry/etcd/nse_server_test.go | 46 ++++++++++- 5 files changed, 178 insertions(+), 19 deletions(-) diff --git a/pkg/registry/chains/registryk8s/registry-k8s_test.go b/pkg/registry/chains/registryk8s/registry-k8s_test.go index 2328fec..2a18036 100644 --- a/pkg/registry/chains/registryk8s/registry-k8s_test.go +++ b/pkg/registry/chains/registryk8s/registry-k8s_test.go @@ -42,6 +42,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/ipam/point2pointipam" "github.com/networkservicemesh/sdk/pkg/networkservice/ipam/strictipam" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" registryserver "github.com/networkservicemesh/sdk/pkg/registry" registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client" "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" @@ -400,7 +401,84 @@ func TestNSMGR_InterdomainUseCase(t *testing.T) { _, err = nsc.Close(ctx, conn) require.NoError(t, err) } +func TestNSMGR_HealRegistry(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + const timeout = time.Second * 5 + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + clientSet := fake.NewSimpleClientset() + + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(1). + SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)). + SetNSMgrProxySupplier(nil). + SetRegistryProxySupplier(nil). + Build() + + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + + nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name())) + require.NoError(t, err) + + nseReg := defaultRegistryEndpoint(nsReg.Name) + + counter := new(count.Server) + domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + + request := defaultRequest(nsReg.Name) + + nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + // 1. Restart Registry + domain.Registry.Restart() + + // 2. Check refresh + request.Connection = conn + _, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + // 3. Check new client request + nsc = domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) + + _, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return counter.Requests() >= 3 + }, timeout, time.Second/10) +} + +func defaultRegistryService(name string) *registry.NetworkService { + return ®istry.NetworkService{ + Name: name, + } +} + +func defaultRegistryEndpoint(nsName string) *registry.NetworkServiceEndpoint { + return ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint", + NetworkServiceNames: []string{nsName}, + } +} + +func defaultRequest(nsName string) *networkservice.NetworkServiceRequest { + return &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: uuid.NewString(), + NetworkService: nsName, + Context: &networkservice.ConnectionContext{}, + Labels: make(map[string]string), + }, + } +} func TestNSMGR_FloatingInterdomainUseCase(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t, ignoreKLogDaemon) }) diff --git a/pkg/registry/etcd/ns_server.go b/pkg/registry/etcd/ns_server.go index ff3838b..b9e843f 100644 --- a/pkg/registry/etcd/ns_server.go +++ b/pkg/registry/etcd/ns_server.go @@ -131,7 +131,7 @@ func (n *etcdNSRegistryServer) sendEvent(resp *registry.NetworkServiceResponse) func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.NetworkService) (*registry.NetworkService, error) { meta := metav1.ObjectMeta{ - GenerateName: "nse-", + GenerateName: "netsvc-", Name: request.GetName(), Namespace: n.ns, } @@ -144,17 +144,18 @@ func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.N metav1.CreateOptions{}, ) - err = errors.Wrapf(err, "failed to create a nse %s in a namespace %s", request.Name, n.ns) + err = errors.Wrapf(err, "failed to create a netsvc %s in a namespace %s", request.Name, n.ns) if apierrors.IsAlreadyExists(err) { - nse, nseErr := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Get(ctx, request.GetName(), metav1.GetOptions{}) + netsvc, nseErr := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Get(ctx, request.GetName(), metav1.GetOptions{}) if nseErr != nil { - err = errors.Wrapf(err, "failed to get a nse %s in a namespace %s, reason: %v", request.Name, n.ns, nseErr.Error()) + err = errors.Wrapf(err, "failed to get a netsvc %s in a namespace %s, reason: %v", request.Name, n.ns, nseErr.Error()) } - if nse != nil { - _, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Update(ctx, nse, metav1.UpdateOptions{}) + if netsvc != nil { + netsvc.Spec = *(*v1.NetworkServiceSpec)(request) + _, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Update(ctx, netsvc, metav1.UpdateOptions{}) if err != nil { - return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns) + return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", netsvc.Name, n.ns) } return next.NetworkServiceRegistryServer(ctx).Register(ctx, request) } @@ -173,14 +174,14 @@ func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s regis } for i := 0; i < len(items.Items); i++ { crd := &items.Items[i] - nse := (*registry.NetworkService)(&crd.Spec) - if nse.Name == "" { - nse.Name = items.Items[i].Name + netsvc := (*registry.NetworkService)(&crd.Spec) + if netsvc.Name == "" { + netsvc.Name = items.Items[i].Name } - if matchutils.MatchNetworkServices(query.NetworkService, nse) { - err := s.Send(®istry.NetworkServiceResponse{NetworkService: nse}) + if matchutils.MatchNetworkServices(query.NetworkService, netsvc) { + err := s.Send(®istry.NetworkServiceResponse{NetworkService: netsvc}) if err != nil { - return errors.Wrapf(err, "NetworkServiceRegistry find server failed to send a response %s", nse.String()) + return errors.Wrapf(err, "NetworkServiceRegistry find server failed to send a response %s", netsvc.String()) } } } diff --git a/pkg/registry/etcd/ns_server_test.go b/pkg/registry/etcd/ns_server_test.go index b7ea61b..132415d 100644 --- a/pkg/registry/etcd/ns_server_test.go +++ b/pkg/registry/etcd/ns_server_test.go @@ -20,6 +20,7 @@ package etcd_test import ( "context" + "fmt" "sync" "sync/atomic" "testing" @@ -28,6 +29,7 @@ import ( "github.com/google/uuid" "github.com/networkservicemesh/api/pkg/api/registry" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" @@ -38,13 +40,50 @@ import ( ) func Test_NSReRegister(t *testing.T) { - s := etcd.NewNetworkServiceRegistryServer(context.Background(), "", fake.NewSimpleClientset()) - _, err := s.Register(context.Background(), ®istry.NetworkService{Name: "ns-1"}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s := etcd.NewNetworkServiceRegistryServer(ctx, "", fake.NewSimpleClientset()) + _, err := s.Register(ctx, ®istry.NetworkService{Name: "netsvc-1"}) require.NoError(t, err) - _, err = s.Register(context.Background(), ®istry.NetworkService{Name: "ns-1", Payload: "IP"}) + _, err = s.Register(ctx, ®istry.NetworkService{Name: "netsvc-1"}) require.NoError(t, err) } +func Test_NSServer_UpdateShouldWorkСonsistently(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s := etcd.NewNetworkServiceRegistryServer(ctx, "", fake.NewSimpleClientset()) + + var expected []*registry.NetworkService + // Register + for i := 0; i < 10; i++ { + expected = append(expected, ®istry.NetworkService{Name: "netsvc-" + fmt.Sprint(i)}) + _, err := s.Register(ctx, expected[len(expected)-1].Clone()) + require.NoError(t, err) + } + + // Update only first nse + expected[0].Payload = "ip" + _, err := s.Register(ctx, expected[0].Clone()) + require.NoError(t, err) + + // Update only last nse + expected[len(expected)-1].Payload = "ethernet" + _, err = s.Register(ctx, expected[len(expected)-1].Clone()) + require.NoError(t, err) + + // Get all nses + stream, err := adapters.NetworkServiceServerToClient(s).Find(ctx, ®istry.NetworkServiceQuery{NetworkService: ®istry.NetworkService{}}) + require.NoError(t, err) + + nsList := registry.ReadNetworkServiceList(stream) + + require.Len(t, nsList, 10) + + for i := range nsList { + require.True(t, proto.Equal(expected[i], nsList[i])) + } +} func Test_K8sNSRegistry_ShouldMatchMetadataToName(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/pkg/registry/etcd/nse_server.go b/pkg/registry/etcd/nse_server.go index a291aae..7131594 100644 --- a/pkg/registry/etcd/nse_server.go +++ b/pkg/registry/etcd/nse_server.go @@ -165,6 +165,7 @@ func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry. err = errors.Wrapf(err, "failed to get a nse %s in a namespace %s, reason: %v", request.Name, n.ns, nseErr.Error()) } if nse != nil { + nse.Spec = *(*v1.NetworkServiceEndpointSpec)(request) apiResp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(ctx, nse, metav1.UpdateOptions{}) if err != nil { return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns) diff --git a/pkg/registry/etcd/nse_server_test.go b/pkg/registry/etcd/nse_server_test.go index e1feacc..f612682 100644 --- a/pkg/registry/etcd/nse_server_test.go +++ b/pkg/registry/etcd/nse_server_test.go @@ -20,6 +20,7 @@ package etcd_test import ( "context" + "fmt" "io" "sync" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/networkservicemesh/api/pkg/api/registry" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,13 +43,51 @@ import ( ) func Test_NSEReRegister(t *testing.T) { - s := etcd.NewNetworkServiceEndpointRegistryServer(context.Background(), "", fake.NewSimpleClientset()) - _, err := s.Register(context.Background(), ®istry.NetworkServiceEndpoint{Name: "nse-1"}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s := etcd.NewNetworkServiceEndpointRegistryServer(ctx, "", fake.NewSimpleClientset()) + _, err := s.Register(ctx, ®istry.NetworkServiceEndpoint{Name: "nse-1"}) require.NoError(t, err) - _, err = s.Register(context.Background(), ®istry.NetworkServiceEndpoint{Name: "nse-1", NetworkServiceNames: []string{"ns-1"}}) + _, err = s.Register(ctx, ®istry.NetworkServiceEndpoint{Name: "nse-1", NetworkServiceNames: []string{"ns-1"}}) require.NoError(t, err) } +func Test_NSEServer_UpdateShouldWorkСonsistently(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s := etcd.NewNetworkServiceEndpointRegistryServer(ctx, "", fake.NewSimpleClientset()) + + var expected []*registry.NetworkServiceEndpoint + // Register + for i := 0; i < 10; i++ { + expected = append(expected, ®istry.NetworkServiceEndpoint{Name: "nse-" + fmt.Sprint(i)}) + _, err := s.Register(ctx, expected[len(expected)-1].Clone()) + require.NoError(t, err) + } + + // Update only first nse + expected[0].NetworkServiceNames = []string{"ns-1"} + _, err := s.Register(ctx, expected[0].Clone()) + require.NoError(t, err) + + // Update only last nse + expected[len(expected)-1].NetworkServiceNames = []string{"ns-2", "ns-3"} + _, err = s.Register(ctx, expected[len(expected)-1].Clone()) + require.NoError(t, err) + + // Get all nses + stream, err := adapters.NetworkServiceEndpointServerToClient(s).Find(ctx, ®istry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{}}) + require.NoError(t, err) + + nseList := registry.ReadNetworkServiceEndpointList(stream) + + require.Len(t, nseList, 10) + + for i := range nseList { + require.True(t, proto.Equal(expected[i], nseList[i])) + } +} + func Test_K8sNSERegistry_ShouldMatchMetadataToName(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel()