Module dewret.tasks
Abstraction layer for task operations.
Access dask, or other, backends consistently using this module. It provides
decorators and execution calls that manage tasks. Note that the task
decorator should be called with no arguments, and will return the appropriate
decorator for the current backend.
Typical usage example:
>>> @task()
... def increment(num: int) -> int:
... return num + 1
Variables
DEFAULT_BACKEND
Param
RetType
construct
Functions
ensure_lazy
def ensure_lazy(
task: Any
) -> dewret.workflow.Lazy | None
Evaluate a single task for a known workflow.
As we mask our lazy-evaluable functions to appear as their original
types to the type system (see dewret.tasks.task
), we must cast them
back, to allow the type-checker to comb the remainder of the code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | the suspected task to check. | None |
Returns:
Type | Description |
---|---|
None | Original task, cast to a Lazy, or None. |
evaluate
def evaluate(
task: dewret.workflow.Lazy,
__workflow__: dewret.workflow.Workflow,
**kwargs: Any
) -> Any
Evaluate a single task for a known workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | the task to evaluate. | None |
workflow | None | workflow within which this exists. | None |
**kwargs | None | any arguments to pass to the task. | None |
lazy
def lazy(
) -> collections.abc.Callable[[collections.abc.Callable[..., typing.Any]], dewret.workflow.Lazy]
Get the lazy decorator for this backend.
Returns:
Type | Description |
---|---|
None | Real decorator for this backend. |
nested_task
def nested_task(
) -> collections.abc.Callable[[collections.abc.Callable[..., typing.Any]], collections.abc.Callable[..., dewret.workflow.Lazy]]
Shortcut for marking a task as nested.
A nested task is one which calls other tasks and does not do anything else important. It will not actually get called at runtime, but should map entirely into the graph. As such, arithmetic operations on results, etc. will cause errors at render-time. Combining tasks is acceptable, and intended. The effect of the nested task will be considered equivalent to whatever reaching whatever step reference is returned at the end.
>>> @task()
... def increment(num: int) -> int:
... return num + 1
>>> @nested_task()
... def double_increment(num: int) -> int:
... return increment(increment(num=num))
Returns:
Type | Description |
---|---|
None | Task that runs at render, not execution, time. |
set_backend
def set_backend(
backend: dewret.tasks.Backend
) -> None
Choose a backend.
Will raise an error if a backend is already chosen.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backend | None | chosen backend to use from here-on in. | None |
task
def task(
nested: bool = False
) -> collections.abc.Callable[[collections.abc.Callable[~Param, ~RetType]], collections.abc.Callable[~Param, ~RetType]]
Decorator factory abstracting backend's own task decorator.
For example:
>>> @task()
... def increment(num: int) -> int:
... return num + 1
If the backend is dask
(the default), it is will evaluate this
as a dask.delayed
. Note that, with any backend, dewret will
hijack the decorator to record the attempted evalution rather than
actually evaluating the lazy function. Nonetheless, this hijacking
will still be executed with the backend's lazy executor, so
dask.delayed
will still be called, for example, in the dask case.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nested | None | whether this should be executed to find other tasks. | None |
Returns:
Type | Description |
---|---|
None | Decorator for the current backend to mark lazy-executable tasks. |
Raises:
Type | Description |
---|---|
TypeError | if arguments are missing or incorrect, in line with usual Python behaviour. |
unwrap
def unwrap(
task: dewret.workflow.Lazy
) -> collections.abc.Callable[..., typing.Any]
Unwraps a lazy-evaluated function to get the function.
Ideally, we could use the __wrapped__
property but not all
workflow engines support this, and most importantly, dask has
only done so as of 2024.03.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | task to be unwrapped. | None |
Returns:
Type | Description |
---|---|
None | Original target. |
Raises:
Type | Description |
---|---|
RuntimeError | if the task is not a wrapped function. |
Classes
Backend
class Backend(
*args,
**kwds
)
Stringy enum representing available backends.
Ancestors (in MRO)
- enum.Enum
Class variables
DASK
name
value
TaskException
class TaskException(
task: dewret.workflow.Task | collections.abc.Callable[..., typing.Any],
dec_tb: traceback | None,
tb: traceback | None,
message: str,
*args: Any,
**kwargs: Any
)
Exception tied to a specific task.
Primarily aimed at parsing issues, but this will ensure that a message is shown with useful debug information for the workflow writer.
Ancestors (in MRO)
- builtins.Exception
- builtins.BaseException
Class variables
args
Methods
add_note
def add_note(
...
)
Exception.add_note(note) --
add a note to the exception
with_traceback
def with_traceback(
...
)
Exception.with_traceback(tb) --
set self.traceback to tb and return self.
TaskManager
class TaskManager(
/,
*args,
**kwargs
)
Overarching backend-agnostic task manager.
Gatekeeps the specific backend implementation. This can be
instantiated without choosing a backend, but the first call to
any of its methods will concretize that choice - either as
the default, or the backend set via TaskManager.set_backend
.
It cannot be changed after this point.
Methods
backend
def backend(
...
)
Import backend module.
Cached property to load the backend module, if it has not been already.
Returns:
Type | Description |
---|---|
None | Backend module for the specific choice of backend. |
ensure_lazy
def ensure_lazy(
self,
task: Any
) -> dewret.workflow.Lazy | None
Evaluate a single task for a known workflow.
As we mask our lazy-evaluable functions to appear as their original
types to the type system (see dewret.tasks.task
), we must cast them
back, to allow the type-checker to comb the remainder of the code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | the suspected task to check. | None |
Returns:
Type | Description |
---|---|
None | Original task, cast to a Lazy, or None. |
evaluate
def evaluate(
self,
task: dewret.workflow.Lazy,
__workflow__: dewret.workflow.Workflow,
**kwargs: Any
) -> Any
Evaluate a single task for a known workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | the task to evaluate. | None |
workflow | None | workflow within which this exists. | None |
**kwargs | None | any arguments to pass to the task. | None |
make_lazy
def make_lazy(
self
) -> collections.abc.Callable[[collections.abc.Callable[..., typing.Any]], dewret.workflow.Lazy]
Get the lazy decorator for this backend.
Returns:
Type | Description |
---|---|
None | Real decorator for this backend. |
set_backend
def set_backend(
self,
backend: dewret.tasks.Backend
) -> dewret.tasks.Backend
Choose a backend.
Sets the backend, provided it has not already been loaded.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backend | None | chosen backend, to override the default. | None |
Returns:
Type | Description |
---|---|
None | Backend that was set. |
Raises:
Type | Description |
---|---|
RuntimeError | when a backend has already been loaded. |
unwrap
def unwrap(
self,
task: dewret.workflow.Lazy
) -> collections.abc.Callable[..., typing.Any]
Unwraps a lazy-evaluated function to get the function.
Ideally, we could use the __wrapped__
property but not all
workflow engines support this, and most importantly, dask has
only done so as of 2024.03.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | task to be unwrapped. | None |
Returns:
Type | Description |
---|---|
None | Original target. |
Raises:
Type | Description |
---|---|
RuntimeError | if the task is not a wrapped function. |