Skip to content

Commit

Permalink
Make multiprocessing working on Windows #787 #685 #357
Browse files Browse the repository at this point in the history
Signed-off-by: Philippe Ombredanne <[email protected]>
  • Loading branch information
pombredanne committed Jan 26, 2018
1 parent 3639ac9 commit 1d8a43d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 42 deletions.
52 changes: 33 additions & 19 deletions src/scancode/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from scancode import validate_option_dependencies
from scancode.api import get_file_info
from scancode.interrupt import DEFAULT_TIMEOUT
from scancode.interrupt import fake_interruptible
from scancode.interrupt import interruptible
from scancode.resource import Codebase
from scancode.utils import BaseCommand
Expand All @@ -86,11 +87,11 @@
# Python 2
unicode
str_orig = str
bytes = str # @ReservedAssignment
str = unicode # @ReservedAssignment
bytes = str # NOQA
str = unicode # NOQA
except NameError:
# Python 3
unicode = str # @ReservedAssignment
unicode = str # NOQA


# Tracing flags
Expand Down Expand Up @@ -257,7 +258,7 @@ class ScanCommand(BaseCommand):
Try 'scancode --help' for help on options and arguments.'''

def __init__(self, name, context_settings=None, callback=None, params=None,
help=None, # @ReservedAssignment
help=None, # NOQA
epilog=None, short_help=None,
options_metavar='[OPTIONS]', add_help_option=True,
plugin_options=()):
Expand Down Expand Up @@ -336,8 +337,8 @@ def print_plugins(ctx, param, value):
name = option.name
opts = ', '.join(option.opts)
help_group = option.help_group
help = option.help # noqa
click.echo(' help_group: {help_group!s}, name: {name!s}: {opts}\n help: {help!s}'.format(**locals()))
help_txt = option.help # noqa
click.echo(' help_group: {help_group!s}, name: {name!s}: {opts}\n help: {help_txt!s}'.format(**locals()))
click.echo('')
ctx.exit()

Expand Down Expand Up @@ -378,7 +379,7 @@ def print_plugins(ctx, param, value):
type=int, default=1,
metavar='INT',
help='Set the number of parallel processes to use. '
'Disable parallel processing if 0. [default: 1]',
'Disable parallel processing if 0. Disable also threading if -1 [default: 1]',
help_group=CORE_GROUP, sort_order=10, cls=CommandLineOption)

@click.option('--timeout',
Expand Down Expand Up @@ -481,7 +482,7 @@ def print_plugins(ctx, param, value):
help='Run ScanCode in a special "test mode". Only for testing.',
help_group=MISC_GROUP, sort_order=1000, cls=CommandLineOption)

def scancode(ctx, input,
def scancode(ctx, input, #NOQA
info,
strip_root, full_root,
processes, timeout,
Expand Down Expand Up @@ -538,7 +539,8 @@ def scancode(ctx, input,
These options are mutually exclusive.
- `processes`: int: run the scan using up to this number of processes in
parallel. If 0, disable the multiprocessing machinery.
parallel. If 0, disable the multiprocessing machinery. if -1 also
disable the multithreading machinery.
- `timeout`: float: intterup the scan of a file if it does not finish within
`timeout` seconds. This applied to each file and scan individually (e.g.
Expand Down Expand Up @@ -589,7 +591,9 @@ def scancode(ctx, input,
try:

if not processes and not quiet:
echo_stderr('Disabling multi-processing.', fg='yellow')
echo_stderr('Disabling multi-processing for debugging.', fg='yellow')
if processes == -1 and not quiet:
echo_stderr('Disabling multi-processing and multi-threading for debugging.', fg='yellow')

########################################################################
# 1. get command options and create all plugin instances
Expand Down Expand Up @@ -934,7 +938,8 @@ def scan_codebase(codebase, scanners, processes=1, timeout=DEFAULT_TIMEOUT,
resources = ((r.get_path(absolute=True), r.rid) for r in codebase.walk())

runner = partial(scan_resource, scanners=scanners,
timeout=timeout, with_timing=with_timing)
timeout=timeout, with_timing=with_timing,
with_threading=processes >= 0)

if TRACE:
logger_debug('scan_codebase: scanners:', '\n'.join(repr(s) for s in scanners))
Expand All @@ -945,7 +950,7 @@ def scan_codebase(codebase, scanners, processes=1, timeout=DEFAULT_TIMEOUT,
pool = None
scans = None
try:
if processes:
if processes >= 1:
# maxtasksperchild helps with recycling processes in case of leaks
pool = get_pool(processes=processes, maxtasksperchild=1000)
# Using chunksize is documented as much more efficient in the Python doc.
Expand All @@ -954,7 +959,7 @@ def scan_codebase(codebase, scanners, processes=1, timeout=DEFAULT_TIMEOUT,
scans = pool.imap_unordered(runner, resources, chunksize=1)
pool.close()
else:
# no multiprocessing with processes=0
# no multiprocessing with processes=0 or -1
scans = imap(runner, resources)

if progress_manager:
Expand Down Expand Up @@ -1021,29 +1026,38 @@ def scan_codebase(codebase, scanners, processes=1, timeout=DEFAULT_TIMEOUT,
return success


def scan_resource(location_rid, scanners,
timeout=DEFAULT_TIMEOUT,
with_timing=False):
def scan_resource(location_rid, scanners, timeout=DEFAULT_TIMEOUT,
with_timing=False, with_threading=True):
"""
Return a tuple of (location, rid, errors, scan_time, scan_results, timings)
by running the `scanners` Scanner objects for the file or directory resource
with id `rid` at `location` provided as a `location_rid` tuple of (location,
rid) for up to `timeout` seconds.
In the returned tuple:
If `with_threading` is False, threading is disabled.
The returned tuple has these values (:
- `location` and `rid` are the orginal arguments.
- `errors` is a list of error strings.
- `scan_results` is a mapping of scan results keyed by scanner.key.
- `scan_time` is the duration in seconds to run all scans for this resource.
- `timings` is a mapping of scan {scanner.key: execution time in seconds}
tracking the execution duration each each scan individually.
`timings` is empty unless `with_timing` is True.
All these values MUST be serializable/pickable because of the way multi-
processing/threading works.
"""
scan_time = time()

timings = None
if with_timing:
timings = OrderedDict((scanner.key, 0) for scanner in scanners)

if not with_threading:
interruptor= fake_interruptible
else:
interruptor = interruptible

location, rid = location_rid
errors = []
results = OrderedDict((scanner.key, []) for scanner in scanners)
Expand All @@ -1054,8 +1068,8 @@ def scan_resource(location_rid, scanners,
start = time()

try:
error, value = interruptible(
partial(scanner.function, location), timeout=timeout)
runner = partial(scanner.function, location)
error, value = interruptor(runner, timeout=timeout)
if error:
msg = 'ERROR: for scanner: ' + scanner.key + ':\n' + error
errors.append(msg)
Expand Down
78 changes: 55 additions & 23 deletions src/scancode/interrupt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
# specific language governing permissions and limitations under the License.
#

from __future__ import print_function
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

from traceback import format_exc as traceback_format_exc

from commoncode.system import on_windows


Expand Down Expand Up @@ -72,7 +74,11 @@ class TimeoutError(Exception):
permissions and limitations under the License.
"""

import signal
from signal import ITIMER_REAL
from signal import SIGALRM
from signal import setitimer
from signal import signal as create_signal


def interruptible(func, args=None, kwargs=None, timeout=DEFAULT_TIMEOUT):
"""
Expand All @@ -83,19 +89,18 @@ def handler(signum, frame):
raise TimeoutError

try:
signal.signal(signal.SIGALRM, handler)
signal.setitimer(signal.ITIMER_REAL, timeout)
create_signal(SIGALRM, handler)
setitimer(ITIMER_REAL, timeout)
return NO_ERROR, func(*(args or ()), **(kwargs or {}))

except TimeoutError:
return TIMEOUT_MSG % locals(), NO_VALUE

except Exception:
import traceback
return ERROR_MSG + traceback.format_exc(), NO_VALUE
return ERROR_MSG + traceback_format_exc(), NO_VALUE

finally:
signal.setitimer(signal.ITIMER_REAL, 0)
setitimer(ITIMER_REAL, 0)

else:
"""
Expand All @@ -105,13 +110,16 @@ def handler(signum, frame):
But not code has been reused from this post.
"""

import ctypes
import multiprocessing
import Queue
from ctypes import c_long
from ctypes import py_object
from ctypes import pythonapi
from multiprocessing import TimeoutError as MpTimeoutError
from Queue import Empty as Queue_Empty
from Queue import Queue
try:
import thread
from thread import start_new_thread
except ImportError:
import _thread as thread
from _thread import start_new_thread


def interruptible(func, args=None, kwargs=None, timeout=DEFAULT_TIMEOUT):
Expand All @@ -120,20 +128,31 @@ def interruptible(func, args=None, kwargs=None, timeout=DEFAULT_TIMEOUT):
POSIX, but is not reliable and works only if everything is pickable.
"""
# We run `func` in a thread and block on a queue until timeout
results = Queue.Queue()
results = Queue()

def runner():
results.put(func(*(args or ()), **(kwargs or {})))
try:
_res = func(*(args or ()), **(kwargs or {}))
results.put((NO_ERROR, _res,))
except Exception:
results.put((ERROR_MSG + traceback_format_exc(), NO_VALUE,))

tid = thread.start_new_thread(runner, ())
tid = start_new_thread(runner, ())

try:
return NO_ERROR, results.get(timeout=timeout)
except (Queue.Empty, multiprocessing.TimeoutError):
err_res = results.get(timeout=timeout)

if not err_res:
return ERROR_MSG, NO_VALUE

return err_res

except (Queue_Empty, MpTimeoutError):
return TIMEOUT_MSG % locals(), NO_VALUE

except Exception:
import traceback
return ERROR_MSG + traceback.format_exc(), NO_VALUE
return ERROR_MSG + traceback_format_exc(), NO_VALUE

finally:
try:
async_raise(tid, Exception)
Expand All @@ -152,13 +171,26 @@ def async_raise(tid, exctype=Exception):
"""
assert isinstance(tid, int), 'Invalid thread id: must an integer'

tid = ctypes.c_long(tid)
exception = ctypes.py_object(Exception)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, exception)
tid = c_long(tid)
exception = py_object(Exception)
res = pythonapi.PyThreadState_SetAsyncExc(tid, exception)
if res == 0:
raise ValueError('Invalid thread id.')
elif res != 1:
# if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError('PyThreadState_SetAsyncExc failed.')


def fake_interruptible(func, args=None, kwargs=None, timeout=DEFAULT_TIMEOUT):
"""
Fake, non-interruptible, using no threads and no signals
implementation used for debugging. This ignores the timeout and just
the function as-is.
"""

try:
return NO_ERROR, func(*(args or ()), **(kwargs or {}))
except Exception:
return ERROR_MSG + traceback_format_exc(), NO_VALUE

0 comments on commit 1d8a43d

Please sign in to comment.