Overview¶
This section describes pyperunner’s API.
|
A task is a single work unit that is run as part of a |
|
Run a pipeline. |
|
Create a pipeline from a stored file yaml file. |
|
Accessor of task results of a pipeline run. |
Task¶
-
class
pyperunner.Task(tag='', reload=False, **kwargs)[source]¶ A task is a single work unit that is run as part of a
Pipeline.All tasks that are run by a pipeline must subclass this class. There are two main ways to accomplish this:
Using the
task()function decorator:
from pyperunner import task @task("Hello") def hello(): print("in hello()") return "Hello"
Note that in this case you need to explicitly state the name of the task as a parameter to the
task()function decorator (here: “Hello”).2. Directly subclassing this class and then using the
run()method decorator on the run() function. Note that the abstractrun()function must be implemented when subclassing.from pyperunner import run class World(Task): @run def run(self, data): return f"{data} world"
Note that in contrast to the
task()function decorator, you don’t need to specify the task name here. Instead, the class name (here “World”) will be used as the task name.- Parameters
tag (
strstr) – Tag of the task; can be used to use the same task multiple times in a single pipeline (every instance of the task needs to have a different tag then to ensure unique task names)reload (
boolbool) – Set True if the Task should be run regardless of whether cached results already exist**kwargs – Additional task-specific parameters
-
class
TaskResult(status, output, exception=None, traceback='')[source]¶ Result of a task
- Parameters
-
exception: Optional[Exception] = None¶
-
assert_params_complete()[source]¶ Asserts that the parameter provided in the constructor match those required by the run function.
This is used to raise TypeError already at task creation time (i.e. early and in main thread), not during task execution time.
Returns: None
- Return type
NoneNone
-
description()[source]¶ Return a complete description of the task.
The description contains the following properties of the task:
name
module
tag
hash (see
hash())parameter dictionary
Returns: Dictionary with information describing the task’s configuration
- Return type
{
str:str, {~KT: ~VT},List}Dict[str,Union[str,Dict,List]]
-
hash()[source]¶ Generate the task’s hash
The hash is constructed from the hash of the specific node and the hashes of all parent (predecessor) nodes. It is therefore dependent on the pipeline the task is part of. This is used to precisely identify a task in a given context (e.g. when saving and loading pipelines using the
pyperunner.Pipeline.to_file()andpyperunner.Pipeline.from_file()methods to store/load a pipeline from file). It ensures reproducibility of single pipeline runs.Returns: Task hash
- Return type
strstr
-
load_output()[source]¶ Get the output generated by this task
Returns: The output generated by this task
- Return type
AnyAny
-
output_exists()[source]¶ Returns whether an output of this task already exists (i.e. was cached from a previous run).
Returns: If the output of this task already exists
- Return type
boolbool
-
output_filename(filename='result.dump.gz')[source]¶ Get the full path of the file to which the task’s output is written.
The output path is build from the data path (set by the pipeline runner), the task’s name and it’s hash and is therefore specific to a certain configuration of the task (via it’s
hash()).- Parameters
filename (
strstr) – Filename of the file to which the task’s output is written
Returns: Full path of the file to which the task’s output is written
- Return type
strstr
-
abstract
run(**kwargs)[source]¶ Worker method that must be implemented when subclassing.
This method contains the Tasks execution logic. Note: The
- Parameters
**kwargs – Task-specific parameters
- Returns: The result of the task that is passed onto the successor task(s) (e.g. a pandas DataFrame or
anything else)
- Return type
AnyAny
-
set_data_path(path)[source]¶ Set the task’s data path.
This is done by the
Runnerwhen starting the task.- Parameters
path (
strstr) – Data path
Returns: None
- Return type
NoneNone
-
set_output(output)[source]¶ Assigns the output generated by the task.
- Parameters
output (
AnyAny) – Output generated by the task (may be load from disk if it’s cached).
Returns: None
- Return type
NoneNone
-
set_reload(reload)[source]¶ Sets the reload parameter that specifies whether this task is forced to run even if its output already exists.
- Parameters
reload (
boolbool) – If the task should forcibly run even if its output already exists
Returns: None
- Return type
NoneNone
-
should_run()[source]¶ Determine if the task should run based on the availability of cached data.
If the task was run previously with the same configuration (see
description()), the output was stored and can be reloaded. In that case, the task does not need to run and this function returns False. The task can be forced to run by setting the reload parameter to True when instantiating the task (seeTask()).Returns: If the task should be run by the runner
- Return type
boolbool
-
store_output(output)[source]¶ Store the output of this task.
- Parameters
output (
AnyAny) – Output of this task
Returns: None
- Return type
NoneNone
Pipeline¶
-
class
pyperunner.Pipeline(name, tasks=None)[source]¶ -
add(task)[source]¶ Add a single task as a primary task of the pipeline (“root task”), i.e. a task at which the pipeline starts.
Returns: None
- Return type
NoneNone
-
static
from_dict(pipeline_dict, compare_hashes=True)[source]¶ Create a pipeline from a dictionary (usually saved parameter.yaml file from previous pipeline run)
- Parameters
pipeline_dict ({~KT: ~VT}
Dict) – Dictionary from saved pipeline run (parameter.yaml)compare_hashes (
boolbool) – Set True if the hashes stored in the pipeline_dict dictionary should be compared with the hashes of the task objects created by this function. Used to ensure that Tasks are created in accordance with the saved pipeline run.
Returns: Pipeline with all tasks as defined by dictionary
-
static
from_file(filename, compare_hashes=True)[source]¶ Create a pipeline from a stored file yaml file.
- Parameters
filename (
strstr) – Filename of the yaml file where the pipeline is storedcompare_hashes (
boolbool) – Set True if the hashes stored in the pipeline_dict dictionary should be compared with the hashes of the task objects created by this function. Used to ensure that Tasks are created in accordance with the saved pipeline run.
Returns: Pipeline with all tasks as defined by the file
-
set_results(results)[source]¶ Stores results of a pipeline run
- Parameters
results (
PipelineResultPipelineResult) – The pipeline results
Returns: None
- Return type
NoneNone
-
set_tasks(tasks)[source]¶ Sets the supplied tasks as primary tasks of the pipeline (“root tasks”), i.e. tasks at which the pipeline will start (there will be no connection between these tasks)
Returns: None
- Return type
NoneNone
-
-
class
pyperunner.Sequential(name, tasks=None)[source]¶ A purely sequential pipeline with no bifurcations (linear pipeline).
-
add(task)[source]¶ Add a task to the current end of the pipeline.
Note: The add() method of the Sequential pipeline behaves differently from the
Pipeline.add()method.Returns: None
- Return type
NoneNone
-
set_tasks(tasks)[source]¶ Set the tasks of the current pipeline.
Note: The tasks are connected in the same order as they are in the supplied list, so make sure these are ordered correctly. Note: The set_tasks() method of the Sequential pipeline behaves differently from the
Pipeline.set_tasks()method.- Parameters
tasks (
List[Task]List[Task]) – Ordered list of tasks, with first element (tasks[0]) being the first task to run (root task)
Returns: None
- Return type
NoneNone
-
Runner¶
-
class
pyperunner.Runner(data_path, log_path, process_limit=None)[source]¶ Pipeline runner with multiprocessing.
This runner executes all tasks of a given pipeline in order. Each task is started as a separate process, forked from the main process (via multiprocessing.Process, see
Process). If possible, tasks are run in parallel. This is generally possible for tasks that do not depend on each other in any way (i.e. are not predecessors or successors of one another). The maximum number of concurrrent processes is limited via the process_limit` parameter.- Parameters
data_path (
strstr) – Path where data will be storedlog_path (
strstr) – Path where log files will be storedprocess_limit (
int,NoneOptional[int]) – Maximum number of concurrent worker processes
-
analyze_pipeline(force_reload=False)[source]¶ Marks all nodes (tasks) in the pipeline that need to be run.
A node is required to run if:
force_reload is True
the node itself is marked to be run (i.e., the node was created using reload=True in
Task()or there is no cached result for the node)any predecessor node is required to run
- Parameters
force_reload (
boolbool) – If true, all nodes are marked to be run
Returns: None
- Return type
NoneNone
-
assert_valid_pipeline()[source]¶ Asserts that the pipeline attribute of this object is set to a valid pipeline object.
Returns: None
- Return type
NoneNone
-
dequeue()[source]¶ Get a runnable Task from the queue.
Loops through the task queue and evaluates for each task - if any of the predecessor tasks failed - if all of the predecessor tasks have finished
In the former cases (failure of predecessors), the task is removed from the task list and added to the failed task list itself (required to propagate the error). In the latter case (all predecessors finished), the task is ready to run and as such removed from the queue and returned by the function. Otherwise the function returns None.
Returns: Runnable task, if any, else None
-
execution_plan_summary(print_fn=None)[source]¶ Print execution summary of the underlying pipeline DAG in ASCII format.
Note that this may easily take a lot of vertical space if multiple tasks are run in parallel and may not be readable anymore.
- Parameters
print_fn (
Callable,NoneOptional[Callable]) – Function that prints the execution summary (default: print)
Returns: None
- Return type
NoneNone
-
finish_tasks()[source]¶ Clean up after tasks have finished.
Loops through processes that have started but not been removed from the running processes list yet. Next, the following steps are performed
- If process is dead, it is removed from the list of running tasks
If the processed exited with an error exit code (e.g. out of memory error), that exception is logged
- Data from the result queue is tried to be read (independent of whether the process is dead or alive)
If data could be read from the queue, the
Runner.process_task_result()function is called- If no data could be read
if the process is still alive, nothing happens
if the process is dead, an exception is raised
Returns: None
- Return type
NoneNone
-
generate_run_name()[source]¶ Get a unique name for the current run of the pipeline.
Used to create the run-specific log path
- Returns: pipeline name + current timestamp
e.g. “<my-pipeline_210119T222719”
- Return type
strstr
-
get_predecessor_outputs(task)[source]¶ Get the results of all predecessor (parent) task of a task.
Used to feed the outputs (return values) of the parent task to a child class when running it.
Returns: List of all parent task outputs
- Return type
List[Any]List[Any]
-
log_exception(result)[source]¶ Provide exception info from a task result to the logger.
Called when processing task results (
Runner.process_task_result())- Parameters
result (
TaskResultTaskResult) – Task result
Returns: None
- Return type
NoneNone
-
pipeline_params_filename()[source]¶ Get filename of pipeline parameter yaml file.
See
Runner.save_pipeline_params()Returns: Filename of pipeline parameter yaml file
- Return type
strstr
-
process_task_result(task, result)[source]¶ Process the result object of a task.
When a task has finished running or does not need to be run at all, this function performs the required internal status maintenance:
Set the task result and status to those of the result object
Add the task to the finished or the failed tasks and log exceptions, if necessary
- Parameters
task (
TaskTask) – Task of which the result should be processedresult (
TaskResultTaskResult) – Result of the given task
Returns: None
- Return type
NoneNone
-
queue_tasks(force_reload=False)[source]¶ Fills lists of tasks that need to be executed.
Note that there are two lists:
tasks_queueA list of all tasks in order (topological sorting of the underlying DAG)tasks_executeA list of tasks that need to be executed
Tasks do not need to be executed when their result is cached and the result is not required in a direct child task.
- Parameters
force_reload (
boolbool) – If true, all nodes are marked to be run
Returns: None
- Return type
NoneNone
-
results()[source]¶ Get the
PipelineResultobject for the current run.Returns: PipelineResult object for current run
- Return type
-
run(pipeline=None, force_reload=False, show_execution_plan=False)[source]¶ Run a pipeline.
Main function of the runner. Evaluates which tasks to run and executes them in order. Each task is run in a separate process. Results from each task are collected using multiprocessing.Queue.
Call
Runner.results()after finishing the run to access the results or alternatively create apyperunner.PipelineResultobject explicitly from the parameter.yaml file of the run.- Parameters
pipeline (
Pipeline,NoneOptional[Pipeline]) – pipeline to be run (or None, if already set viaRunner.set_pipeline())force_reload (
boolbool) – Set true if all tasks should run even if they would not need toshow_execution_plan (
boolbool) – Set true if an execution summary of the underlying pipeline DAG should be displayed.
Returns: None
- Return type
NoneNone
-
save_pipeline_params()[source]¶ Stores a complete description of the current pipeline run to a yaml file.
Parameter file contains - run specific information (paths, environment) (see py:func:pyperunner.environment.get_environment_info) - name, parameter and parents of each task
Returns: None
- Return type
NoneNone
-
set_pipeline(pipeline)[source]¶ Sets the pipeline to be run by the runner.
Returns: None
- Return type
NoneNone
-
skip_task(task)[source]¶ Skips execution of a task.
Used for tasks that are neither required to run nor required to load data from (they are completely bypassed). E.g. in a pipeline of tasks a -> b -> c, if only c needs to be run (because it wasn’t run so far or because it is forced to run setting reload=True in
Task()Returns: None
- Return type
NoneNone
-
start_task(task)[source]¶ Run a task.
Initiates the Process object for the task class and starts it by forking (see
Process.run()).ProcessReturns: Process object of the started task
- Return type
ProcessProcess
-
tasks_execute: List[pyperunner.task.Task]¶ A list of tasks that need to be executed
-
tasks_queue: List[pyperunner.task.Task]¶ A list of all tasks in order (topological sorting of the underlying DAG)
-
static
validate_pipeline(pipeline)[source]¶ Asserts a pipeline conforms to requirements.
Requirements - Every node in the pipeline must have a unique name - The pipeline must be acyclic
Raises exceptions if one of the requirements isn’t met by the supplied pipeline.
Returns: None
- Return type
NoneNone
PipelineResult¶
-
class
pyperunner.PipelineResult(conf, data_path='')[source]¶ Accessor of task results of a pipeline run.
Instantiate using the pipeline run configuration or using the from_file method pointing to the parameter.yaml file saved during the pipeline run. Then iterate over the single tasks results or access single task results using array indexing:
results = PipelineResult.from_file(fname) # loop through individual task results for task_name in pipeline.results(): print(pipeline[task_name])
- Parameters
conf ({~KT: ~VT}
Dict) – Configuration (pipeline.yaml saved in the pipeline run log dir)data_path (
strstr) – Path where the results are stored
-
static
from_file(filename)[source]¶ Create a PipelineResult object from the parameter.yaml file saved in the pipeline run log path
- Parameters
filename (
strstr) – parameter.yaml file saved in the pipeline run log path
Returns: PipelineResult object
- Return type