Skip to content
This repository has been archived by the owner on Jul 8, 2021. It is now read-only.

Commit

Permalink
task remote init: fix strange tempfile issue
Browse files Browse the repository at this point in the history
Users are experiencing issues on one of the platforms we deloy Cylc
where remote submission fails intermittently with a file not found error
complaining that a temporary file cannot be found. (The file is used for
piping service files via STDIN (to the SSH) command.)

In this change, we pass the temporary file handle directly instead of
passing the name of the temporary file to the process pool. This appears
to fix the issue. (Note: Before cylc#2590, we were unable to pass the file
handle because the context needs to be serialised for the
multiprocessing.Pool. This is no longer a requirement in our own
subprocess pool, so it is now safer to pass the handle.)

Add basic unit tests for running command with STDIN in subprocess pool.
Rename SuiteProcPool to SubProcPool.
  • Loading branch information
matthewrmshin committed Mar 15, 2019
1 parent c3d669e commit 430b880
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 28 deletions.
4 changes: 2 additions & 2 deletions bin/cylc-check-versions
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import cylc.flags
from cylc.option_parsers import CylcOptionParser as COP
from cylc import __version__ as CYLC_VERSION
from cylc.config import SuiteConfig
from cylc.subprocpool import SuiteProcPool
from cylc.subprocpool import SubProcPool
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.task_remote_mgr import TaskRemoteMgr
from cylc.templatevars import load_template_vars
Expand Down Expand Up @@ -73,7 +73,7 @@ def main():
config.get_config(['runtime', name, 'remote', 'owner']),
config.get_config(['runtime', name, 'remote', 'host'])))
task_remote_mgr = TaskRemoteMgr(
suite, SuiteProcPool(), suite_srv_files_mgr)
suite, SubProcPool(), suite_srv_files_mgr)
for _, host_str in account_set:
task_remote_mgr.remote_host_select(host_str)
accounts = []
Expand Down
4 changes: 2 additions & 2 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ from cylc.cfgspec.glbl_cfg import glbl_cfg
from cylc.config import SuiteConfig
from cylc.cycling.loader import get_point
import cylc.flags
from cylc.subprocpool import SuiteProcPool
from cylc.subprocpool import SubProcPool
from cylc.option_parsers import CylcOptionParser as COP
from cylc.suite_db_mgr import SuiteDatabaseManager
from cylc.broadcast_mgr import BroadcastMgr
Expand Down Expand Up @@ -104,7 +104,7 @@ def main():

