Skip to content

Commit

Permalink
fix update nsm crd use-cases
Browse files Browse the repository at this point in the history
Signed-off-by: denis-tingaikin <[email protected]>
  • Loading branch information
denis-tingaikin committed Sep 30, 2024
1 parent db98b51 commit afd76af
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 19 deletions.
78 changes: 78 additions & 0 deletions pkg/registry/chains/registryk8s/registry-k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/ipam/point2pointipam"
"github.com/networkservicemesh/sdk/pkg/networkservice/ipam/strictipam"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
registryserver "github.com/networkservicemesh/sdk/pkg/registry"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
Expand Down Expand Up @@ -400,7 +401,84 @@ func TestNSMGR_InterdomainUseCase(t *testing.T) {
_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
}
func TestNSMGR_HealRegistry(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

const timeout = time.Second * 5

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
clientSet := fake.NewSimpleClientset()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := new(count.Server)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)

// 1. Restart Registry
domain.Registry.Restart()

// 2. Check refresh
request.Connection = conn
_, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)

// 3. Check new client request
nsc = domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)

_, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)

require.Eventually(t, func() bool {
return counter.Requests() >= 3
}, timeout, time.Second/10)
}

func defaultRegistryService(name string) *registry.NetworkService {
return &registry.NetworkService{
Name: name,
}
}

func defaultRegistryEndpoint(nsName string) *registry.NetworkServiceEndpoint {
return &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
NetworkServiceNames: []string{nsName},
}
}

func defaultRequest(nsName string) *networkservice.NetworkServiceRequest {
return &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: uuid.NewString(),
NetworkService: nsName,
Context: &networkservice.ConnectionContext{},
Labels: make(map[string]string),
},
}
}
func TestNSMGR_FloatingInterdomainUseCase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t, ignoreKLogDaemon) })

Expand Down
27 changes: 14 additions & 13 deletions pkg/registry/etcd/ns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (n *etcdNSRegistryServer) sendEvent(resp *registry.NetworkServiceResponse)

func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.NetworkService) (*registry.NetworkService, error) {
meta := metav1.ObjectMeta{
GenerateName: "nse-",
GenerateName: "netsvc-",
Name: request.GetName(),
Namespace: n.ns,
}
Expand All @@ -144,17 +144,18 @@ func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.N
metav1.CreateOptions{},
)

err = errors.Wrapf(err, "failed to create a nse %s in a namespace %s", request.Name, n.ns)
err = errors.Wrapf(err, "failed to create a netsvc %s in a namespace %s", request.Name, n.ns)

