Source code for valjean.cambronne.commands.run

# Copyright French Alternative Energies and Atomic Energy Commission
# Contributors: valjean developers
# valjean-support@cea.fr
#
# This software is a computer program whose purpose is to analyze and
# post-process numerical simulation results.
#
# This software is governed by the CeCILL license under French law and abiding
# by the rules of distribution of free software. You can use, modify and/ or
# redistribute the software under the terms of the CeCILL license as circulated
# by CEA, CNRS and INRIA at the following URL: http://www.cecill.info.
#
# As a counterpart to the access to the source code and rights to copy, modify
# and redistribute granted by the license, users are provided only with a
# limited warranty and the software's author, the holder of the economic
# rights, and the successive licensors have only limited liability.
#
# In this respect, the user's attention is drawn to the risks associated with
# loading, using, modifying and/or developing or reproducing the software by
# the user in light of its specific status of free software, that may mean that
# it is complicated to manipulate, and that also therefore means that it is
# reserved for developers and experienced professionals having in-depth
# computer knowledge. Users are therefore encouraged to load and test the
# software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and, more generally,
# to use and operate it in the same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

'''Module for the ``run`` subcommand.'''

import logging
from pathlib import Path

from ..common import JobCommand, read_env, write_env, build_graphs
from ...cosette.backends.queue import QueueScheduling
from ...chrono import Chrono
from ...cosette.scheduler import Scheduler
from ...cosette.task import TaskStatus
from ...path import ensure


LOGGER = logging.getLogger(__name__)


[docs]class RunCommand(JobCommand): '''Command class for the ``run`` subcommand.''' NAME = 'run' HELP = 'Run the tasks defined in a job file.' DESC = ('Run the tasks defined by the job() function in JOB_FILE, passing ' 'the given arguments (JOB_ARG) and keyword arguments (-k option) ' 'to job().')
[docs] def register(self, parser): '''Register options for this command in the parser.''' super().register(parser) parser.add_argument('-j', '--workers', action='store', default=4, type=int, help='number of workers to use in parallel') parser.add_argument('--env-filename', action='store', default='valjean.env', help='name of the files that contain the ' 'persistent environment (default: valjean.env)') parser.add_argument('--env-format', action='store', choices=('pickle',), default='pickle', help='environment persistency format') parser.set_defaults(func=self.execute)
[docs] def execute(self, args, config): '''Execute the ``run`` command.''' with Chrono() as chrono: hard_graph, soft_graph = build_graphs(args) LOGGER.info('graphs built in %s seconds', chrono) LOGGER.info('hard_graph contains %d tasks', len(hard_graph)) LOGGER.info('soft_graph contains %d tasks', len(soft_graph)) LOGGER.info('will schedule up to %d tasks in parallel', args.workers) # store the args in the config, for later retrieval config['args'] = vars(args) output_root = config.query('path', 'output-root') # we extract the tasks from the hard-dependency graph; both graphs # contain the same nodes anyway tasks = hard_graph.nodes() task_names = [task.name for task in tasks] env = read_env(root=output_root, names=task_names, filename=args.env_filename, fmt=args.env_format) new_env = schedule(hard_graph=hard_graph, soft_graph=soft_graph, env=env, config=config, workers=args.workers) self.task_diagnostics(tasks=tasks, env=new_env, config=config) write_env(env, filename=args.env_filename, fmt=args.env_format) return new_env
[docs] @classmethod def task_diagnostics(cls, *, tasks, env, config): '''Emit diagnostic messages about the status of the tasks. Count how many have succeeded, how many have failed, etc. If any tasks have failed, this method writes their names in a file called 'failed_tasks' in the log directory. :param tasks: the tasks that have been scheduled. :type tasks: list(Task) :param Env env: the environment. :param Config config: the configuration object. ''' from collections import Counter count_status = Counter() missing = [] failed = [] for task in tasks: task_name = task.name if task_name in env: status = env[task.name]['status'] count_status[status] += 1 if status == TaskStatus.FAILED: failed.append(task_name) else: missing.append(task_name) if missing: LOGGER.warning('the following %s tasks are missing from the ' 'environment: %s', len(missing), '\n '.join(missing)) total_graph = len(tasks) msgs = (f'{status.name:>7}: {count}/{total_graph} ' f'({100*count/total_graph:5.1f}%)' for status, count in count_status.items()) LOGGER.info('final environment statistics:\n %s', '\n '.join(msgs)) cls.write_failed_tasks(failed=failed, config=config)
[docs] @classmethod def write_failed_tasks(cls, *, failed, config): '''Write the names of the failed tasks in the ``failed_tasks`` file. :param list(str) failed: the names of the failed tasks. :param Config config: the configuration object. ''' if not failed: return log_root = config.query('path', 'log-root') ensure(log_root, is_dir=True) failed_fname = Path(log_root, 'failed-tasks') with failed_fname.open('w', encoding='utf-8') as failed_file: for task in failed: failed_file.write(task + '\n')
[docs]def schedule(*, hard_graph, soft_graph, env, config=None, workers=1): '''Schedule a graph for execution. ''' scheduler = Scheduler(hard_graph=hard_graph, soft_graph=soft_graph, backend=QueueScheduling(workers)) new_env = scheduler.schedule(env=env, config=config) LOGGER.debug('resulting environment: %s', new_env) return new_env