diff --git a/k8s/elastic-retention-enforcer/elastic-retention-enforcer.py b/k8s/elastic-retention-enforcer/elastic-retention-enforcer.py index 8c0f4198c..bc38cd5dd 100644 --- a/k8s/elastic-retention-enforcer/elastic-retention-enforcer.py +++ b/k8s/elastic-retention-enforcer/elastic-retention-enforcer.py @@ -61,6 +61,13 @@ def apply_total_bytes_policy(indices, max_disk_util, logger): return popped, indices +def divide_chunks(content, n): + + # looping till length l + for i in range(0, len(content), n): + yield content[i : i + n] + + def main(): args = parse() args.max_disk_util = parse_size(args.max_disk_util, binary=True) @@ -77,16 +84,28 @@ def main(): indice_stats = client.cat.indices(index=args.index_pattern, h=("i", "ss", "creation.date.string")) indices = [re.split(r"\s+", indice) for indice in indice_stats.split("\n")[:-1]] - indices = [ - { + parsed_indices = [] + for indice in indices: + system_index = indice[0].startswith(".") + should_skip_index = any([re.match(i, indice[0]) for i in args_to_skip]) + has_all_stats = len(indice) == 3 + if system_index: + logger.info("Skipping system index %s", indice[0]) + continue + if should_skip_index: + logger.info("Skipping special index: %s", indice[0]) + continue + if not has_all_stats: + logger.info("Received line doesn't contain all data: %s", indice) + continue + parsed_indice = { "name": indice[0], "size": parse_size(indice[1], binary=True), "created_at": datetime.strptime(indice[2], "%Y-%m-%dT%H:%M:%S.%fZ"), } - for indice in indices - if not indice[0].startswith(".") and not any([re.match(i, indice[0]) for i in args_to_skip]) - ] + parsed_indices.append(parsed_indice) + indices = parsed_indices logger.info("Total indices found: %s", len(indices)) indices = sorted(indices, key=lambda x: x["created_at"]) @@ -99,7 +118,9 @@ def main(): if total_drop: logger.info("Dropping following indexes:\n%s", json.dumps(total_drop, indent=2, default=str)) - client.indices.delete(index=[i["name"] for i in total_drop]) + for chunk in divide_chunks([i["name"] for i in total_drop], 10): + logger.info("Dropping chunk:\n%s", json.dumps(chunk, indent=2, default=str)) + client.indices.delete(index=[i for i in chunk]) else: logger.info("Nothing to drop")