Skip to content

Commit

Permalink
Merge pull request #49 from ralphbean/feature/multiprocessing
Browse files Browse the repository at this point in the history
Feature/multiprocessing
  • Loading branch information
ralphbean committed Feb 11, 2013
2 parents b03a125 + 2a80de2 commit 3d0c8f4
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 49 deletions.
7 changes: 7 additions & 0 deletions bugwarrior/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ Create a ``~/.bugwarriorrc`` file with the following contents.
#bitly.api_user = YOUR_USERNAME
#bitly.api_key = YOUR_API_KEY

# This is an experimental mode where bugwarrior will query all of your
# online sources simultaneously. It works as far as I've tested it, so
# give it a try. Please backup your ~/.task/ directory first and report
# any problems if you encounter them at
# https://github.com/ralphbean/bugwarrior/issues
#multiprocessing = False

# This is a github example. It says, "scrape every issue from every repository
# on http://github.com/ralphbean. It doesn't matter if ralphbean owns the issue
# or not."
Expand Down
2 changes: 1 addition & 1 deletion bugwarrior/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def pull():
# Stuff them in the taskwarrior db as necessary
synchronize(issues)
except:
log.trace('error').critical('oh noes')
log.name('command').trace('error').critical('oh noes')
9 changes: 8 additions & 1 deletion bugwarrior/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from ConfigParser import ConfigParser


def asbool(some_value):
""" Cast config values to boolean. """
return str(some_value).lower() in ['y', 'yes', 't', 'true', '1', 'on']


def load_example_rc():
root = '/'.join(__file__.split('/')[:-1])
fname = root + '/README.rst'
Expand Down Expand Up @@ -66,7 +71,9 @@ def validate_config(config):

for option in ['bitly.api_user', 'bitly.api_key']:
if not config.has_option('general', option):
log.warning("URLs will not be shortened with bit.ly")
log.name('config').warning(
"URLs will not be shortened with bit.ly"
)

# Validate each target one by one.
for target in targets:
Expand Down
26 changes: 18 additions & 8 deletions bugwarrior/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ def synchronize(issues):

# Load info about the task database
tasks = tw.load_tasks()
is_bugwarrior_task = lambda task: task.get('description', '').startswith(MARKUP)
is_bugwarrior_task = lambda task: \
task.get('description', '').startswith(MARKUP)

# Prune down to only tasks managed by bugwarrior
for key in tasks.keys():
tasks[key] = filter(is_bugwarrior_task, tasks[key])

# Build a list of only the descriptions of those local bugwarrior tasks
local_descs = [t['description'] for t in sum(tasks.values(), []) \
if t['status'] not in ('deleted')]
local_descs = [t['description'] for t in sum(tasks.values(), [])
if t['status'] not in ('deleted')]

# Now for the remote data.
# Build a list of only the descriptions of those remote issues
Expand All @@ -34,11 +35,14 @@ def synchronize(issues):
is_done = lambda task: task['description'] not in remote_descs
done_tasks = filter(is_done, tasks['pending'])

log.struct(new=len(new_issues), completed=len(done_tasks))
log.name('db').struct(new=len(new_issues), completed=len(done_tasks))

# Add new issues
for issue in new_issues:
log.info("Adding task {0}", issue['description'].encode("utf-8"))
log.name('db').info(
"Adding task {0}",
issue['description'].encode("utf-8")
)
tw.task_add(**issue)

# Update any issues that may have had new properties added. These are
Expand All @@ -51,12 +55,18 @@ def synchronize(issues):
id, task = tw.get_task(description=upstream_issue['description'])
for key in upstream_issue:
if key not in task:
log.info("Updating {0} on {1}",
key, upstream_issue['description'].encode("utf-8"))
log.name('db').info(
"Updating {0} on {1}",
key,
upstream_issue['description'].encode("utf-8"),
)
task[key] = upstream_issue[key]
id, task = tw.task_update(task)

# Delete old issues
for task in done_tasks:
log.info("Completing task {0}", task['description'].encode("utf-8"))
log.name('db').info(
"Completing task {0}",
task['description'].encode("utf-8"),
)
tw.task_done(uuid=task['uuid'])
76 changes: 57 additions & 19 deletions bugwarrior/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import bitlyapi
import time

from bugwarrior.config import die
from bugwarrior.config import die, asbool
from bugwarrior.db import MARKUP


