Skip to content

Commit

Permalink
Fix agentcfg cache renewal scroll (#13958)
Browse files Browse the repository at this point in the history
APM Agents configuration cache appears to be broken, resulting frequent invalid requests made by APM Server to Elasticsearch cluster. Fix ScrollID used for retrieval of APM Agents configuration.

(cherry picked from commit ecffa8e)
  • Loading branch information
up2neck authored and mergify[bot] committed Sep 3, 2024
1 parent d36412d commit d8a4671
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,40 +251,38 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {

func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) {
var result cacheResult
var err error
var resp *esapi.Response

if scrollID == "" {
resp, err := esapi.SearchRequest{
switch scrollID {
case "":
resp, err = esapi.SearchRequest{
Index: []string{ElasticsearchIndexName},
Size: &f.searchSize,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
if err != nil {
return result, err
}
defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
default:
resp, err = esapi.ScrollRequest{
ScrollID: scrollID,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
}

resp, err := esapi.ScrollRequest{
ScrollID: result.ScrollID,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
if err != nil {
return result, err
}
defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
}

Expand Down

0 comments on commit d8a4671

Please sign in to comment.