Skip to content

Commit

Permalink
Add verified to work owa sync code
Browse files Browse the repository at this point in the history
- Add proper optional hmac support to built-in iris client and move
  it to separate file

- Add metric for number of messages delt with

- Make the nap time between polls configurable

- Add makefile entry
  • Loading branch information
jrgp authored and houqp committed Jun 7, 2017
1 parent b01d8fe commit 5aa53a9
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 30 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ serve:
sender:
iris-sender configs/config.dev.yaml

owa-sync:
iris-owa-sync configs/config.dev.yaml

test:
make unit
make e2e
Expand Down
5 changes: 5 additions & 0 deletions configs/config.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,8 @@ gmail_one_click_url_endpoint: 'http://localhost:16648/api/v0/gmail-oneclick/rela
# username: ''
# password: ''
# smtp_address: '[email protected]'
# # need a valid iris application for for owa script to connect to api to relay emails
# iris_app: ''
# iris_app_key: ''
# api_host: http://localhost:16649
# sleep_interval: 60
35 changes: 22 additions & 13 deletions src/iris/bin/owasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from iris import metrics
from iris.config import load_config
from iris.sender.cache import IrisClient
from iris.client import IrisClient
from exchangelib import DELEGATE, Account, Credentials
from exchangelib.errors import EWSError
import sys
Expand All @@ -30,7 +30,8 @@
'message_relay_success': 0,
'message_relay_failure': 0,
'total_inbox_count': 0,
'unread_inbox_count': 0
'unread_inbox_count': 0,
'message_process_count': 0
}


Expand All @@ -43,8 +44,11 @@ def poll(account, iris_client):
logger.exception('Failed to gather inbox counts from OWA API')
metrics.incr('owa_api_failure')

processed_messages = 0

try:
for message in account.inbox.filter(is_read=False).order_by('-datetime_received'):
processed_messages += 1
if relay(message, iris_client):
message.is_read = True
try:
Expand All @@ -56,6 +60,9 @@ def poll(account, iris_client):
logger.exception('Failed to iterate through inbox')
metrics.incr('owa_api_failure')

metrics.set('message_process_count', processed_messages)
return processed_messages


def relay(message, iris_client):
# Get headers into the format the iris expects from gmail
Expand All @@ -69,11 +76,10 @@ def relay(message, iris_client):
data = {'headers': headers, 'body': message.text_body.strip()}

try:
# TODO: add POST method and HMAC functionality to irisclient
iris_client.post('v0/response/email', json=data)
iris_client.post('response/email', json=data).raise_for_status()
metrics.incr('message_relay_success')
return True
except (requests.exceptions.RequestException):
except requests.exceptions.RequestException:
metrics.incr('message_relay_failure')
logger.exception('Failed posting message %s (from %s) to iris-api', message.message_id, message.sender.email_address)
return False
Expand All @@ -90,6 +96,11 @@ def main():
logger.critical('Missing OWA configs')
sys.exit(1)

api_host = owaconfig.get('api_host', 'http://localhost:16649')
iris_client = IrisClient(api_host, 0, owaconfig['iris_app'], owaconfig['iris_app_key'])

spawn(metrics.emit_forever)

creds = Credentials(**owaconfig['credentials'])

account = Account(
Expand All @@ -99,16 +110,14 @@ def main():
access_type=DELEGATE)
logger.info('Receiving mail on behalf of %s', owaconfig['smtp_address'])

nap_time = 60

api_host = config.get('sender', {}).get('api_host', 'http://localhost:16649')
iris_client = IrisClient(api_host)

spawn(metrics.emit_forever)
try:
nap_time = int(owaconfig.get('sleep_interval', 60))
except ValueError:
nap_time = 60

while True:
start_time = time.time()
poll(account, iris_client)
message_count = poll(account, iris_client)
run_time = time.time() - start_time
logger.info('Last run took %2.f seconds. Waiting %s seconds until next poll..', run_time, nap_time)
logger.info('Last run took %2.f seconds and processed %s messages. Waiting %s seconds until next poll..', run_time, message_count, nap_time)
sleep(nap_time)
50 changes: 50 additions & 0 deletions src/iris/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import requests
import base64
import hashlib
import hmac
import time
import logging

logger = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.WARNING)


class IrisAuth(requests.auth.AuthBase):
def __init__(self, app, key):
self.header = 'hmac %s:' % app
self.HMAC = hmac.new(key, '', hashlib.sha512)

def __call__(self, request):
HMAC = self.HMAC.copy()
path = request.path_url
method = request.method
body = request.body or ''
window = int(time.time()) // 5
HMAC.update('%s %s %s %s' % (window, method, path, body))
digest = base64.urlsafe_b64encode(HMAC.digest())
request.headers['Authorization'] = self.header + digest
return request


class IrisClient(requests.Session):
def __init__(self, base, version=0, iris_app=None, iris_app_key=None):
super(IrisClient, self).__init__()
self.url = base + '/v%d/' % version
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
self.mount('http://', adapter)
self.mount('https://', adapter)

if iris_app and iris_app_key:
self.auth = IrisAuth(iris_app, iris_app_key)
logger.info('Initializing iris api client with auth using app %s', iris_app)
else:
logger.warning('Initializing iris api client without auth')

def get(self, path, *args, **kwargs):
return super(IrisClient, self).get(self.url + path, *args, **kwargs)

def post(self, path, *args, **kwargs):
return super(IrisClient, self).post(self.url + path, *args, **kwargs)

def put(self, path, *args, **kwargs):
return super(IrisClient, self).put(self.url + path, *args, **kwargs)
18 changes: 1 addition & 17 deletions src/iris/sender/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import absolute_import

from collections import deque
import requests
import jinja2
from jinja2.sandbox import SandboxedEnvironment
from gevent import spawn, sleep
Expand All @@ -13,10 +12,10 @@
from .. import db
from ..role_lookup import get_role_lookups
from . import auditlog
from ..client import IrisClient

import logging
logger = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.WARNING)


iris_client = None
Expand All @@ -31,21 +30,6 @@
targets_for_role = None


class IrisClient(requests.Session):
def __init__(self, base, version=0):
super(IrisClient, self).__init__()
self.url = base + '/v%d/' % version
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
self.mount('http://', adapter)
self.mount('https://', adapter)

def get(self, path, *args, **kwargs):
return super(IrisClient, self).get(self.url + path, *args, **kwargs)

def post(self, path, *args, **kwargs):
return super(IrisClient, self).post(self.url + path, *args, **kwargs)


class Cache():
def __init__(self, engine, sql, active):
self.engine = engine
Expand Down

0 comments on commit 5aa53a9

Please sign in to comment.