Module dewret.backends.backend_dask
Dask backend.
Lazy-evaluation via dask.delayed
.
Variables
config
Functions
is_lazy
def is_lazy(
task: Any
) -> bool
Checks if a task is really a lazy-evaluated function for this backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | suspected lazy-evaluated function. | None |
Returns:
Type | Description |
---|---|
None | True if so, False otherwise. |
lazy
def lazy(
obj='__no__default__',
name=None,
pure=None,
nout=None,
traverse=True
)
Wraps a function or object to produce a Delayed
.
Delayed
objects act as proxies for the object they wrap, but all
operations on them are done lazily by building up a dask graph internally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj | object | The function or object to wrap | None |
name | Dask key | The key to use in the underlying graph for the wrapped object. Defaults to hashing content. Note that this only affects the name of the object wrapped by this call to delayed, and not the output of delayed function calls - for that use dask_key_name= as described below... note:: Because this name is used as the key in task graphs, you shouldensure that it uniquely identifies obj . If you'd like to providea descriptive name that is still unique, combine the descriptive name with :func: dask.base.tokenize of the array_like . See:ref: graphs for more. |
s |
pure | bool | Indicates whether calling the resulting Delayed object is a pureoperation. If True, arguments to the call are hashed to produce deterministic keys. If not provided, the default is to check the global delayed_pure setting, and fallback to False if unset. |
to |
nout | int | The number of outputs returned from calling the resulting Delayed object. If provided, the Delayed output of the call can be iteratedinto nout objects, allowing for unpacking of results. By defaultiteration over Delayed objects will error. Note, that nout=1 expects obj to return a tuple of length 1, and consequently fornout=0 , obj should return an empty tuple. |
iteration |
traverse | bool | By default dask traverses builtin python collections looking for dask objects passed to delayed . For large collections this can beexpensive. If obj doesn't contain any dask objects, settraverse=False to avoid doing this traversal. |
dask |
run
def run(
workflow: dewret.workflow.Workflow | None,
task: dewret.workflow.Lazy | list[dewret.workflow.Lazy] | tuple[dewret.workflow.Lazy, ...],
thread_pool: concurrent.futures.thread.ThreadPoolExecutor | None = None,
**kwargs: Any
) -> Any
Execute a task as the output of a workflow.
Runs a task with dask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow | None | Workflow in which to record the execution. |
None |
task | None | dask.delayed function, wrapped by dewret, that we wish to compute. |
None |
thread_pool | None | custom thread pool for executing workflows, copies in correct values for contextvars to each thread before they are accessed by a dask worker. | None |
**kwargs | None | any configuration arguments for this backend. | None |
unwrap
def unwrap(
task: dewret.workflow.Lazy
) -> collections.abc.Callable[..., typing.Any]
Unwraps a lazy-evaluated function to get the function.
In recent dask (>=2024.3) this works with inspect.wraps, but earlier
versions do not have the __wrapped__
property.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | None | task to be unwrapped. | None |
Returns:
Type | Description |
---|---|
None | Original target. |
Raises:
Type | Description |
---|---|
RuntimeError | if the task is not a wrapped function. |
Classes
Delayed
class Delayed(
*args,
**kwargs
)
Description of a dask delayed
.
Since dask.delayed
does not have a hintable type, this
stands in its place, making sure that all the features of a
dask.delayed
are available.
More info: https://github.com/dask/dask/issues/7779
Ancestors (in MRO)
- typing.Protocol
- typing.Generic
Methods
compute
def compute(
self,
__workflow__: dewret.workflow.Workflow | None
) -> dewret.workflow.StepReference[typing.Any]
Evaluate this dask.delayed
.
Evaluate a delayed (dask lazy-evaluated) function. dewret
will have replaced it with a wrapper that expects a Workflow
and all arguments will already be known to the wrapped delayed
so the signature here is simple.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow | None | Workflow that this is tied to, if applicable. |
None |
Returns:
Type | Description |
---|---|
None | Reference to the final output step. |