Expand All @@ -15,7 +15,7 @@ def __init__(self, config, target, shorten):
self.target = target
self.shorten = shorten

log.info("Working on [{0}]", self.target)
log.name(target).info("Working on [{0}]", self.target)

@classmethod
def validate_config(cls, config, target):
Expand Down Expand Up @@ -146,29 +146,67 @@ def get_owner(self, issue):
except ImportError:
pass

WORKER_FAILURE = "__this signifies a worker failure__"


def _aggregate_issues(args):
""" This worker function is separated out from the main
:func:`aggregate_issues` func only so that we can use multiprocessing
on it for speed reasons.
"""

# Unpack arguments
conf, target = args

try:
# By default, we don't shorten URLs
shorten = lambda url: url

# Setup bitly shortening callback if creds are specified
bitly_opts = ['bitly.api_user', 'bitly.api_key']
if all([conf.has_option('general', opt) for opt in bitly_opts]):
get_opt = lambda option: conf.get('general', option)
bitly = bitlyapi.BitLy(
get_opt('bitly.api_user'),
get_opt('bitly.api_key')
)
shorten = lambda url: bitly.shorten(longUrl=url)['url']

service = SERVICES[conf.get(target, 'service')](conf, target, shorten)
return service.issues()
except Exception as e:
log.name(target).trace('error').critical("worker failure")
return WORKER_FAILURE
finally:
log.name(target).info("Done with [%s]" % target)

# Import here so that mproc knows about _aggregate_issues
import multiprocessing


def aggregate_issues(conf):
""" Return all issues from every target.
Takes a config object and a callable which returns a shortened url.
"""

# By default, we don't shorten URLs
shorten = lambda url: url

# Setup bitly shortening callback if creds are specified
bitly_opts = ['bitly.api_user', 'bitly.api_key']
if all([conf.has_option('general', opt) for opt in bitly_opts]):
get_opt = lambda option: conf.get('general', option)
bitly = bitlyapi.BitLy(
get_opt('bitly.api_user'),
get_opt('bitly.api_key')
)
shorten = lambda url: bitly.shorten(longUrl=url)['url']
log.name('bugwarrior').info("Starting to aggregate remote issues.")

# Create and call service objects for every target in the config
targets = [t.strip() for t in conf.get('general', 'targets').split(',')]
return sum([
SERVICES[conf.get(t, 'service')](conf, t, shorten).issues()
for t in targets
], [])

# This multiprocessing stuff is kind of experimental.
map_function = map
if asbool(conf.get('general', 'multiprocessing', 'True')):
log.name('bugwarrior').info("Spawning %i workers." % len(targets))
pool = multiprocessing.Pool(processes=len(targets))
map_function = pool.map

issues_by_target = map_function(
_aggregate_issues,
zip([conf] * len(targets), targets)
)
log.name('bugwarrior').info("Done aggregating remove issues.")
if WORKER_FAILURE in issues_by_target:
log.name('bugwarrior').critical("A worker failed. Aborting.")
raise RuntimeError('Worker failure')
return sum(issues_by_target, [])
14 changes: 7 additions & 7 deletions bugwarrior/services/activecollab2.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def find_issues(self, user_id=None, project_id=None, project_name=None):
assigned_task['due'] = self.format_date(task[u'due_on'])

if assigned_task:
log.debug(" Adding '" + assigned_task['description'] + "' to task list.")
log.name(self.target).debug(" Adding '" + assigned_task['description'] + "' to task list.")
assigned_tasks.append(assigned_task)
except:
log.debug(' No user tasks loaded for "%s".' % project_name)
log.name(self.target).debug(' No user tasks loaded for "%s".' % project_name)

return assigned_tasks

Expand Down Expand Up @@ -180,14 +180,14 @@ def issues(self):
# @todo Implement threading here.
for project in projects:
for project_id, project_name in project.iteritems():
log.debug(" Getting tasks for #" + project_id + " " + project_name + '"')
log.name(self.target).debug(" Getting tasks for #" + project_id + " " + project_name + '"')
issues += self.client.find_issues(self.user_id, project_id, project_name)

