Skip to content

Commit

Permalink
Add types and comment to reindex function
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr committed Aug 21, 2023
1 parent c3e5ef9 commit 208d848
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions internal/benchrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ func (r *runner) enrollAgents() error {
return nil
}

// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore
func (r *runner) reindexData() error {
if !r.options.ReindexData {
return nil
Expand All @@ -719,7 +720,7 @@ func (r *runner) reindexData() error {

logger.Debug("starting reindexing of data...")

logger.Debug("gettings orignal mappings...")
logger.Debug("getting orignal mappings...")
// Get the mapping from the source data stream
mappingRes, err := r.options.ESAPI.Indices.GetMapping(
r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream),
Expand Down Expand Up @@ -789,56 +790,63 @@ func (r *runner) reindexData() error {
}
defer res.Body.Close()

type searchRes struct {
Error *struct {
Reason string `json:"reson"`
} `json:"error"`
ScrollID string `json:"_scroll_id"`
Hits []struct {
ID string `json:"_id"`
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
}

// Iterate through the search results using the Scroll API
for {
var sr map[string]interface{}
var sr searchRes
if err := json.NewDecoder(res.Body).Decode(&sr); err != nil {
return fmt.Errorf("error decoding search response: %w", err)
}

resErr, found := sr["error"]
if found {
errStr := resErr.(map[string]interface{})["reason"].(string)
return fmt.Errorf("error searching for documents: %s", errStr)
if sr.Error != nil {
return fmt.Errorf("error searching for documents: %s", sr.Error.Reason)
}

hits, found := sr["hits"].(map[string]interface{})["hits"].([]interface{})
if !found || len(hits) == 0 {
if len(sr.Hits) == 0 {
break
}

var bulkBodyBuilder strings.Builder
for _, hit := range hits {
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.(map[string]interface{})["_id"]))
enriched := r.enrichEventWithBenchmarkMetadata(hit.(map[string]interface{})["_source"].(map[string]interface{}))
for _, hit := range sr.Hits {
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
enriched := r.enrichEventWithBenchmarkMetadata(hit.Source)
src, err := json.Marshal(enriched)
if err != nil {
return fmt.Errorf("error decoding _source: %w", err)
}
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
}

logger.Debugf("bulk request of %d events...", len(hits))
logger.Debugf("bulk request of %d events...", len(sr.Hits))

bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
if err != nil {
return fmt.Errorf("error performing the bulk index request: %w", err)
}
bulkRes.Body.Close()

scrollId, found := sr["_scroll_id"].(string)
if !found {
if sr.ScrollID == "" {
return errors.New("error getting scroll ID")
}

res, err = r.options.ESAPI.Scroll(
r.options.ESAPI.Scroll.WithScrollID(scrollId),
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
r.options.ESAPI.Scroll.WithScroll(time.Minute),
)
if err != nil {
return fmt.Errorf("error executing scroll: %s", err)
}
defer res.Body.Close()
res.Body.Close()
}

logger.Debug("reindexing operation finished")
Expand Down

0 comments on commit 208d848

Please sign in to comment.