Source code for pyperunner.task

import inspect
from typing import Any, Optional, Dict, List, Union, Callable, Iterator
import hashlib
import json
import logging
import os
from abc import abstractmethod, ABCMeta
from dataclasses import dataclass
from enum import Enum
import traceback

import joblib
import yaml

from pyperunner.dag import Node


class TaskError(Exception):
    pass


def get_exception_str(exc: BaseException) -> str:
    """
    Format the traceback of an exception

    Args:
        exc: Exception

    Returns:  Exception traceback

    """
    s = traceback.format_exception(etype=None, value=exc, tb=exc.__traceback__)
    return "".join(s)


class Result:
    """
    Generic task result (data that is stored on disk)
    """

    pass


[docs]class Task(Node, metaclass=ABCMeta): """ A task is a single work unit that is run as part of a :py:class:`~pyperunner.Pipeline`. All tasks that are run by a pipeline must subclass this class. There are two main ways to accomplish this: 1. Using the :py:func:`~pyperunner.task` function decorator: .. code-block:: python from pyperunner import task @task("Hello") def hello(): print("in hello()") return "Hello" Note that in this case you need to explicitly state the name of the task as a parameter to the :py:func:`~pyperunner.task` function decorator (here: "Hello"). 2. Directly subclassing this class and then using the :py:func:`~pyperunner.run` method decorator on the run() function. Note that the abstract :py:meth:`~pyperunner.Task.run` function must be implemented when subclassing. .. code-block:: python from pyperunner import run class World(Task): @run def run(self, data): return f"{data} world" Note that in contrast to the :py:func:`~pyperunner.task` function decorator, you don't need to specify the task name here. Instead, the class name (here "World") will be used as the task name. Args: tag: Tag of the task; can be used to use the same task multiple times in a single pipeline (every instance of the task needs to have a different tag then to ensure unique task names) reload: Set True if the Task should be run regardless of whether cached results already exist **kwargs: Additional task-specific parameters """
[docs] class Status(Enum): """ Encodes current Status of a :py:class:`~pyperunner.Task` """ NOT_STARTED = 0 # Task hasn't been started yet FAILURE = -1 # Task has failed SUCCESS = 1 # Successfully executed RUNNING = 2 # Currently running CANT_RUN = 3 # Task can't run because a predecessor task failed SKIPPED = 4 # Task is completely skipped from execution
[docs] @dataclass class TaskResult: """ Result of a task Args: status: Status code output: Data returned by the task exception: Exception if one was raised traceback: Traceback of an exception, if one was raised """ status: "Task.Status" output: Any exception: Optional[Exception] = None traceback: str = ""
_run_signature = None def __init__(self, tag: str = "", reload: bool = False, **kwargs: Dict) -> None: super().__init__(self.__class__.__name__ + f"({tag})") self.tag = tag self.task_name = self.__class__.__name__ self.params: Dict = kwargs self.data_path: str = "" self.output: Any = None self.status: Task.Status = Task.Status.NOT_STARTED self.result: Task.TaskResult = Task.TaskResult( status=Task.Status.NOT_STARTED, output=None ) self.reload: bool = reload self.logger = logging.getLogger(self.name) self.assert_run_decorated() try: self.assert_params_complete() except TypeError as e: raise TaskError(f"Could not create task {self.task_name}({self.tag}): {e}") def assert_run_decorated(self) -> None: if ( not hasattr(self.run, "__decorated__") or "run" not in self.run.__decorated__ # type: ignore ): raise TaskError(f"{self.task_name}.run() method not decorated with @run")
[docs] def assert_params_complete(self) -> None: """ Asserts that the parameter provided in the constructor match those required by the run function. This is used to raise TypeError already at task creation time (i.e. early and in main thread), not during task execution time. Returns: None """ if self._run_signature is None: self._run_signature = inspect.signature(self.run) params = self.params.copy() if "data" in self._run_signature.parameters: params["data"] = None self._run_signature.bind(**params)
def _single_node_hash(self) -> str: """ Generates the hash of a single node (i.e. task), constructed from the name and params of the node. Returns: Single node hash """ s = json.dumps( { "class": self.__class__.__name__, "name": self.name, "params": self.params, }, sort_keys=True, ) return hashlib.md5(s.encode("utf-8")).hexdigest() def _hash(self) -> List[str]: """ Generate the task's hash components The hash is constructed from the hash of the specific node and the hashes of *all* parent (predecessor) nodes. It is therefore dependent on the pipeline the task is part of. Returns: List of this task's hash and all predecessor tasks' hashes. """ hash = [self._single_node_hash() + "_" + self.name] for parent in self._parents_generator(): hash += parent._hash() return sorted(hash)
[docs] def hash(self) -> str: """ Generate the task's hash The hash is constructed from the hash of the specific node and the hashes of *all* parent (predecessor) nodes. It is therefore dependent on the pipeline the task is part of. This is used to precisely identify a task in a given context (e.g. when saving and loading pipelines using the :py:meth:`pyperunner.Pipeline.to_file` and :py:meth:`pyperunner.Pipeline.from_file` methods to store/load a pipeline from file). It ensures reproducibility of single pipeline runs. Returns: Task hash """ return hashlib.md5("/".join(self._hash()).encode("utf-8")).hexdigest()
[docs] def description(self) -> Dict[str, Union[str, Dict, List]]: """ Return a complete description of the task. The description contains the following properties of the task: - name - module - tag - hash (see :py:meth:`~pyperunner.Task.hash`) - parameter dictionary Returns: Dictionary with information describing the task's configuration """ return { "name": self.task_name, "module": self.__module__, "tag": self.tag, "hash": self.hash(), "params": self.params, }
[docs] def should_run(self) -> bool: """ Determine if the task should run based on the availability of cached data. If the task was run previously with the same configuration (see :py:meth:`~pyperunner.Task.description`), the output was stored and can be reloaded. In that case, the task does not need to run and this function returns False. The task can be forced to run by setting the `reload` parameter to True when instantiating the task (see :py:meth:`~pyperunner.Task()`). Returns: If the task should be run by the runner """ return not self.output_exists() or self.reload
[docs] @abstractmethod def run(self, **kwargs: Dict) -> Any: """ Worker method that must be implemented when subclassing. This method contains the Tasks execution logic. Note: The Args: **kwargs: Task-specific parameters Returns: The result of the task that is passed onto the successor task(s) (e.g. a pandas DataFrame or anything else) """ pass
def run_wrapper( self, func: Callable, data: Any, static: bool = False, receives_input: bool = True, ) -> TaskResult: self.logger.info("Starting") if self.output_exists() and not self.reload: self.logger.info("Loading output from disk, skipping processing") output = self.load_output() else: try: self.store_params() args = [] kwargs = self.params.copy() if not static: args.append(self) if receives_input: kwargs["data"] = data # TODO check what happens when the first task is instantiated with "receives_input=True" output = func(*args, **kwargs) if inspect.isgenerator(output): while True: try: result = next(output) self.store_result(result) except StopIteration as e: output = e.value self.store_output(output) except Exception as e: self.logger.error(str(e)) self.result = Task.TaskResult( status=Task.Status.FAILURE, output=None, exception=e, traceback=get_exception_str(e), ) raise e self.result = Task.TaskResult(status=Task.Status.SUCCESS, output=output) self.logger.info(f"Finished: {self.result.status}") return self.result def _parents_generator(self) -> Iterator["Task"]: """ Yields every parent task of this task. Returns: Parent task """ for p in self.parents: if isinstance(p, Task): yield p
[docs] def set_data_path(self, path: str) -> None: """ Set the task's data path. This is done by the :py:class:`~pyperunner.Runner` when starting the task. Args: path: Data path Returns: None """ self.data_path = path
[docs] def set_output(self, output: Any) -> None: """ Assigns the output generated by the task. Args: output: Output generated by the task (may be load from disk if it's cached). Returns: None """ self.output = output
[docs] def set_status(self, status: "Task.Status") -> None: """ Set the task's status (see :py:class:`~pyperunner.Task.Status`) Args: status: Task status Returns: None """ self.status = status
[docs] def set_reload(self, reload: bool) -> None: """ Sets the reload parameter that specifies whether this task is forced to run even if its output already exists. Args: reload: If the task should forcibly run even if its output already exists Returns: None """ self.reload = reload
[docs] def output_filename(self, filename: str = "result.dump.gz") -> str: """ Get the full path of the file to which the task's output is written. The output path is build from the data path (set by the pipeline runner), the task's name and it's hash and is therefore specific to a certain configuration of the task (via it's :py:meth:`~pyperunner.Task.hash`). Args: filename: Filename of the file to which the task's output is written Returns: Full path of the file to which the task's output is written """ path = os.path.realpath(os.path.join(self.data_path, self.name, self.hash())) if not os.path.exists(path): os.makedirs(path) return os.path.join(path, filename)
[docs] def output_exists(self) -> bool: """ Returns whether an output of this task already exists (i.e. was cached from a previous run). Returns: If the output of this task already exists """ return os.path.exists(self.output_filename())
[docs] def store_output(self, output: Any) -> None: """ Store the output of this task. Args: output: Output of this task Returns: None """ filename = self.output_filename() joblib.dump(output, filename)
[docs] def store_result(self, result: Result) -> None: """ Store an intermediate result Args: result: result to store Returns: None """ raise NotImplementedError()
def _build_caller_dict(self) -> Dict: params = { "task": self.description(), "parents": { parent.name: parent.description() for parent in self._parents_generator() }, } return params
[docs] def store_params(self) -> None: """ Store the parameters of this task to a params.yaml file Returns: None """ filename = self.output_filename("params.yaml") params = self._build_caller_dict() with open(filename, "w") as f: yaml.dump(params, f, default_flow_style=False)
[docs] def load_output(self) -> Any: """ Get the output generated by this task Returns: The output generated by this task """ filename = self.output_filename() return joblib.load(filename)
def __str__(self) -> str: return f"{self.task_name}({self.tag})#{self.hash()}#{hash(self)}"