# Copyright French Alternative Energies and Atomic Energy Commission
# Contributors: valjean developers
# valjean-support@cea.fr
#
# This software is a computer program whose purpose is to analyze and
# post-process numerical simulation results.
#
# This software is governed by the CeCILL license under French law and abiding
# by the rules of distribution of free software. You can use, modify and/ or
# redistribute the software under the terms of the CeCILL license as circulated
# by CEA, CNRS and INRIA at the following URL: http://www.cecill.info.
#
# As a counterpart to the access to the source code and rights to copy, modify
# and redistribute granted by the license, users are provided only with a
# limited warranty and the software's author, the holder of the economic
# rights, and the successive licensors have only limited liability.
#
# In this respect, the user's attention is drawn to the risks associated with
# loading, using, modifying and/or developing or reproducing the software by
# the user in light of its specific status of free software, that may mean that
# it is complicated to manipulate, and that also therefore means that it is
# reserved for developers and experienced professionals having in-depth
# computer knowledge. Users are therefore encouraged to load and test the
# software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and, more generally,
# to use and operate it in the same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
#
# -*- coding: utf-8 -*-
'''This module collects a few task classes that can be used with the
:mod:`~.scheduler` and :mod:`~.depgraph` modules.
This module defines a dummy :class:`Task` class that may be used as a base
class and extended.
The :meth:`.Task.do()` method takes two arguments:
* `env` is an environment for task execution. The idea of the environment is
that tasks may use it to store information about their execution. For
instance, a task may create a file and store its location in the environment,
so that later tasks may be able to retrieve it. The type of `env` is really
immaterial, but it is probably natural to use a key-value mapping of some
kind. Note, however, that most of the tasks defined in the :mod:`~.task`
module hierarchy expect `env` to be an :class:`~.Env` object.
* `config` is a :class:`~.config.Config` object describing the configuration
for the current run. Tasks may look up global configuration values here.
The :class:`Task` class models two types of inter-task dependencies. **Hard
dependencies** represent dependencies that are crucial for the execution of the
task at hand. If task `A` has a hard dependency on task `B`, it means that `A`
cannot run unless `B` has successfully completed. If `B` fails, then it makes
no sense to run `A`. On the other hand, if task `A` has a **soft dependency**
on task `B`, it means that `A` will not start before `B`'s termination, but it
makes sense to run `A` even if `B` fails.
'''
import enum
import json
from abc import ABC, abstractmethod
import logging
LOGGER = logging.getLogger(__name__)
#: Enumeration for the task status. The possible values are:
#:
#: * ``WAITING`` (the task is waiting to be scheduled)
#: * ``PENDING`` (the task is under execution)
#: * ``DONE`` (the task was executed and it succeeded)
#: * ``FAILED`` (the task was executed and it failed)
#: * ``SKIPPED`` (the task was skipped by the scheduler; this may happen, for
#: instance, if the one of the task dependencies was not successful)
TaskStatus = enum.Enum('TaskStatus', # pylint: disable=invalid-name
'WAITING PENDING DONE FAILED SKIPPED')
[docs]
class TaskError(Exception):
'''An error that may be raised by :class:`~Task` classes.'''
[docs]
class Task(ABC):
'''Base class for other task classes.'''
[docs]
def __init__(self, name, *, deps=None, soft_deps=None):
'''Initialize the task.
:param str name: The name of the task. Task names **must** be unique!
:param deps: The list of (hard) dependencies for this task. It must be
either `None` (i.e. no dependencies) or list of
:class:`Task` objects.
:type deps: list(Task) or None
:param soft_deps: The list of soft dependencies for this task. It must
be either `None` (i.e. no dependencies) or list of
:class:`Task` objects.
:type soft_deps: list(Task) or None
'''
LOGGER.debug('creating task %s', name)
self.name = name
self.depends_on = set()
if deps is not None:
if not isinstance(deps, (tuple, list, set)):
errmsg = ('The `deps` task argument must '
'be either a collection of tasks or None; '
f'type {type(deps)} found')
raise TypeError(errmsg)
self.depends_on.update(deps)
self.soft_depends_on = set()
if soft_deps is not None:
if not isinstance(soft_deps, (tuple, list, set)):
errmsg = ('The `soft_deps` task argument must '
'be either a collection of tasks or None; '
f'type {type(soft_deps)} found')
raise TypeError(errmsg)
self.soft_depends_on.update(soft_deps)
[docs]
@abstractmethod
def do(self, env, config):
'''Perform a task.
:param env: The environment for this task.
'''
raise NotImplementedError('do() not implemented for Task')
[docs]
def __str__(self):
return repr(self.name) # use repr() to add quotes around the task name
[docs]
def __repr__(self):
return f"Task('{self.name}')"
[docs]
def add_dependency(self, dep):
'''Add an item to the list of dependencies of this task.'''
self.depends_on.add(dep)
[docs]
def depends(self, other):
'''Return `True` if `self` depends on `other`.'''
return other in self.depends_on
[docs]
def soft_depends(self, other):
'''Return `True` if `self` has a soft dependency on `other`.'''
return other in self.soft_depends_on
[docs]
class DelayTask(Task):
'''Task that waits for the specified number of seconds. This task is useful
to test scheduling algorithms under different load conditions.
'''
[docs]
def __init__(self, name, delay=1.):
'''Initialize the task from a given delay.
:param float delay: The amount of time (in seconds) that this task will
wait when executed.
'''
super().__init__(name)
self.delay = float(delay)
[docs]
def do(self, env, config):
'''Perform the task (i.e. sleep; I wish my life was like that).
:param env: The environment. Ignored.
'''
from time import sleep
LOGGER.info('DelayTask %s sleeping %f seconds...',
self, self.delay)
sleep(self.delay)
LOGGER.info('DelayTask %s waking up!', self)
return {}, TaskStatus.DONE
[docs]
def det_hash(*args):
'''Produce a deterministic hash for the collection of objects passed as
an argument.'''
from hashlib import sha256
hasher = sha256()
for thing in args:
LOGGER.debug('hashing: %r', thing)
json_thing = json.dumps(thing, sort_keys=True)
LOGGER.debug('in json: %s', json_thing)
hasher.update(json_thing.encode('utf-8'))
digest = hasher.hexdigest()
LOGGER.debug('resulting hash: %s', digest)
return digest
[docs]
def close_dependency_graph(tasks):
'''Return the tasks along with all their dependencies.
:param list tasks: A list of tasks.
:returns: The list of tasks, their dependencies, the dependencies of their
dependencies and so on.
:rtype: list(Task)
'''
queue = set(tasks)
all_tasks = queue.copy()
while queue:
deps = set(dep for task in queue for dep in task.depends_on
if task.depends_on is not None)
soft_deps = set(dep for task in queue for dep in task.soft_depends_on
if task.soft_depends_on is not None)
all_tasks.update(deps)
all_tasks.update(soft_deps)
queue = deps | soft_deps
return list(all_tasks)