-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wechat alerter #355
Wechat alerter #355
Changes from all commits
4be0270
696c01a
224274a
62748c9
9857568
7c61964
23120e9
195d3d8
63df979
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2780,6 +2780,70 @@ Example With SMS usage:: | |||||
twilio_auth_token: "abcdefghijklmnopqrstuvwxyz012345" | ||||||
twilio_account_sid: "ABCDEFGHIJKLMNOPQRSTUVWXYZ01234567" | ||||||
|
||||||
~~~~~~~~ | ||||||
|
||||||
Wechat will send notification with a application. The body of the notification is formatted the same as with other alerters. See: https://work.weixin.qq.com/api/doc/90000/90135/90236 | ||||||
|
||||||
Required: | ||||||
|
||||||
``wechat_corp_id``: Wechat corp id. | ||||||
|
||||||
``wechat_agent_id``: The wechat agent id you are going to send the message. | ||||||
|
||||||
``wechat_secret``: The wechat agent secret. | ||||||
|
||||||
``wechat_to_party`` & ``wechat_to_user`` & ``wechat_to_tag``: One of wechat_to_party, wechat_to_user, or wechat_to_tag. | ||||||
|
||||||
Optional: | ||||||
|
||||||
``wechat_msgtype``: Wechat msgtype, defaults to ``text``. ``textcard``, ``markdown``. | ||||||
|
||||||
``wechat_textcard_url``: Wechat textcard url while ``wechat_msgtype`` is ``textcard``, default to ``null_url``. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
``wechat_enable_duplicate_check``: ``enable_duplicate_check``, defaults to ``0``. | ||||||
|
||||||
``wechat_duplicate_check_interval``: ``duplicate_check_interval``, defaults to ``1800``. | ||||||
|
||||||
``wechat_proxy``: HTTP proxy. | ||||||
|
||||||
``wechat_proxy_login``: HTTP proxy login. | ||||||
|
||||||
``wechat_proxy_pass``: HTTP proxy pass. | ||||||
|
||||||
``wechat_textcard_url``: The url of a textcard button.. | ||||||
|
||||||
|
||||||
Example msgtype : text:: | ||||||
|
||||||
alert: | ||||||
- "wechat" | ||||||
wechat_corp_id: "a_corp_id" | ||||||
wechat_secret: "a_secret" | ||||||
wechat_agent_id: "a_agent_id" | ||||||
wechat_to_user: "user1|user2|user3" | ||||||
|
||||||
Example msgtype : textcard:: | ||||||
|
||||||
alert: | ||||||
- "wechat" | ||||||
wechat_corp_id: "a_corp_id" | ||||||
wechat_secret: "a_secret" | ||||||
wechat_agent_id: "a_agent_id" | ||||||
wechat_to_user: "@all" | ||||||
wechat_msgtype: "textcard" | ||||||
wechat_textcard_url: "http://{your_kibana_url}" | ||||||
|
||||||
Example msgtype : markdown:: | ||||||
|
||||||
alert: | ||||||
- "wechat" | ||||||
wechat_corp_id: "a_corp_id" | ||||||
wechat_secret: "a_secret" | ||||||
wechat_agent_id: "a_agent_id" | ||||||
wechat_to_tag: "tag1" | ||||||
wechat_msgtype: "markdown" | ||||||
|
||||||
Zabbix | ||||||
~~~~~~ | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import json | ||
import datetime | ||
|
||
import requests | ||
from requests.exceptions import RequestException | ||
from requests.auth import HTTPProxyAuth | ||
|
||
from elastalert.alerts import Alerter | ||
from elastalert.util import elastalert_logger, EAException | ||
|
||
|
||
class WechatAlerter(Alerter): | ||
required_options = frozenset(['wechat_corp_id', 'wechat_secret', 'wechat_agent_id']) | ||
|
||
def __init__(self, *args): | ||
super(WechatAlerter, self).__init__(*args) | ||
self.wechat_corp_id = self.rule.get('wechat_corp_id', '') | ||
self.wechat_secret = self.rule.get('wechat_secret', '') | ||
self.wechat_agent_id = self.rule.get('wechat_agent_id', '') | ||
self.wechat_msgtype = self.rule.get('wechat_msgtype', 'text') | ||
self.wechat_to_party = self.rule.get('wechat_to_party', None) | ||
self.wechat_to_user = self.rule.get('wechat_to_user', None) | ||
self.wechat_to_tag = self.rule.get('wechat_to_tag', None) | ||
self.wechat_textcard_url = self.rule.get('wechat_textcard_url', 'null_url') | ||
self.wechat_enable_duplicate_check = self.rule.get('wechat_enable_duplicate_check', 0) | ||
self.wechat_duplicate_check_interval = self.rule.get('wechat_duplicate_check_interval', 1800) | ||
|
||
self.wechat_proxy = self.rule.get('wechat_proxy', None) | ||
self.wechat_proxy_login = self.rule.get('wechat_proxy_login', None) | ||
self.wechat_proxy_password = self.rule.get('wechat_proxy_pass', None) | ||
self.proxies = {'https': self.wechat_proxy} if self.wechat_proxy else None | ||
self.auth = HTTPProxyAuth(self.wechat_proxy_login, self.wechat_proxy_password) if self.wechat_proxy_login else None | ||
|
||
self.wechat_access_token = '' | ||
self.wechat_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}' | ||
self.wechat_message_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}' | ||
self.expires_in = datetime.datetime.now() - datetime.timedelta(seconds=3600) | ||
|
||
def get_token(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function could use some unit test coverage. |
||
if self.expires_in >= datetime.datetime.now() and self.wechat_access_token: | ||
return | ||
|
||
try: | ||
response = requests.get(self.wechat_token_url.format(self.wechat_corp_id, | ||
self.wechat_secret), proxies=self.proxies, auth=self.auth) | ||
response.raise_for_status() | ||
except RequestException as e: | ||
raise EAException('Get wechat access_token failed , stacktrace:%s' % e) | ||
|
||
token_json = response.json() | ||
|
||
if 'access_token' not in token_json: | ||
raise EAException('Get wechat access_token failed, cause :%s' % response.text()) | ||
|
||
self.wechat_access_token = token_json['access_token'] | ||
self.expires_in = datetime.datetime.now() + datetime.timedelta(seconds=token_json['expires_in']) | ||
|
||
def format_body(self, body): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could use a simple unit test. |
||
return body.encode('utf8') | ||
|
||
def alert(self, matches): | ||
if not self.wechat_to_user and not self.wechat_to_party and not self.wechat_to_tag: | ||
raise EAException('All wechat_to_user & wechat_to_party & wechat_to_tag invalid.') | ||
|
||
self.get_token() | ||
headers = {'content-type': 'application/json'} | ||
title = self.create_title(matches) | ||
body = self.create_alert_body(matches) | ||
|
||
# message was cropped, see: https://work.weixin.qq.com/api/doc/90000/90135/90236 | ||
if len(body) > 2048: | ||
body = body[:2045] + "..." | ||
|
||
payload = { | ||
'touser': self.wechat_to_user or '', | ||
'toparty': self.wechat_to_party or '', | ||
'totag': self.wechat_to_tag or '', | ||
'agentid': self.wechat_agent_id, | ||
'enable_duplicate_check': self.wechat_enable_duplicate_check, | ||
'duplicate_check_interval': self.wechat_duplicate_check_interval | ||
} | ||
|
||
if self.wechat_msgtype == 'text': | ||
payload['msgtype'] = 'text' | ||
payload['text'] = { | ||
'content': body | ||
} | ||
elif self.wechat_msgtype == 'textcard': | ||
payload['msgtype'] = 'textcard' | ||
payload['textcard'] = { | ||
'title': title, | ||
'description': body, | ||
'url': self.wechat_textcard_url | ||
} | ||
elif self.wechat_msgtype == 'markdown': | ||
payload['msgtype'] = 'markdown' | ||
payload['markdown'] = { | ||
'content': body | ||
} | ||
|
||
try: | ||
response = requests.post(self.wechat_message_url.format(self.wechat_access_token), data=json.dumps( | ||
payload, ensure_ascii=False), headers=headers, proxies=self.proxies, auth=self.auth) | ||
response.raise_for_status() | ||
except RequestException as e: | ||
raise EAException('Error sending wechat msg: %s' % e) | ||
elastalert_logger.info('Alert sent to wechat.') | ||
|
||
def get_info(self): | ||
return {'type': 'wechat'} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -310,7 +310,7 @@ properties: | |
dingtalk_single_title: {type: string} | ||
dingtalk_single_url: {type: string} | ||
dingtalk_btn_orientation: {type: string} | ||
|
||
## Discord | ||
discord_webhook_url: {type: string} | ||
discord_emoji_title: {type: string} | ||
|
@@ -551,6 +551,11 @@ properties: | |
twilio_message_service_sid: {type: string} | ||
twilio_use_copilot: {type: boolean} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be more schema definitions listed here since this alerter has a large number of input parameters. |
||
wechat_corp_id: {type: string} | ||
wechat_secret: {type: string} | ||
wechat_agent_id: {type: integer} | ||
|
||
### Zabbix | ||
zbx_sender_host: {type: string} | ||
zbx_sender_port: {type: integer} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import json | ||
|
||
import pytest | ||
|
||
from elastalert.loaders import FileRulesLoader | ||
from elastalert.alerters.wechat import WechatAlerter | ||
|
||
|
||
def test_wechat_getinfo(): | ||
rule = { | ||
'name': 'Test Wechat Alerter', | ||
'type': 'any', | ||
'wechat_corp_id': 'test_wechat_corp_id', | ||
'wechat_secret': 'test_wechat_secret', | ||
'wechat_agent_id': 'test_wechat_agent_id', | ||
'alert': [], | ||
'alert_subject': 'Test Wechat Alert' | ||
} | ||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = WechatAlerter(rule) | ||
|
||
expected_data = {'type': 'wechat'} | ||
actual_data = alert.get_info() | ||
assert expected_data == actual_data | ||
|
||
|
||
@pytest.mark.parametrize('wechat_corp_id, wechat_secret, wechat_agent_id, expected_data', [ | ||
('', '', '', 'Missing required option(s): wechat_corp_id, wechat_secret, wechat_agent_id'), | ||
('xxxx1', '', '', 'Missing required option(s): wechat_corp_id, wechat_secret, wechat_agent_id'), | ||
('', 'xxxx2', '', 'Missing required option(s): wechat_corp_id, wechat_secret, wechat_agent_id'), | ||
('xxxx1', 'xxxx2', '', 'Missing required option(s): wechat_corp_id, wechat_secret, wechat_agent_id'), | ||
('xxxx1', '', 'xxxx3', 'Missing required option(s): wechat_corp_id, wechat_secret, wechat_agent_id'), | ||
('', 'xxxx2', 'xxxx3', 'Missing required option(s): wechat_corp_id, wechat_secret, wechat_agent_id'), | ||
('xxxx1', 'xxxx2', 'xxxx3', | ||
{ | ||
'type': 'wechat' | ||
}), | ||
]) | ||
def test_wechat_required_error(wechat_corp_id, wechat_secret, wechat_agent_id, expected_data): | ||
try: | ||
rule = { | ||
'name': 'Test Wechat Rule', | ||
'type': 'any', | ||
'alert': [], | ||
'alert_subject': 'Test Wechat' | ||
} | ||
|
||
if wechat_corp_id != '': | ||
rule['wechat_corp_id'] = wechat_corp_id | ||
|
||
if wechat_secret != '': | ||
rule['wechat_secret']=wechat_secret | ||
|
||
if wechat_agent_id != '': | ||
rule['wechat_agent_id'] = wechat_agent_id | ||
|
||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = WechatAlerter(rule) | ||
|
||
actual_data = alert.get_info() | ||
assert expected_data == actual_data | ||
except Exception as ea: | ||
assert expected_data in str(ea) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should change the
&
to|
to avoid confusion since not all are required.