Overview

This section describes pyperunner’s API.

Task([tag, reload])

A task is a single work unit that is run as part of a Pipeline.

Runner.run([pipeline, force_reload, …])

Run a pipeline.

Pipeline.from_file(filename[, compare_hashes])

Create a pipeline from a stored file yaml file.

PipelineResult(conf[, data_path])

Accessor of task results of a pipeline run.

Task

class pyperunner.Task(tag='', reload=False, **kwargs)[source]

A task is a single work unit that is run as part of a Pipeline.

All tasks that are run by a pipeline must subclass this class. There are two main ways to accomplish this:

  1. Using the task() function decorator:

from pyperunner import task

@task("Hello")
def hello():
    print("in hello()")
    return "Hello"

Note that in this case you need to explicitly state the name of the task as a parameter to the task() function decorator (here: “Hello”).

2. Directly subclassing this class and then using the run() method decorator on the run() function. Note that the abstract run() function must be implemented when subclassing.

from pyperunner import run

class World(Task):
    @run
    def run(self, data):
        return f"{data} world"

Note that in contrast to the task() function decorator, you don’t need to specify the task name here. Instead, the class name (here “World”) will be used as the task name.

Parameters
  • tag (strstr) – Tag of the task; can be used to use the same task multiple times in a single pipeline (every instance of the task needs to have a different tag then to ensure unique task names)

  • reload (boolbool) – Set True if the Task should be run regardless of whether cached results already exist

  • **kwargs – Additional task-specific parameters

class Status(value)[source]

Encodes current Status of a Task

class TaskResult(status, output, exception=None, traceback='')[source]

Result of a task

Parameters
  • status (StatusStatus) – Status code

  • output (AnyAny) – Data returned by the task

  • exception (Exception, NoneOptional[Exception]) – Exception if one was raised

  • traceback (strstr) – Traceback of an exception, if one was raised

exception: Optional[Exception] = None
assert_params_complete()[source]

Asserts that the parameter provided in the constructor match those required by the run function.

This is used to raise TypeError already at task creation time (i.e. early and in main thread), not during task execution time.

Returns: None

Return type

NoneNone

description()[source]

Return a complete description of the task.

The description contains the following properties of the task:

  • name

  • module

  • tag

  • hash (see hash())

  • parameter dictionary

Returns: Dictionary with information describing the task’s configuration

Return type

{str: str, {~KT: ~VT}, List}Dict[str, Union[str, Dict, List]]

hash()[source]

Generate the task’s hash

The hash is constructed from the hash of the specific node and the hashes of all parent (predecessor) nodes. It is therefore dependent on the pipeline the task is part of. This is used to precisely identify a task in a given context (e.g. when saving and loading pipelines using the pyperunner.Pipeline.to_file() and pyperunner.Pipeline.from_file() methods to store/load a pipeline from file). It ensures reproducibility of single pipeline runs.

Returns: Task hash

Return type

strstr

load_output()[source]

Get the output generated by this task

Returns: The output generated by this task

Return type

AnyAny

output_exists()[source]

Returns whether an output of this task already exists (i.e. was cached from a previous run).

Returns: If the output of this task already exists

Return type

boolbool

output_filename(filename='result.dump.gz')[source]

Get the full path of the file to which the task’s output is written.

The output path is build from the data path (set by the pipeline runner), the task’s name and it’s hash and is therefore specific to a certain configuration of the task (via it’s hash()).

Parameters

filename (strstr) – Filename of the file to which the task’s output is written

Returns: Full path of the file to which the task’s output is written

Return type

strstr

abstract run(**kwargs)[source]

Worker method that must be implemented when subclassing.

This method contains the Tasks execution logic. Note: The

Parameters

**kwargs – Task-specific parameters

Returns: The result of the task that is passed onto the successor task(s) (e.g. a pandas DataFrame or

anything else)

Return type

AnyAny

set_data_path(path)[source]

Set the task’s data path.

This is done by the Runner when starting the task.

Parameters

path (strstr) – Data path

Returns: None

Return type

NoneNone

set_output(output)[source]

Assigns the output generated by the task.

Parameters

output (AnyAny) – Output generated by the task (may be load from disk if it’s cached).

Returns: None

Return type

NoneNone

set_reload(reload)[source]

