Skip to content

Commit

Permalink
Allow the close action to ignore sync flush fails (elastic#1460)
Browse files Browse the repository at this point in the history
If closing a very large batch of indices, potentially with some being live at runtime, a failure to perform a synced flush is a real possibility. This new option for the ``close`` action, ``ignore_sync_failures``, will allow index closures to continue.

Fixes elastic#1248
  • Loading branch information
untergeek committed Aug 27, 2019
1 parent cc158c1 commit 7bdc37d
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 15 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,4 @@ Contributors:
* Juraj Seffer (jurajseffer)
* Roger Steneteg (rsteneteg)
* Muhammad Junaid Muzammil (junmuz)
* Loet Avramson (psypuff)
31 changes: 24 additions & 7 deletions curator/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from copy import deepcopy
from datetime import datetime
from curator import exceptions, utils
from elasticsearch.exceptions import RequestError
from elasticsearch.exceptions import ConflictError, RequestError

class Alias(object):
def __init__(self, name=None, extra_settings={}, **kwargs):
Expand Down Expand Up @@ -256,14 +256,17 @@ def do_action(self):


class Close(object):
def __init__(self, ilo, delete_aliases=False, skip_flush=False):
def __init__(self, ilo, delete_aliases=False, skip_flush=False, ignore_sync_failures=False):
"""
:arg ilo: A :class:`curator.indexlist.IndexList` object
:arg delete_aliases: If `True`, will delete any associated aliases
before closing indices.
:type delete_aliases: bool
:arg skip_flush: If `True`, will not flush indices before closing.
:type skip_flush: bool
:arg ignore_sync_failures: If `True`, will not fail if there are failures while attempting
a synced flush.
:type ignore_sync_failures: bool
"""
utils.verify_index_list(ilo)
#: Instance variable.
Expand All @@ -276,9 +279,12 @@ def __init__(self, ilo, delete_aliases=False, skip_flush=False):
#: Internal reference to `skip_flush`
self.skip_flush = skip_flush
#: Instance variable.
#: Internal reference to `ignore_sync_failures`
self.ignore_sync_failures = ignore_sync_failures
#: Instance variable.
#: The Elasticsearch Client object derived from `ilo`
self.client = ilo.client
self.loggit = logging.getLogger('curator.actions.close')
self.client = ilo.client
self.loggit = logging.getLogger('curator.actions.close')


def do_dry_run(self):
Expand All @@ -295,7 +301,10 @@ def do_action(self):
self.index_list.filter_closed()
self.index_list.empty_list_check()
self.loggit.info(
'Closing {0} selected indices: {1}'.format(len(self.index_list.indices), self.index_list.indices))
'Closing %s selected indices: %s' % (
len(self.index_list.indices), self.index_list.indices
)
)
try:
index_lists = utils.chunk_index_list(self.index_list.indices)
for l in index_lists:
Expand All @@ -312,8 +321,16 @@ def do_action(self):
' {0}'.format(e)
)
if not self.skip_flush:
self.client.indices.flush_synced(
index=utils.to_csv(l), ignore_unavailable=True)
try:
self.client.indices.flush_synced(
index=utils.to_csv(l), ignore_unavailable=True)
except ConflictError as err:
if not self.ignore_sync_failures:
raise ConflictError(err.status_code, err.error, err.info)
else:
self.loggit.warn(
'Ignoring flushed sync failures: %s %s' % (err.error, err.info)
)
self.client.indices.close(
index=utils.to_csv(l), ignore_unavailable=True)
except Exception as e:
Expand Down
3 changes: 3 additions & 0 deletions curator/defaults/option_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def ignore_empty_list():
def ignore_existing():
return { Optional('ignore_existing', default=False): Any(bool, All(Any(*string_types), Boolean())) }

def ignore_sync_failures():
return { Optional('ignore_sync_failures', default=False): Any(bool, All(Any(*string_types), Boolean())) }