log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))
global api_count
log.debug(" {0} API calls", api_count)
log.debug(" {0} tasks and tickets analyzed", task_count)
log.debug(" Elapsed Time: %s" % (time.time() - start))
log.name(self.target).debug(" {0} API calls", api_count)
log.name(self.target).debug(" {0} tasks and tickets analyzed", task_count)
log.name(self.target).debug(" Elapsed Time: %s" % (time.time() - start))

formatted_issues = []

Expand Down
4 changes: 2 additions & 2 deletions bugwarrior/services/bitbucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def issues(self):
]

issues = sum([self.pull(user + "/" + repo) for repo in repos], [])
log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))

# Build a url for each issue
for i in range(len(issues)):
Expand All @@ -88,7 +88,7 @@ def issues(self):
not_resolved = lambda tup: tup[1]['status'] not in closed
issues = filter(not_resolved, issues)
issues = filter(self.include, issues)
log.debug(" Pruned down to {0}", len(issues))
log.name(self.target).debug(" Pruned down to {0}", len(issues))

return [dict(
description=self.description(
Expand Down
4 changes: 2 additions & 2 deletions bugwarrior/services/bz.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def issues(self):
]

issues = [(self.target, bug) for bug in bugs]
log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))

# Build a url for each issue
base_url = "https://%s/show_bug.cgi?id=" % \
Expand All @@ -112,7 +112,7 @@ def issues(self):
# can already do a lot of the filtering we want for us.

#issues = filter(self.include, issues)
#log.debug(" Pruned down to {0}", len(issues))
#log.name(self.target).debug(" Pruned down to {0}", len(issues))

return [dict(
description=self.description(
Expand Down
4 changes: 2 additions & 2 deletions bugwarrior/services/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def issues(self):
repo['open_issues_count'] > 0
repos = filter(has_issues, all_repos)
issues = sum([self._issues(user + "/" + r['name']) for r in repos], [])
log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))
issues = filter(self.include, issues)
log.debug(" Pruned down to {0}", len(issues))
log.name(self.target).debug(" Pruned down to {0}", len(issues))

# Next, get all the pull requests (and don't prune)
has_requests = lambda repo: repo['forks'] > 1
Expand Down
2 changes: 1 addition & 1 deletion bugwarrior/services/jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def annotations(self, issue):
def issues(self):
cases = self.jira.search_issues(self.query, maxResults=-1)

log.debug(" Found {0} total.", len(cases))
log.name(self.target).debug(" Found {0} total.", len(cases))

return [dict(
description=self.description(
Expand Down
2 changes: 1 addition & 1 deletion bugwarrior/services/mplan.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_issue_url(self, issue):

def issues(self):
issues = self.client.get_actual_tasks()
log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))

return [dict(
description=self.description(
Expand Down
2 changes: 1 addition & 1 deletion bugwarrior/services/redmine.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def get_project_name(self, issue):

def issues(self):
issues = self.client.find_issues(self.user_id)
log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))

return [dict(
description=self.description(
Expand Down
4 changes: 2 additions & 2 deletions bugwarrior/services/teamlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ def get_priority(self, issue):

def issues(self):
issues = self.client.get_task_list()
log.debug(" Remote has {0} total issues.", len(issues))
log.name(self.target).debug(" Remote has {0} total issues.", len(issues))
if not issues:
return []

# Filter out closed tasks.
issues = filter(lambda i: i["status"] == 1, issues)
log.debug(" Remote has {0} active issues.", len(issues))
log.name(self.target).debug(" Remote has {0} active issues.", len(issues))

return [dict(
description=self.description(
Expand Down
4 changes: 2 additions & 2 deletions bugwarrior/services/trac.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ def issues(self):
tickets = self.trac.query_tickets('status!=closed&max=0')
tickets = map(self.trac.get_ticket, tickets)
issues = [(self.target, ticket[3]) for ticket in tickets]
log.debug(" Found {0} total.", len(issues))
log.name(self.target).debug(" Found {0} total.", len(issues))

# Build a url for each issue
for i in range(len(issues)):
issues[i][1]['url'] = "%s/ticket/%i" % (base_url, tickets[i][0])
issues[i][1]['number'] = tickets[i][0]

issues = filter(self.include, issues)
log.debug(" Pruned down to {0}", len(issues))
log.name(self.target).debug(" Pruned down to {0}", len(issues))

return [dict(
description=self.description(
Expand Down

0 comments on commit 3d0c8f4

Please sign in to comment.