From 9cffbfa6fde6f381856d9cdee2824bc96e711da4 Mon Sep 17 00:00:00 2001 From: Zachary Radtka Date: Wed, 9 Dec 2015 21:34:42 -0500 Subject: [PATCH] [mapreduce] An agent that reports MapReduce metrics --- checks.d/mapreduce.py | 512 ++++++++++++++++++ conf.d/mapreduce.yaml.example | 61 +++ tests/checks/fixtures/mapreduce/apps_metrics | 1 + tests/checks/fixtures/mapreduce/cluster_info | 1 + .../fixtures/mapreduce/job_counter_metrics | 1 + tests/checks/fixtures/mapreduce/job_metrics | 1 + tests/checks/fixtures/mapreduce/task_metrics | 1 + tests/checks/mock/test_mapreduce.py | 240 ++++++++ 8 files changed, 818 insertions(+) create mode 100644 checks.d/mapreduce.py create mode 100644 conf.d/mapreduce.yaml.example create mode 100644 tests/checks/fixtures/mapreduce/apps_metrics create mode 100644 tests/checks/fixtures/mapreduce/cluster_info create mode 100644 tests/checks/fixtures/mapreduce/job_counter_metrics create mode 100644 tests/checks/fixtures/mapreduce/job_metrics create mode 100644 tests/checks/fixtures/mapreduce/task_metrics create mode 100644 tests/checks/mock/test_mapreduce.py diff --git a/checks.d/mapreduce.py b/checks.d/mapreduce.py new file mode 100644 index 0000000000..ed72f53180 --- /dev/null +++ b/checks.d/mapreduce.py @@ -0,0 +1,512 @@ +''' +MapReduce Job Metrics +--------------------- +mapreduce.job.elapsed_ime The elapsed time since the application started (in ms) +mapreduce.job.maps_total The total number of maps +mapreduce.job.maps_completed The number of completed maps +mapreduce.job.reduces_total The total number of reduces +mapreduce.job.reduces_completed The number of completed reduces +mapreduce.job.maps_pending The number of maps still to be run +mapreduce.job.maps_running The number of running maps +mapreduce.job.reduces_pending The number of reduces still to be run +mapreduce.job.reduces_running The number of running reduces +mapreduce.job.new_reduce_attempts The number of new reduce attempts +mapreduce.job.running_reduce_attempts The number of running reduce attempts +mapreduce.job.failed_reduce_attempts The number of failed reduce attempts +mapreduce.job.killed_reduce_attempts The number of killed reduce attempts +mapreduce.job.successful_reduce_attempts The number of successful reduce attempts +mapreduce.job.new_map_attempts The number of new map attempts +mapreduce.job.running_map_attempts The number of running map attempts +mapreduce.job.failed_map_attempts The number of failed map attempts +mapreduce.job.killed_map_attempts The number of killed map attempts +mapreduce.job.successful_map_attempts The number of successful map attempts + +MapReduce Job Counter Metrics +----------------------------- +mapreduce.job.counter.reduce_counter_value The counter value of reduce tasks +mapreduce.job.counter.map_counter_value The counter value of map tasks +mapreduce.job.counter.total_counter_value The counter value of all tasks + +MapReduce Map Task Metrics +-------------------------- +mapreduce.job.map.task.progress The distribution of all map task progresses +mapreduce.job.map.task.elapsed_time The distribution of all map tasks elapsed time + +MapReduce Reduce Task Metrics +-------------------------- +mapreduce.job.reduce.task.progress The distribution of all reduce task progresses +mapreduce.job.reduce.task.elapsed_time The distribution of all reduce tasks elapsed time +''' + +# stdlib +from urlparse import urljoin +from urlparse import urlsplit +from urlparse import urlunsplit + +# 3rd party +import requests +from requests.exceptions import Timeout, HTTPError, InvalidURL, ConnectionError +from simplejson import JSONDecodeError + +# Project +from checks import AgentCheck + +# Service Check Names +YARN_SERVICE_CHECK = 'mapreduce.resource_manager.can_connect' +MAPREDUCE_SERVICE_CHECK = 'mapreduce.application_master.can_connect' + +# URL Paths +INFO_PATH = 'ws/v1/cluster/info' +YARN_APPS_PATH = 'ws/v1/cluster/apps' +MAPREDUCE_JOBS_PATH = 'ws/v1/mapreduce/jobs' + +# Application type and states to collect +YARN_APPLICATION_TYPES = 'MAPREDUCE' +YARN_APPLICATION_STATES = 'RUNNING' + +# Metric types +HISTOGRAM = 'histogram' + +# Metrics to collect +MAPREDUCE_JOB_METRICS = { + 'elapsedTime': ('mapreduce.job.elapsed_time', HISTOGRAM), + 'mapsTotal': ('mapreduce.job.maps_total', HISTOGRAM), + 'mapsCompleted': ('mapreduce.job.maps_completed', HISTOGRAM), + 'reducesTotal': ('mapreduce.job.reduces_total', HISTOGRAM), + 'reducesCompleted': ('mapreduce.job.reduces_completed', HISTOGRAM), + 'mapsPending': ('mapreduce.job.maps_pending', HISTOGRAM), + 'mapsRunning': ('mapreduce.job.maps_running', HISTOGRAM), + 'reducesPending': ('mapreduce.job.reduces_pending', HISTOGRAM), + 'reducesRunning': ('mapreduce.job.reduces_running', HISTOGRAM), + 'newReduceAttempts': ('mapreduce.job.new_reduce_attempts', HISTOGRAM), + 'runningReduceAttempts': ('mapreduce.job.running_reduce_attempts', HISTOGRAM), + 'failedReduceAttempts': ('mapreduce.job.failed_reduce_attempts', HISTOGRAM), + 'killedReduceAttempts': ('mapreduce.job.killed_reduce_attempts', HISTOGRAM), + 'successfulReduceAttempts': ('mapreduce.job.successful_reduce_attempts', HISTOGRAM), + 'newMapAttempts': ('mapreduce.job.new_map_attempts', HISTOGRAM), + 'runningMapAttempts': ('mapreduce.job.running_map_attempts', HISTOGRAM), + 'failedMapAttempts': ('mapreduce.job.failed_map_attempts', HISTOGRAM), + 'killedMapAttempts': ('mapreduce.job.killed_map_attempts', HISTOGRAM), + 'successfulMapAttempts': ('mapreduce.job.successful_map_attempts', HISTOGRAM), +} + +MAPREDUCE_JOB_COUNTER_METRICS = { + 'reduceCounterValue': ('mapreduce.job.counter.reduce_counter_value', HISTOGRAM), + 'mapCounterValue': ('mapreduce.job.counter.map_counter_value', HISTOGRAM), + 'totalCounterValue': ('mapreduce.job.counter.total_counter_value', HISTOGRAM), +} + +MAPREDUCE_MAP_TASK_METRICS = { + 'progress': ('mapreduce.job.map.task.progress', HISTOGRAM), + 'elapsedTime': ('mapreduce.job.map.task.elapsed_time', HISTOGRAM) +} + +MAPREDUCE_REDUCE_TASK_METRICS = { + 'progress': ('mapreduce.job.reduce.task.progress', HISTOGRAM), + 'elapsedTime': ('mapreduce.job.reduce.task.elapsed_time', HISTOGRAM) +} + +class MapReduceCheck(AgentCheck): + + def __init__(self, name, init_config, agentConfig, instances=None): + AgentCheck.__init__(self, name, init_config, agentConfig, instances) + + # Parse job specific counters + self.general_counters = self._parse_general_counters(init_config) + + # Parse job specific counters + self.job_specific_counters = self._parse_job_specific_counters(init_config) + + def check(self, instance): + # Get properties from conf file + rm_address = instance.get('resourcemanager_uri') + if rm_address is None: + raise Exception('The ResourceManager URL must be specified in the instance configuration') + + # Get the cluster ID from Yarn + cluster_id = self._get_cluster_id(rm_address) + + # Get the running MR applications from YARN + running_apps = self._get_running_app_ids(rm_address) + + # Report success after gathering all metrics from ResourceManaager + self.service_check(YARN_SERVICE_CHECK, + AgentCheck.OK, + tags=['url:%s' % rm_address], + message='Connection to ResourceManager "%s" was successful' % rm_address) + + # Get the applications from the application master + running_jobs = self._mapreduce_job_metrics(running_apps, cluster_id) + + # # Get job counter metrics + self._mapreduce_job_counters_metrics(running_jobs, cluster_id) + + # Get task metrics + self._mapreduce_task_metrics(running_jobs, cluster_id) + + # Report success after gathering all metrics from Application Master + if running_jobs: + job_id, metrics = running_jobs.items()[0] + am_address = self._get_url_base(metrics['tracking_url']) + + self.service_check(MAPREDUCE_SERVICE_CHECK, + AgentCheck.OK, + tags=['url:%s' % am_address], + message='Connection to ApplicationManager "%s" was successful' % am_address) + + def _parse_general_counters(self, init_config): + ''' + Return a dictionary for each job counter + { + counter_group_name: [ + counter_name + ] + } + } + ''' + job_counter = {} + + if init_config.get('general_counters'): + + # Parse the custom metrics + for counter_group in init_config['general_counters']: + counter_group_name = counter_group.get('counter_group_name') + counters = counter_group.get('counters') + + if not counter_group_name: + raise Exception('"general_counters" must contain a valid "counter_group_name"') + + if not counters: + raise Exception('"general_counters" must contain a list of "counters"') + + # Add the counter_group to the job_counters if it doesn't already exist + if counter_group_name not in job_counter: + job_counter[counter_group_name] = [] + + for counter in counters: + counter_name = counter.get('counter_name') + + if not counter_name: + raise Exception('At least one "counter_name" should be specified in the list of "counters"') + + job_counter[counter_group_name].append(counter_name) + + return job_counter + + def _parse_job_specific_counters(self, init_config): + ''' + Return a dictionary for each job counter + { + job_name: { + counter_group_name: [ + counter_name + ] + } + } + } + ''' + job_counter = {} + + if init_config.get('job_specific_counters'): + + # Parse the custom metrics + for job in init_config['job_specific_counters']: + job_name = job.get('job_name') + metrics = job.get('metrics') + + if not job_name: + raise Exception('Counter metrics must have a "job_name"') + + if not metrics: + raise Exception('Jobs specified in counter metrics must contain at least one metric') + + # Add the job to the custom metrics if it doesn't already exist + if job_name not in job_counter: + job_counter[job_name] = {} + + for metric in metrics: + counter_group_name = metric.get('counter_group_name') + counters = metric.get('counters') + + if not counter_group_name: + raise Exception('Each counter metric must contain a valid "counter_group_name"') + + if not counters: + raise Exception('Each counter metric must contain a list of "counters"') + + # Add the counter group name if it doesn't exist for the current job + if counter_group_name not in job_counter[job_name]: + job_counter[job_name][counter_group_name] = [] + + for counter in counters: + counter_name = counter.get('counter_name') + + if not counter_name: + raise Exception('At least one "counter_name" should be specified in the list of "counters"') + + job_counter[job_name][counter_group_name].append(counter_name) + + return job_counter + + def _get_cluster_id(self, rm_address): + ''' + Return the cluster ID, otherwise raise an exception + ''' + info_json = self._rest_request_to_json(rm_address, + INFO_PATH, + YARN_SERVICE_CHECK) + + return info_json.get('clusterInfo', {}).get('id') + + def _get_running_app_ids(self, rm_address, **kwargs): + ''' + Return a dictionary of {app_id: (app_name, tracking_url)} for the running MapReduce applications + ''' + metrics_json = self._rest_request_to_json(rm_address, + YARN_APPS_PATH, + YARN_SERVICE_CHECK, + states=YARN_APPLICATION_STATES, + applicationTypes=YARN_APPLICATION_TYPES) + + running_apps = {} + + if metrics_json.get('apps'): + if metrics_json['apps'].get('app') is not None: + + for app_json in metrics_json['apps']['app']: + app_id = app_json.get('id') + tracking_url = app_json.get('trackingUrl') + app_name = app_json.get('name') + + if app_id and tracking_url and app_name: + running_apps[app_id] = (app_name, tracking_url) + + return running_apps + + def _mapreduce_job_metrics(self, running_apps, cluster_id): + ''' + Get metrics for each MapReduce job. + Return a dictionary for each MapReduce job + { + job_id: { + 'job_name': job_name, + 'app_name': app_name, + 'user_name': user_name, + 'tracking_url': tracking_url + } + ''' + running_jobs = {} + + for app_id, (app_name, tracking_url) in running_apps.iteritems(): + + metrics_json = self._rest_request_to_json(tracking_url, + MAPREDUCE_JOBS_PATH, + MAPREDUCE_SERVICE_CHECK) + + if metrics_json.get('jobs'): + if metrics_json['jobs'].get('job'): + + for job_json in metrics_json['jobs']['job']: + job_id = job_json.get('id') + job_name = job_json.get('name') + user_name = job_json.get('user') + + if job_id and job_name and user_name: + + # Build the structure to hold the information for each job ID + running_jobs[str(job_id)] = {'job_name': str(job_name), + 'app_name': str(app_name), + 'user_name': str(user_name), + 'tracking_url': self._join_url_dir(tracking_url, MAPREDUCE_JOBS_PATH, job_id)} + + tags = ['cluster_id:' + str(cluster_id), + 'app_name:' + str(app_name), + 'user_name:' + str(user_name), + 'job_name:' + str(job_name)] + + self._set_metrics_from_json(tags, job_json, MAPREDUCE_JOB_METRICS) + + return running_jobs + + def _mapreduce_job_counters_metrics(self, running_jobs, cluster_id): + ''' + Get custom metrics specified for each counter + ''' + for job_id, job_metrics in running_jobs.iteritems(): + job_name = job_metrics['job_name'] + + # Check if the job_name exist in the custom metrics + if self.general_counters or (job_name in self.job_specific_counters): + job_specific_metrics = self.job_specific_counters.get(job_name) + + metrics_json = self._rest_request_to_json(job_metrics['tracking_url'], + 'counters', + MAPREDUCE_SERVICE_CHECK) + + if metrics_json.get('jobCounters'): + if metrics_json['jobCounters'].get('counterGroup'): + + # Cycle through all the counter groups for this job + for counter_group in metrics_json['jobCounters']['counterGroup']: + group_name = counter_group.get('counterGroupName') + + if group_name: + counter_metrics = set([]) + + # Add any counters in the job specific metrics + if job_specific_metrics and group_name in job_specific_metrics: + counter_metrics = counter_metrics.union(job_specific_metrics[group_name]) + + # Add any counters in the general metrics + if group_name in self.general_counters: + counter_metrics = counter_metrics.union(self.general_counters[group_name]) + + if counter_metrics: + # Cycle through all the counters in this counter group + if counter_group.get('counter'): + for counter in counter_group['counter']: + counter_name = counter.get('name') + + # Check if the counter name is in the custom metrics for this group name + if counter_name and counter_name in counter_metrics: + tags = ['cluster_id:' + str(cluster_id), + 'app_name:' + job_metrics.get('app_name'), + 'user_name:' + job_metrics.get('user_name'), + 'job_name:' + job_name, + 'counter_name:' + str(counter_name).lower()] + + self._set_metrics_from_json(tags, + counter, + MAPREDUCE_JOB_COUNTER_METRICS) + + def _mapreduce_task_metrics(self, running_jobs, cluster_id): + ''' + Get metrics for each MapReduce task + Return a dictionary of {task_id: 'tracking_url'} for each MapReduce task + ''' + for job_id, job_stats in running_jobs.iteritems(): + + metrics_json = self._rest_request_to_json(job_stats['tracking_url'], + 'tasks', + MAPREDUCE_SERVICE_CHECK) + + if metrics_json.get('tasks'): + if metrics_json['tasks'].get('task'): + + for task in metrics_json['tasks']['task']: + task_type = task.get('type') + + if task_type: + tags = ['cluster_id:' + str(cluster_id), + 'app_name:' + job_stats['app_name'], + 'user_name:' + job_stats['user_name'], + 'job_name:' + job_stats['job_name'], + 'task_type:' + str(task_type).lower() + ] + + if task_type == 'MAP': + self._set_metrics_from_json(tags, task, MAPREDUCE_MAP_TASK_METRICS) + + elif task_type == 'REDUCE': + self._set_metrics_from_json(tags, task, MAPREDUCE_REDUCE_TASK_METRICS) + + def _set_metrics_from_json(self, tags, metrics_json, metrics): + ''' + Parse the JSON response and set the metrics + ''' + for status, (metric_name, metric_type) in metrics.iteritems(): + metric_status = metrics_json.get(status) + + if metric_status is not None: + self._set_metric(metric_name, + metric_type, + metric_status, + tags) + + def _set_metric(self, metric_name, metric_type, value, tags=None, device_name=None): + ''' + Set a metric + ''' + if metric_type == HISTOGRAM: + self.histogram(metric_name, value, tags=tags, device_name=device_name) + else: + self.log.error('Metric type "%s" unknown' % (metric_type)) + + def _rest_request_to_json(self, address, object_path, service_name, *args, **kwargs): + ''' + Query the given URL and return the JSON response + ''' + response_json = None + + service_check_tags = ['url:%s' % self._get_url_base(address)] + + url = address + + if object_path: + url = self._join_url_dir(url, object_path) + + # Add args to the url + if args: + for directory in args: + url = self._join_url_dir(url, directory) + + self.log.debug('Attempting to connect to "%s"' % url) + + # Add kwargs as arguments + if kwargs: + query = '&'.join(['{0}={1}'.format(key, value) for key, value in kwargs.iteritems()]) + url = urljoin(url, '?' + query) + + try: + response = requests.get(url) + response.raise_for_status() + response_json = response.json() + + except Timeout as e: + self.service_check(service_name, + AgentCheck.CRITICAL, + tags=service_check_tags, + message="Request timeout: {0}, {1}".format(url, e)) + raise + + except (HTTPError, + InvalidURL, + ConnectionError) as e: + self.service_check(service_name, + AgentCheck.CRITICAL, + tags=service_check_tags, + message="Request failed: {0}, {1}".format(url, e)) + raise + + except JSONDecodeError as e: + self.service_check(service_name, + AgentCheck.CRITICAL, + tags=service_check_tags, + message='JSON Parse failed: {0}, {1}'.format(url, e)) + raise + + except ValueError as e: + self.service_check(service_name, + AgentCheck.CRITICAL, + tags=service_check_tags, + message=e) + raise + + return response_json + + def _join_url_dir(self, url, *args): + ''' + Join a URL with multiple directories + ''' + for path in args: + url = url.rstrip('/') + '/' + url = urljoin(url, path.lstrip('/')) + + return url + + def _get_url_base(self, url): + ''' + Return the base of a URL + ''' + s = urlsplit(url) + return urlunsplit([s.scheme, s.netloc, '', '', '']) diff --git a/conf.d/mapreduce.yaml.example b/conf.d/mapreduce.yaml.example new file mode 100644 index 0000000000..9f860d53ee --- /dev/null +++ b/conf.d/mapreduce.yaml.example @@ -0,0 +1,61 @@ +instances: + # + # The MapReduce check retrieves metrics from YARN's ResourceManager. This + # check must be run from the Master Node and the ResourceManager URI must + # be specified below. The ResourceManager URI is composed of the + # ResourceManager's hostname and port. + # + # The ResourceManager hostname can be found in the yarn-site.xml conf file + # under the property yarn.resourcemanager.address + # + # The ResourceManager port can be found in the yarn-site.xml conf file under + # the property yarn.resourcemanager.webapp.address + # + - resourcemanager_uri: http://localhost:8088 + +init_config: + # + # Optional metrics can be specified for counters. For more information on + # counters visit the MapReduce documentation page: + # https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html#Job_Counters_API + # + + general_counters: + # + # general_counters are job agnostic metrics that create a metric for each + # specified counter + # + - counter_group_name: 'org.apache.hadoop.mapreduce.TaskCounter' + counters: + - counter_name: 'MAP_INPUT_RECORDS' + - counter_name: 'MAP_OUTPUT_RECORDS' + - counter_name: 'REDUCE_INPUT_RECORDS' + - counter_name: 'REDUCE_OUTPUT_RECORDS' + # + # Additional counter's can be specified as following + # + + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'HDFS_BYTES_READ' + + job_specific_counters: + # + # job_specific_counters are metrics that are specific to a particular job. + # The following example specifies counters for the jobs 'Foo' and 'Bar'. + # + + # - job_name: 'Foo' + # metrics: + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'FILE_BYTES_WRITTEN' + # - counter_name: 'HDFS_BYTES_WRITTEN' + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'HDFS_BYTES_READ' + # - job_name: 'Bar' + # metrics: + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'FILE_BYTES_WRITTEN' diff --git a/tests/checks/fixtures/mapreduce/apps_metrics b/tests/checks/fixtures/mapreduce/apps_metrics new file mode 100644 index 0000000000..2bc3fcf900 --- /dev/null +++ b/tests/checks/fixtures/mapreduce/apps_metrics @@ -0,0 +1 @@ +{"apps":{"app":[{"id":"application_1453738555560_0001","trackingUrl":"http://localhost:8088/proxy/application_1453738555560_0001/","user":"vagrant","name":"WordCount"}]}} \ No newline at end of file diff --git a/tests/checks/fixtures/mapreduce/cluster_info b/tests/checks/fixtures/mapreduce/cluster_info new file mode 100644 index 0000000000..664925c7e1 --- /dev/null +++ b/tests/checks/fixtures/mapreduce/cluster_info @@ -0,0 +1 @@ +{"clusterInfo":{"id":1453738555560}} \ No newline at end of file diff --git a/tests/checks/fixtures/mapreduce/job_counter_metrics b/tests/checks/fixtures/mapreduce/job_counter_metrics new file mode 100644 index 0000000000..1c839e6c50 --- /dev/null +++ b/tests/checks/fixtures/mapreduce/job_counter_metrics @@ -0,0 +1 @@ +{"jobCounters":{"id":"job_1453738555560_0001","counterGroup":[{"counterGroupName":"org.apache.hadoop.mapreduce.FileSystemCounter","counter":[{"name":"FILE_BYTES_READ","totalCounterValue":0,"mapCounterValue":1,"reduceCounterValue":2},{"name":"FILE_BYTES_WRITTEN","totalCounterValue":3,"mapCounterValue":4,"reduceCounterValue":5}]},{"counterGroupName":"org.apache.hadoop.mapreduce.TaskCounter","counter":[{"name":"MAP_INPUT_RECORDS","totalCounterValue":6,"mapCounterValue":7,"reduceCounterValue":8},{"name":"MAP_OUTPUT_RECORDS","totalCounterValue":9,"mapCounterValue":10,"reduceCounterValue":11}]}]}} \ No newline at end of file diff --git a/tests/checks/fixtures/mapreduce/job_metrics b/tests/checks/fixtures/mapreduce/job_metrics new file mode 100644 index 0000000000..c501c1a9dc --- /dev/null +++ b/tests/checks/fixtures/mapreduce/job_metrics @@ -0,0 +1 @@ +{"jobs":{"job":[{"startTime":1453761316277,"finishTime":0,"elapsedTime":99221829,"id":"job_1453738555560_0001","name":"WordCount","user":"vagrant","state":"RUNNING","mapsTotal":1,"mapsCompleted":0,"reducesTotal":1,"reducesCompleted":0,"mapProgress":48.335266,"reduceProgress":0.0,"mapsPending":0,"mapsRunning":1,"reducesPending":1,"reducesRunning":0,"uberized":false,"diagnostics":"","newReduceAttempts":1,"runningReduceAttempts":0,"failedReduceAttempts":0,"killedReduceAttempts":0,"successfulReduceAttempts":0,"newMapAttempts":0,"runningMapAttempts":1,"failedMapAttempts":1,"killedMapAttempts":0,"successfulMapAttempts":0}]}} \ No newline at end of file diff --git a/tests/checks/fixtures/mapreduce/task_metrics b/tests/checks/fixtures/mapreduce/task_metrics new file mode 100644 index 0000000000..1e3b50ea8d --- /dev/null +++ b/tests/checks/fixtures/mapreduce/task_metrics @@ -0,0 +1 @@ +{"tasks":{"task":[{"startTime":1453761318527,"finishTime":0,"elapsedTime":99869037,"progress":49.11076,"id":"task_1453738555560_0001_m_000000","state":"RUNNING","type":"MAP","successfulAttempt":"","status":"map > map"},{"startTime":1453761318527,"finishTime":0,"elapsedTime":123456,"progress":32.42940,"id":"task_1453738555560_0001_r_000000","state":"RUNNING","type":"REDUCE","successfulAttempt":"","status":"map > map"}]}} \ No newline at end of file diff --git a/tests/checks/mock/test_mapreduce.py b/tests/checks/mock/test_mapreduce.py new file mode 100644 index 0000000000..d5c5fe908e --- /dev/null +++ b/tests/checks/mock/test_mapreduce.py @@ -0,0 +1,240 @@ +# stdlib +from urlparse import urljoin + +# 3rd party +import mock +import json + +from tests.checks.common import AgentCheckTest, Fixtures + +# ID +CLUSTER_ID = '1453738555560' +APP_ID = 'application_1453738555560_0001' +APP_NAME = 'WordCount' +JOB_ID = 'job_1453738555560_0001' +JOB_NAME = 'WordCount' +USER_NAME = 'vagrant' +TASK_ID = 'task_1453738555560_0001_m_000000' + +# Resource manager URI +RM_URI = 'http://localhost:8088' + +# URL Paths +INFO_PATH = 'ws/v1/cluster/info' +YARN_APPS_PATH = 'ws/v1/cluster/apps' +MAPREDUCE_JOBS_PATH = 'ws/v1/mapreduce/jobs' + +# Service Check Names +YARN_SERVICE_CHECK = 'mapreduce.resource_manager.can_connect' +MAPREDUCE_SERVICE_CHECK = 'mapreduce.application_master.can_connect' + +def join_url_dir(url, *args): + ''' + Join a URL with multiple directories + ''' + for path in args: + url = url.rstrip('/') + '/' + url = urljoin(url, path.lstrip('/')) + + return url + + +# Service URLs +CLUSTER_INFO_URL = urljoin(RM_URI, INFO_PATH) +YARN_APPS_URL = urljoin(RM_URI, YARN_APPS_PATH) + '?states=RUNNING&applicationTypes=MAPREDUCE' +MR_JOBS_URL = join_url_dir(RM_URI, 'proxy', APP_ID, MAPREDUCE_JOBS_PATH) +MR_JOB_COUNTERS_URL = join_url_dir(MR_JOBS_URL, JOB_ID, 'counters') +MR_TASKS_URL = join_url_dir(MR_JOBS_URL, JOB_ID, 'tasks') + + +def requests_get_mock(*args, **kwargs): + + class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return json.loads(self.json_data) + + def raise_for_status(self): + return True + + + if args[0] == CLUSTER_INFO_URL: + with open(Fixtures.file('cluster_info'), 'r') as f: + body = f.read() + return MockResponse(body, 200) + + elif args[0] == YARN_APPS_URL: + with open(Fixtures.file('apps_metrics'), 'r') as f: + body = f.read() + return MockResponse(body, 200) + + elif args[0] == MR_JOBS_URL: + with open(Fixtures.file('job_metrics'), 'r') as f: + body = f.read() + return MockResponse(body, 200) + + elif args[0] == MR_JOB_COUNTERS_URL: + with open(Fixtures.file('job_counter_metrics'), 'r') as f: + body = f.read() + return MockResponse(body, 200) + + elif args[0] == MR_TASKS_URL: + with open(Fixtures.file('task_metrics'), 'r') as f: + body = f.read() + return MockResponse(body, 200) + + +class MapReduceCheck(AgentCheckTest): + CHECK_NAME = 'mapreduce' + + MR_CONFIG = { + 'resourcemanager_uri': 'http://localhost:8088' + } + + INIT_CONFIG = { + 'general_counters': [ + { + 'counter_group_name': 'org.apache.hadoop.mapreduce.FileSystemCounter', + 'counters': [ + {'counter_name': 'FILE_BYTES_READ'}, + {'counter_name': 'FILE_BYTES_WRITTEN'} + ] + } + ], + 'job_specific_counters': [ + { + 'job_name': 'WordCount', + 'metrics': [ + { + 'counter_group_name': 'org.apache.hadoop.mapreduce.FileSystemCounter', + 'counters': [ + {'counter_name': 'FILE_BYTES_WRITTEN'} + ] + }, { + 'counter_group_name': 'org.apache.hadoop.mapreduce.TaskCounter', + 'counters': [ + {'counter_name': 'MAP_OUTPUT_RECORDS'} + ] + } + ] + } + ] + } + + MAPREDUCE_JOB_METRIC_VALUES = { + 'mapreduce.job.elapsed_time.max': 99221829, + 'mapreduce.job.maps_total.max': 1, + 'mapreduce.job.maps_completed.max': 0, + 'mapreduce.job.reduces_total.max': 1, + 'mapreduce.job.reduces_completed.max': 0, + 'mapreduce.job.maps_pending.max': 0, + 'mapreduce.job.maps_running.max': 1, + 'mapreduce.job.reduces_pending.max': 1, + 'mapreduce.job.reduces_running.max': 0, + 'mapreduce.job.new_reduce_attempts.max': 1, + 'mapreduce.job.running_reduce_attempts.max': 0, + 'mapreduce.job.failed_reduce_attempts.max': 0, + 'mapreduce.job.killed_reduce_attempts.max': 0, + 'mapreduce.job.successful_reduce_attempts.max': 0, + 'mapreduce.job.new_map_attempts.max': 0, + 'mapreduce.job.running_map_attempts.max': 1, + 'mapreduce.job.failed_map_attempts.max': 1, + 'mapreduce.job.killed_map_attempts.max': 0, + 'mapreduce.job.successful_map_attempts.max': 0, + } + + MAPREDUCE_JOB_METRIC_TAGS = [ + 'cluster_id:' + CLUSTER_ID, + 'app_name:' + APP_NAME, + 'job_name:' + JOB_NAME, + 'user_name:' + USER_NAME + ] + + MAPREDUCE_MAP_TASK_METRIC_VALUES = { + 'mapreduce.job.map.task.progress.max': 49.11076, + 'mapreduce.job.map.task.elapsed_time.max': 99869037 + } + + MAPREDUCE_MAP_TASK_METRIC_TAGS = [ + 'cluster_id:' + CLUSTER_ID, + 'app_name:' + APP_NAME, + 'job_name:' + JOB_NAME, + 'user_name:' + USER_NAME, + 'task_type:map' + ] + + MAPREDUCE_REDUCE_TASK_METRIC_VALUES = { + 'mapreduce.job.reduce.task.progress.max': 32.42940, + 'mapreduce.job.reduce.task.elapsed_time.max': 123456 + } + + MAPREDUCE_REDUCE_TASK_METRIC_TAGS = [ + 'cluster_id:' + CLUSTER_ID, + 'app_name:' + APP_NAME, + 'job_name:' + JOB_NAME, + 'user_name:' + USER_NAME, + 'task_type:reduce' + ] + + MAPREDUCE_JOB_COUNTER_METRIC_VALUES = { + 'mapreduce.job.counter.total_counter_value.max': {'value': 0, 'tags': ['counter_name:file_bytes_read']}, + 'mapreduce.job.counter.map_counter_value.max': {'value': 1, 'tags': ['counter_name:file_bytes_read']}, + 'mapreduce.job.counter.reduce_counter_value.max': {'value': 2, 'tags': ['counter_name:file_bytes_read']}, + 'mapreduce.job.counter.total_counter_value.max': {'value': 3, 'tags': ['counter_name:file_bytes_written']}, + 'mapreduce.job.counter.map_counter_value.max': {'value': 4, 'tags': ['counter_name:file_bytes_written']}, + 'mapreduce.job.counter.reduce_counter_value.max': {'value': 5, 'tags': ['counter_name:file_bytes_written']}, + 'mapreduce.job.counter.total_counter_value.max': {'value': 9, 'tags': ['counter_name:map_output_records']}, + 'mapreduce.job.counter.map_counter_value.max': {'value': 10, 'tags': ['counter_name:map_output_records']}, + 'mapreduce.job.counter.reduce_counter_value.max': {'value': 11, 'tags': ['counter_name:map_output_records']}, + } + + MAPREDUCE_JOB_COUNTER_METRIC_TAGS = [ + 'cluster_id:' + CLUSTER_ID, + 'app_name:' + APP_NAME, + 'job_name:' + JOB_NAME, + 'user_name:' + USER_NAME + ] + + @mock.patch('requests.get', side_effect=requests_get_mock) + def test_check(self, mock_requests): + config = { + 'instances': [self.MR_CONFIG], + 'init_config': self.INIT_CONFIG + } + + self.run_check(config) + + # Check the MapReduce job metrics + for metric, value in self.MAPREDUCE_JOB_METRIC_VALUES.iteritems(): + self.assertMetric(metric, + value=value, + tags=self.MAPREDUCE_JOB_METRIC_TAGS) + + # Check the map task metrics + for metric, value in self.MAPREDUCE_MAP_TASK_METRIC_VALUES.iteritems(): + self.assertMetric(metric, + value=value, + tags=self.MAPREDUCE_MAP_TASK_METRIC_TAGS) + + # Check the reduce task metrics + for metric, value in self.MAPREDUCE_REDUCE_TASK_METRIC_VALUES.iteritems(): + self.assertMetric(metric, + value=value, + tags=self.MAPREDUCE_REDUCE_TASK_METRIC_TAGS) + + # Check the MapReduce job counter metrics + for metric, attributes in self.MAPREDUCE_JOB_COUNTER_METRIC_VALUES.iteritems(): + tags = attributes['tags'] + tags.extend(self.MAPREDUCE_JOB_COUNTER_METRIC_TAGS) + self.assertMetric(metric, + value=attributes['value'], + tags=tags) + + # Check the service tests + self.assertServiceCheckOK(YARN_SERVICE_CHECK, + tags=['url:http://localhost:8088']) + self.assertServiceCheckOK(MAPREDUCE_SERVICE_CHECK, + tags=['url:http://localhost:8088'])