Source code for pyperunner.pipeline

from typing import Dict, List, Type, Optional
import yaml
import networkx as nx
import importlib

from pyperunner.dag import Node, DAG
from pyperunner.task import Task
from pyperunner.util import PipelineResult


class PipelineError(Exception):
    pass


[docs]class Pipeline(DAG): colormap = { Task.Status.FAILURE: "red", Task.Status.SUCCESS: "green", Task.Status.NOT_STARTED: "white", Task.Status.RUNNING: "yellow", Task.Status.CANT_RUN: "gray", Task.Status.SKIPPED: "skyblue", } def __init__(self, name: str, tasks: List[Task] = None): """ A pipeline describes which Tasks should be executed and in what order. The pipeline is implemented as a directed, acyclic graph of :py:class:`Task` objects. Args: name: Name of the pipeline (supply any name that helps you later identifying what the pipeline was used for) tasks: Optional list of *primary* tasks, i.e. tasks at which the pipeline will start (there will be no connection between these tasks) """ super().__init__() self.name = name self._results: Optional[PipelineResult] = None if tasks is not None: self.set_tasks(tasks)
[docs] def set_tasks(self, tasks: List[Task]) -> None: """ Sets the supplied tasks as *primary* tasks of the pipeline ("root tasks"), i.e. tasks at which the pipeline will start (there will be no connection between these tasks) Args: tasks: Tasks to be set as primary tasks (root tasks) Returns: None """ for task in tasks: self.root.connect_child(task)
[docs] def add(self, task: Task) -> None: """ Add a single task as a *primary* task of the pipeline ("root task"), i.e. a task at which the pipeline starts. Args: task: Task to set as primary (root) task Returns: None """ self.root.connect_child(task)
def _add_node(self, g: nx.DiGraph, node: Task) -> None: # type: ignore[override] """ Recursively add a node and all of its children to a given graph. This function is used when creating the nx.DiGraph representation of the pipeline to add all the tasks to the graph. As additional information, a "fillcolor" attribute is set according to the task's status (see :py:attr:`~Pipeline.colormap`). This is used when creating a PNG image of the pipeline via :py:meth:`~DAG.plot_graph`. Args: g: Graph to add the node to node: Node to add Returns: """ child: Task g.add_node( node, style="filled", fillcolor=self.colormap[node.status], label=node.name ) for child in node.children: # type: ignore if not g.has_node(child): self._add_node(g, child) g.add_edge(node, child)
[docs] def to_dict(self) -> Dict: """ Get a dictionary representation of the directed acyclic graph underlying the pipeline. Returns: Dictionary representation of the directed acyclic graph underlying the pipeline. """ g = self.create_graph() tasks = {} node: Task for node in g.nodes: desc = node.description() desc["parents"] = [p.name for p in g.predecessors(node)] tasks[node.name] = desc pipeline_dict = {"pipeline": {"name": self.name}, "tasks": tasks} return pipeline_dict
@staticmethod def __instantiate_task( class_name: str, module_name: str, tag: str, params: Dict ) -> Task: """ Helper function for :py:meth:`Pipeline.from_dict` for instantiating :py:class:`Task`s from string class names Args: class_name: Class name of the task object to instantiate module_name: Module name of the class tag: Tag of the task object to instantiate params: Parameter of the task object to instantiate Returns: Instantiated Task """ def get_class(class_name: str, module_name: str = None) -> Type: if module_name is None: module_name = "__main__" module = importlib.import_module(module_name) if not hasattr(module, class_name): raise NameError( f"Class '{class_name}' not found in module '{module_name}" ) class_ = getattr(module, class_name) return class_ c = get_class(class_name, module_name) assert issubclass(c, Task), f"Class '{class_name}' must be subclass of Task" task = c(tag, **params) return task @staticmethod def __instantiate_tasks(tasks_dict: Dict) -> Dict: """ Helper function for :py:meth:`Pipeline.from_dict` for instantiating :py:class:`Task`s from string class names Args: tasks_dict: Dictionary with Task information from saved parameter.yaml file Requires the keys "name", "module", "tag" and "params" Returns: Collection of instantiated tasks """ tasks = {} for task_name, desc in tasks_dict.items(): task = Pipeline.__instantiate_task( class_name=desc["name"], module_name=desc.get("module", None), tag=desc["tag"], params=desc["params"], ) tasks[task_name] = task return tasks def __compare_hashes(self, tasks_dict: Dict) -> None: """ Compare the hashs of a Tasks dictionary (from parameter.yaml file) and the instantiated tasks of the pipeline Used to ensure that tasks created using :py:meth:`Pipeline.from_dict` were created correctly. Args: tasks_dict: Tasks dictionary from saved parameter.yaml file Returns: None """ g = self.create_graph() for task in g.nodes: if "hash" not in tasks_dict[task.name]: continue if not tasks_dict[task.name]["hash"] == task.hash(): raise ValueError( "Cannot create pipeline from file: Hashes do not match" ) def __connect_tasks(self, tasks: Dict, tasks_dict: Dict) -> None: """ Connect task objects according to a task dictionary from a saved parameter.yaml file Takes the list of instantiated tasks and the information of how tasks are connected to each other from the dictionary (usually read from parameter.yaml file) and connects the Tasks accordingly in the pipeline. Args: tasks: Collection of instantiated tasks tasks_dict: Task dictionary from saved parameter.yaml file Returns: None """ g = nx.DiGraph() for task_name, desc in tasks_dict.items(): g.add_node(task_name) for parent_task_name in desc["parents"]: g.add_edge(parent_task_name, task_name) for task_name in nx.topological_sort(g): if not tasks_dict[task_name]["parents"]: self(tasks[task_name]) else: for parent_task_name in tasks_dict[task_name]["parents"]: tasks[parent_task_name](tasks[task_name])
[docs] @staticmethod def from_dict(pipeline_dict: Dict, compare_hashes: bool = True) -> "Pipeline": """ Create a pipeline from a dictionary (usually saved parameter.yaml file from previous pipeline run) Args: pipeline_dict: Dictionary from saved pipeline run (parameter.yaml) compare_hashes: Set True if the hashes stored in the pipeline_dict dictionary should be compared with the hashes of the task objects created by this function. Used to ensure that Tasks are created in accordance with the saved pipeline run. Returns: Pipeline with all tasks as defined by dictionary """ tasks_dict = pipeline_dict["tasks"] pipeline = Pipeline(**pipeline_dict["pipeline"]) tasks = pipeline.__instantiate_tasks(tasks_dict) pipeline.__connect_tasks(tasks, tasks_dict) if compare_hashes: pipeline.__compare_hashes(tasks_dict) return pipeline
[docs] def to_file(self, filename: str) -> None: """ Save a representation of the pipeline to file (yaml format) Args: filename: Filename to store the representation to Returns: None """ with open(filename, "w") as f: yaml.dump(self.to_dict(), f)
[docs] @staticmethod def from_file(filename: str, compare_hashes: bool = True) -> "Pipeline": """ Create a pipeline from a stored file yaml file. Args: filename: Filename of the yaml file where the pipeline is stored compare_hashes: Set True if the hashes stored in the pipeline_dict dictionary should be compared with the hashes of the task objects created by this function. Used to ensure that Tasks are created in accordance with the saved pipeline run. Returns: Pipeline with all tasks as defined by the file """ with open(filename, "r") as f: pipeline_dict = yaml.safe_load(f) return Pipeline.from_dict(pipeline_dict, compare_hashes=compare_hashes)
[docs] def set_results(self, results: PipelineResult) -> None: """ Stores results of a pipeline run Args: results: The pipeline results Returns: None """ self._results = results
def results(self) -> PipelineResult: if self._results is None: raise AttributeError("Results have not been set. Run the pipeline first!") return self._results
[docs]class Sequential(Pipeline): """ A purely sequential pipeline with no bifurcations (linear pipeline). """
[docs] def set_tasks(self, tasks: List[Task]) -> None: """ Set the tasks of the current pipeline. Note: The tasks are connected in the same order as they are in the supplied list, so make sure these are ordered correctly. Note: The set_tasks() method of the Sequential pipeline behaves differently from the :py:meth:`Pipeline.set_tasks` method. Args: tasks: Ordered list of tasks, with first element (tasks[0]) being the first task to run (root task) Returns: None """ prev_task: Node = self.root for task in tasks: prev_task.connect_child(task) prev_task = task
[docs] def add(self, task: Task) -> None: """ Add a task to the current end of the pipeline. Note: The add() method of the Sequential pipeline behaves differently from the :py:meth:`Pipeline.add` method. Args: task: Task to add as a successor of the current end of the pipeline. Returns: None """ cur: Node = self.root while cur.children: cur = cur.children[0] cur.connect_child(task)