Skip to content

Module dewret.tasks

Abstraction layer for task operations.

Access dask, or other, backends consistently using this module. It provides decorators and execution calls that manage tasks. Note that the task decorator should be called with no arguments, and will return the appropriate decorator for the current backend.

Typical usage example:

>>> @task()
... def increment(num: int) -> int:
...     return num + 1

Variables

DEFAULT_BACKEND
Param
RetType
construct

Functions

ensure_lazy

def ensure_lazy(
    task: Any
) -> dewret.workflow.Lazy | None

Evaluate a single task for a known workflow.

As we mask our lazy-evaluable functions to appear as their original types to the type system (see dewret.tasks.task), we must cast them back, to allow the type-checker to comb the remainder of the code.

Parameters:

Name Type Description Default
task None the suspected task to check. None

Returns:

Type Description
None Original task, cast to a Lazy, or None.

evaluate

def evaluate(
    task: dewret.workflow.Lazy,
    __workflow__: dewret.workflow.Workflow,
    **kwargs: Any
) -> Any

Evaluate a single task for a known workflow.

Parameters:

Name Type Description Default
task None the task to evaluate. None
workflow None workflow within which this exists. None
**kwargs None any arguments to pass to the task. None

lazy

def lazy(
    
) -> collections.abc.Callable[[collections.abc.Callable[..., typing.Any]], dewret.workflow.Lazy]

Get the lazy decorator for this backend.

Returns:

Type Description
None Real decorator for this backend.

nested_task

def nested_task(
    
) -> collections.abc.Callable[[collections.abc.Callable[..., typing.Any]], collections.abc.Callable[..., dewret.workflow.Lazy]]

Shortcut for marking a task as nested.

A nested task is one which calls other tasks and does not do anything else important. It will not actually get called at runtime, but should map entirely into the graph. As such, arithmetic operations on results, etc. will cause errors at render-time. Combining tasks is acceptable, and intended. The effect of the nested task will be considered equivalent to whatever reaching whatever step reference is returned at the end.

>>> @task()
... def increment(num: int) -> int:
...     return num + 1

>>> @nested_task()
... def double_increment(num: int) -> int:
...     return increment(increment(num=num))

Returns:

Type Description
None Task that runs at render, not execution, time.

set_backend

def set_backend(
    backend: dewret.tasks.Backend
) -> None

Choose a backend.

Will raise an error if a backend is already chosen.

Parameters:

Name Type Description Default
backend None chosen backend to use from here-on in. None

task

def task(
    nested: bool = False
) -> collections.abc.Callable[[collections.abc.Callable[~Param, ~RetType]], collections.abc.Callable[~Param, ~RetType]]

Decorator factory abstracting backend's own task decorator.

For example:

>>> @task()
... def increment(num: int) -> int:
...     return num + 1

If the backend is dask (the default), it is will evaluate this as a dask.delayed. Note that, with any backend, dewret will hijack the decorator to record the attempted evalution rather than actually evaluating the lazy function. Nonetheless, this hijacking will still be executed with the backend's lazy executor, so dask.delayed will still be called, for example, in the dask case.

Parameters:

Name Type Description Default
nested None whether this should be executed to find other tasks. None

Returns:

Type Description
None Decorator for the current backend to mark lazy-executable tasks.

Raises:

Type Description
TypeError if arguments are missing or incorrect, in line with usual
Python behaviour.

unwrap

def unwrap(
    task: dewret.workflow.Lazy
) -> collections.abc.Callable[..., typing.Any]

Unwraps a lazy-evaluated function to get the function.

Ideally, we could use the __wrapped__ property but not all workflow engines support this, and most importantly, dask has only done so as of 2024.03.

Parameters:

Name Type Description Default
task None task to be unwrapped. None

Returns:

Type Description
None Original target.

Raises:

Type Description
RuntimeError if the task is not a wrapped function.

Classes

Backend

class Backend(
    *args,
    **kwds
)

Stringy enum representing available backends.

Ancestors (in MRO)

  • enum.Enum

Class variables

DASK
name
value

TaskException

class TaskException(
    task: dewret.workflow.Task | collections.abc.Callable[..., typing.Any],
    dec_tb: traceback | None,
    tb: traceback | None,
    message: str,
    *args: Any,
    **kwargs: Any
)

Exception tied to a specific task.

Primarily aimed at parsing issues, but this will ensure that a message is shown with useful debug information for the workflow writer.

Ancestors (in MRO)

  • builtins.Exception
  • builtins.BaseException

Class variables

args

Methods

add_note

def add_note(
    ...
)

Exception.add_note(note) --

add a note to the exception

with_traceback

def with_traceback(
    ...
)

Exception.with_traceback(tb) --

set self.traceback to tb and return self.

TaskManager

class TaskManager(
    /,
    *args,
    **kwargs
)

Overarching backend-agnostic task manager.

Gatekeeps the specific backend implementation. This can be instantiated without choosing a backend, but the first call to any of its methods will concretize that choice - either as the default, or the backend set via TaskManager.set_backend. It cannot be changed after this point.

Methods

backend

def backend(
    ...
)

Import backend module.

Cached property to load the backend module, if it has not been already.

Returns:

Type Description
None Backend module for the specific choice of backend.

ensure_lazy

def ensure_lazy(
    self,
    task: Any
) -> dewret.workflow.Lazy | None

Evaluate a single task for a known workflow.

As we mask our lazy-evaluable functions to appear as their original types to the type system (see dewret.tasks.task), we must cast them back, to allow the type-checker to comb the remainder of the code.

Parameters:

Name Type Description Default
task None the suspected task to check. None

Returns:

Type Description
None Original task, cast to a Lazy, or None.

evaluate

def evaluate(
    self,
    task: dewret.workflow.Lazy,
    __workflow__: dewret.workflow.Workflow,
    **kwargs: Any
) -> Any

Evaluate a single task for a known workflow.

Parameters:

Name Type Description Default
task None the task to evaluate. None
workflow None workflow within which this exists. None
**kwargs None any arguments to pass to the task. None

make_lazy

def make_lazy(
    self
) -> collections.abc.Callable[[collections.abc.Callable[..., typing.Any]], dewret.workflow.Lazy]

Get the lazy decorator for this backend.

Returns:

Type Description
None Real decorator for this backend.

set_backend

def set_backend(
    self,
    backend: dewret.tasks.Backend
) -> dewret.tasks.Backend

Choose a backend.

Sets the backend, provided it has not already been loaded.

Parameters:

Name Type Description Default
backend None chosen backend, to override the default. None

Returns:

Type Description
None Backend that was set.

Raises:

Type Description
RuntimeError when a backend has already been loaded.

unwrap

def unwrap(
    self,
    task: dewret.workflow.Lazy
) -> collections.abc.Callable[..., typing.Any]

Unwraps a lazy-evaluated function to get the function.

Ideally, we could use the __wrapped__ property but not all workflow engines support this, and most importantly, dask has only done so as of 2024.03.

Parameters:

Name Type Description Default
task None task to be unwrapped. None

Returns:

Type Description
None Original target.

Raises:

Type Description
RuntimeError if the task is not a wrapped function.