Source code for valjean.cosette.run

# Copyright French Alternative Energies and Atomic Energy Commission
# Contributors: valjean developers
# valjean-support@cea.fr
#
# This software is a computer program whose purpose is to analyze and
# post-process numerical simulation results.
#
# This software is governed by the CeCILL license under French law and abiding
# by the rules of distribution of free software. You can use, modify and/ or
# redistribute the software under the terms of the CeCILL license as circulated
# by CEA, CNRS and INRIA at the following URL: http://www.cecill.info.
#
# As a counterpart to the access to the source code and rights to copy, modify
# and redistribute granted by the license, users are provided only with a
# limited warranty and the software's author, the holder of the economic
# rights, and the successive licensors have only limited liability.
#
# In this respect, the user's attention is drawn to the risks associated with
# loading, using, modifying and/or developing or reproducing the software by
# the user in light of its specific status of free software, that may mean that
# it is complicated to manipulate, and that also therefore means that it is
# reserved for developers and experienced professionals having in-depth
# computer knowledge. Users are therefore encouraged to load and test the
# software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and, more generally,
# to use and operate it in the same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

'''This module defines a few useful functions and classes to embed generic
command execution in a :mod:`~.depgraph`.

Spawning external processes
===========================

.. doctest:: code
   :hide:

   >>> from valjean.config import Config
   >>> config = Config()

The :class:`RunTask` class is the basic building block to run tasks that
consist in spawning external processes and waiting for their completion.  It
makes it possible to execute arbitrary commands. Consider:

   >>> from valjean.cosette.run import RunTask
   >>> task = RunTask.from_cli(name='say', cli=['echo', 'ni!'])
   >>> env_update, status = task.do(env=dict(), config=config) # prints 'ni!'
   >>> print(status)
   TaskStatus.DONE

The task succeeded, but where is the output of our command?! Note that
:class:`RunTask` captures standard output and standard error and redirects them
to files. If you want to see what was printed, you have to look there:

   >>> def print_stdout(env_up, name):
   ...   """A small function to print the stdout of a task."""
   ...   stdout = env_up[name]['stdout']
   ...   with open(stdout) as stdout_f:
   ...     print(stdout_f.read(), end='')
   >>> print_stdout(env_update, 'say')
   ni!

Note that `command` is not parsed by a shell. So the following may not do what
you expect:

   >>> task = RunTask.from_cli(name='want',
   ...                         cli=['echo', 'We want... ', '&&',
   ...                              'echo', 'a shrubbery!'])
   >>> env_update, status = task.do(env=dict(), config=config)
   >>> print_stdout(env_update, 'want')
   We want...  && echo a shrubbery!

Indeed, :class:`RunTask` ran :command:`echo` only once.  If you need to execute
several commands, you can wrap them in a shell script and execute it.
Alternatively, you can directly invoke the :meth:`RunTask.from_clis` class
method:

   >>> task = RunTask.from_clis(name='want',
   ...                          clis=[['echo', '-n', 'We want... '],
   ...                                ['echo', 'a shrubbery!']])
   >>> env_update, status = task.do(env=dict(), config=config)
   >>> print_stdout(env_update, 'want')
   We want... a shrubbery!


Creating tasks using a factory
==============================

When you want to create multiple :class:`RunTask` objects using the same
executable, it can be convenient to use :class:`RunTaskFactory`. This class can
be parametrized to create tasks by specifying the path to the executable once
and for all, for instance, and providing the missing arguments later.

.. doctest:: RunTaskFactory
    :hide:

    >>> import os
    >>> from valjean.config import Config
    >>> config = Config()
    >>> def print_stdout(env_up, name):
    ...   """A small function to print the stdout of a task."""
    ...   stdout = env_up[name]['stdout']
    ...   with open(stdout) as stdout_f:
    ...     print(stdout_f.read(), end='')

The simplest way to create a :class:`RunTaskFactory` is to use one of the
:meth:`RunTaskFactory.from_executable` or :meth:`RunTaskFactory.from_task`
class methods. For example, this will create a factory instance that generates
:class:`RunTask` objects for the :command:`echo` executable:

    >>> factory = RunTaskFactory.from_executable('echo', name='echo')

You can use it to generate tasks by invoking the :meth:`RunTaskFactory.make`
method:

    >>> task = factory.make(name='task', extra_args=['spam'])

This creates an `task.echo` object (of type :class:`RunTask`) that executes
:file:`echo spam` when run:

    >>> env_up, status = task.do(env={}, config=config)
    >>> print_stdout(env_up, 'task.echo')
    spam

You can also leave the `name` parameter out. If you do so,
:class:`RunTaskFactory` will generate a name for you:

    >>> task_sausage = factory.make(extra_args=['sausage'])
    >>> task_sausage.name
    '....echo'

Of course you can generate multiple tasks using the same factory (this is the
whole point of :class:`RunTaskFactory`, really):

    >>> task_spam = factory.make(name='task_spam', extra_args=['spam'])
    >>> task_eggs = factory.make(name='task_eggs', extra_args=['eggs'])
    >>> task_bacon = factory.make(name='task_bacon', extra_args=['bacon'])

You can also specify a few arguments beforehand and provide the rest later:

    >>> factory = RunTaskFactory.from_executable('echo', name='echo',
    ...                                          default_args=['spam'])
    >>> task = factory.make(name='task', extra_args=['eggs'])
    >>> env_up, status = task.do(env={}, config=config)
    >>> print_stdout(env_up, 'task.echo')
    spam eggs

Finally, you can parametrize your arguments on arbitrary keywords that will be
provided when the task is created:

    >>> args = ['{food}', 'with', '{side}']
    >>> factory = RunTaskFactory.from_executable('echo', name='echo',
    ...                                          default_args=args)
    >>> task = factory.make(name='task', food='lobster', side='spam')
    >>> env_up, status = task.do(env={}, config=config)
    >>> print_stdout(env_up, 'task.echo')
    lobster with spam

Default values for the parameters may be specified when creating the factory
and can be overridden when the task is created:

    >>> args = ['{food}', 'with', '{side}']
    >>> factory = RunTaskFactory.from_executable('echo', default_args=args,
    ...                                          side='spam', name='echo')
    >>> beans = factory.make(name='baked beans', food='baked beans')
    >>> eggs = factory.make(name='eggs', food='eggs',
    ...                     side='bacon and spam')
    >>> env_up, status = beans.do(env={}, config=config)
    >>> print_stdout(env_up, 'baked beans.echo')
    baked beans with spam
    >>> env_up, status = eggs.do(env={}, config=config)
    >>> print_stdout(env_up, 'eggs.echo')
    eggs with bacon and spam

Note also that you can refer to the environment or the configuration in your
command-line arguments:

    >>> args = ['{env[side]}']
    >>> factory = RunTaskFactory.from_executable('echo', name='echo',
    ...                                          default_args=args)
    >>> task = factory.make(name='task')
    >>> env_up, status = task.do(env={'side': 'spam'}, config=config)
    >>> print_stdout(env_up, 'task.echo')
    spam


Caching
-------

The :class:`RunTaskFactory` class caches generated tasks under the hood.
Repeated calls to :meth:`RunTaskFactory.make` from the same factory with the
same arguments will result in the same task:

    >>> factory = RunTaskFactory.from_executable('echo', name='echo')
    >>> task_sausage = factory.make(extra_args=['sausage'])
    >>> task_sausage_again = factory.make(extra_args=['sausage'])
    >>> task_sausage is task_sausage_again
    True

If you instantiate another factory, the caching mechanism is defeated:

    >>> other_factory = RunTaskFactory.from_executable('echo', name='echo')
    >>> task_sausage_other = other_factory.make(extra_args=['sausage'])
    >>> task_sausage is task_sausage_other
    False


Module API
==========
'''

