Skip to content

Commit

Permalink
pkg/search/backendstore: unit test opensearch
Browse files Browse the repository at this point in the history
In this commit, we unit test opensearch backend store for
karmada search aggregated apiserver as cache registry on creating
new instance and getting/creating an index.

Signed-off-by: Mohamed Awnallah <[email protected]>
  • Loading branch information
mohamedawnallah committed Nov 16, 2024
1 parent 088cdb2 commit 9b82211
Show file tree
Hide file tree
Showing 6 changed files with 671 additions and 59 deletions.
62 changes: 38 additions & 24 deletions pkg/search/backendstore/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import (
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
)

var defaultPrefix = "kubernetes"
var (
defaultPrefix = "kubernetes"
resourceExistsError = "resource_already_exists_exception"
)

var mapping = `
{
Expand Down Expand Up @@ -163,7 +166,7 @@ func (os *OpenSearch) delete(obj interface{}) {
return
}

indexName, err := os.indexName(us)
indexName, err := os.getOrCreateIndex(us, false)
if err != nil {
klog.Errorf("cannot get index name: %v", err)
return
Expand Down Expand Up @@ -223,7 +226,7 @@ func (os *OpenSearch) upsert(obj interface{}) {
return
}

indexName, err := os.indexName(us)
indexName, err := os.getOrCreateIndex(us, true)
if err != nil {
klog.Errorf("Cannot get index name: %v", err)
return
Expand All @@ -247,7 +250,7 @@ func (os *OpenSearch) upsert(obj interface{}) {
}

// TODO: apply mapping
func (os *OpenSearch) indexName(us *unstructured.Unstructured) (string, error) {
func (os *OpenSearch) getOrCreateIndex(us *unstructured.Unstructured, createIndexIfNotExist bool) (string, error) {
name := fmt.Sprintf("%s-%s", defaultPrefix, strings.ToLower(us.GetKind()))
os.l.Lock()
defer os.l.Unlock()
Expand All @@ -257,32 +260,32 @@ func (os *OpenSearch) indexName(us *unstructured.Unstructured) (string, error) {
}

klog.Infof("Try to create index: %s", name)
res := opensearchapi.IndicesCreateRequest{Index: name, Body: strings.NewReader(mapping)}
resp, err := res.Do(context.Background(), os.client)
req := opensearchapi.IndicesCreateRequest{Index: name, Body: strings.NewReader(mapping)}
resp, err := req.Do(context.Background(), os.client)
if err != nil {
if strings.Contains(err.Error(), "resource_already_exists_exception") {
klog.Info("Index already exists")
os.indices[name] = struct{}{}
return name, nil
}
return name, fmt.Errorf("cannot create index: %v", err)
return os.handleIndexAlreadyExists(err.Error(), name)
}

// Handle cases where the response indicates an error, such as when the
// index already exists. This logic is influenced by the
// UseResponseCheckOnly config flag when it is set.
if resp.IsError() {
if strings.Contains(resp.String(), "resource_already_exists_exception") {
klog.Info("Index already exists")
os.indices[name] = struct{}{}
return name, nil
}
return name, fmt.Errorf("cannot create index (resp): %v", resp.String())
return os.handleIndexAlreadyExists(resp.String(), name)
}

klog.Infof("create index response: %s", resp.String())
os.indices[name] = struct{}{}
if createIndexIfNotExist {
klog.Infof("create index response: %s", resp.String())
os.indices[name] = struct{}{}
return name, nil
}

return name, nil
return "", fmt.Errorf("index %s not exist in the system. Ensure the index name is correct or try creating it first", name)
}

var openSearchClientBuilder = func(cfg opensearch.Config) (*opensearch.Client, error) {
// OpenSearchClientBuilder is a function that creates a new OpenSearch client
// using the provided configuration. It returns a pointer to the OpenSearch
// client or an error if the client cannot be created.
var OpenSearchClientBuilder = func(cfg opensearch.Config) (*opensearch.Client, error) {
return opensearch.NewClient(cfg)
}

Expand All @@ -298,7 +301,7 @@ func (os *OpenSearch) initClient(bsc *searchv1alpha1.BackendStoreConfig) error {

user, pwd := func(secretRef clusterv1alpha1.LocalSecretReference) (user, pwd string) {
if secretRef.Namespace == "" || secretRef.Name == "" {
klog.Warningf("Not found secret for opensearch, try to without auth")
klog.Warning("Not found secret for opensearch, try to without auth")
return
}

Expand All @@ -316,7 +319,7 @@ func (os *OpenSearch) initClient(bsc *searchv1alpha1.BackendStoreConfig) error {
cfg.Password = pwd
}

client, err := openSearchClientBuilder(cfg)
client, err := OpenSearchClientBuilder(cfg)
if err != nil {
return fmt.Errorf("cannot create opensearch client: %v", err)
}
Expand All @@ -330,3 +333,14 @@ func (os *OpenSearch) initClient(bsc *searchv1alpha1.BackendStoreConfig) error {
os.client = client
return nil
}

// handleIndexAlreadyExists checks if the error message indicates that the index already exists,
// logs the message, and updates the indices map if needed.
func (os *OpenSearch) handleIndexAlreadyExists(errMessage, name string) (string, error) {
if strings.Contains(errMessage, resourceExistsError) {
klog.Info("Index already exists")
os.indices[name] = struct{}{}
return name, nil
}
return "", fmt.Errorf("cannot create index: %v", errMessage)
}
Loading

0 comments on commit 9b82211

Please sign in to comment.