Sets the reload parameter that specifies whether this task is forced to run even if its output already exists.

Parameters

reload (boolbool) – If the task should forcibly run even if its output already exists

Returns: None

Return type

NoneNone

set_status(status)[source]

Set the task’s status (see Status)

Parameters

status (StatusStatus) – Task status

Returns: None

Return type

NoneNone

should_run()[source]

Determine if the task should run based on the availability of cached data.

If the task was run previously with the same configuration (see description()), the output was stored and can be reloaded. In that case, the task does not need to run and this function returns False. The task can be forced to run by setting the reload parameter to True when instantiating the task (see Task()).

Returns: If the task should be run by the runner

Return type

boolbool

store_output(output)[source]

Store the output of this task.

Parameters

output (AnyAny) – Output of this task

Returns: None

Return type

NoneNone

store_params()[source]

Store the parameters of this task to a params.yaml file

Returns: None

Return type

NoneNone

store_result(result)[source]

Store an intermediate result

Parameters

result (ResultResult) – result to store

Returns: None

Return type

NoneNone

class pyperunner.TaskResult(name, data_path, conf)[source]

Access to the output of a single task

Parameters
  • name (strstr) – Task name

  • data_path (strstr) – Path where the results are stored

  • conf ({~KT: ~VT}Dict) – Configuration as saved by the task during running

Pipeline

class pyperunner.Pipeline(name, tasks=None)[source]
add(task)[source]

Add a single task as a primary task of the pipeline (“root task”), i.e. a task at which the pipeline starts.

Parameters

task (TaskTask) – Task to set as primary (root) task

Returns: None

Return type

NoneNone

static from_dict(pipeline_dict, compare_hashes=True)[source]

Create a pipeline from a dictionary (usually saved parameter.yaml file from previous pipeline run)

Parameters
  • pipeline_dict ({~KT: ~VT}Dict) – Dictionary from saved pipeline run (parameter.yaml)

  • compare_hashes (boolbool) – Set True if the hashes stored in the pipeline_dict dictionary should be compared with the hashes of the task objects created by this function. Used to ensure that Tasks are created in accordance with the saved pipeline run.

Returns: Pipeline with all tasks as defined by dictionary

Return type

PipelinePipeline

static from_file(filename, compare_hashes=True)[source]

Create a pipeline from a stored file yaml file.

Parameters
  • filename (strstr) – Filename of the yaml file where the pipeline is stored

  • compare_hashes (boolbool) – Set True if the hashes stored in the pipeline_dict dictionary should be compared with the hashes of the task objects created by this function. Used to ensure that Tasks are created in accordance with the saved pipeline run.

Returns: Pipeline with all tasks as defined by the file

Return type

PipelinePipeline

set_results(results)[source]

Stores results of a pipeline run

Parameters

results (PipelineResultPipelineResult) – The pipeline results

Returns: None

Return type

NoneNone

set_tasks(tasks)[source]

Sets the supplied tasks as primary tasks of the pipeline (“root tasks”), i.e. tasks at which the pipeline will start (there will be no connection between these tasks)

Parameters

tasks (List[Task]List[Task]) – Tasks to be set as primary tasks (root tasks)

Returns: None

Return type

NoneNone

to_dict()[source]

Get a dictionary representation of the directed acyclic graph underlying the pipeline.

Returns: Dictionary representation of the directed acyclic graph underlying the pipeline.

Return type

{~KT: ~VT}Dict

to_file(filename)[source]

Save a representation of the pipeline to file (yaml format)

Parameters

filename (strstr) – Filename to store the representation to

Returns: None

Return type

NoneNone

class pyperunner.Sequential(name, tasks=None)[source]

A purely sequential pipeline with no bifurcations (linear pipeline).

add(task)[source]

Add a task to the current end of the pipeline.

Note: The add() method of the Sequential pipeline behaves differently from the Pipeline.add() method.

Parameters

task (TaskTask) – Task to add as a successor of the current end of the pipeline.

Returns: None

Return type

NoneNone

set_tasks(tasks)[source]

Set the tasks of the current pipeline.

Note: The tasks are connected in the same order as they are in the supplied list, so make sure these are ordered correctly. Note: The set_tasks() method of the Sequential pipeline behaves differently from the Pipeline.set_tasks() method.

