Skip to content

Commit

Permalink
Merge pull request #331 from nsano-rururu/testcode6
Browse files Browse the repository at this point in the history
Improved test code coverage 70% → 72%
  • Loading branch information
jertel authored Jul 7, 2021
2 parents 3307cc8 + 919b6bd commit dc6fc36
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 5 deletions.
44 changes: 44 additions & 0 deletions tests/alerters/chatwork_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,47 @@ def test_chatwork_maxlength():

actual_data = mock_post_request.call_args_list[0][1]['params']
assert expected_data == actual_data


def test_chatwork_matchs():
rule = {
'name': 'Test Chatwork Rule',
'type': 'any',
'chatwork_apikey': 'xxxx1',
'chatwork_room_id': 'xxxx2',
'alert': []
}
rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = ChatworkAlerter(rule)
match = {
'@timestamp': '2021-01-01T00:00:00',
'somefield': 'foobarbaz'
}
with mock.patch('requests.post') as mock_post_request:
alert.alert([match, match])
expected_data = {
'body': 'Test Chatwork Rule\n' +
'\n' +
'@timestamp: 2021-01-01T00:00:00\n' +
'somefield: foobarbaz\n' +
'\n' +
'----------------------------------------\n' +
'Test Chatwork Rule\n' +
'\n' +
'@timestamp: 2021-01-01T00:00:00\n' +
'somefield: foobarbaz\n' +
'\n' +
'----------------------------------------\n',
}

mock_post_request.assert_called_once_with(
'https://api.chatwork.com/v2/rooms/xxxx2/messages',
params=mock.ANY,
headers={'X-ChatWorkToken': 'xxxx1'},
proxies=None,
auth=None
)

actual_data = mock_post_request.call_args_list[0][1]['params']
assert expected_data == actual_data
58 changes: 58 additions & 0 deletions tests/alerters/discord_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,61 @@ def test_discord_required_error(discord_webhook_url, expected_data):
assert expected_data == actual_data
except Exception as ea:
assert expected_data in str(ea)


def test_discord_matches():
rule = {
'name': 'Test Discord Rule',
'type': 'any',
'discord_webhook_url': 'http://xxxxxxx',
'discord_emoji_title': ':warning:',
'discord_embed_color': 0xffffff,
'discord_embed_footer': 'footer',
'discord_embed_icon_url': 'http://xxxx/image.png',
'alert': [],
'alert_subject': 'Test Discord'
}
rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = DiscordAlerter(rule)
match = {
'@timestamp': '2021-01-01T00:00:00',
'somefield': 'foobarbaz'
}
with mock.patch('requests.post') as mock_post_request:
alert.alert([match, match])

expected_data = {
'content': ':warning: Test Discord :warning:',
'embeds':
[{
'description': 'Test Discord Rule\n' +
'\n' +
'@timestamp: 2021-01-01T00:00:00\n' +
'somefield: foobarbaz\n' +
'\n' +
'----------------------------------------\n' +
'Test Discord Rule\n' +
'\n' +
'@timestamp: 2021-01-01T00:00:00\n' +
'somefield: foobarbaz\n' +
'\n' +
'----------------------------------------\n',
'color': 0xffffff,
'footer': {
'text': 'footer',
'icon_url': 'http://xxxx/image.png'
}
}]
}

mock_post_request.assert_called_once_with(
rule['discord_webhook_url'],
data=mock.ANY,
headers={'Content-Type': 'application/json'},
proxies=None,
auth=None
)

actual_data = json.loads(mock_post_request.call_args_list[0][1]['data'])
assert expected_data == actual_data
45 changes: 45 additions & 0 deletions tests/alerters/line_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,48 @@ def test_line_notify_maxlength():

actual_data = mock_post_request.call_args_list[0][1]['data']
assert expected_data == actual_data


def test_line_notify_matchs():
rule = {
'name': 'Test LineNotify Rule',
'type': 'any',
'linenotify_access_token': 'xxxxx',
'alert': []
}
rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = LineNotifyAlerter(rule)
match = {
'@timestamp': '2021-01-01T00:00:00',
'somefield': 'foobarbaz'
}
with mock.patch('requests.post') as mock_post_request:
alert.alert([match, match])

expected_data = {
'message': 'Test LineNotify Rule\n'
'\n'
'@timestamp: 2021-01-01T00:00:00\n'
'somefield: foobarbaz\n'
'\n'
'----------------------------------------\n'
'Test LineNotify Rule\n'
'\n'
'@timestamp: 2021-01-01T00:00:00\n'
'somefield: foobarbaz\n'
'\n'
'----------------------------------------\n'
}

mock_post_request.assert_called_once_with(
'https://notify-api.line.me/api/notify',
data=mock.ANY,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Bearer {}'.format('xxxxx')
}
)

