Source code for latch.resources.workflow
import inspect
from dataclasses import is_dataclass
from textwrap import dedent
from typing import Callable, Union, get_args, get_origin
from flytekit import workflow as _workflow
from flytekit.core.workflow import PythonFunctionWorkflow
from latch.types.metadata import LatchMetadata
# this weird Union thing is to ensure backwards compatibility,
# so that when users call @workflow without any arguments or
# parentheses, the workflow still serializes as expected
[docs]def workflow(
metadata: Union[LatchMetadata, Callable]
) -> Union[PythonFunctionWorkflow, Callable]:
if isinstance(metadata, Callable):
return _workflow(metadata)
else:
def decorator(f: Callable):
if f.__doc__ is None:
f.__doc__ = f"{f.__name__}\n\nSample Description"
short_desc, long_desc = f.__doc__.split("\n", 1)
signature = inspect.signature(f)
wf_params = signature.parameters
in_meta_not_in_wf = []
not_in_meta_in_wf = []
for meta_param in metadata.parameters:
if meta_param not in wf_params:
in_meta_not_in_wf.append(meta_param)
for wf_param in wf_params:
if wf_param not in metadata.parameters:
not_in_meta_in_wf.append(wf_param)
if len(in_meta_not_in_wf) > 0 or len(not_in_meta_in_wf) > 0:
error_str = (
"Inconsistency detected between parameters in your `LatchMetadata`"
" object and parameters in your workflow signature.\n\n"
)
if len(in_meta_not_in_wf) > 0:
error_str += (
"The following parameters appear in your `LatchMetadata` object"
" but not in your workflow signature:\n\n"
)
for meta_param in in_meta_not_in_wf:
error_str += f" \x1b[1m{meta_param}\x1b[22m\n"
error_str += "\n"
if len(not_in_meta_in_wf) > 0:
error_str += (
"The following parameters appear in your workflow signature but"
" not in your `LatchMetadata` object:\n\n"
)
for meta_param in not_in_meta_in_wf:
error_str += f" \x1b[1m{meta_param}\x1b[22m\n"
error_str += "\n"
error_str += (
"Please resolve these inconsistencies and ensure that your"
" `LatchMetadata` object and workflow signature have the same"
" parameters."
)
raise ValueError(error_str)
for name, meta_param in metadata.parameters.items():
if meta_param.samplesheet is not True:
continue
annotation = wf_params[name].annotation
origin = get_origin(annotation)
args = get_args(annotation)
valid = (
origin is not None
and issubclass(origin, list)
and is_dataclass(args[0])
)
if not valid:
raise ValueError(
f"parameter marked as samplesheet is not valid: {name} "
f"in workflow {f.__name__} must be a list of dataclasses"
)
f.__doc__ = f"{short_desc}\n{dedent(long_desc)}\n\n" + str(metadata)
return _workflow(f)
return decorator