Source code for valjean.cosette.use

# Copyright French Alternative Energies and Atomic Energy Commission
# Contributors: valjean developers
# valjean-support@cea.fr
#
# This software is a computer program whose purpose is to analyze and
# post-process numerical simulation results.
#
# This software is governed by the CeCILL license under French law and abiding
# by the rules of distribution of free software. You can use, modify and/ or
# redistribute the software under the terms of the CeCILL license as circulated
# by CEA, CNRS and INRIA at the following URL: http://www.cecill.info.
#
# As a counterpart to the access to the source code and rights to copy, modify
# and redistribute granted by the license, users are provided only with a
# limited warranty and the software's author, the holder of the economic
# rights, and the successive licensors have only limited liability.
#
# In this respect, the user's attention is drawn to the risks associated with
# loading, using, modifying and/or developing or reproducing the software by
# the user in light of its specific status of free software, that may mean that
# it is complicated to manipulate, and that also therefore means that it is
# reserved for developers and experienced professionals having in-depth
# computer knowledge. Users are therefore encouraged to load and test the
# software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and, more generally,
# to use and operate it in the same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

r'''This module provides a decorator that simplifies the integration of free
Python functions in the :mod:`valjean` dependency graph.


An example
==========

Let us consider the following simple function, that counts the number of
characters in a file:

    >>> def how_many_chars(filename):
    ...     return len(open(filename).read())

Suppose that we have a task that produces some text and we want to count the
number of characters in the output. For this example, we will take the task to
be:

    >>> from valjean.cosette.run import RunTask
    >>> cmd = ['echo', 'spam']
    >>> run_task = RunTask.from_cli('run_task', cmd)

This task will produce exactly five characters of standard output [#]_, but
assume we want to check this with our `how_many_chars` function. If the task is
part of a dependency graph, we cannot call `how_many_chars` until the task has
been executed, simply because its output (which is the input to
`how_many_chars`) does not exist until then.

The solution is to bridge the gap using an :class:`Use` object:

    >>> from valjean.cosette.use import Use
    >>> use_arg_echo = Use.from_func(func=how_many_chars, task=run_task)

:class:`Use` lifts the naked function `how_many_chars` into the world of
:mod:`valjean` dependency graphs. The `how_many_chars` function is wrapped in a
:class:`~.PythonTask` that can be retrieved with the :meth:`~.get_task`
method:

    >>> how_many_task = use_arg_echo.get_task()
    >>> from valjean.cosette.pythontask import PythonTask
    >>> isinstance(how_many_task, PythonTask)
    True

The `how_many_task` object is wired in such a way that it will retrieve the
value to its argument from the ``'stdout'`` key of `run_task`'s environment
section on execution. If `run_task` has not been executed at that point or it
has failed, then of course `how_many_task` will fail, too.  The dependency of
`how_many_task` on `run_task` is made explicit:

    >>> run_task in how_many_task.depends_on
    True

.. note::

    If you want to introduce a **soft** dependency on `run_task` (instead of a
    hard one), you can use the `deps_type` constructor argument:

    >>> use_arg_echo_soft = Use.from_func(func=how_many_chars, task=run_task,
    ...                                   deps_type='soft')
    >>> how_many_task_soft = use_arg_echo_soft.get_task()
    >>> run_task in how_many_task_soft.soft_depends_on
    True

This allows :mod:`valjean` to seamlessly integrate the execution of
`how_many_task` in the dependency graph.  We can illustrate the relations
between the objects that we have created with the following diagram:

.. digraph:: depgraph
    :align: center

    compound=true;
    subgraph cluster_use {
        label="use_arg_echo: Use";
        func [label="how_many_chars: func" shape=diamond];
    }
    run_task [label="run_task: Task"];
    how_many_task [label="how_many_task: Task"]
    { rank=same run_task how_many_task }
    func -> how_many_task [label=" generates" style=dotted ltail=cluster_use];
    how_many_task -> run_task [label="depends on"];

Here we see that `use_arg_echo` has type :class:`Use` and contains the
`how_many_chars` function. Additionally, it generates `how_many_task`, which is
a :class:`~.Task` object that depends on `run_task`.


.. doctest:: cosette_use
    :hide:

    >>> from valjean.config import Config
    >>> config = Config()

We can test that `how_many_task` works by manually executing `run_task` first
and passing the resulting environment to `how_many_task`:

    >>> from valjean.cosette.env import Env
    >>> env, status = run_task.do(env=Env(), config=config)
    >>> print(status)
    TaskStatus.DONE

Note that the environment now contains the name of the file written by
`run_task` in the ``'stdout'`` key:

    >>> env[run_task.name]['stdout']
    '/.../run_task/stdout'

We can now execute `how_many_task`:

    >>> env_up, status = how_many_task.do(env=env, config=config)
    >>> print(status)
    TaskStatus.DONE

The ``'result'`` key now holds the return value of `how_many_chars`:

    >>> env_up[how_many_task.name]['result']
    5


:class:`Use` objects are wrappers
---------------------------------

:class:`Use` also behaves as a wrapper of `how_many_chars`.  You can still call
the underlying wrapped function with explicit parameters if you wish:

    >>> with open('test.txt', 'w') as f:
    ...     _ = f.write('lobster Thermidor\n')
    >>> use_arg_echo('test.txt')
    18
    >>> len('lobster Thermidor\n')
    18

This simplifies testing.


Argument injection via the :func:`using` decorator
--------------------------------------------------

A practical way to create :class:`Use` objects is to call the :func:`using`
decorator:

    >>> @using(task=run_task)
    ... def how_many_chars(filename):
    ...     return len(open(filename).read())

Now `how_many_chars` is itself a :class:`Use` object:

    >>> isinstance(how_many_chars, Use)
    True

This is exactly equivalent to what we did earlier, except for the fact that the
:class:`Use` object now does not have a different name:

.. digraph:: depgraph
    :align: center

    compound=true;
    subgraph cluster_use {
        label="how_many_chars: Use";
        fontcolor=red;
        func [label="how_many_chars: func" shape=diamond];
    }
    run_task [label="run_task: Task"];
    how_many_task [label="how_many_task: Task"]
    { rank=same run_task how_many_task }
    func -> how_many_task [label=" generates" style=dotted ltail=cluster_use];
    how_many_task -> run_task [label="depends on"];

You can still call the underlying `how_many_chars` by calling the :class:`Use`
object as if it were a normal function, as discussed above. Just like before,
we can generate a task from the :class:`Use` object:

    >>> how_many_task = how_many_chars.get_task()
    >>> isinstance(how_many_task, PythonTask)
    True


Injecting multiple arguments
----------------------------

If your function expects multiple arguments to be fed from previous tasks,
you can stack multiple :class:`Use` decorators:

    >>> name_task = RunTask.from_cli('name_task', ['echo', 'Arthur'])
    >>> job_task = RunTask.from_cli('job_task',
    ...                             ['echo', 'King of the Britons'])
    >>> @using(task=name_task)
    ... @using(task=job_task)
    ... def introduce(name_filename, job_filename):
    ...     with open(name_filename) as name_file:
    ...         name = name_file.read().rstrip()
    ...     with open(job_filename) as job_file:
    ...         job = job_file.read().rstrip()
    ...     return 'I am ' + name + ', ' + job + '!'

The resulting task has the correct dependencies, i.e. it depends on both
`name_task` and `job_task`:

    >>> intro_task = introduce.get_task()
    >>> name_task in intro_task.depends_on
    True
    >>> job_task in intro_task.depends_on
    True

We can represent the dependency structure of the generated tasks with the
following graph:

.. digraph:: depgraph
    :align: center

    compound=true;
    subgraph cluster_intro {
      label="introduce: Use";
      introduce [label="introduce: func" shape=diamond];
    }
    name [label="name_task: Task"];
    job [label="job_task: Task"];
    "intro_task: Task" -> name [label="depends on"];
    "intro_task: Task" -> job [label="depends on"];
    introduce -> "intro_task: Task"
      [label=" generates" style=dotted ltail=cluster_intro];


If we manually execute `name_task` and `job_task`, we can then execute
`intro_task`:

    >>> env = Env()
    >>> for task in [name_task, job_task, intro_task]:
    ...     env_up, _ = task.do(env=env, config=config)
    ...     env.apply(env_up)
    >>> print(env[intro_task.name]['result'])
    I am Arthur, King of the Britons!

Keyword arguments
-----------------

In the previous example, note that the order of the decorators is important!
The outer decorator (`name_task`) injects the first argument (`name_filename`),
and the inner one (`job_task`) injects the second argument (`job_filename`). If
we nest the decorators the other way around, we inverse the order of the
arguments:

    >>> @using(task=job_task)
    ... @using(task=name_task)
    ... def introduce_inv(name_filename, job_filename):
    ...     with open(name_filename) as name_file:
    ...         name = name_file.read().rstrip()
    ...     with open(job_filename) as job_file:
    ...         job = job_file.read().rstrip()
    ...     return 'I am ' + name + ', ' + job + '!'
    >>> intro_inv_task = introduce_inv.get_task()
    >>> for task in [name_task, job_task, intro_inv_task]:
    ...     env_up, _ = task.do(env=env, config=config)
    ...     env.apply(env_up)
    >>> print(env[intro_inv_task.name]['result'])
    I am King of the Britons, Arthur!

If you are injecting many arguments, it may be a good idea to use the `kwarg`
argument to :func:`using`, which allows you to inject values into arguments by
name instead of by position:

    >>> @using(kwarg='name_filename', task=name_task)
    ... @using(kwarg='job_filename', task=job_task)
    ... def introduce_kwargs(name_filename, job_filename):
    ...     return introduce(name_filename, job_filename)
    >>> @using(kwarg='job_filename', task=job_task)
    ... @using(kwarg='name_filename', task=name_task)
    ... def introduce_kwargs_swap(name_filename, job_filename):
    ...     return introduce(name_filename, job_filename)

Note that the `introduce_kwargs` and `introduce_kwargs_swap` functions are
identical, but the `@using` decorators appear in reverse order. In the previous
example, this mattered and we ended up with ``I am King of the Britons,
Arthur!``. But since now we are using the `kwarg` argument, the order of the
decorators does not matter any more.  Both `introduce_kwargs` and
`introduce_kwargs_swap` will inject the arguments as expected:

    >>> intro_kwargs_task = introduce_kwargs.get_task()
    >>> intro_kwargs_swap_task = introduce_kwargs_swap.get_task()
    >>> env_up, _ = intro_kwargs_task.do(env=env, config=config)
    >>> print(env_up[intro_kwargs_task.name]['result'])
    I am Arthur, King of the Britons!
    >>> env_up, _ = intro_kwargs_swap_task.do(env=env, config=config)
    >>> print(env_up[intro_kwargs_swap_task.name]['result'])
    I am Arthur, King of the Britons!


Modifying an existing :class:`Use` decorator
--------------------------------------------

You can pipe the result of the task generated by a :class:`Use` decorator by
using the :meth:`Use.map` method:

    >>> intro_stars = introduce.map(lambda x: '★★★' + x + '★★★')

Here `intro_stars` is a new :class:`Use` object that will apply the specified
function (here, a lambda that surrounds its argument with ``★★★`` cute little
stars ``★★★``) to the result of `intro_task` (see above). The task associated
to `intro_stars` depends on `intro_task`:

    >>> intro_stars_task = intro_stars.get_task()
    >>> intro_task in intro_stars_task.depends_on
    True

In our schematic representation, `intro_stars` and `intro_stars_task` would
appear like this:

.. digraph:: depgraph
    :align: center

    compound=true;
    subgraph cluster_intro {
        label="introduce: Use";
        introduce [label="introduce: func" shape=diamond];
    }
    name [label="name_task: Task"];
    job [label="job_task: Task"];
    intro_task [label="intro_task: Task"];
    intro_task -> name [label="depends on"];
    intro_task -> job [label="depends on"];
    introduce -> intro_task
      [label=" generates" style=dotted ltail=cluster_intro];

    subgraph cluster_stars {
        label="intro_stars: Use";
        color=red;
        fontcolor=red;
        stars [label=": <lambda>" shape=diamond color=red fontcolor=red];
    }
    intro_stars_task
      [label="intro_stars_task: Task" color=red fontcolor=red];
    stars -> intro_stars_task
      [label=" generates" style=dotted ltail=cluster_stars];
    intro_stars_task -> intro_task [label="depends on"];

    { rank=same intro_task intro_stars_task }


If we execute `intro_stars_task`, we find that our string now has stars!

    >>> env_up, _ = intro_stars_task.do(env=env, config=config)
    >>> env.apply(env_up)
    >>> print(env[intro_stars_task.name]['result'])
    ★★★I am Arthur, King of the Britons!★★★


.. rubric:: Footnotes

.. [#] The proof is left to the reader as an exercise.


Injecting arguments into tasks created by a :class:`~.RunTaskFactory`
=====================================================================

There is a typical use case for :class:`Use` which consists in injecting the
result of a :class:`~.RunTask` generated by a :class:`~.RunTaskFactory` as an
argument to a subsequent task `B`. Since this use case is common, there is a
special :class:`UseRun` class that helps reducing the boilerplate.

First, we create a :class:`~.RunTaskFactory` object:

    >>> from valjean.cosette.run import RunTaskFactory
    >>> factory = RunTaskFactory.from_executable('echo')

Instead of creating :class:`~.RunTask` objects and wrapping them in
:class:`Use` by hand, we instantiate a :class:`UseRun` object:

    >>> using_run = UseRun.from_factory(factory)

The :class:`UseRun` object will create the :class:`~.RunTask` objects on the
fly, when invoked as a decorator:

    >>> @using_run(extra_args=['spam'])
    ... def read_and_check_text(filename):
    ...     with open(filename) as f:
    ...         return f.read() == 'spam\n'

The decorated function is simply an instance of the :class:`Use` class that we
all know and love:

    >>> isinstance(read_and_check_text, Use)
    True

so we can call :meth:`Use.get_task` to generate a task:

    >>> read_and_check_task = read_and_check_text.get_task()
    >>> run_task = next(iter(read_and_check_task.depends_on))

Since a picture is worth a thousand words, the objects have the following
structure:

.. digraph:: depgraph
    :align: center

    compound=true;
    using_run [label="using_run: UseRun" shape=box];
    subgraph cluster_read {
        label="read_and_check_text: Use";
        read [label="read_and_check_text: func" shape=diamond];
        factory [label="factory: RunTaskFactory"];
    }
    using_run -> read [label=" generates" lhead=cluster_read style=dotted];
    factory -> run_task [label="generates" style=dotted];
    read_task [label="read_and_check_task: Task"];
    run_task [label="run_task: Task"];
    read_task -> run_task [label="depends on"];
    read -> read_task [label=" generates" style=dotted ltail=cluster_read];
    { rank=same read_task run_task }

Let's check that our tasks work as expected:

    >>> env, _ = run_task.do(env=Env(), config=config)
    >>> env, _ = read_and_check_task.do(env=env, config=config)
    >>> print(env[read_and_check_task.name]['result'])
    True


Pipelines
---------

This is already nice! However, if you often want to perform the same additional
actions after creating your :class:`~.RunTask`, you can cut the boilerplate
further with the :meth:`UseRun.map` method. In our example, we might want to
factorize out the ``with open(filename) as f`` bit and focus in the text
comparison:

    >>> def slurp(filename):
    ...     with open(filename) as f:
    ...         return f.read()
    >>> using_run_as_text = using_run.map(slurp)

The  :meth:`UseRun.map` method creates a new :class:`UseRun` object which will
enqueue the specified function (as a task) to be executed on the result of the
:class:`~.RunTask`. We can simplify `read_and_check_text` from above as
follows:

    >>> @using_run_as_text(extra_args=['spam'])
    ... def check_text(text):
    ...     return text == 'spam\n'

The diagram now looks like this:

.. digraph:: depgraph
    :align: center

    compound=true;
    using_run [label="using_run: UseRun" shape=box];
    using_run_as_text [label="using_run_as_text: UseRun" shape=box];
    using_run -> using_run_as_text [label=" map(slurp)" style=dotted];
    subgraph cluster_check {
        label="check_text: Use";
        slurp [label="slurp: func" shape=diamond];
        check [label="check_text: func" shape=diamond];
        factory [label="factory: RunTaskFactory"];
        { rank=same slurp check }
    }
    using_run_as_text -> slurp
      [label=" generates" lhead=cluster_check style=dotted];
    factory -> run_task [label=" generates" style=dotted];
    check_task [label="check_task: Task"];
    slurp_task [label="slurp_task: Task"];
    run_task [label="run_task: Task"];
    check_task -> slurp_task [label="depends on"];
    slurp_task -> run_task [label="depends on"];
    slurp -> slurp_task [label=" generates" style=dotted ltail=cluster_check];
    check -> check_task [label=" generates" style=dotted ltail=cluster_check];
    { rank=same slurp_task check_task run_task }

Checking that this works is a bit more laborious, because we now have one extra
task to run; in the real world, of course, :mod:`valjean` would take care of
running the tasks via a dependency graph and we wouldn't need to worry about
any of this:

    >>> check_task = check_text.get_task()
    >>> slurp_task = next(iter(check_task.depends_on))
    >>> run_task = next(iter(slurp_task.depends_on))
    >>> env = Env()
    >>> for task in [run_task, slurp_task, check_task]:
    ...     env_up, _ = task.do(env=env, config=config)
    ...     env.apply(env_up)
    >>> print(env[check_task.name]['result'])
    True

You can also chain :meth:`UseRun.map` calls:

    >>> using_run_as_stripped_text = using_run.map(slurp) \
    ...                                       .map(lambda x: x.strip())
    >>> @using_run_as_stripped_text(extra_args=['spam'])
    ... def check_stripped_text(text):
    ...     return text == 'spam'  # note that the \n is gone

Proof that it works:

    >>> check_stripped_task = check_stripped_text.get_task()
    >>> strip_task = next(iter(check_stripped_task.depends_on))
    >>> slurp_task = next(iter(strip_task.depends_on))
    >>> run_task = next(iter(slurp_task.depends_on))
    >>> env = Env()
    >>> for task in [run_task, slurp_task, strip_task, check_stripped_task]:
    ...     env_up, _ = task.do(env=env, config=config)
    ...     env.apply(env_up)
    >>> print(env[check_stripped_task.name]['result'])
    True

and a final diagram to show the dependencies among the objects:

.. digraph:: depgraph
    :align: center

    compound=true;
    using_run [label="using_run: UseRun" shape=box];
    using_run_anonymous
      [label="<anonymous>: UseRun" shape=box];
    using_run_as_stripped_text
      [label="using_run_as_stripped_text: UseRun" shape=box];
    using_run -> using_run_anonymous [label=" map(slurp)" style=dotted];
    using_run_anonymous -> using_run_as_stripped_text
      [label=" map(strip)" style=dotted];
    subgraph cluster_check_stripped {
        label="check_stripped_text: Use";
        slurp [label="slurp: func" shape=diamond];
        strip [label="strip: func" shape=diamond];
        check_stripped [label="check_stripped_text: func" shape=diamond];
        factory [label="factory: RunTaskFactory"];
        { rank=same slurp check_stripped strip }
    }
    using_run_as_stripped_text -> slurp
      [label=" generates" lhead=cluster_check_stripped style=dotted];
    factory -> run_task [label=" generates" style=dotted];
    check_stripped_task [label="check_stripped_task: Task"];
    strip_task [label="strip_task: Task"];
    slurp_task [label="slurp_task: Task"];
    run_task [label="run_task: Task"];
    check_stripped_task -> strip_task [label="depends on"];
    strip_task -> slurp_task [label="depends on"];
    slurp_task -> run_task [label="depends on"];
    strip -> strip_task
      [label=" generates" style=dotted ltail=cluster_check_stripped];
    slurp -> slurp_task
      [label=" generates" style=dotted ltail=cluster_check_stripped];
    check_stripped -> check_stripped_task
      [label=" generates" style=dotted ltail=cluster_check_stripped];
    { rank=same slurp_task check_stripped_task run_task strip_task }


Module API
==========
'''