if apierrors.IsAlreadyExists(err) {
nse, nseErr := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Get(ctx, request.GetName(), metav1.GetOptions{})
netsvc, nseErr := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Get(ctx, request.GetName(), metav1.GetOptions{})
if nseErr != nil {
err = errors.Wrapf(err, "failed to get a nse %s in a namespace %s, reason: %v", request.Name, n.ns, nseErr.Error())
err = errors.Wrapf(err, "failed to get a netsvc %s in a namespace %s, reason: %v", request.Name, n.ns, nseErr.Error())
}
if nse != nil {
_, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Update(ctx, nse, metav1.UpdateOptions{})
if netsvc != nil {
netsvc.Spec = *(*v1.NetworkServiceSpec)(request)
_, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Update(ctx, netsvc, metav1.UpdateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns)
return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", netsvc.Name, n.ns)
}
return next.NetworkServiceRegistryServer(ctx).Register(ctx, request)
}
Expand All @@ -173,14 +174,14 @@ func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s regis
}
for i := 0; i < len(items.Items); i++ {
crd := &items.Items[i]
nse := (*registry.NetworkService)(&crd.Spec)
if nse.Name == "" {
nse.Name = items.Items[i].Name
netsvc := (*registry.NetworkService)(&crd.Spec)
if netsvc.Name == "" {
netsvc.Name = items.Items[i].Name
}
if matchutils.MatchNetworkServices(query.NetworkService, nse) {
err := s.Send(&registry.NetworkServiceResponse{NetworkService: nse})
if matchutils.MatchNetworkServices(query.NetworkService, netsvc) {
err := s.Send(&registry.NetworkServiceResponse{NetworkService: netsvc})
if err != nil {
return errors.Wrapf(err, "NetworkServiceRegistry find server failed to send a response %s", nse.String())
return errors.Wrapf(err, "NetworkServiceRegistry find server failed to send a response %s", netsvc.String())
}
}
}
Expand Down
45 changes: 42 additions & 3 deletions pkg/registry/etcd/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package etcd_test

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/google/uuid"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
Expand All @@ -38,13 +40,50 @@ import (
)

func Test_NSReRegister(t *testing.T) {
s := etcd.NewNetworkServiceRegistryServer(context.Background(), "", fake.NewSimpleClientset())
_, err := s.Register(context.Background(), &registry.NetworkService{Name: "ns-1"})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s := etcd.NewNetworkServiceRegistryServer(ctx, "", fake.NewSimpleClientset())
_, err := s.Register(ctx, &registry.NetworkService{Name: "netsvc-1"})
require.NoError(t, err)
_, err = s.Register(context.Background(), &registry.NetworkService{Name: "ns-1", Payload: "IP"})
_, err = s.Register(ctx, &registry.NetworkService{Name: "netsvc-1"})
require.NoError(t, err)
}

func Test_NSServer_UpdateShouldWorkСonsistently(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s := etcd.NewNetworkServiceRegistryServer(ctx, "", fake.NewSimpleClientset())

var expected []*registry.NetworkService
// Register
for i := 0; i < 10; i++ {
expected = append(expected, &registry.NetworkService{Name: "netsvc-" + fmt.Sprint(i)})
_, err := s.Register(ctx, expected[len(expected)-1].Clone())
require.NoError(t, err)
}

// Update only first nse
expected[0].Payload = "ip"
_, err := s.Register(ctx, expected[0].Clone())
require.NoError(t, err)

// Update only last nse
expected[len(expected)-1].Payload = "ethernet"
_, err = s.Register(ctx, expected[len(expected)-1].Clone())
require.NoError(t, err)

// Get all nses
stream, err := adapters.NetworkServiceServerToClient(s).Find(ctx, &registry.NetworkServiceQuery{NetworkService: &registry.NetworkService{}})
require.NoError(t, err)

nsList := registry.ReadNetworkServiceList(stream)

require.Len(t, nsList, 10)

for i := range nsList {
require.True(t, proto.Equal(expected[i], nsList[i]))
}
}
func Test_K8sNSRegistry_ShouldMatchMetadataToName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/etcd/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.
err = errors.Wrapf(err, "failed to get a nse %s in a namespace %s, reason: %v", request.Name, n.ns, nseErr.Error())
}
if nse != nil {
nse.Spec = *(*v1.NetworkServiceEndpointSpec)(request)
apiResp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(ctx, nse, metav1.UpdateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns)
Expand Down
46 changes: 43 additions & 3 deletions pkg/registry/etcd/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package etcd_test

import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -41,13 +43,51 @@ import (
)

func Test_NSEReRegister(t *testing.T) {
s := etcd.NewNetworkServiceEndpointRegistryServer(context.Background(), "", fake.NewSimpleClientset())
_, err := s.Register(context.Background(), &registry.NetworkServiceEndpoint{Name: "nse-1"})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s := etcd.NewNetworkServiceEndpointRegistryServer(ctx, "", fake.NewSimpleClientset())
_, err := s.Register(ctx, &registry.NetworkServiceEndpoint{Name: "nse-1"})
require.NoError(t, err)
_, err = s.Register(context.Background(), &registry.NetworkServiceEndpoint{Name: "nse-1", NetworkServiceNames: []string{"ns-1"}})
_, err = s.Register(ctx, &registry.NetworkServiceEndpoint{Name: "nse-1", NetworkServiceNames: []string{"ns-1"}})
require.NoError(t, err)
}

func Test_NSEServer_UpdateShouldWorkСonsistently(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s := etcd.NewNetworkServiceEndpointRegistryServer(ctx, "", fake.NewSimpleClientset())

var expected []*registry.NetworkServiceEndpoint
// Register
for i := 0; i < 10; i++ {
expected = append(expected, &registry.NetworkServiceEndpoint{Name: "nse-" + fmt.Sprint(i)})
_, err := s.Register(ctx, expected[len(expected)-1].Clone())
require.NoError(t, err)
}

// Update only first nse
expected[0].NetworkServiceNames = []string{"ns-1"}
_, err := s.Register(ctx, expected[0].Clone())
require.NoError(t, err)

// Update only last nse
expected[len(expected)-1].NetworkServiceNames = []string{"ns-2", "ns-3"}
_, err = s.Register(ctx, expected[len(expected)-1].Clone())
require.NoError(t, err)

// Get all nses
stream, err := adapters.NetworkServiceEndpointServerToClient(s).Find(ctx, &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{}})
require.NoError(t, err)

nseList := registry.ReadNetworkServiceEndpointList(stream)

require.Len(t, nseList, 10)

for i := range nseList {
require.True(t, proto.Equal(expected[i], nseList[i]))
}
}

func Test_K8sNSERegistry_ShouldMatchMetadataToName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down

0 comments on commit afd76af

Please sign in to comment.