# Initialise job submit environment
glbl_cfg().create_cylc_run_tree(suite)
pool = SuiteProcPool()
pool = SubProcPool()
db_mgr = SuiteDatabaseManager()
task_job_mgr = TaskJobManager(
suite, pool, db_mgr, suite_srv_mgr,
Expand Down
4 changes: 2 additions & 2 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from cylc.network.server import SuiteRuntimeServer
from cylc.profiler import Profiler
from cylc.state_summary_mgr import StateSummaryMgr
from cylc.subprocpool import SuiteProcPool
from cylc.subprocpool import SubProcPool
from cylc.suite_db_mgr import SuiteDatabaseManager
from cylc.suite_events import (
SuiteEventContext, SuiteEventError, SuiteEventHandler)
Expand Down Expand Up @@ -351,7 +351,7 @@ def configure(self):
self.profiler.log_memory("scheduler.py: start configure")

# Start up essential services
self.proc_pool = SuiteProcPool()
self.proc_pool = SubProcPool()
self.state_summary_mgr = StateSummaryMgr()
self.command_queue = Queue()
self.message_queue = Queue()
Expand Down
7 changes: 4 additions & 3 deletions lib/cylc/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ class SubProcContext(object):
Default return code.
shell (boolean):
Launch command with "/bin/sh"?
stdin_file_paths (list):
stdin_files (list):
Files with content to send to command's STDIN.
Can be file paths or opened file handles.
stdin_str (str):
Content to send to command's STDIN.
.err (str):
Expand Down Expand Up @@ -84,9 +85,9 @@ def __str__(self):
value = getattr(self, attr, None)
if value is not None and str(value).strip():
mesg = ''
if attr == 'cmd' and self.cmd_kwargs.get('stdin_file_paths'):
if attr == 'cmd' and self.cmd_kwargs.get('stdin_files'):
mesg += 'cat'
for file_path in self.cmd_kwargs.get('stdin_file_paths'):
for file_path in self.cmd_kwargs.get('stdin_files'):
mesg += ' ' + quote(file_path)
mesg += ' | '
if attr == 'cmd' and isinstance(value, list):
Expand Down
21 changes: 13 additions & 8 deletions lib/cylc/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ def run_function(func_name, json_args, json_kwargs, src_dir):
sys.stdout.write(json.dumps(res))


class SuiteProcPool(object):
class SubProcPool(object):
"""Manage queueing and pooling of subprocesses.
This is mainly used by the main loop of the suite server program, although
the SuiteProcPool.run_command can be used as a standalone utility function
the SubProcPool.run_command can be used as a standalone utility function
to run the command in a cylc.subprocctx.SubProcContext.
A command to run under a subprocess in the pool is expected to be wrapped
using a cylc.subprocctx.SubProcContext object. The caller will add the
context object using the SuiteProcPool.put_command method. A callback can
context object using the SubProcPool.put_command method. A callback can
be specified to notify the caller on exit of the subprocess.
A command launched by the pool is expected to write to STDOUT and STDERR.
Expand Down Expand Up @@ -311,15 +311,20 @@ def _poll_proc_pipes(self, proc, ctx):
def _run_command_init(cls, ctx, callback=None, callback_args=None):
"""Prepare and launch shell command in ctx."""
try:
if ctx.cmd_kwargs.get('stdin_file_paths'):
if len(ctx.cmd_kwargs['stdin_file_paths']) > 1:
if ctx.cmd_kwargs.get('stdin_files'):
if len(ctx.cmd_kwargs['stdin_files']) > 1:
stdin_file = TemporaryFile()
for file_path in ctx.cmd_kwargs['stdin_file_paths']:
stdin_file.write(open(file_path, 'rb').read())
for file_ in ctx.cmd_kwargs['stdin_files']:
if hasattr(file_, 'read'):
stdin_file.write(file_.read())
else:
stdin_file.write(open(file_, 'rb').read())
stdin_file.seek(0)
elif hasattr(ctx.cmd_kwargs['stdin_files'][0], 'read'):
stdin_file = ctx.cmd_kwargs['stdin_files'][0]
else:
stdin_file = open(
ctx.cmd_kwargs['stdin_file_paths'][0], 'rb')
ctx.cmd_kwargs['stdin_files'][0], 'rb')
elif ctx.cmd_kwargs.get('stdin_str'):
stdin_file = TemporaryFile('bw+')
stdin_file.write(ctx.cmd_kwargs.get('stdin_str').encode())
Expand Down
12 changes: 6 additions & 6 deletions lib/cylc/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from cylc.task_job_logs import (
JOB_LOG_JOB, get_task_job_log, get_task_job_job_log,
get_task_job_activity_log, get_task_job_id, NN)
from cylc.subprocpool import SuiteProcPool
from cylc.subprocpool import SubProcPool
from cylc.subprocctx import SubProcContext
from cylc.task_action_timer import TaskActionTimer
from cylc.task_events_mgr import TaskEventsManager, log_task_job_activity
Expand Down Expand Up @@ -74,7 +74,7 @@ class TaskJobManager(object):

JOBS_KILL = 'jobs-kill'
JOBS_POLL = 'jobs-poll'
JOBS_SUBMIT = SuiteProcPool.JOBS_SUBMIT
JOBS_SUBMIT = SubProcPool.JOBS_SUBMIT
POLL_FAIL = 'poll failed'
REMOTE_SELECT_MSG = 'waiting for remote host selection'
REMOTE_INIT_MSG = 'remote host initialising'
Expand Down Expand Up @@ -296,11 +296,11 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
'%s ... # will invoke in batches, sizes=%s',
cmd, [len(b) for b in itasks_batches])
for i, itasks_batch in enumerate(itasks_batches):
stdin_file_paths = []
stdin_files = []
job_log_dirs = []
for itask in itasks_batch:
if remote_mode:
stdin_file_paths.append(
stdin_files.append(
get_task_job_job_log(
suite, itask.point, itask.tdef.name,
itask.submit_num))
Expand All @@ -317,7 +317,7 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
SubProcContext(
self.JOBS_SUBMIT,
cmd + job_log_dirs,
stdin_file_paths=stdin_file_paths,
stdin_files=stdin_files,
job_log_dirs=job_log_dirs,
**kwargs
),
Expand Down Expand Up @@ -713,7 +713,7 @@ def _submit_task_job_callback(self, suite, itask, cmd_ctx, line):
ctx.cmd = cmd_ctx.cmd # print original command on failure
log_task_job_activity(ctx, suite, itask.point, itask.tdef.name)

if ctx.ret_code == SuiteProcPool.RET_CODE_SUITE_STOPPING:
if ctx.ret_code == SubProcPool.RET_CODE_SUITE_STOPPING:
return

try:
Expand Down
10 changes: 5 additions & 5 deletions lib/cylc/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import re
from subprocess import Popen, PIPE
import tarfile
from tempfile import NamedTemporaryFile
from tempfile import TemporaryFile
from time import time

from cylc import LOG
Expand Down Expand Up @@ -195,8 +195,9 @@ def remote_init(self, host, owner):
self.remote_init_map[(host, owner)] = REMOTE_INIT_NOT_REQUIRED
return self.remote_init_map[(host, owner)]

# Create "stdin_file_paths" file, with "items" in it.
tmphandle = NamedTemporaryFile()
# Create a TAR archive with the service files,
# so they can be sent later via SSH's STDIN to the task remote.
tmphandle = TemporaryFile()
tarhandle = tarfile.open(fileobj=tmphandle, mode='w')
for path, arcname in items:
tarhandle.add(path, arcname=arcname)
Expand All @@ -222,8 +223,7 @@ def remote_init(self, host, owner):
cmd.append(glbl_cfg().get_derived_host_item(
self.suite, 'suite run directory', host, owner))
self.proc_pool.put_command(
SubProcContext(
'remote-init', cmd, stdin_file_paths=[tmphandle.name]),
SubProcContext('remote-init', cmd, stdin_files=[tmphandle]),
self._remote_init_callback,
[host, owner, tmphandle])
# None status: Waiting for command to finish
Expand Down
128 changes: 128 additions & 0 deletions lib/cylc/tests/test_subprocpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python2

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from tempfile import NamedTemporaryFile, TemporaryFile
import unittest

from cylc.subprocctx import SubProcContext
from cylc.subprocpool import SubProcPool


class TestSubProcPool(unittest.TestCase):

def test_run_command_returns_0(self):
"""Test basic usage, command returns 0"""
ctx = SubProcContext('truth', ['true'])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, '')
self.assertEqual(ctx.ret_code, 0)

def test_run_command_returns_1(self):
"""Test basic usage, command returns 1"""
ctx = SubProcContext('lies', ['false'])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, '')
self.assertEqual(ctx.ret_code, 1)

def test_run_command_writes_to_out(self):
"""Test basic usage, command writes to STDOUT"""
ctx = SubProcContext('parrot', ['echo', 'pirate', 'urrrr'])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, 'pirate urrrr\n')
self.assertEqual(ctx.ret_code, 0)

def test_run_command_writes_to_err(self):
"""Test basic usage, command writes to STDERR"""
ctx = SubProcContext(
'parrot2', ['bash', '-c', 'echo pirate errrr >&2'])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, 'pirate errrr\n')
self.assertEqual(ctx.out, '')
self.assertEqual(ctx.ret_code, 0)

def test_run_command_with_stdin_from_str(self):
"""Test STDIN from string"""
ctx = SubProcContext('meow', ['cat'], stdin_str='catches mice.\n')
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, 'catches mice.\n')
self.assertEqual(ctx.ret_code, 0)

def test_run_command_with_stdin_from_handle(self):
"""Test STDIN from a single opened file handle"""
handle = TemporaryFile()
handle.write('catches mice.\n'.encode('UTF-8'))
handle.seek(0)
ctx = SubProcContext('meow', ['cat'], stdin_files=[handle])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, 'catches mice.\n')
self.assertEqual(ctx.ret_code, 0)
handle.close()

def test_run_command_with_stdin_from_path(self):
"""Test STDIN from a single file path"""
handle = NamedTemporaryFile()
handle.write('catches mice.\n'.encode('UTF-8'))
handle.seek(0)
ctx = SubProcContext('meow', ['cat'], stdin_files=[handle.name])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, 'catches mice.\n')
self.assertEqual(ctx.ret_code, 0)
handle.close()

def test_run_command_with_stdin_from_handles(self):
"""Test STDIN from multiple file handles"""
handles = []
for txt in ['catches mice.\n', 'eat fish.\n']:
handle = TemporaryFile()
handle.write(txt.encode('UTF-8'))
handle.seek(0)
handles.append(handle)
ctx = SubProcContext('meow', ['cat'], stdin_files=handles)
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, 'catches mice.\neat fish.\n')
self.assertEqual(ctx.ret_code, 0)
for handle in handles:
handle.close()

def test_run_command_with_stdin_from_paths(self):
"""Test STDIN from multiple file paths"""
handles = []
for txt in ['catches mice.\n', 'eat fish.\n']:
handle = NamedTemporaryFile()
handle.write(txt.encode('UTF-8'))
handle.seek(0)
handles.append(handle)
ctx = SubProcContext(
'meow', ['cat'], stdin_files=[handle.name for handle in handles])
SubProcPool.run_command(ctx)
self.assertEqual(ctx.err, '')
self.assertEqual(ctx.out, 'catches mice.\neat fish.\n')
self.assertEqual(ctx.ret_code, 0)
for handle in handles:
handle.close()


if __name__ == '__main__':
unittest.main()

0 comments on commit 430b880

Please sign in to comment.