Parameters

tasks (List[Task]List[Task]) – Ordered list of tasks, with first element (tasks[0]) being the first task to run (root task)

Returns: None

Return type

NoneNone

Runner

class pyperunner.Runner(data_path, log_path, process_limit=None)[source]

Pipeline runner with multiprocessing.

This runner executes all tasks of a given pipeline in order. Each task is started as a separate process, forked from the main process (via multiprocessing.Process, see Process). If possible, tasks are run in parallel. This is generally possible for tasks that do not depend on each other in any way (i.e. are not predecessors or successors of one another). The maximum number of concurrrent processes is limited via the process_limit` parameter.

Parameters
  • data_path (strstr) – Path where data will be stored

  • log_path (strstr) – Path where log files will be stored

  • process_limit (int, NoneOptional[int]) – Maximum number of concurrent worker processes

analyze_pipeline(force_reload=False)[source]

Marks all nodes (tasks) in the pipeline that need to be run.

A node is required to run if:

  • force_reload is True

  • the node itself is marked to be run (i.e., the node was created using reload=True in Task() or there is no cached result for the node)

  • any predecessor node is required to run

Parameters

force_reload (boolbool) – If true, all nodes are marked to be run

Returns: None

Return type

NoneNone

assert_valid_pipeline()[source]

Asserts that the pipeline attribute of this object is set to a valid pipeline object.

Returns: None

Return type

NoneNone

dequeue()[source]

Get a runnable Task from the queue.

Loops through the task queue and evaluates for each task - if any of the predecessor tasks failed - if all of the predecessor tasks have finished

In the former cases (failure of predecessors), the task is removed from the task list and added to the failed task list itself (required to propagate the error). In the latter case (all predecessors finished), the task is ready to run and as such removed from the queue and returned by the function. Otherwise the function returns None.

Returns: Runnable task, if any, else None

Return type

Task, NoneOptional[Task]

execution_plan_summary(print_fn=None)[source]

Print execution summary of the underlying pipeline DAG in ASCII format.

Note that this may easily take a lot of vertical space if multiple tasks are run in parallel and may not be readable anymore.

Parameters

print_fn (Callable, NoneOptional[Callable]) – Function that prints the execution summary (default: print)

Returns: None

Return type

NoneNone

finish_tasks()[source]

Clean up after tasks have finished.

Loops through processes that have started but not been removed from the running processes list yet. Next, the following steps are performed

  • If process is dead, it is removed from the list of running tasks
    • If the processed exited with an error exit code (e.g. out of memory error), that exception is logged

  • Data from the result queue is tried to be read (independent of whether the process is dead or alive)
    • If data could be read from the queue, the Runner.process_task_result() function is called

    • If no data could be read
      • if the process is still alive, nothing happens

      • if the process is dead, an exception is raised

Returns: None

Return type

NoneNone

generate_run_name()[source]

Get a unique name for the current run of the pipeline.

Used to create the run-specific log path

Returns: pipeline name + current timestamp

e.g. “<my-pipeline_210119T222719”

Return type

strstr

get_predecessor_outputs(task)[source]

Get the results of all predecessor (parent) task of a task.

Used to feed the outputs (return values) of the parent task to a child class when running it.

Parameters

task (TaskTask) – Task for which the predecessor’s outputs are returned.

Returns: List of all parent task outputs

Return type

List[Any]List[Any]

log_exception(result)[source]

Provide exception info from a task result to the logger.

Called when processing task results (Runner.process_task_result())

Parameters

result (TaskResultTaskResult) – Task result

Returns: None

Return type

NoneNone

pipeline_params_filename()[source]

Get filename of pipeline parameter yaml file.

See Runner.save_pipeline_params()

Returns: Filename of pipeline parameter yaml file

Return type

strstr

process_task_result(task, result)[source]

Process the result object of a task.

When a task has finished running or does not need to be run at all, this function performs the required internal status maintenance:

  • Set the task result and status to those of the result object

  • Add the task to the finished or the failed tasks and log exceptions, if necessary

Parameters

Returns: None

Return type

NoneNone

queue_tasks(force_reload=False)[source]

Fills lists of tasks that need to be executed.

Note that there are two lists:

  • tasks_queue A list of all tasks in order (topological sorting of the underlying DAG)

  • tasks_execute A list of tasks that need to be executed

Tasks do not need to be executed when their result is cached and the result is not required in a direct child task.

Parameters

force_reload (boolbool) – If true, all nodes are marked to be run

Returns: None

Return type

NoneNone

results()[source]

Get the PipelineResult object for the current run.

Returns: PipelineResult object for current run

Return type

PipelineResultPipelineResult

run(pipeline=None, force_reload=False, show_execution_plan=False)[source]

Run a pipeline.

Main function of the runner. Evaluates which tasks to run and executes them in order. Each task is run in a separate process. Results from each task are collected using multiprocessing.Queue.

Call Runner.results() after finishing the run to access the results or alternatively create a pyperunner.PipelineResult object explicitly from the parameter.yaml file of the run.

Parameters
  • pipeline (Pipeline, NoneOptional[Pipeline]) – pipeline to be run (or None, if already set via Runner.set_pipeline())

  • force_reload (boolbool) – Set true if all tasks should run even if they would not need to

  • show_execution_plan (boolbool) – Set true if an execution summary of the underlying pipeline DAG should be displayed.

Returns: None

Return type

NoneNone

save_pipeline_params()[source]

Stores a complete description of the current pipeline run to a yaml file.

Parameter file contains - run specific information (paths, environment) (see py:func:pyperunner.environment.get_environment_info) - name, parameter and parents of each task

Returns: None

Return type

NoneNone

set_pipeline(pipeline)[source]

Sets the pipeline to be run by the runner.

Parameters

pipeline (PipelinePipeline) – Pipeline to be run

Returns: None

Return type

NoneNone

skip_task(task)[source]

Skips execution of a task.

Used for tasks that are neither required to run nor required to load data from (they are completely bypassed). E.g. in a pipeline of tasks a -> b -> c, if only c needs to be run (because it wasn’t run so far or because it is forced to run setting reload=True in Task()

Parameters

task (TaskTask) – Task to skip

Returns: None

Return type

NoneNone

start_task(task)[source]

Run a task.

Initiates the Process object for the task class and starts it by forking (see Process.run()). Process

Parameters

task (TaskTask) – Task to start

Returns: Process object of the started task

Return type

ProcessProcess

tasks_execute: List[pyperunner.task.Task]

A list of tasks that need to be executed

tasks_queue: List[pyperunner.task.Task]

A list of all tasks in order (topological sorting of the underlying DAG)

static validate_pipeline(pipeline)[source]

Asserts a pipeline conforms to requirements.

Requirements - Every node in the pipeline must have a unique name - The pipeline must be acyclic

Raises exceptions if one of the requirements isn’t met by the supplied pipeline.

Parameters

pipeline (PipelinePipeline) – Pipeline to check

Returns: None

Return type

NoneNone

write_status_image(fname='status.png')[source]

Write an image of the pipeline’s DAG to disk.

Creates a PNG image of each of the node and the connections of the DAG associated with the pipeline and writes that to disk

Parameters

fname (strstr) – Filename to write to (PNG)

Return type

NoneNone

PipelineResult

class pyperunner.PipelineResult(conf, data_path='')[source]

Accessor of task results of a pipeline run.

Instantiate using the pipeline run configuration or using the from_file method pointing to the parameter.yaml file saved during the pipeline run. Then iterate over the single tasks results or access single task results using array indexing:

results = PipelineResult.from_file(fname)

# loop through individual task results
for task_name in pipeline.results():
    print(pipeline[task_name])
Parameters
  • conf ({~KT: ~VT}Dict) – Configuration (pipeline.yaml saved in the pipeline run log dir)

  • data_path (strstr) – Path where the results are stored

static from_file(filename)[source]

Create a PipelineResult object from the parameter.yaml file saved in the pipeline run log path

Parameters

filename (strstr) – parameter.yaml file saved in the pipeline run log path

Returns: PipelineResult object

Return type

PipelineResultPipelineResult

set_data_path(data_path)[source]

Sets the data path of the task

Parameters

data_path (strstr) – Path where the results are stored

Returns: None

Return type

NoneNone

task_result(task)[source]

Return the results of a certain task.

Parameters

task (strstr) – Task name

Returns: Results of the task

Return type

AnyAny