latch.resources package#

Submodules#

latch.resources.conditional module#

latch.resources.conditional.create_conditional_section(name: str) ConditionalSection[source]#

This method creates a new conditional section in a workflow, allowing a user to conditionally execute a task based on the value of a task result.

Conditional sections are akin to ternary operators – they return the output of the branch result. However, they can be n-ary with as many _elif_ clauses as desired.

It is possible to consume the outputs from conditional nodes. And to pass in outputs from other tasks to conditional nodes.

The boolean expressions in the condition use & and | as and / or operators. Additionally, unary expressions are not allowed. Thus if a task returns a boolean and we wish to use it in a condition of a conditional block, we must use built in truth checks: result.is_true() or result.is_false()

Parameters:

name – The name of the conditional section, to be shown in Latch Console

Returns:

A conditional section

Intended Use:

@workflow
def multiplier(my_input: float) -> float:
    result_1 = double(n=my_input)
    result_2 =  (
        create_conditional_section("fractions")
        .if_((result_1 < 0.0)).then(double(n=result_1))
        .elif_((result_1 > 0.0)).then(square(n=result_1))
        .else_().fail("Only nonzero values allowed")
    )
    result_3 = double(n=result_2)
    return result_3

latch.resources.launch_plan module#

class latch.resources.launch_plan.LaunchPlan(workflow: Callable, name: str, default_params: Dict[str, Any])[source]#

Bases: object

Construct named groups of default parameters for your workflows.

Pass a workflow function and a dictionary of parameter names mapped to default python values and a set of “test data” will be populated on the console upon registration.

Parameters:
  • workflow – The workflow function to which the values apply.

  • name – A semantic identifier to the parameter values (eg. ‘Small Data’)

  • default_params – A mapping of paramter names to values

latch.resources.map_tasks module#

A map task lets you run a pod task or a regular task over a list of inputs within a single workflow node. This means you can run thousands of instances of the task without creating a node for every instance, providing valuable performance gains!

Some use cases of map tasks include:
  • Several inputs must run through the same code logic

  • Multiple data batches need to be processed in parallel

  • Hyperparameter optimization

param task_function:

The task to be mapped, to be shown in Latch Console

returns:

A conditional section

Intended Use:

@task
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified

@task
def coalesce(b: typing.List[str]) -> str:
    coalesced = "".join(b)
    return coalesced

@workflow
def my_map_workflow(a: typing.List[int]) -> str:
    mapped_out = map_task(a_mappable_task)(a=a).with_overrides(
        requests=Resources(mem="300Mi"),
        limits=Resources(mem="500Mi"),
        retries=1,
    )
    coalesced = coalesce(b=mapped_out)
    return coalesced

latch.resources.reference_workflow module#

latch.resources.reference_workflow.workflow_reference(name: str, version: str)[source]#

latch.resources.tasks module#

Latch tasks are decorators to turn functions into workflow ‘nodes’.

Each task is containerized, versioned and registered with Flyte when a workflow is uploaded to Latch. Containerized tasks are then executed on arbitrary instances as Kubernetes Pods, scheduled using flytepropeller.

The type of instance that the task executes on (eg. number of available resources, presence of GPU) can be controlled by invoking one of the set of exported decorators.

latch.resources.tasks.large_gpu_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask#

This task will get scheduled on a large GPU-enabled node.

This node is not necessarily dedicated to the task, but the node itself will be on-demand.

Title#

Type

CPU

RAM

GPU

On-Demand

Request

31

120Gi

1

True

Limit

64

256Gi

1

True

latch.resources.tasks.small_gpu_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask#

This task will get scheduled on a small GPU-enabled node.

This node will be dedicated to the task. No other tasks will be allowed to run on it.

Title#

Type

CPU

RAM

GPU

On-Demand

Request

7

30Gi

1

True

Limit

8

32Gi

1

True

latch.resources.tasks.large_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask#

This task will get scheduled on a large node.

This node will be dedicated to the task. No other tasks will be allowed to run on it.

Title#

Type

CPU

RAM

GPU

On-Demand

Request

90

176Gi

0

True

Limit

96

196Gi

0

True

latch.resources.tasks.medium_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask#

This task will get scheduled on a medium node.

This node will be dedicated to the task. No other tasks will be allowed to run on it.

Title#

Type

CPU

RAM

GPU

On-Demand

Request

8

32Gi

0

True

Limit

12

64Gi

0

True

latch.resources.tasks.small_task(_task_function: ~typing.Callable | None = None, *, task_config: ~typing.Any | None = <flytekitplugins.pod.task.Pod object>, cache: bool = False, cache_serialize: bool = False, cache_version: str = '', retries: int = 0, interruptible: bool | None = None, deprecated: str = '', timeout: ~datetime.timedelta | int = 0, container_image: str | None = None, environment: ~typing.Dict[str, str] | None = None, requests: ~flytekit.core.resources.Resources | None = None, limits: ~flytekit.core.resources.Resources | None = None, secret_requests: ~typing.List[~flytekit.models.security.Secret] | None = None, execution_mode: ~flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior | None = ExecutionBehavior.DEFAULT, dockerfile: ~pathlib.Path | None = None, task_resolver: ~flytekit.core.base_task.TaskResolverMixin | None = None) Callable | PythonFunctionTask#

This task will get scheduled on a small node.

Title#

Type

CPU

RAM

GPU

On-Demand

Request

2

4Gi

0

False

Limit

4

8Gi

0

False

latch.resources.tasks.custom_memory_optimized_task(cpu: int, memory: int)[source]#

Returns a custom task configuration requesting the specified CPU/RAM allocations. This task can utilize fewer cpu cores (62) than `custom_task`s (95) but can use more RAM (up to 490 GiB) than `custom_task`s (up to 179 GiB). This is ideal for processes which utilize a lot of memory per thread. :param cpu: An integer number of cores to request, up to 63 cores :param memory: An integer number of Gibibytes of RAM to request, up to 511 GiB

latch.resources.tasks.custom_task(cpu: int, memory: int, *, storage_gib: int = 500)[source]#

Returns a custom task configuration requesting the specified CPU/RAM allocations

Parameters:
  • cpu – An integer number of cores to request, up to 95 cores

  • memory – An integer number of Gibibytes of RAM to request, up to 490 GiB

  • storage – An integer number of Gibibytes of storage to request, up to 4949 GiB

latch.resources.workflow module#

latch.resources.workflow.workflow(metadata: LatchMetadata | Callable) PythonFunctionWorkflow | Callable[source]#

Module contents#