import os
import shlex
import logging
from functools import partial
from subprocess import call

from ..chrono import Chrono
from ..path import ensure, sanitize_filename
from .task import TaskStatus, det_hash
from .pythontask import PythonTask


LOGGER = logging.getLogger(__name__)


# The following functions are helpers for RunTask, but they may be used
# elsewhere, too
[docs]def run(clis, stdout, stderr, **subprocess_args): '''Run the given command lines and capture their stdout/stderr. Execution stops at the first failure (result value != 0). :param list clis: The list of commands to execute. :param stdout: File handle to capture the stdout stream. :type stdout: :term:`file object` :param stderr: File handle to capture the stderr stream. :type stderr: :term:`file object` :param dict subprocess_args: Parameters to be passed to :func:`subprocess.call`. ''' results = [] # collect the return codes of each cli status = TaskStatus.DONE # will change to FAILED in case of trouble with Chrono() as chrono: for cli in clis: LOGGER.debug('Running cli: %s', cli) print('$ ' + ' '.join(shlex.quote(token) for token in cli), file=stderr, flush=True) LOGGER.debug('subprocess_args: %s', subprocess_args) result = call(cli, universal_newlines=True, stdout=stdout, stderr=stderr, **subprocess_args) LOGGER.debug('Run result: %s', result) results.append(result) if result != 0: LOGGER.warning('A subprocess ended with return code %s; will ' 'skip the remaining commands', result) status = TaskStatus.FAILED break return results, status, float(chrono)
[docs]def make_cap_paths(base_path): '''Construct filenames to capture stdout and stderr.''' stdout_path = base_path / 'stdout' stderr_path = base_path / 'stderr' ensure(stdout_path) ensure(stderr_path) return stdout_path.resolve(), stderr_path.resolve()
[docs]class RunTask(PythonTask): '''Task that executes the specified shell commands and waits for their completion.'''
[docs] @classmethod def from_cli(cls, name, cli, **kwargs): '''Create a :class:`RunTask` from a single command line. Use the :meth:`RunTask.from_clis` method if you want to run several commands in a row. :param str name: The name of this task. :param list cli: The command line to be executed, as a list of strings. The first element of the list is the command and the following ones are its arguments. :param kwargs: Any other keyword arguments will be passed on to the constructor (see :meth:`RunTask.__init__`). ''' if not isinstance(cli, (list, tuple)): raise TypeError('Argument `cli` must be a list or a tuple') return cls.from_clis(name, [cli], **kwargs)
[docs] @classmethod def from_clis(cls, name, clis, **kwargs): '''Create a :class:`RunTask` from a list of command lines. :param str name: The name of this task. :param list clis: The command lines to be executed, as a list of lists of strings. The first element of each sub-list is the command and the following ones are its arguments. :param kwargs: Any other keyword arguments will be passed on to the constructor (see :meth:`RunTask.__init__`). ''' if (not isinstance(clis, (list, tuple)) or not all(isinstance(cli, (list, tuple)) for cli in clis)): raise TypeError('Argument `cli` must be a list (or a tuple) ' 'of lists (or tuples)') return cls(name, lambda _env, _config: clis, **kwargs)
[docs] def __init__(self, name, clis_closure, *, deps=None, soft_deps=None, **subprocess_args): '''Initialize this task from a list of command lines. The `clis_closure` argument must be a closure. It will be invoked at execution time as:: clis_closure(env, config) and it must return the command lines to be executed, as a list of lists of strings. :param str name: The name of this task. :param list clis_closure: A closure to generate the command lines. :param dict subprocess_args: Any remaining options will be passed to the :class:`.subprocess.Popen` constructor. :param deps: The dependencies for this task (see :meth:`Task.__init__ <valjean.cosette.task.Task.__init__>` for the format), or `None`. :type deps: list(Task) or None :param soft_deps: The dependencies for this task (see :meth:`Task.__init__ <valjean.cosette.task.Task.__init__>` for the format), or `None`. :type soft_deps: list(Task) or None ''' super().__init__(name, self.run_task(clis_closure, name, **subprocess_args), deps=deps, soft_deps=soft_deps, env_kwarg='env', config_kwarg='config') LOGGER.debug('Created %s task %r', self.__class__.__name__, self.name) LOGGER.debug(' - deps = %s', deps) LOGGER.debug(' - soft_deps = %s', soft_deps) LOGGER.debug(' - subprocess_args = %s', subprocess_args)
[docs] def run_task(self, clis_closure, name, **subprocess_args): # pylint: disable=unused-argument '''Execute the specified command and wait for its completion. On completion, this method proposes the following updates to the environment:: env[task.name]['clis'] = clis env[task.name]['return_codes'] = return_codes env[task.name]['elapsed_time'] = wallclock_time env[task.name]['stdout'] = stdout env[task.name]['stderr'] = stderr Here ``clis`` is the list of command lines that were executed, ``return_codes`` is the list of return codes of the executed commands, and ``elapsed_time`` is the time the whole list took. The keys ``stdout`` and ``stderr`` hold the paths to the files containing respectively the captured standard output and standard error streams. :param Env env: The task environment. :param config: The configuration object. :type config: Config or None :returns: The proposed environment update. ''' def runner(*, env, config, name, clis_closure, subprocess_args): # pylint: disable=too-many-locals '''Actually run the command lines.''' from pathlib import Path clis = clis_closure(env, config) output_dir = Path(config.query('path', 'output-root'), sanitize_filename(name)) stdout_path, stderr_path = make_cap_paths(output_dir) with stdout_path.open('w') as stdout: with stderr_path.open('w') as stderr: results, status, elapsed = run(clis, stdout, stderr, cwd=str(output_dir), **subprocess_args) env_up = {self.name: {'clis': clis, 'output_dir': str(output_dir), 'return_codes': results, 'elapsed_time': elapsed, 'result': str(stdout_path), 'stdout': str(stdout_path), 'stderr': str(stderr_path)}} LOGGER.debug('RunTask %s ends, elapsed time: %f s', name, elapsed) return env_up, status # saturate name, clis and subprocess_args with the values from the # enclosing scope return partial(runner, name=name, clis_closure=clis_closure, subprocess_args=subprocess_args)
[docs]class RunTaskFactory: '''Create multiple tasks from the same executable without even breaking a sweat.''' @classmethod def _generic_clis_closure(cls, executable, *, env, config, default_args, extra_args, **kwargs): # pylint: disable=too-many-arguments cli = [executable] cli.extend(arg.format(env=env, config=config, **kwargs) for arg in default_args) cli.extend(extra_args) return [cli]
[docs] @classmethod def from_task(cls, task, *, relative_path, name=None, default_args=None, **kwargs): '''This class method creates a :class:`RunTaskFactory` from a :class:`~valjean.cosette.task.Task`. The command to be executed must appear in the `output_dir` directory of the given task; its relative location must be specified with the `relative_path` argument. This method can be used, among other things, to create a :class:`RunTaskFactory` from a :class:`~valjean.cosette.code.CheckoutTask` or a :class:`~valjean.cosette.code.BuildTask`. :param task: The :class:`~valjean.cosette.task.Task` object. :type task: :class:`~valjean.cosette.task.Task` :param str relative_path: The path to the executable, relative to the the output directory (`output_dir`) of the task. :param name: a unique identifier for this factory. The factory name is used to produce unique names for the generated tasks. If `None` is given, the factory name will be constructed by hashing the other arguments. :type name: str or None :param default_args: see :meth:`RunTaskFactory.from_executable`. :param kwargs: Any remaining arguments will be passed to the :meth:`__init__`. ''' d_args = [] if default_args is None else default_args LOGGER.debug('relative_path=%r', relative_path) LOGGER.debug('d_args=%s', d_args) closure = partial(cls._clis_closure_from_task, task.name, relative_path, d_args) name = (name if name is not None else det_hash(task.name, relative_path, d_args)) return cls(closure, deps=[task], name=name, **kwargs)
@classmethod def _clis_closure_from_task(cls, job_name, relative_path, default_args, extra_args, **kwargs): '''This static method generates the closure consumed by :meth:`RunTaskFactory.from_task`.''' def clis_closure(env, config): output_dir = env[job_name]['output_dir'] executable = os.path.join(output_dir, relative_path) return cls._generic_clis_closure(executable, env=env, config=config, default_args=default_args, extra_args=extra_args, **kwargs) return clis_closure
[docs] @classmethod def from_executable(cls, path, name=None, default_args=None, **kwargs): '''This class method creates a :class:`RunTaskFactory` from the path to an existing executable. :param str path: the path to the executable. :param name: a unique identifier for this factory. If `None` is given, the factory will use a hash of the other arguments. :type name: str or None :param default_args: The list of arguments that will be passed to the executed command by :class:`RunTask`. It may contain expressions understood by Python's format mini-language, such as ``{foo}``. These expressions can be filled when the task is generated by passing appropriate keyword arguments to the :meth:`make` method. :type default_args: list(str) or None :param kwargs: Any remaining arguments will be passed to the :meth:`__init__`. ''' d_args = [] if default_args is None else default_args LOGGER.debug('path=%r', path) LOGGER.debug('d_args=%s', d_args) name = name if name is not None else det_hash(path, d_args) return cls(partial(cls._clis_closure_from_executable, path, d_args), name=name, **kwargs)
@classmethod def _clis_closure_from_executable(cls, executable, default_args, extra_args, **kwargs): def clis_closure(env, config): return cls._generic_clis_closure(executable, env=env, config=config, default_args=default_args, extra_args=extra_args, **kwargs) return clis_closure
[docs] def __init__(self, make_closure, *, deps=None, soft_deps=None, name, **kwargs): self.make_closure = make_closure self.deps = [] if deps is None else deps self.soft_deps = [] if soft_deps is None else soft_deps self.kwargs = kwargs self.cache = {} self.name = name LOGGER.debug('creating factory with name: %s', self.name)
[docs] def make(self, *, name=None, extra_args=None, subprocess_args=None, deps=None, soft_deps=None, **kwargs): '''Create a :class:`RunTask` object. :param name: the name of the task to be generated, as a string. If `None` is passed, :class:`RunTaskFactory` will generate a name by hashing the contents of all the other arguments. :type name: str or None :param extra_args: A list of additional arguments that will be appended at the end of the command line. :type extra_args: list or None :param subprocess_args: A dictionary of arguments for the call to the :class:`.subprocess.Popen` constructor. :type subprocess_args: dict or None :param deps: A list of dependencies for the generated task. Note that factories built with :meth:`from_task` automatically inject dependencies on the given task. :type deps: list(Task) or None :param soft_deps: A list of soft dependencies for the generated task. :type soft_deps: list(Task) or None :param kwargs: Any remaining keyword arguments will be used to format the command line before execution. The environment and the configuration are available at formatting time as ``env`` and ``config``, respectively. ''' LOGGER.debug('RunFactory.make(name=%r, extra_args=%r, ' 'subprocess_args=%r, deps=%r, soft_deps=%r, **kwargs=%r)', name, extra_args, subprocess_args, deps, soft_deps, kwargs) extra_args = [] if extra_args is None else extra_args subprocess_args = {} if subprocess_args is None else subprocess_args deps = [] if deps is None else deps soft_deps = [] if soft_deps is None else soft_deps kwargs_ = self.kwargs.copy() kwargs_.update(kwargs) cli_closure = self.make_closure(extra_args, **kwargs_) # handle caching if name is None: task_name = str(det_hash(self.name, extra_args, kwargs_)) else: task_name = name task_name += '.' + self.name if task_name in self.cache: LOGGER.debug('cache hit for task name %r', task_name) cached_task = self.cache[task_name] return cached_task LOGGER.debug('cache miss for task name %r', task_name) task = RunTask(task_name, cli_closure, deps=self.deps + deps, soft_deps=self.soft_deps + soft_deps, **subprocess_args) self.cache[task_name] = task return task
[docs] def copy(self): '''Return a copy of this object.''' return self.__class__(self.make_closure, deps=self.deps.copy(), soft_deps=self.soft_deps.copy(), name=self.name, **self.kwargs)