diff --git a/.gitignore b/.gitignore index e8c6e4b..0b1cf53 100644 --- a/.gitignore +++ b/.gitignore @@ -19,10 +19,11 @@ lib64/ parts/ sdist/ var/ +_trial_temp/ *.egg-info/ .installed.cfg *.egg - +.DS_Store # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. diff --git a/vxsandbox/resources/metrics.py b/vxsandbox/resources/metrics.py new file mode 100644 index 0000000..3012385 --- /dev/null +++ b/vxsandbox/resources/metrics.py @@ -0,0 +1,93 @@ +# -*- test-case-name: go.apps.jsbox.tests.test_metrics -*- +# -*- coding: utf-8 -*- + +"""Metrics for JS Box sandboxes""" + +import re + +from vxsandbox import SandboxResource + +from vumi.blinkenlights.metrics import SUM, AVG, MIN, MAX, LAST + + +class MetricEventError(Exception): + """Raised when a command cannot be converted to a metric event.""" + + +class MetricEvent(object): + + AGGREGATORS = { + 'sum': SUM, + 'avg': AVG, + 'min': MIN, + 'max': MAX, + 'last': LAST + } + + NAME_REGEX = re.compile(r"^[a-zA-Z][a-zA-Z0-9._-]{,100}$") + + def __init__(self, store, metric, value, agg): + self.store = store + self.metric = metric + self.value = value + self.agg = agg + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return all((self.store == other.store, self.metric == other.metric, + self.value == other.value, self.agg is other.agg)) + + @classmethod + def _parse_name(cls, name, kind): + if name is None: + raise MetricEventError("Missing %s name." % (kind,)) + if not isinstance(name, basestring): + raise MetricEventError("Invalid type for %s name: %r" + % (kind, name)) + if not cls.NAME_REGEX.match(name): + raise MetricEventError("Invalid %s name: %r." % (kind, name)) + return name + + @classmethod + def _parse_value(cls, value): + try: + value = float(value) + except (ValueError, TypeError): + raise MetricEventError("Invalid metric value %r." % (value,)) + return value + + @classmethod + def _parse_agg(cls, agg): + if not isinstance(agg, basestring): + raise MetricEventError("Invalid metric aggregator %r" % (agg,)) + if agg not in cls.AGGREGATORS: + raise MetricEventError("Invalid metric aggregator %r." % (agg,)) + return cls.AGGREGATORS[agg] + + @classmethod + def from_command(cls, command): + store = cls._parse_name(command.get('store', 'default'), 'store') + metric = cls._parse_name(command.get('metric'), 'metric') + value = cls._parse_value(command.get('value')) + agg = cls._parse_agg(command.get('agg')) + return cls(store, metric, value, agg) + + +class MetricsResource(SandboxResource): + """Resource that provides metric storing.""" + + def _publish_event(self, api, ev): + conversation = self.app_worker.conversation_for_api(api) + self.app_worker.publish_account_metric(conversation.user_account.key, + ev.store, ev.metric, ev.value, + ev.agg) + + def handle_fire(self, api, command): + """Fire a metric value.""" + try: + ev = MetricEvent.from_command(command) + except MetricEventError, e: + return self.reply(command, success=False, reason=unicode(e)) + self._publish_event(api, ev) + return self.reply(command, success=True) diff --git a/vxsandbox/resources/metrics_worker.py b/vxsandbox/resources/metrics_worker.py deleted file mode 100644 index 3b59fc5..0000000 --- a/vxsandbox/resources/metrics_worker.py +++ /dev/null @@ -1,137 +0,0 @@ -# -*- test-case-name: go.vumitools.tests.test_metrics_worker -*- - -from twisted.internet.defer import inlineCallbacks, returnValue -from twisted.internet.task import LoopingCall - -from vumi import log -from vumi.worker import BaseWorker -from vumi.config import ConfigInt, ConfigError -from vumi.persist.model import Manager - -from go.vumitools.api import VumiApiCommand, ApiCommandPublisher -from go.vumitools.app_worker import GoWorkerConfigMixin, GoWorkerMixin - - -class GoMetricsWorkerConfig(BaseWorker.CONFIG_CLASS, GoWorkerConfigMixin): - """At the start of each `metrics_interval` the :class:`GoMetricsWorker` - collects a list of all active conversations and distributes them - into `metrics_interval / metrics_granularity` buckets. - Immediately afterwards and then after each `metrics_granulatiry` - interval, the metrics worker sends a `collect_metrics` command to each - of the conversations in the current bucket until all buckets have been - processed. - Once all buckets have been processed, active conversations are - collected again and the cycle repeats. - """ - - metrics_interval = ConfigInt( - "How often (in seconds) the worker should send `collect_metrics` " - "commands for each conversation. Must be an integer multiple of " - "`metrics_granularity`.", - default=300, - static=True) - - metrics_granularity = ConfigInt( - "How often (in seconds) the worker should process a bucket of " - "conversations.", - default=5, - static=True) - - def post_validate(self): - if (self.metrics_interval % self.metrics_granularity != 0): - raise ConfigError("Metrics interval must be an integer multiple" - " of metrics granularity.") - - -class GoMetricsWorker(BaseWorker, GoWorkerMixin): - """A metrics collection worker for Go applications. - This worker operates by finding all conversations that require metrics - collection and sending commands to the relevant application workers to - trigger the actual metrics. - """ - - CONFIG_CLASS = GoMetricsWorkerConfig - worker_name = 'go_metrics' - - @inlineCallbacks - def setup_worker(self): - yield self._go_setup_worker() - config = self.get_static_config() - - self.command_publisher = yield self.start_publisher( - ApiCommandPublisher) - - self._current_bucket = 0 - self._num_buckets = ( - config.metrics_interval // config.metrics_granularity) - self._buckets = dict((i, []) for i in range(self._num_buckets)) - self._conversation_workers = {} - - self._looper = LoopingCall(self.metrics_loop_func) - self._looper.start(config.metrics_granularity) - - @inlineCallbacks - def teardown_worker(self): - if self._looper.running: - self._looper.stop() - yield self._go_teardown_worker() - - def bucket_for_conversation(self, conv_key): - return hash(conv_key) % self._num_buckets - - @inlineCallbacks - def populate_conversation_buckets(self): - account_keys = yield self.find_account_keys() - num_conversations = 0 - # We deliberarely serialise this. We don't want to hit the datastore - # too hard for metrics. - for account_key in account_keys: - conv_keys = yield self.find_conversations_for_account(account_key) - num_conversations += len(conv_keys) - for conv_key in conv_keys: - bucket = self.bucket_for_conversation(conv_key) - if conv_key not in self._conversation_workers: - # TODO: Clear out archived conversations - user_api = self.vumi_api.get_user_api(account_key) - conv = yield user_api.get_wrapped_conversation(conv_key) - self._conversation_workers[conv_key] = conv.worker_name - worker_name = self._conversation_workers[conv_key] - self._buckets[bucket].append( - (account_key, conv_key, worker_name)) - log.info( - "Scheduled metrics commands for %d conversations in %d accounts." - % (num_conversations, len(account_keys))) - - @inlineCallbacks - def process_bucket(self, bucket): - convs, self._buckets[bucket] = self._buckets[bucket], [] - for account_key, conversation_key, worker_name in convs: - yield self.send_metrics_command( - account_key, conversation_key, worker_name) - - def increment_bucket(self): - self._current_bucket += 1 - self._current_bucket %= self._num_buckets - - @inlineCallbacks - def metrics_loop_func(self): - if self._current_bucket == 0: - yield self.populate_conversation_buckets() - yield self.process_bucket(self._current_bucket) - self.increment_bucket() - - def setup_connectors(self): - pass - - @Manager.calls_manager - def find_account_keys(self): - keys = yield self.vumi_api.account_store.users.all_keys() - disabled_keys = yield self.redis.smembers('disabled_metrics_accounts') - returnValue(set(keys) - set(disabled_keys)) - - def send_metrics_command(self, account_key, worker_name): - cmd = VumiApiCommand.command( - worker_name, - 'collect_metrics', - user_account_key=account_key) - return self.command_publisher.publish_message(cmd) diff --git a/vxsandbox/resources/tests/test_metrics.py b/vxsandbox/resources/tests/test_metrics.py new file mode 100644 index 0000000..d62ece6 --- /dev/null +++ b/vxsandbox/resources/tests/test_metrics.py @@ -0,0 +1,136 @@ +"""Tests for go.apps.jsbox.metrics.""" + +import mock + +from vxsandbox.resources import SandboxCommand + +from vumi.tests.helpers import VumiTestCase + +from vxsandbox.resources.metrics import ( + MetricEvent, MetricEventError, MetricsResource) + + +class TestMetricEvent(VumiTestCase): + + SUM = MetricEvent.AGGREGATORS['sum'] + + def test_create(self): + ev = MetricEvent('mystore', 'foo', 2.0, self.SUM) + self.assertEqual(ev.store, 'mystore') + self.assertEqual(ev.metric, 'foo') + self.assertEqual(ev.value, 2.0) + self.assertEqual(ev.agg, self.SUM) + + def test_eq(self): + ev1 = MetricEvent('mystore', 'foo', 1.5, self.SUM) + ev2 = MetricEvent('mystore', 'foo', 1.5, self.SUM) + self.assertEqual(ev1, ev2) + + def test_neq(self): + ev1 = MetricEvent('mystore', 'foo', 1.5, self.SUM) + ev2 = MetricEvent('mystore', 'bar', 1.5, self.SUM) + self.assertNotEqual(ev1, ev2) + + def test_from_command(self): + ev = MetricEvent.from_command({'store': 'mystore', 'metric': 'foo', + 'value': 1.5, 'agg': 'sum'}) + self.assertEqual(ev, MetricEvent('mystore', 'foo', 1.5, self.SUM)) + + def test_bad_store(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'foo bar', 'metric': 'foo', 'value': 1.5, + 'agg': 'sum'}) + + def test_bad_type_store(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': {}, 'metric': 'foo', 'value': 1.5, + 'agg': 'sum'}) + + def test_bad_metric(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo bar', 'value': 1.5, + 'agg': 'sum'}) + + def test_bad_type_metric(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': {}, 'value': 1.5, + 'agg': 'sum'}) + + def test_missing_metric(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'value': 1.5, 'agg': 'sum'}) + + def test_bad_value(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo', 'value': 'abc', + 'agg': 'sum'}) + + def test_bad_type_value(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo', 'value': {}, + 'agg': 'sum'}) + + def test_missing_value(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo', 'agg': 'sum'}) + + def test_bad_agg(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo', 'value': 1.5, + 'agg': 'foo'}) + + def test_bad_type_agg(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo', 'value': 1.5, + 'agg': {}}) + + def test_missing_agg(self): + self.assertRaises(MetricEventError, MetricEvent.from_command, { + 'store': 'mystore', 'metric': 'foo', 'value': 1.5}) + + +class TestMetricsResource(VumiTestCase): + + SUM = MetricEvent.AGGREGATORS['sum'] + + def setUp(self): + self.conversation = mock.Mock() + self.app_worker = mock.Mock() + self.dummy_api = object() + self.resource = MetricsResource("test", self.app_worker, {}) + self.app_worker.conversation_for_api = mock.Mock( + return_value=self.conversation) + + def check_reply(self, reply, cmd, success): + self.assertEqual(reply['reply'], True) + self.assertEqual(reply['cmd_id'], cmd['cmd_id']) + self.assertEqual(reply['success'], success) + + def check_publish(self, store, metric, value, agg): + self.app_worker.publish_account_metric.assert_called_once_with( + self.conversation.user_account.key, store, metric, value, agg) + + def check_not_published(self): + self.assertFalse(self.app_worker.publish_account_metric.called) + + def test_handle_fire(self): + cmd = SandboxCommand(metric="foo", value=1.5, agg='sum') + reply = self.resource.handle_fire(self.dummy_api, cmd) + self.check_reply(reply, cmd, True) + self.check_publish('default', 'foo', 1.5, self.SUM) + + def _test_error(self, cmd, expected_error): + reply = self.resource.handle_fire(self.dummy_api, cmd) + self.check_reply(reply, cmd, False) + self.assertEqual(reply['reason'], expected_error) + self.check_not_published() + + def test_handle_fire_error(self): + cmd = SandboxCommand(metric="foo bar", value=1.5, agg='sum') + expected_error = "Invalid metric name: 'foo bar'." + self._test_error(cmd, expected_error) + + def test_non_ascii_metric_name_error(self): + cmd = SandboxCommand(metric=u"b\xe6r", value=1.5, agg='sum') + expected_error = "Invalid metric name: u'b\\xe6r'." + self._test_error(cmd, expected_error) diff --git a/vxsandbox/resources/tests/test_metrics_worker.py b/vxsandbox/resources/tests/test_metrics_worker.py deleted file mode 100644 index 465ddf5..0000000 --- a/vxsandbox/resources/tests/test_metrics_worker.py +++ /dev/null @@ -1,235 +0,0 @@ -"""Tests for go.vumitools.metrics_worker.""" - -import copy -import re - -from twisted.internet.defer import inlineCallbacks, returnValue -from twisted.internet.task import Clock, LoopingCall - -from vumi.tests.helpers import VumiTestCase - -from go.vumitools import metrics_worker -from go.vumitools.tests.helpers import VumiApiHelper - -from vumi.tests.utils import LogCatcher - - -class TestGoMetricsWorker(VumiTestCase): - - @inlineCallbacks - def setUp(self): - self.vumi_helper = yield self.add_helper(VumiApiHelper()) - self.clock = Clock() - self.patch(metrics_worker, 'LoopingCall', self.looping_call) - self.conversation_names = {} - - def get_metrics_worker(self, config=None, start=True, - needs_looping=False, needs_hash=False): - if not needs_looping: - self.patch(metrics_worker, 'LoopingCall', self.no_looping) - if not needs_hash: - self.patch(metrics_worker.GoMetricsWorker, - 'bucket_for_conversation', - self.dummy_bucket_for_conversation) - config = self.vumi_helper.mk_config(config or {}) - return self.vumi_helper.get_worker_helper().get_worker( - metrics_worker.GoMetricsWorker, config, start=start) - - def rkey(self, name): - return name - - def looping_call(self, *args, **kwargs): - looping_call = LoopingCall(*args, **kwargs) - looping_call.clock = self.clock - return looping_call - - def no_looping(self, *args, **kw): - return self.looping_call(lambda: None) - - def dummy_bucket_for_conversation(self, conv_key): - conv_name = self.conversation_names[conv_key] - digits = re.sub('[^0-9]', '', conv_name) or '0' - return int(digits) % 60 - - def make_conv(self, user_helper, conv_name, conv_type=u'my_conv', **kw): - return user_helper.create_conversation(conv_type, name=conv_name, **kw) - - @inlineCallbacks - def test_bucket_for_conversation(self): - worker = yield self.get_metrics_worker(needs_hash=True) - user_helper = yield self.vumi_helper.make_user(u'acc1') - conv1 = yield self.make_conv(user_helper, u'conv1') - - bucket = worker.bucket_for_conversation(conv1.key) - self.assertEqual(bucket, hash(conv1.key) % 60) - - def assert_conversations_bucketed(self, worker, expected): - expected = expected.copy() - buckets = copy.deepcopy(worker._buckets) - for key in range(60): - buckets[key] = sorted(buckets[key]) - expected[key] = sorted( - (c.user_account.key, c.key, u'my_conv_application') - for c in expected.get(key, [])) - if buckets[key] == expected[key]: - del buckets[key] - del expected[key] - self.assertEqual(buckets, expected) - - @inlineCallbacks - def test_populate_conversation_buckets(self): - worker = yield self.get_metrics_worker() - - user_helper = yield self.vumi_helper.make_user(u'acc1') - conv1 = yield self.make_conv(user_helper, u'conv1', started=True) - conv2a = yield self.make_conv(user_helper, u'conv2a', started=True) - conv2b = yield self.make_conv(user_helper, u'conv2b', started=True) - conv4 = yield self.make_conv(user_helper, u'conv4', started=True) - for conv in [conv1, conv2a, conv2b, conv4]: - self.conversation_names[conv.key] = conv.name - - self.assert_conversations_bucketed(worker, {}) - with LogCatcher(message='Scheduled') as lc: - yield worker.populate_conversation_buckets() - [log_msg] = lc.messages() - self.assert_conversations_bucketed(worker, { - 1: [conv1], - 2: [conv2a, conv2b], - 4: [conv4], - }) - # We may have tombstone keys from accounts created (and deleted) by - # previous tests, so we replace the account count in the log message - # we're asserting on. - log_msg = re.sub(r'in \d account', 'in 1 account', log_msg) - self.assertEqual(log_msg, "Scheduled metrics commands for" - " 4 conversations in 1 accounts.") - - @inlineCallbacks - def test_process_bucket(self): - worker = yield self.get_metrics_worker() - - user_helper = yield self.vumi_helper.make_user(u'acc1') - conv1 = yield self.make_conv(user_helper, u'conv1', started=True) - conv2a = yield self.make_conv(user_helper, u'conv2a', started=True) - conv2b = yield self.make_conv(user_helper, u'conv2b', started=True) - conv4 = yield self.make_conv(user_helper, u'conv4', started=True) - for conv in [conv1, conv2a, conv2b, conv4]: - self.conversation_names[conv.key] = conv.name - - self.assert_conversations_bucketed(worker, {}) - yield worker.populate_conversation_buckets() - yield worker.process_bucket(2) - self.assert_conversations_bucketed(worker, { - 1: [conv1], - 4: [conv4], - }) - - @inlineCallbacks - def test_increment_bucket(self): - worker = yield self.get_metrics_worker() - self.assertEqual(worker._current_bucket, 0) - worker.increment_bucket() - self.assertEqual(worker._current_bucket, 1) - worker._current_bucket = 59 - worker.increment_bucket() - self.assertEqual(worker._current_bucket, 0) - - @inlineCallbacks - def test_metrics_poller(self): - polls = [] - # Replace metrics worker with one that hasn't started yet. - worker = yield self.get_metrics_worker(start=False, needs_looping=True) - worker.metrics_loop_func = lambda: polls.append(None) - self.assertEqual(0, len(polls)) - # Start worker. - yield worker.startWorker() - self.assertEqual(1, len(polls)) - # Pass time, but not enough to trigger a metric run. - self.clock.advance(4) - self.assertEqual(1, len(polls)) - # Pass time, trigger a metric run. - self.clock.advance(1) - self.assertEqual(2, len(polls)) - - @inlineCallbacks - def test_find_accounts(self): - worker = yield self.get_metrics_worker() - user1_helper = yield self.vumi_helper.make_user(u'acc1') - user2_helper = yield self.vumi_helper.make_user(u'acc2') - user3_helper = yield self.vumi_helper.make_user(u'acc3') - yield worker.redis.sadd( - 'disabled_metrics_accounts', user3_helper.account_key) - yield worker.redis.sadd('metrics_accounts', user2_helper.account_key) - - account_keys = yield worker.find_account_keys() - self.assertEqual( - sorted([user1_helper.account_key, user2_helper.account_key]), - sorted(account_keys)) - - @inlineCallbacks - def test_send_metrics_command(self): - worker = yield self.get_metrics_worker() - user_helper = yield self.vumi_helper.make_user(u'acc1') - conv1 = yield self.make_conv(user_helper, u'conv1', started=True) - - yield worker.send_metrics_command( - conv1.user_account.key, conv1.key, 'my_conv_application') - [cmd] = self.vumi_helper.get_dispatched_commands() - - self.assertEqual(cmd['worker_name'], 'my_conv_application') - self.assertEqual( - cmd['kwargs']['user_account_key'], user_helper.account_key) - - @inlineCallbacks - def setup_metric_loop_conversations(self, worker): - user1_helper = yield self.vumi_helper.make_user(u'acc1') - conv0 = yield self.make_conv(user1_helper, u'conv0', started=True) - conv1 = yield self.make_conv(user1_helper, u'conv1', started=True) - user2_helper = yield self.vumi_helper.make_user(u'acc2') - conv2 = yield self.make_conv(user2_helper, u'conv2', started=True) - conv3 = yield self.make_conv(user2_helper, u'conv3', started=True) - for conv in [conv0, conv1, conv2, conv3]: - self.conversation_names[conv.key] = conv.name - - returnValue([conv0, conv1, conv2, conv3]) - - @inlineCallbacks - def test_metrics_loop_func_bucket_zero(self): - worker = yield self.get_metrics_worker() - convs = yield self.setup_metric_loop_conversations(worker) - [conv0, conv1, conv2, conv3] = convs - - self.assertEqual(worker._current_bucket, 0) - yield worker.metrics_loop_func() - self.assertEqual(worker._current_bucket, 1) - - cmds = self.vumi_helper.get_dispatched_commands() - conv_keys = [c.payload['kwargs']['conversation_key'] for c in cmds] - self.assertEqual(conv_keys, [conv0.key]) - - self.assert_conversations_bucketed(worker, { - 1: [conv1], - 2: [conv2], - 3: [conv3], - }) - - @inlineCallbacks - def test_metrics_loop_func_bucket_nonzero(self): - worker = yield self.get_metrics_worker() - convs = yield self.setup_metric_loop_conversations(worker) - [conv0, conv1, conv2, conv3] = convs - yield worker.populate_conversation_buckets() - - worker._current_bucket = 1 - yield worker.metrics_loop_func() - self.assertEqual(worker._current_bucket, 2) - - cmds = self.vumi_helper.get_dispatched_commands() - conv_keys = [c['kwargs']['conversation_key'] for c in cmds] - self.assertEqual(conv_keys, [conv1.key]) - - self.assert_conversations_bucketed(worker, { - 0: [conv0], - 2: [conv2], - 3: [conv3], - })