actual_data = mock_post_request.call_args_list[0][1]['data']
assert expected_data == actual_data
52 changes: 52 additions & 0 deletions tests/alerters/opsgenie_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,3 +908,55 @@ def test_opsgenie_substitution(opsgenie_entity, expected_entity, opsgenie_priori

assert mcal[0][1]['json']['entity'] == expected_entity
assert mcal[0][1]['json']['priority'] == expected_priority


def test_opsgenie_details_with_constant_value_matchs():
rule = {
'name': 'Opsgenie Details',
'type': mock_rule(),
'opsgenie_account': 'genies',
'opsgenie_key': 'ogkey',
'opsgenie_details': {'Foo': 'Bar'}
}
match = {
'@timestamp': '2014-10-31T00:00:00'
}
alert = OpsGenieAlerter(rule)

with mock.patch('requests.post') as mock_post_request:
alert.alert([match, match])

mock_post_request.assert_called_once_with(
'https://api.opsgenie.com/v2/alerts',
headers={
'Content-Type': 'application/json',
'Authorization': 'GenieKey ogkey'
},
json=mock.ANY,
proxies=None
)

expected_json = {
'description': 'Opsgenie Details\n'
'\n'
"{'@timestamp': '2014-10-31T00:00:00'}\n"
'\n'
'@timestamp: 2014-10-31T00:00:00\n'
'\n'
'----------------------------------------\n'
'Opsgenie Details\n'
'\n'
"{'@timestamp': '2014-10-31T00:00:00'}\n"
'\n'
'@timestamp: 2014-10-31T00:00:00\n'
'\n'
'----------------------------------------\n',
'details': {'Foo': 'Bar'},
'message': 'ElastAlert: Opsgenie Details',
'priority': None,
'source': 'ElastAlert',
'tags': ['ElastAlert', 'Opsgenie Details'],
'user': 'genies'
}
actual_json = mock_post_request.call_args_list[0][1]['json']
assert expected_json == actual_json
49 changes: 49 additions & 0 deletions tests/alerters/telegram_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,52 @@ def test_telegram_required_error(telegram_bot_token, telegram_room_id, expected_
assert expected_data == actual_data
except Exception as ea:
assert expected_data in str(ea)


def test_telegram_matchs():
rule = {
'name': 'Test Telegram Rule',
'type': 'any',
'telegram_bot_token': 'xxxxx1',
'telegram_room_id': 'xxxxx2',
'alert': []
}
rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = TelegramAlerter(rule)
match = {
'@timestamp': '2021-01-01T00:00:00',
'somefield': 'foobarbaz'
}
with mock.patch('requests.post') as mock_post_request:
alert.alert([match, match])
expected_data = {
'chat_id': rule['telegram_room_id'],
'text': '⚠ *Test Telegram Rule* ⚠ ```\n' +
'Test Telegram Rule\n' +
'\n' +
'@timestamp: 2021-01-01T00:00:00\n' +
'somefield: foobarbaz\n' +
'\n' +
'----------------------------------------\n' +
'Test Telegram Rule\n' +
'\n' +
'@timestamp: 2021-01-01T00:00:00\n' +
'somefield: foobarbaz\n' +
'\n' +
'----------------------------------------\n' +
' ```',
'parse_mode': 'markdown',
'disable_web_page_preview': True
}

mock_post_request.assert_called_once_with(
'https://api.telegram.org/botxxxxx1/sendMessage',
data=mock.ANY,
headers={'content-type': 'application/json'},
proxies=None,
auth=None
)

actual_data = json.loads(mock_post_request.call_args_list[0][1]['data'])
assert expected_data == actual_data
91 changes: 91 additions & 0 deletions tests/alerters/thehive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,94 @@ def test_thehive_getinfo(hive_host, expect):
expected_data = expect
actual_data = alert.get_info()
assert expected_data == actual_data


def test_thehive_alerter2():
rule = {'alert': [],
'alert_text': '',
'alert_text_type': 'alert_text_only',
'description': 'test',
'hive_alert_config': {'customFields': [{'name': 'test',
'type': 'string',
'value': 2}],
'follow': True,
'severity': 2,
'source': 'elastalert',
'status': 'New',
'tags': ['test.port'],
'tlp': 3,
'type': 'external'},
'hive_connection': {'hive_apikey': '',
'hive_host': 'https://localhost',
'hive_port': 9000},
'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}],
'name': 'test-thehive',
'tags': ['a', 'b'],
'type': 'any'}
rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = HiveAlerter(rule)
match = {
"test": {
"ip": "127.0.0.1",
"port": 9876,
"as_number": 1234
},
"@timestamp": "2021-05-09T14:43:30",
}
with mock.patch('requests.post') as mock_post_request:
alert.alert([match])

expected_data = {
"artifacts": [
{
"data": "127.0.0.1",
"dataType": "ip",
"message": None,
"tags": [],
"tlp": 2
},
{
"data": "1234",
"dataType": "autonomous-system",
"message": None,
"tags": [],
"tlp": 2
}
],
"customFields": {
"test": {
"order": 0,
"string": 2
}
},
"description": "\n\n",
"follow": True,
"severity": 2,
"source": "elastalert",
"status": "New",
"tags": [
"9876"
],
"title": "test-thehive",
"tlp": 3,
"type": "external"
}

conn_config = rule['hive_connection']
alert_url = f"{conn_config['hive_host']}:{conn_config['hive_port']}/api/alert"
mock_post_request.assert_called_once_with(
alert_url,
data=mock.ANY,
headers={'Content-Type': 'application/json',
'Authorization': 'Bearer '},
verify=False,
proxies={'http': '', 'https': ''}
)

actual_data = json.loads(mock_post_request.call_args_list[0][1]['data'])
# The date and sourceRef are autogenerated, so we can't expect them to be a particular value
del actual_data['date']
del actual_data['sourceRef']

assert expected_data == actual_data
15 changes: 15 additions & 0 deletions tests/alerts_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,18 @@ def test_alert_subject_with_jinja():
assert "Test alert for the_owner;" in alertsubject
assert "field field_value;" in alertsubject
assert "Abc: abc from match" in alertsubject


def test_alert_getinfo():
rule = {
'name': 'test_rule',
'type': mock_rule(),
'owner': 'the_owner',
'priority': 2,
'alert_subject': 'A very long subject',
'alert_subject_max_len': 5
}
alert = Alerter(rule)
actual_data = alert.get_info()
expected_data = {'type': 'Unknown'}
assert expected_data == actual_data
Loading

0 comments on commit dc6fc36

Please sign in to comment.