def ignore_unavailable():
return { Optional('ignore_unavailable', default=False): Any(bool, All(Any(*string_types), Boolean())) }

Expand Down
3 changes: 2 additions & 1 deletion curator/validators/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def action_specific(action):
],
'close' : [
option_defaults.delete_aliases(),
option_defaults.skip_flush()
option_defaults.skip_flush(),
option_defaults.ignore_sync_failures(),
],
'cluster_routing' : [
option_defaults.routing_type(),
Expand Down
3 changes: 3 additions & 0 deletions docs/Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog

**New**

* Require ``elasticsearch-py`` version 7.0.4
* New client configuration option: api_key - used in the X-Api-key header in
requests to Elasticsearch when set, which may be required if ReadonlyREST
plugin is configured to require api-key. Requested in #1409 (vetler)
Expand All @@ -25,6 +26,8 @@ Changelog
* Add support for ``freeze`` and ``unfreeze`` indexes using curator. Requires
Elasticsearch version 6.6 or greater with xpack enabled. Requested in issue
#1399 and rasied in PR #1454. (junmuz)
* Allow the ``close`` action to ignore synced flush failures with the new
``ignore_sync_failures`` option. Raised in #1248. (untergeek)

**Bug Fixes**

Expand Down
6 changes: 4 additions & 2 deletions docs/asciidoc/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ TIP: See an example of this action in an <<actionfile,actionfile>>
action: close
description: "Close selected indices"
options:
delete_aliases: False
skip_flush: False
delete_aliases: false
skip_flush: false
ignore_sync_failures: false
filters:
- filtertype: ...
-------------
Expand All @@ -192,6 +193,7 @@ aliases beforehand.

* <<option_delete_aliases,delete_aliases>>
* <<option_skip_flush,skip_flush>>
* <<option_ignore_sync_failures,ignore_sync_failures>>
* <<option_ignore_empty,ignore_empty_list>>
* <<option_timeout_override,timeout_override>>
* <<option_continue,continue_if_exception>>
Expand Down
1 change: 1 addition & 0 deletions docs/asciidoc/examples.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ actions:
options:
skip_flush: False
delete_aliases: False
ignore_sync_failures: True
disable_action: True
filters:
- filtertype: pattern
Expand Down
2 changes: 1 addition & 1 deletion docs/asciidoc/index.asciidoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
:curator_version: 5.8.0
:curator_major: 5
:curator_doc_tree: 5.8
:es_py_version: 7.0.2
:es_py_version: 7.0.4
:es_doc_tree: 7.3
:stack_doc_tree: 7.3
:pybuild_ver: 3.6.8
Expand Down
22 changes: 22 additions & 0 deletions docs/asciidoc/options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,28 @@ an ERROR level message will be logged, and Curator will exit with code 1.

The default value of this setting is `False`

[[option_ignore_sync_failures]]
== ignore_sync_failures

NOTE: This setting is only used by the <<close,close action>>, and is
optional.

[source,yaml]
-------------
action: close
description: "Close selected indices"
options:
ignore_sync_failures: false
filters:
- filtertype: ...
-------------

If `ignore_sync_failures` is set to `true`, Curator will ignore failures during
the synced flush that normally takes place before closing. This may be useful
for closing a list of indices which may include active indices.

The default value is `false`.

[[option_ignore]]
== ignore_unavailable

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
voluptuous>=0.9.3
elasticsearch>=7.0.2,<8.0.0
elasticsearch>=7.0.4,<8.0.0
urllib3>=1.24.2,<1.25
requests>=2.20.0
boto3>=1.9.142
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers =
[options]
install_requires =
voluptuous>=0.9.3
elasticsearch>=7.0.2,<8.0.0
elasticsearch>=7.0.4,<8.0.0
urllib3>=1.24.2,<1.25
requests>=2.20.0
boto3>=1.9.142
Expand All @@ -33,7 +33,7 @@ install_requires =

setup_requires =
voluptuous>=0.9.3
elasticsearch>=7.0.2,<8.0.0
elasticsearch>=7.0.4,<8.0.0
urllib3>=1.24.2,<1.25
requests>=2.20.0
boto3>=1.9.142
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def get_version():
return VERSION

def get_install_requires():
res = ['elasticsearch>=7.0.2,<8.0.0' ]
res = ['elasticsearch>=7.0.4,<8.0.0' ]
res.append('urllib3>=1.24.2,<1.25')
res.append('requests>=2.20.0')
res.append('boto3>=1.9.142')
Expand Down
75 changes: 75 additions & 0 deletions test/integration/test_close.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,81 @@ def test_close_skip_flush(self):
# re-enable shard allocation for next tests
enable_allocation = '{"transient":{"cluster.routing.allocation.enable":null}}'
self.client.cluster.put_settings(body=enable_allocation)
def test_close_ignore_sync_failures(self):
self.write_config(
self.args['configfile'], testvars.client_config.format(host, port))
self.write_config(self.args['actionfile'],
testvars.close_ignore_sync.format('true'))
self.create_index('dummy')
# Disable shard allocation to make my_index go red
disable_allocation = '{"transient":{"cluster.routing.allocation.enable":"none"}}'
self.client.cluster.put_settings(body=disable_allocation)
self.create_index('my_index', wait_for_yellow=False, wait_for_active_shards=0)
test = clicktest.CliRunner()
_ = test.invoke(
curator.cli,
[
'--config', self.args['configfile'],
self.args['actionfile']
],
)
try:
self.assertEquals(
'close',
self.client.cluster.state(
index='my_index',
metric='metadata',
)['metadata']['indices']['my_index']['state']
)
self.assertNotEqual(
'close',
self.client.cluster.state(
index='dummy',
metric='metadata',
)['metadata']['indices']['dummy']['state']
)
finally:
# re-enable shard allocation for next tests
enable_allocation = '{"transient":{"cluster.routing.allocation.enable":null}}'
self.client.cluster.put_settings(body=enable_allocation)
def test_close_has_sync_failures(self):
self.write_config(
self.args['configfile'], testvars.client_config.format(host, port))
self.write_config(self.args['actionfile'],
testvars.close_ignore_sync.format('false'))
self.create_index('dummy')
# Disable shard allocation to make my_index go red
disable_allocation = '{"transient":{"cluster.routing.allocation.enable":"none"}}'
self.client.cluster.put_settings(body=disable_allocation)
self.create_index('my_index', wait_for_yellow=False, wait_for_active_shards=0)
test = clicktest.CliRunner()
_ = test.invoke(
curator.cli,
[
'--config', self.args['configfile'],
self.args['actionfile']
],
)
try:
self.assertEquals(
'open',
self.client.cluster.state(
index='my_index',
metric='metadata',
)['metadata']['indices']['my_index']['state']
)
self.assertNotEqual(
'close',
self.client.cluster.state(
index='dummy',
metric='metadata',
)['metadata']['indices']['dummy']['state']
)
self.assertEquals(1, _.exit_code)
finally:
# re-enable shard allocation for next tests
enable_allocation = '{"transient":{"cluster.routing.allocation.enable":null}}'
self.client.cluster.put_settings(body=enable_allocation)
def test_extra_option(self):
self.write_config(
self.args['configfile'], testvars.client_config.format(host, port))
Expand Down
12 changes: 12 additions & 0 deletions test/integration/testvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,18 @@
' kind: prefix\n'
' value: my\n')

close_ignore_sync = ('---\n'
'actions:\n'
' 1:\n'
' description: "Close indices as filtered"\n'
' action: close\n'
' options:\n'
' ignore_sync_failures: {0}\n'
' filters:\n'
' - filtertype: pattern\n'
' kind: prefix\n'
' value: my\n')

delete_proto = ('---\n'
'actions:\n'
' 1:\n'
Expand Down

0 comments on commit 7bdc37d

Please sign in to comment.