scheduler — Task scheduling and dependency handling

This module provides classes to schedule the execution of several tasks, possibly dependent on each other.

Example usage:

>>> from valjean.cosette.task import DelayTask
>>> spam = DelayTask('spam', 0.1)
>>> eggs = DelayTask('eggs', 0.2)
>>> bacon = DelayTask('bacon', 0.2)
>>> from valjean.cosette.depgraph import DepGraph
>>> g = DepGraph.from_dependency_dictionary({
...         spam: [],
...         bacon: [spam],
...         eggs: []
...     })
>>> from valjean.cosette.scheduler import Scheduler
>>> s = Scheduler(hard_graph=g)
>>> env = s.schedule()  # executes the tasks in the correct order
exception valjean.cosette.scheduler.SchedulerError[source]

An error that may be raised by the Scheduler class.

class valjean.cosette.scheduler.Scheduler(*, hard_graph, soft_graph=None, backend=None)[source]

Schedule a number of tasks.

The Scheduler class has the responsibility of scheduling and executing a number of tasks. Here hard_graph is a DepGraph describing the hard dependencies among the tasks to be executed (see task for details about hard vs. soft dependencies); soft_graph is a DepGraph describing the soft dependencies alone; finally, backend should be an instance of a *Scheduling class such as QueueScheduling, or at any rate a class that exhibits an execute_tasks method with the correct signature (see QueueScheduling.execute_tasks). If backend is None, the default backend will be used.

Parameters:
  • hard_graph (DepGraph) – The dependency graph for hard dependencies.

  • soft_graph (DepGraph or None) – The dependency graph for soft dependencies.

  • backend (None or QueueScheduling) – The scheduling backend.

Raises:
__init__(*, hard_graph, soft_graph=None, backend=None)[source]

Initialize the scheduler with a graph.

schedule(*, config=None, env=None)[source]

Schedule the tasks!

Parameters:
  • env (Env) – An initial environment for the scheduled tasks. This allows completed tasks to inform other, dependent tasks of e.g. the location of interesting files.

  • config (Config) – the configuration object.

Returns:

The modified environment.

Scheduling backends

queue — Producer-consumer-queue backend

This module contains an implementation of a scheduling backend that leverages Python “threads” (threading module) and producer-consumer queues (queue module).

class valjean.cosette.backends.queue.QueueScheduling(n_workers=10)[source]

The default scheduling backend.

Uses threading and queue to handle concurrent execution of tasks.

__init__(n_workers=10)[source]

Initialize the queue backend.

Parameters:

n_workers (int) – The number of worker threads to use.

classmethod decide_new_state(task, deps, hard_deps, env)[source]

Look at the dependencies of the current task and make a decision about what we should do with this task.

This function returns a TaskStatus that represents the suggested new state for this task, or None in case the task should be ignored and removed from the queue.

Parameters:
  • task (Task) – the task

  • deps (list(Task)) – the dependencies of task

  • hard_deps (list(Task)) – the hard dependencies of task

  • env (Env) – the environment.

Return type:

TaskStatus or None

classmethod last_end_time(tasks, env)[source]

Return the latest end_clock time of the given tasks, or None if some of the tasks do not have an end_clock.

classmethod decide_new_state_waiting(task, deps, hard_deps, env)[source]

Decide what to do with a task that is in WAITING state.

execute_tasks(*, full_graph, hard_graph, env, config)[source]

Execute the tasks.

Parameters:
  • full_graph (DepGraph) – Full dependency graph for the tasks, i.e. including both hard and soft dependencies.

  • hard_graph (DepGraph) – Hard-dependency graph for the tasks.

  • env (Config) – An initial environment for the scheduled tasks.

  • env – The configuration object (for things like paths, etc.).

class WorkerThread(queue, env, config, cond_var)[source]

Workhorse class for QueueScheduling. This class consumes (i.e. executes) tasks passed to it through the queue.

__init__(queue, env, config, cond_var)[source]

Initialize the thread.

Parameters:
  • queue – The producer-consumer task queue.

  • env – The execution environment for the tasks.

  • config – The configuration for the tasks.

  • cond_var – A condition variable to notify when we are finished running a task

run()[source]

Main method: run this thread.