import logging
from pathlib import Path
from functools import update_wrapper

from .task import TaskStatus
from ..path import ensure, sanitize_filename
from .pythontask import PythonTask


LOGGER = logging.getLogger(__name__)


[docs]def from_env(*, env, task_name, key): '''Helper function to extract a value from the environment for argument injection. :param env: the environment. :type env: :class:`~.Env` :param str task_name: the name of the task to look up. :param key: the key to look up. If `None` is given, return the whole key-value pair associated to `task_name`. :type key: str or None :raises KeyError: if the required task_name or key are not available. ''' try: task_env = env[task_name] except KeyError: LOGGER.error('Task %r is required for argument injection, but the ' 'task is not in the environment', task_name) raise if key is None: return task_name, task_env try: task_result = task_env[key] except KeyError: LOGGER.error('Task %r is required for argument injection, but I ' 'could not find the %r key in the result environment', task_name, key) raise return task_result
[docs]class Use: '''A function wrapper around a free Python function. Lifts the function into a :mod:`~.valjean` :class:`~.Task`.''' _CACHE = {} _USE_CACHING = True
[docs] @classmethod def from_func(cls, *, func, task, key='result', kwarg=None, deps_type='hard', serialize=False): # pylint: disable=too-many-arguments '''Create a :class:`Use` from a function. :param func: a function or a callable object. :param task: the task whose result should be injected as an argument to `func`. :type task: :class:`~.Task` :param str key: the name of the key that contains the task result in the environment. :param kwarg: the name of the keyword argument to `func` that must be fed with the `task` result. If None, the result of `task` will be passed as a positional argument. :type kwarg: None or str :param str deps_type: whether the created task should have a hard or a soft dependency towards the injected tasks. Possible values are ``'hard'`` and ``'soft'``. :param bool serialize: if `True`, the result of this task will be written to the output directory. ''' if isinstance(func, cls): # when we decorate another Use object, we just extend the decorated # object's inj_args list and inj_kwargs dictionary LOGGER.debug('decorating a "%s" object', cls.__name__) inj_args = func.inj_args.copy() inj_kwargs = func.inj_kwargs.copy() else: LOGGER.debug('decorating a free function') inj_args = [] inj_kwargs = {} if kwarg is None: inj_args.append((task, key)) else: inj_kwargs[kwarg] = (task, key) LOGGER.debug('inj_args = %s', inj_args) LOGGER.debug('inj_kwargs = %s', inj_kwargs) return cls(inj_args=inj_args, inj_kwargs=inj_kwargs, wrapped=func, deps_type=deps_type, serialize=serialize)
[docs] def __init__(self, *, inj_args=None, inj_kwargs=None, wrapped, deps_type='hard', serialize=False): '''Instantiate a :class:`Use` by providing all the necessary information. :param inj_args: an iterable of `(task, key)` pairs specifying which part of what task result should be injected as a positional argument. See :class:`Use` for the meaning of `key`. :type inj_args: tuple((Task, str or None)) or list((Task, str or None)) :param inj_kwargs: a dictionary associating any kwarg names to a `(task, key)` pair. The specified task result will be injected as the given kwarg. :type inj_kwargs: dict(str, (Task, str or None)) :param wrapped: the function to wrap. :param str deps_type: whether the created task should have a hard or a soft dependency towards the injected tasks. Possible values are ``'hard'`` and ``'soft'``. :param bool serialize: if `True`, the result of this task will be written to the output directory. ''' self.inj_args = () if inj_args is None else inj_args self.inj_kwargs = {} if inj_kwargs is None else inj_kwargs self.wrapped = wrapped if isinstance(wrapped, self.__class__): func_name = wrapped.func_name else: func_name = (wrapped.__qualname__ if wrapped.__name__ is None else wrapped.__name__) update_wrapper(self, wrapped) self.func_name = func_name expected_deps_type = ['hard', 'soft'] if deps_type not in expected_deps_type: raise ValueError('deps_type argument to Use must be one of ' f'{expected_deps_type}') self.deps_type = deps_type self.serialize = bool(serialize)
[docs] def get_task(self): '''Return the task that executes the decorated function.''' from itertools import chain if self.deps_type == 'soft': soft_deps = set(value[0] for value in chain(self.inj_kwargs.values(), self.inj_args)) deps = set() else: deps = set(value[0] for value in chain(self.inj_kwargs.values(), self.inj_args)) soft_deps = set() if deps: pytask_name = (','.join(sorted(dep.name for dep in deps)) + '.' + self.func_name) else: pytask_name = self.func_name if self._USE_CACHING and pytask_name in self._CACHE: LOGGER.debug('returning cached task %s', pytask_name) return self._CACHE[pytask_name] LOGGER.debug('creating %s task', pytask_name) LOGGER.debug(' args: %s', self.inj_args) LOGGER.debug(' kwargs: %s', self.inj_kwargs) LOGGER.debug(' deps_type: %s', self.deps_type) LOGGER.debug(' will depend on %s', deps) LOGGER.debug(' will soft-depend on %s', soft_deps) def inject_from_env(*, env, config): # Prepare the args for the function; we loop over self.inj_args in # reverse because stacked decorators are invoked from the inside # out. If we do not reverse the list, the outermost decorator will # provide the last positional argument. args = [] for task, key in reversed(self.inj_args): task_result = from_env(env=env, task_name=task.name, key=key) args.append(task_result) # prepare the kwargs for the function kwargs = {} for kwarg, (task, key) in self.inj_kwargs.items(): task_result = from_env(env=env, task_name=task.name, key=key) kwargs[kwarg] = task_result LOGGER.debug('injecting args: %s', args) LOGGER.debug('injecting kwargs: %s', kwargs) result = self.wrapped(*args, **kwargs) env_up = {pytask_name: {'result': result}} if self.serialize: output_dir = Path(config.query('path', 'output-root'), sanitize_filename(pytask_name)) ensure(output_dir, is_dir=True) env_up[pytask_name]['output_dir'] = str(output_dir) return env_up, TaskStatus.DONE task = PythonTask(pytask_name, inject_from_env, deps=deps, soft_deps=soft_deps, env_kwarg='env', config_kwarg='config') self._CACHE[pytask_name] = task return task
[docs] def __call__(self, *args, **kwargs): '''Redirect the call to the wrapped function.''' return self.wrapped(*args, **kwargs)
[docs] def map(self, func): '''Create a new :class:`Use` object that applies `func` to the result of the task defined by `self`.''' return Use.from_func(func=func, task=self.get_task())
[docs]def using(*, key='result', task, kwarg=None): '''Make it possible to instantiate :class:`Use` as a decorator. See :meth:`Use.__init__` for a description of the parameters. ''' def decorator(wrapped): LOGGER.debug('wrapping %s into a Use object', wrapped) return Use.from_func(task=task, key=key, kwarg=kwarg, func=wrapped) return decorator
[docs]class UseRun: '''Produce :class:`Use` decorators from a :class:`~.RunTaskFactory`.'''
[docs] @classmethod def from_factory(cls, factory): '''Create a :class:`UseRun` from a :class:`~.RunTaskFactory`. Given a :class:`~.RunTaskFactory`, the :class:`UseRun` class can be used to construct :class:`~.RunTask` objects on demand and inject their results into the decorated function. :param RunTaskFactory factory: a :class:`~.RunTaskFactory` object. ''' return cls(factory, [])
[docs] def __init__(self, factory, posts): '''Instantiate a :class:`UseRun` object. Given a :class:`~.RunTaskFactory`, the :class:`UseRun` class can be used to construct :class:`~.RunTask` objects on demand and inject their results into the decorated function. The `posts` parameters is a list of functions that will be sequentially applied to the result of the generated :class:`~.RunTask`. Each function becomes a new :class:`~.PythonTask` object depending on the result of the previous one. :param RunTaskFactory factory: a :class:`~.RunTaskFactory` object. :param list posts: a collection of post-processing functions. Each function will be converted to a :class:`~.PythonTask` object. ''' self.factory = factory self.posts = posts.copy()
[docs] def __call__(self, kwarg=None, **kwargs): task = self.factory.make(**kwargs) LOGGER.debug('mapping %s functions on the output of %s', len(self.posts), task.name) for post in self.posts: use = Use.from_func(task=task, key='result', func=post) task = use.get_task() return using(kwarg=kwarg, task=task, key='result')
[docs] def copy(self): '''Return a copy of this object.''' return UseRun(self.factory.copy(), self.posts.copy())
[docs] def map(self, func): '''Create a new instance of :class:`UseRun` by extending the list of post-processing functions with a new one. :param func: a function with one argument to be applied to the result of `self`. ''' copy = self.copy() copy.posts.append(func) return copy