scheduler
— Task scheduling and dependency handling
This module provides classes to schedule the execution of several tasks, possibly dependent on each other.
Example usage:
>>> from valjean.cosette.task import DelayTask
>>> spam = DelayTask('spam', 0.1)
>>> eggs = DelayTask('eggs', 0.2)
>>> bacon = DelayTask('bacon', 0.2)
>>> from valjean.cosette.depgraph import DepGraph
>>> g = DepGraph.from_dependency_dictionary({
... spam: [],
... bacon: [spam],
... eggs: []
... })
>>> from valjean.cosette.scheduler import Scheduler
>>> s = Scheduler(hard_graph=g)
>>> env = s.schedule() # executes the tasks in the correct order
- exception valjean.cosette.scheduler.SchedulerError[source]
An error that may be raised by the
Scheduler
class.
- class valjean.cosette.scheduler.Scheduler(*, hard_graph, soft_graph=None, backend=None)[source]
Schedule a number of tasks.
The Scheduler class has the responsibility of scheduling and executing a number of tasks. Here hard_graph is a
DepGraph
describing the hard dependencies among the tasks to be executed (seetask
for details about hard vs. soft dependencies); soft_graph is aDepGraph
describing the soft dependencies alone; finally, backend should be an instance of a *Scheduling class such asQueueScheduling
, or at any rate a class that exhibits anexecute_tasks
method with the correct signature (seeQueueScheduling.execute_tasks
). If backend is None, the default backend will be used.- Parameters:
hard_graph (DepGraph) – The dependency graph for hard dependencies.
soft_graph (DepGraph or None) – The dependency graph for soft dependencies.
backend (None or QueueScheduling) – The scheduling backend.
- Raises:
ValueError – if depgraph is not an instance of
DepGraph
.SchedulerError – if the tasks do not have any
do()
method.
- __init__(*, hard_graph, soft_graph=None, backend=None)[source]
Initialize the scheduler with a graph.
Scheduling backends
queue
— Producer-consumer-queue backend
This module contains an implementation of a scheduling backend that
leverages Python “threads” (threading
module) and producer-consumer
queues (queue
module).
- class valjean.cosette.backends.queue.QueueScheduling(n_workers=10)[source]
The default scheduling backend.
Uses
threading
andqueue
to handle concurrent execution of tasks.- __init__(n_workers=10)[source]
Initialize the queue backend.
- Parameters:
n_workers (int) – The number of worker threads to use.
- classmethod decide_new_state(task, deps, hard_deps, env)[source]
Look at the dependencies of the current task and make a decision about what we should do with this task.
This function returns a
TaskStatus
that represents the suggested new state for this task, or None in case the task should be ignored and removed from the queue.
- classmethod last_end_time(tasks, env)[source]
Return the latest
end_clock
time of the given tasks, or None if some of the tasks do not have anend_clock
.
- classmethod decide_new_state_waiting(task, deps, hard_deps, env)[source]
Decide what to do with a task that is in WAITING state.
- class WorkerThread(queue, env, config, cond_var)[source]
Workhorse class for
QueueScheduling
. This class consumes (i.e. executes) tasks passed to it through the queue.