Skip to content

Module dewret.backends.backend_dask

Dask backend.

Lazy-evaluation via dask.delayed.

Variables

config

Functions

is_lazy

def is_lazy(
    task: Any
) -> bool

Checks if a task is really a lazy-evaluated function for this backend.

Parameters:

Name Type Description Default
task None suspected lazy-evaluated function. None

Returns:

Type Description
None True if so, False otherwise.

lazy

def lazy(
    obj='__no__default__',
    name=None,
    pure=None,
    nout=None,
    traverse=True
)

Wraps a function or object to produce a Delayed.

Delayed objects act as proxies for the object they wrap, but all operations on them are done lazily by building up a dask graph internally.

Parameters:

Name Type Description Default
obj object The function or object to wrap None
name Dask key The key to use in the underlying graph for the wrapped object. Defaults
to hashing content. Note that this only affects the name of the object
wrapped by this call to delayed, and not the output of delayed
function calls - for that use dask_key_name= as described below.

.. note::

Because this name is used as the key in task graphs, you should
ensure that it uniquely identifies obj. If you'd like to provide
a descriptive name that is still unique, combine the descriptive name
with :func:dask.base.tokenize of the array_like. See
:ref:graphs for more.
s
pure bool Indicates whether calling the resulting Delayed object is a pure
operation. If True, arguments to the call are hashed to produce
deterministic keys. If not provided, the default is to check the global
delayed_pure setting, and fallback to False if unset.
to
nout int The number of outputs returned from calling the resulting Delayed
object. If provided, the Delayed output of the call can be iterated
into nout objects, allowing for unpacking of results. By default
iteration over Delayed objects will error. Note, that nout=1
expects obj to return a tuple of length 1, and consequently for
nout=0, obj should return an empty tuple.
iteration
traverse bool By default dask traverses builtin python collections looking for dask
objects passed to delayed. For large collections this can be
expensive. If obj doesn't contain any dask objects, set
traverse=False to avoid doing this traversal.
dask

run

def run(
    workflow: dewret.workflow.Workflow | None,
    task: dewret.workflow.Lazy | list[dewret.workflow.Lazy] | tuple[dewret.workflow.Lazy, ...],
    thread_pool: concurrent.futures.thread.ThreadPoolExecutor | None = None,
    **kwargs: Any
) -> Any

Execute a task as the output of a workflow.

Runs a task with dask.

Parameters:

Name Type Description Default
workflow None Workflow in which to record the execution. None
task None dask.delayed function, wrapped by dewret, that we wish to compute. None
thread_pool None custom thread pool for executing workflows, copies in correct values for contextvars to each thread before they are accessed by a dask worker. None
**kwargs None any configuration arguments for this backend. None

unwrap

def unwrap(
    task: dewret.workflow.Lazy
) -> collections.abc.Callable[..., typing.Any]

Unwraps a lazy-evaluated function to get the function.

In recent dask (>=2024.3) this works with inspect.wraps, but earlier versions do not have the __wrapped__ property.

Parameters:

Name Type Description Default
task None task to be unwrapped. None

Returns:

Type Description
None Original target.

Raises:

Type Description
RuntimeError if the task is not a wrapped function.

Classes

Delayed

class Delayed(
    *args,
    **kwargs
)

Description of a dask delayed.

Since dask.delayed does not have a hintable type, this stands in its place, making sure that all the features of a dask.delayed are available.

More info: https://github.com/dask/dask/issues/7779

Ancestors (in MRO)

  • typing.Protocol
  • typing.Generic

Methods

compute

def compute(
    self,
    __workflow__: dewret.workflow.Workflow | None
) -> dewret.workflow.StepReference[typing.Any]

Evaluate this dask.delayed.

Evaluate a delayed (dask lazy-evaluated) function. dewret will have replaced it with a wrapper that expects a Workflow and all arguments will already be known to the wrapped delayed so the signature here is simple.

Parameters:

Name Type Description Default
workflow None Workflow that this is tied to, if applicable. None

Returns:

Type Description
None Reference to the final output step.