latch.resources package#
Submodules#
latch.resources.conditional module#
- latch.resources.conditional.create_conditional_section(name: str) ConditionalSection [source]#
This method creates a new conditional section in a workflow, allowing a user to conditionally execute a task based on the value of a task result.
Conditional sections are akin to ternary operators – they return the output of the branch result. However, they can be n-ary with as many _elif_ clauses as desired.
It is possible to consume the outputs from conditional nodes. And to pass in outputs from other tasks to conditional nodes.
The boolean expressions in the condition use & and | as and / or operators. Additionally, unary expressions are not allowed. Thus if a task returns a boolean and we wish to use it in a condition of a conditional block, we must use built in truth checks: result.is_true() or result.is_false()
- Parameters:
name – The name of the conditional section, to be shown in Latch Console
- Returns:
A conditional section
Intended Use:
@workflow def multiplier(my_input: float) -> float: result_1 = double(n=my_input) result_2 = ( create_conditional_section("fractions") .if_((result_1 < 0.0)).then(double(n=result_1)) .elif_((result_1 > 0.0)).then(square(n=result_1)) .else_().fail("Only nonzero values allowed") ) result_3 = double(n=result_2) return result_3
latch.resources.launch_plan module#
- class latch.resources.launch_plan.LaunchPlan(workflow: Callable, name: str, default_params: Dict[str, Any])[source]#
Bases:
object
Construct named groups of default parameters for your workflows.
Pass a workflow function and a dictionary of parameter names mapped to default python values and a set of “test data” will be populated on the console upon registration.
- Parameters:
workflow – The workflow function to which the values apply.
name – A semantic identifier to the parameter values (eg. ‘Small Data’)
default_params – A mapping of paramter names to values
latch.resources.map_tasks module#
A map task lets you run a pod task or a regular task over a list of inputs within a single workflow node. This means you can run thousands of instances of the task without creating a node for every instance, providing valuable performance gains!
- Some use cases of map tasks include:
Several inputs must run through the same code logic
Multiple data batches need to be processed in parallel
Hyperparameter optimization
- param task_function:
The task to be mapped, to be shown in Latch Console
- returns:
A conditional section
Intended Use:
@task
def a_mappable_task(a: int) -> str:
inc = a + 2
stringified = str(inc)
return stringified
@task
def coalesce(b: typing.List[str]) -> str:
coalesced = "".join(b)
return coalesced
@workflow
def my_map_workflow(a: typing.List[int]) -> str:
mapped_out = map_task(a_mappable_task)(a=a).with_overrides(
requests=Resources(mem="300Mi"),
limits=Resources(mem="500Mi"),
retries=1,
)
coalesced = coalesce(b=mapped_out)
return coalesced
latch.resources.reference_workflow module#
latch.resources.tasks module#
Latch tasks are decorators to turn functions into workflow ‘nodes’.
Each task is containerized, versioned and registered with Flyte when a workflow is uploaded to Latch. Containerized tasks are then executed on arbitrary instances as Kubernetes Pods, scheduled using flytepropeller.
The type of instance that the task executes on (eg. number of available resources, presence of GPU) can be controlled by invoking one of the set of exported decorators.
- latch.resources.tasks.large_gpu_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask #
This task will get scheduled on a large GPU-enabled node.
This node is not necessarily dedicated to the task, but the node itself will be on-demand.
# Type
CPU
RAM
GPU
On-Demand
Request
31
120Gi
1
True
Limit
64
256Gi
1
True
- latch.resources.tasks.small_gpu_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask #
This task will get scheduled on a small GPU-enabled node.
This node will be dedicated to the task. No other tasks will be allowed to run on it.
# Type
CPU
RAM
GPU
On-Demand
Request
7
30Gi
1
True
Limit
8
32Gi
1
True
- latch.resources.tasks.large_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask #
This task will get scheduled on a large node.
This node will be dedicated to the task. No other tasks will be allowed to run on it.
# Type
CPU
RAM
GPU
On-Demand
Request
90
176Gi
0
True
Limit
96
196Gi
0
True
- latch.resources.tasks.medium_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask #
This task will get scheduled on a medium node.
This node will be dedicated to the task. No other tasks will be allowed to run on it.
# Type
CPU
RAM
GPU
On-Demand
Request
8
32Gi
0
True
Limit
12
64Gi
0
True
- latch.resources.tasks.small_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask #
This task will get scheduled on a small node.
# Type
CPU
RAM
GPU
On-Demand
Request
2
4Gi
0
False
Limit
4
8Gi
0
False
- latch.resources.tasks.custom_memory_optimized_task(cpu: int, memory: int)[source]#
Returns a custom task configuration requesting the specified CPU/RAM allocations. This task can utilize fewer cpu cores (62) than `custom_task`s (95) but can use more RAM (up to 490 GiB) than `custom_task`s (up to 179 GiB). This is ideal for processes which utilize a lot of memory per thread. :param cpu: An integer number of cores to request, up to 63 cores :param memory: An integer number of Gibibytes of RAM to request, up to 511 GiB
- latch.resources.tasks.custom_task(cpu: Callable | int, memory: Callable | int, *, storage_gib: Callable | int = 500, timeout: timedelta | int = 0)[source]#
Returns a custom task configuration requesting the specified CPU/RAM allocations
- Parameters:
cpu – An integer number of cores to request, up to 95 cores
memory – An integer number of Gibibytes of RAM to request, up to 490 GiB
storage – An integer number of Gibibytes of storage to request, up to 4949 GiB
latch.resources.workflow module#
- latch.resources.workflow.workflow(metadata: LatchMetadata | Callable) PythonFunctionWorkflow | Callable [source]#