diff --git a/CHANGELOG.md b/CHANGELOG.md index 730d6f03..d21ae951 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ ## New features - [Alertmanager] Added support for Alertmanager - [#503](https://github.com/jertel/elastalert2/pull/503) - @nsano-rururu - Add summary_table_max_rows optional configuration to limit rows in summary tables - [#508](https://github.com/jertel/elastalert2/pull/508) - @mdavyt92 +- Added support for shortening Kibana Discover URLs using Kibana Shorten URL API - [#512](https://github.com/jertel/elastalert2/pull/512) - @JeffAshton ## Other changes - [Docs] Add exposed metrics documentation - [#498](https://github.com/jertel/elastalert2/pull/498) - @thisisxgp diff --git a/docs/source/ruletypes.rst b/docs/source/ruletypes.rst index 256af11c..6417f81e 100644 --- a/docs/source/ruletypes.rst +++ b/docs/source/ruletypes.rst @@ -64,6 +64,10 @@ Rule Configuration Cheat Sheet +--------------------------------------------------------------+ | | ``kibana_url`` (string, default from es_host) | | +--------------------------------------------------------------+ | +| ``kibana_username`` (string, no default) | | ++--------------------------------------------------------------+ | +| ``kibana_password`` (string, no default) | | ++--------------------------------------------------------------+ | | ``use_kibana4_dashboard`` (string, no default) | | +--------------------------------------------------------------+ | | ``kibana4_start_timedelta`` (time, default: 10 min) | | @@ -72,12 +76,16 @@ Rule Configuration Cheat Sheet +--------------------------------------------------------------+ | | ``generate_kibana_discover_url`` (boolean, default False) | | +--------------------------------------------------------------+ | +| ``shorten_kibana_discover_url`` (boolean, default False) | | ++--------------------------------------------------------------+ | | ``kibana_discover_app_url`` (string, no default) | | +--------------------------------------------------------------+ | | ``kibana_discover_version`` (string, no default) | | +--------------------------------------------------------------+ | | ``kibana_discover_index_pattern_id`` (string, no default) | | +--------------------------------------------------------------+ | +| ``kibana_discover_security_tenant`` (string, no default) | | ++--------------------------------------------------------------+ | | ``kibana_discover_columns`` (list of strs, default _source) | | +--------------------------------------------------------------+ | | ``kibana_discover_from_timedelta`` (time, default: 10 min) | | @@ -546,10 +554,34 @@ be uploaded to the kibana-int index as a temporary dashboard. (Optional, boolean kibana_url ^^^^^^^^^^ -``kibana_url``: The url to access Kibana. This will be used if ``generate_kibana_link`` or -``use_kibana_dashboard`` is true. If not specified, a URL will be constructed using ``es_host`` and ``es_port``. +``kibana_url``: The base url of the Kibana application. If not specified, a URL will be constructed using ``es_host`` +and ``es_port``. + +This value will be used if one of the following conditions are met: + +- ``generate_kibana_link`` is true +- ``use_kibana_dashboard`` is true +- ``use_kibana4_dashboard`` is true +- ``generate_kibana_discover_url`` is true and ``kibana_discover_app_url`` is a relative path + (Optional, string, default ``http://:/_plugin/kibana/``) +kibana_username +^^^^^^^^^^^^^^^ + +``kibana_username``: The username used to make basic authenticated API requests against Kibana. +This value is only used if ``shorten_kibana_discover_url`` is true. + +(Optional, string, no default) + +kibana_password +^^^^^^^^^^^^^^^ + +``kibana_password``: The password used to make basic authenticated API requests against Kibana. +This value is only used if ``shorten_kibana_discover_url`` is true. + +(Optional, string, no default) + use_kibana_dashboard ^^^^^^^^^^^^^^^^^^^^ @@ -608,13 +640,39 @@ Example usage:: alert_text_args: [ kibana_discover_url ] alert_text_type: alert_text_only +shorten_kibana_discover_url +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``shorten_kibana_discover_url``: Enables the shortening of the generated Kibana Discover urls. +In order to use the Kibana Shorten URL REST API, the ``kibana_discover_app_url`` must be provided +as a relative url (e.g. app/discover?#/). + +ElastAlert may need to authenticate with Kibana to invoke the Kibana Shorten URL REST API. The +supported authentication methods are: + +- Basic authentication by specifying ``kibana_username`` and ``kibana_password`` +- AWS authentication (if configured already for ElasticSearch) + +(Optional, bool, false) + kibana_discover_app_url ^^^^^^^^^^^^^^^^^^^^^^^ ``kibana_discover_app_url``: The url of the Kibana Discover application used to generate the ``kibana_discover_url`` variable. This value can use `$VAR` and `${VAR}` references to expand environment variables. +This value should be relative to the base kibana url defined by ``kibana_url`` and will vary depending on your installation. + +``kibana_discover_app_url: app/discover#/`` + +(Optional, string, no default) + +kibana_discover_security_tenant +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``kibana_discover_security_tenant``: The Kibana security tenant to include in the generated +``kibana_discover_url`` variable. -``kibana_discover_app_url: http://kibana:5601/#/discover`` +(Optional, string, no default) kibana_discover_version ^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index 12392b78..327ae67c 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -34,6 +34,7 @@ from elastalert.config import load_conf from elastalert.enhancements import DropMatchException from elastalert.kibana_discover import generate_kibana_discover_url +from elastalert.kibana_external_url_formatter import create_kibana_external_url_formatter from elastalert.prometheus_wrapper import PrometheusWrapper from elastalert.ruletypes import FlatlineRule from elastalert.util import (add_raw_postfix, cronite_datetime_to_timestamp, dt_to_ts, dt_to_unix, EAException, @@ -1593,7 +1594,8 @@ def send_alert(self, matches, rule, alert_time=None, retried=False): if rule.get('generate_kibana_discover_url'): kb_link = generate_kibana_discover_url(rule, matches[0]) if kb_link: - matches[0]['kibana_discover_url'] = kb_link + kb_link_formatter = self.get_kibana_discover_external_url_formatter(rule) + matches[0]['kibana_discover_url'] = kb_link_formatter.format(kb_link) # Enhancements were already run at match time if # run_enhancements_first is set or @@ -1676,6 +1678,17 @@ def get_alert_body(self, match, rule, alert_sent, alert_time, alert_exception=No body['alert_exception'] = alert_exception return body + def get_kibana_discover_external_url_formatter(self, rule): + """ Gets or create the external url formatter for kibana discover links """ + key = '__kibana_discover_external_url_formatter__' + formatter = rule.get(key) + if formatter is None: + shorten = rule.get('shorten_kibana_discover_url') + security_tenant = rule.get('kibana_discover_security_tenant') + formatter = create_kibana_external_url_formatter(rule, shorten, security_tenant) + rule[key] = formatter + return formatter + def writeback(self, doc_type, body, rule=None, match_body=None): # ES 2.0 - 2.3 does not support dots in field names. if self.replace_dots_in_field_names: diff --git a/elastalert/kibana_external_url_formatter.py b/elastalert/kibana_external_url_formatter.py new file mode 100644 index 00000000..1c463a97 --- /dev/null +++ b/elastalert/kibana_external_url_formatter.py @@ -0,0 +1,138 @@ +import boto3 +import os +from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlsplit, urlunsplit + +import requests +from requests import RequestException +from requests.auth import AuthBase, HTTPBasicAuth + +from elastalert.auth import RefeshableAWSRequestsAuth +from elastalert.util import EAException + +def append_security_tenant(url, security_tenant): + '''Appends the security_tenant query string parameter to the url''' + parsed = urlsplit(url) + + if parsed.query: + qs = parse_qsl(parsed.query, keep_blank_values=True, strict_parsing=True) + else: + qs = [] + qs.append(('security_tenant', security_tenant)) + + new_query = urlencode(qs) + new_args = parsed._replace(query=new_query) + new_url = urlunsplit(new_args) + return new_url + +class KibanaExternalUrlFormatter: + '''Interface for formatting external Kibana urls''' + + def format(self, relative_url: str) -> str: + raise NotImplementedError() + +class AbsoluteKibanaExternalUrlFormatter(KibanaExternalUrlFormatter): + '''Formats absolute external Kibana urls''' + + def __init__(self, base_url: str, security_tenant: str) -> None: + self.base_url = base_url + self.security_tenant = security_tenant + + def format(self, relative_url: str) -> str: + url = urljoin(self.base_url, relative_url) + if self.security_tenant: + url = append_security_tenant(url, self.security_tenant) + return url + +class ShortKibanaExternalUrlFormatter(KibanaExternalUrlFormatter): + '''Formats external urls using the Kibana Shorten URL API''' + + def __init__(self, base_url: str, auth: AuthBase, security_tenant: str) -> None: + self.auth = auth + self.security_tenant = security_tenant + self.goto_url = urljoin(base_url, 'goto/') + + shorten_url = urljoin(base_url, 'api/shorten_url') + if security_tenant: + shorten_url = append_security_tenant(shorten_url, security_tenant) + self.shorten_url = shorten_url + + def format(self, relative_url: str) -> str: + # join with '/' to ensure relative to root of app + long_url = urljoin('/', relative_url) + if self.security_tenant: + long_url = append_security_tenant(long_url, self.security_tenant) + + try: + response = requests.post( + url=self.shorten_url, + auth=self.auth, + headers={ + 'kbn-xsrf': 'elastalert', + 'osd-xsrf': 'elastalert' + }, + json={ + 'url': long_url + } + ) + response.raise_for_status() + except RequestException as e: + raise EAException("Failed to invoke Kibana Shorten URL API: %s" % e) + + response_body = response.json() + url_id = response_body.get('urlId') + + goto_url = urljoin(self.goto_url, url_id) + if self.security_tenant: + goto_url = append_security_tenant(goto_url, self.security_tenant) + return goto_url + + +def create_kibana_auth(kibana_url, rule) -> AuthBase: + '''Creates a Kibana http authentication for use by requests''' + + # Basic + username = rule.get('kibana_username') + password = rule.get('kibana_password') + if username and password: + return HTTPBasicAuth(username, password) + + # AWS SigV4 + aws_region = rule.get('aws_region') + if not aws_region: + aws_region = os.environ.get('AWS_DEFAULT_REGION') + if aws_region: + + aws_profile = rule.get('profile') + session = boto3.session.Session( + profile_name=aws_profile, + region_name=aws_region + ) + credentials = session.get_credentials() + + kibana_host = urlparse(kibana_url).hostname + + return RefeshableAWSRequestsAuth( + refreshable_credential=credentials, + aws_host=kibana_host, + aws_region=aws_region, + aws_service='es' + ) + + # Unauthenticated + return None + + +def create_kibana_external_url_formatter( + rule, + shorten: bool, + security_tenant: str +) -> KibanaExternalUrlFormatter: + '''Creates a Kibana external url formatter''' + + base_url = rule.get('kibana_url') + + if shorten: + auth = create_kibana_auth(base_url, rule) + return ShortKibanaExternalUrlFormatter(base_url, auth, security_tenant) + + return AbsoluteKibanaExternalUrlFormatter(base_url, security_tenant) diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index 49a05bd8..04b9eba2 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -233,6 +233,9 @@ properties: raw_count_keys: {type: boolean} generate_kibana_link: {type: boolean} kibana_dashboard: {type: string} + kibana_url: {type: string, format: uri} + kibana_username: {type: string} + kibana_password: {type: string} use_kibana_dashboard: {type: string} use_local_time: {type: boolean} custom_pretty_ts_format: {type: string} @@ -250,12 +253,14 @@ properties: ### Kibana Discover App Link generate_kibana_discover_url: {type: boolean} - kibana_discover_app_url: {type: string, format: uri} + shorten_kibana_discover_url: {type: boolean} + kibana_discover_app_url: {type: string} kibana_discover_version: {type: string, enum: ['7.15', '7.14', '7.13', '7.12', '7.11', '7.10', '7.9', '7.8', '7.7', '7.6', '7.5', '7.4', '7.3', '7.2', '7.1', '7.0', '6.8', '6.7', '6.6', '6.5', '6.4', '6.3', '6.2', '6.1', '6.0', '5.6']} kibana_discover_index_pattern_id: {type: string, minLength: 1} kibana_discover_columns: {type: array, items: {type: string, minLength: 1}, minItems: 1} kibana_discover_from_timedelta: *timedelta kibana_discover_to_timedelta: *timedelta + kibana_discover_security_tenant: {type:string} # Alert Content alert_text: {type: string} # Python format string diff --git a/tests/base_test.py b/tests/base_test.py index 8f8ada27..c28e62c7 100644 --- a/tests/base_test.py +++ b/tests/base_test.py @@ -14,6 +14,8 @@ from elastalert.enhancements import DropMatchException from elastalert.enhancements import TimeEnhancement from elastalert.kibana import dashboard_temp +from elastalert.kibana_external_url_formatter import AbsoluteKibanaExternalUrlFormatter +from elastalert.kibana_external_url_formatter import ShortKibanaExternalUrlFormatter from elastalert.util import dt_to_ts from elastalert.util import dt_to_unix from elastalert.util import dt_to_unixms @@ -1465,3 +1467,30 @@ def test_time_enhancement(ea): te.process(match) excepted = '2021-01-01 00:00 UTC' assert match['@timestamp'] == excepted + + +def test_get_kibana_discover_external_url_formatter_same_rule(ea): + rule = ea.rules[0] + x = ea.get_kibana_discover_external_url_formatter(rule) + y = ea.get_kibana_discover_external_url_formatter(rule) + assert type(x) is AbsoluteKibanaExternalUrlFormatter + assert x is y, "Should return same external url formatter for the same rule" + + +def test_get_kibana_discover_external_url_formatter_different_rule(ea): + x_rule = ea.rules[0] + y_rule = copy.copy(x_rule) + y_rule['name'] = 'different_rule' + x = ea.get_kibana_discover_external_url_formatter(x_rule) + y = ea.get_kibana_discover_external_url_formatter(y_rule) + assert type(x) is AbsoluteKibanaExternalUrlFormatter + assert x is not y, 'Should return unique external url formatter for each rule' + + +def test_get_kibana_discover_external_url_formatter_smoke(ea): + rule = copy.copy(ea.rules[0]) + rule['kibana_discover_security_tenant'] = 'global' + rule['shorten_kibana_discover_url'] = True + formatter = ea.get_kibana_discover_external_url_formatter(rule) + assert type(formatter) is ShortKibanaExternalUrlFormatter + assert formatter.security_tenant == 'global' diff --git a/tests/kibana_discover_test.py b/tests/kibana_discover_test.py index 5288d14c..f4fca4e6 100644 --- a/tests/kibana_discover_test.py +++ b/tests/kibana_discover_test.py @@ -88,6 +88,38 @@ def test_generate_kibana_discover_url_with_kibana_7x(kibana_version): assert url == expectedUrl +def test_generate_kibana_discover_url_with_relative_kinbana_discover_app_url(): + url = generate_kibana_discover_url( + rule={ + 'kibana_discover_app_url': 'app/discover#/', + 'kibana_discover_version': '7.15', + 'kibana_discover_index_pattern_id': '620ad0e6-43df-4557-bda2-384960fa9086', + 'timestamp_field': 'timestamp' + }, + match={ + 'timestamp': '2021-10-08T00:30:00Z' + } + ) + expectedUrl = ( + 'app/discover#/' + + '?_g=%28' # global start + + 'filters%3A%21%28%29%2C' + + 'refreshInterval%3A%28pause%3A%21t%2Cvalue%3A0%29%2C' + + 'time%3A%28' # time start + + 'from%3A%272021-10-08T00%3A20%3A00Z%27%2C' + + 'to%3A%272021-10-08T00%3A40%3A00Z%27' + + '%29' # time end + + '%29' # global end + + '&_a=%28' # app start + + 'columns%3A%21%28_source%29%2C' + + 'filters%3A%21%28%29%2C' + + 'index%3A%27620ad0e6-43df-4557-bda2-384960fa9086%27%2C' + + 'interval%3Aauto' + + '%29' # app end + ) + assert url == expectedUrl + + def test_generate_kibana_discover_url_with_missing_kibana_discover_version(): url = generate_kibana_discover_url( rule={ diff --git a/tests/kibana_external_url_formatter_test.py b/tests/kibana_external_url_formatter_test.py new file mode 100644 index 00000000..4edecb62 --- /dev/null +++ b/tests/kibana_external_url_formatter_test.py @@ -0,0 +1,357 @@ +from typing import Any +import os +import pytest + +import requests +from requests.auth import AuthBase, HTTPBasicAuth + +from elastalert.kibana_external_url_formatter import AbsoluteKibanaExternalUrlFormatter +from elastalert.kibana_external_url_formatter import KibanaExternalUrlFormatter +from elastalert.kibana_external_url_formatter import ShortKibanaExternalUrlFormatter +from elastalert.kibana_external_url_formatter import append_security_tenant +from elastalert.kibana_external_url_formatter import create_kibana_auth +from elastalert.kibana_external_url_formatter import create_kibana_external_url_formatter + +from elastalert.auth import RefeshableAWSRequestsAuth +from elastalert.util import EAException + +from unittest import mock + + +class AbsoluteFormatTestCase: + def __init__( + self, + base_url: str, + relative_url: str, + expected_url: str, + security_tenant: str = None, + ) -> None: + self.base_url = base_url + self.relative_url = relative_url + self.expected_url = expected_url + self.security_tenant = security_tenant + + +@pytest.mark.parametrize("test_case", [ + + # Relative to Kibana plugin + AbsoluteFormatTestCase( + base_url='http://elasticsearch.test.org:9200/_plugin/kibana/', + relative_url='app/dev_tools#/console', + expected_url='http://elasticsearch.test.org:9200/_plugin/kibana/app/dev_tools#/console' + ), + + # Relative to OpenSearch Dashboards + AbsoluteFormatTestCase( + base_url='http://opensearch.test.org/_dashboards/', + relative_url='app/dev_tools#/console', + expected_url='http://opensearch.test.org/_dashboards/app/dev_tools#/console' + ), + + # Relative to root of dedicated Kibana domain + AbsoluteFormatTestCase( + base_url='http://kibana.test.org/', + relative_url='/app/dev_tools#/console', + expected_url='http://kibana.test.org/app/dev_tools#/console' + ), + + # With security tenant + AbsoluteFormatTestCase( + base_url='http://kibana.test.org/', + security_tenant='global', + relative_url='/app/dev_tools#/console', + expected_url='http://kibana.test.org/app/dev_tools?security_tenant=global#/console' + ), +]) +def test_absolute_kinbana_external_url_formatter( + test_case: AbsoluteFormatTestCase +): + formatter = AbsoluteKibanaExternalUrlFormatter( + base_url=test_case.base_url, + security_tenant=test_case.security_tenant + ) + actualUrl = formatter.format(test_case.relative_url) + assert actualUrl == test_case.expected_url + + +def mock_kibana_shorten_url_api(*args, **kwargs): + class MockResponse: + def __init__(self, status_code): + self.status_code = status_code + + def json(self): + return { + 'urlId': '62af3ebe6652370f85de91ccb3a3825f' + } + + def raise_for_status(self): + if self.status_code == 400: + raise requests.exceptions.HTTPError() + + json = kwargs['json'] + url = json['url'] + + if url.startswith('/app/'): + return MockResponse(200) + else: + return MockResponse(400) + + +class ShortenUrlTestCase: + def __init__( + self, + base_url: str, + relative_url: str, + expected_api_request: Any, + expected_url: str, + auth: AuthBase = None, + security_tenant: str = None + ) -> None: + self.base_url = base_url + self.relative_url = relative_url + self.expected_api_request = expected_api_request + self.expected_url = expected_url + self.authorization = auth + self.security_tenant = security_tenant + + +@mock.patch('requests.post', side_effect=mock_kibana_shorten_url_api) +@pytest.mark.parametrize("test_case", [ + + # Relative to kibana plugin + ShortenUrlTestCase( + base_url='http://elasticsearch.test.org/_plugin/kibana/', + relative_url='app/dev_tools#/console', + expected_api_request={ + 'url': 'http://elasticsearch.test.org/_plugin/kibana/api/shorten_url', + 'auth': None, + 'headers': { + 'kbn-xsrf': 'elastalert', + 'osd-xsrf': 'elastalert' + }, + 'json': { + 'url': '/app/dev_tools#/console' + } + }, + expected_url='http://elasticsearch.test.org/_plugin/kibana/goto/62af3ebe6652370f85de91ccb3a3825f' + ), + + # Relative to root of dedicated Kibana domain + ShortenUrlTestCase( + base_url='http://kibana.test.org/', + relative_url='/app/dev_tools#/console', + expected_api_request={ + 'url': 'http://kibana.test.org/api/shorten_url', + 'auth': None, + 'headers': { + 'kbn-xsrf': 'elastalert', + 'osd-xsrf': 'elastalert' + }, + 'json': { + 'url': '/app/dev_tools#/console' + } + }, + expected_url='http://kibana.test.org/goto/62af3ebe6652370f85de91ccb3a3825f' + ), + + # With authentication + ShortenUrlTestCase( + base_url='http://kibana.test.org/', + auth=HTTPBasicAuth('john', 'doe'), + relative_url='/app/dev_tools#/console', + expected_api_request={ + 'url': 'http://kibana.test.org/api/shorten_url', + 'auth': HTTPBasicAuth('john', 'doe'), + 'headers': { + 'kbn-xsrf': 'elastalert', + 'osd-xsrf': 'elastalert' + }, + 'json': { + 'url': '/app/dev_tools#/console' + } + }, + expected_url='http://kibana.test.org/goto/62af3ebe6652370f85de91ccb3a3825f' + ), + + # With security tenant + ShortenUrlTestCase( + base_url='http://kibana.test.org/', + security_tenant='global', + relative_url='/app/dev_tools#/console', + expected_api_request={ + 'url': 'http://kibana.test.org/api/shorten_url?security_tenant=global', + 'auth': None, + 'headers': { + 'kbn-xsrf': 'elastalert', + 'osd-xsrf': 'elastalert' + }, + 'json': { + 'url': '/app/dev_tools?security_tenant=global#/console' + } + }, + expected_url='http://kibana.test.org/goto/62af3ebe6652370f85de91ccb3a3825f?security_tenant=global' + ) +]) +def test_short_kinbana_external_url_formatter( + mock_post: mock.MagicMock, + test_case: ShortenUrlTestCase +): + formatter = ShortKibanaExternalUrlFormatter( + base_url=test_case.base_url, + auth=test_case.authorization, + security_tenant=test_case.security_tenant, + ) + + actualUrl = formatter.format(test_case.relative_url) + assert actualUrl == test_case.expected_url + + mock_post.assert_called_once_with(**test_case.expected_api_request) + + +@mock.patch('requests.post', side_effect=mock_kibana_shorten_url_api) +def test_short_kinbana_external_url_formatter_request_exception(mock_post: mock.MagicMock): + formatter = ShortKibanaExternalUrlFormatter( + base_url='http://kibana.test.org', + auth=None, + security_tenant=None, + ) + with pytest.raises(EAException, match="Failed to invoke Kibana Shorten URL API"): + formatter.format('http://wacky.org') + mock_post.assert_called_once() + + +def test_create_kibana_external_url_formatter_without_shortening(): + formatter = create_kibana_external_url_formatter( + rule={ + 'kibana_url': 'http://kibana.test.org/' + }, + shorten=False, + security_tenant='foo' + ) + assert type(formatter) is AbsoluteKibanaExternalUrlFormatter + assert formatter.base_url == 'http://kibana.test.org/' + assert formatter.security_tenant == 'foo' + + +def test_create_kibana_external_url_formatter_with_shortening(): + formatter = create_kibana_external_url_formatter( + rule={ + 'kibana_url': 'http://kibana.test.org/', + 'kibana_username': 'john', + 'kibana_password': 'doe' + }, + shorten=True, + security_tenant='foo' + ) + assert type(formatter) is ShortKibanaExternalUrlFormatter + assert formatter.auth == HTTPBasicAuth('john', 'doe') + assert formatter.security_tenant == 'foo' + assert formatter.goto_url == 'http://kibana.test.org/goto/' + assert formatter.shorten_url == 'http://kibana.test.org/api/shorten_url?security_tenant=foo' + + +@pytest.mark.parametrize("test_case", [ + # Trivial + { + 'url': 'http://test.org', + 'expected': 'http://test.org?security_tenant=foo' + }, + # With query + { + 'url': 'http://test.org?year=2021', + 'expected': 'http://test.org?year=2021&security_tenant=foo' + }, + # With fragment + { + 'url': 'http://test.org#fragement', + 'expected': 'http://test.org?security_tenant=foo#fragement' + }, + # With query & fragment + { + 'url': 'http://test.org?year=2021#fragement', + 'expected': 'http://test.org?year=2021&security_tenant=foo#fragement' + }, +]) +def test_append_security_tenant(test_case): + url = test_case.get('url') + expected = test_case.get('expected') + result = append_security_tenant(url=url, security_tenant='foo') + assert result == expected + + +def test_create_kibana_auth_basic(): + auth = create_kibana_auth( + kibana_url='http://kibana.test.org', + rule={ + 'kibana_username': 'john', + 'kibana_password': 'doe', + } + ) + assert auth == HTTPBasicAuth('john', 'doe') + + +@mock.patch.dict( + os.environ, + { + 'AWS_DEFAULT_REGION': '', + 'AWS_ACCESS_KEY_ID': 'access', + 'AWS_SECRET_ACCESS_KEY': 'secret', + }, + clear=True +) +def test_create_kibana_auth_aws_explicit_region(): + auth = create_kibana_auth( + kibana_url='http://kibana.test.org', + rule={ + 'aws_region': 'us-east-1' + } + ) + assert type(auth) is RefeshableAWSRequestsAuth + assert auth.aws_host == 'kibana.test.org' + assert auth.aws_region == 'us-east-1' + assert auth.service == 'es' + assert auth.aws_access_key == 'access' + assert auth.aws_secret_access_key == 'secret' + assert auth.aws_token is None + + +@mock.patch.dict( + os.environ, + { + 'AWS_DEFAULT_REGION': 'us-east-2', + 'AWS_ACCESS_KEY_ID': 'access', + 'AWS_SECRET_ACCESS_KEY': 'secret', + }, + clear=True +) +def test_create_kibana_auth_aws_implicit_region(): + auth = create_kibana_auth( + kibana_url='http://kibana.test.org', + rule={} + ) + assert type(auth) is RefeshableAWSRequestsAuth + assert auth.aws_host == 'kibana.test.org' + assert auth.aws_region == 'us-east-2' + assert auth.service == 'es' + assert auth.aws_access_key == 'access' + assert auth.aws_secret_access_key == 'secret' + assert auth.aws_token is None + + +@mock.patch.dict( + os.environ, + {}, + clear=True +) +def test_create_kibana_auth_unauthenticated(): + auth = create_kibana_auth( + kibana_url='http://kibana.test.org', + rule={} + ) + assert auth is None + + +def test_kibana_external_url_formatter_not_implemented(): + formatter = KibanaExternalUrlFormatter() + with pytest.raises(NotImplementedError): + formatter.format('test')