Source code for latch.executions
import os
import re
from dataclasses import dataclass
from typing import Optional
import click
import gql
from latch_sdk_gql.execute import execute
pod_name_regex = re.compile(
r"""
^(
(?P<token>[a-zA-Z0-9]+)
-
(?P<node_name>
(
n
[0-9]+
-
[0-9]+
-
)*
n
[0-9]+
)
-
(?P<retry>[0-9]+)
(
-
(?P<arr_index>[0-9]+)
)?
(
-
(?P<arr_retry>[0-9]+)
)?
(-preexec)?
)$
""",
re.VERBOSE,
)
[docs]@dataclass(frozen=True)
class TaskIdentifier:
token: str
node_name: str
retry: int
arr_index: Optional[int]
arr_retry: Optional[int]
[docs]def get_task_identifier() -> Optional[TaskIdentifier]:
try:
with open("/etc/hostname", "r") as f:
pod_name = f.read().strip()
except FileNotFoundError:
return None
match = pod_name_regex.match(pod_name)
if not match:
return None
return TaskIdentifier(
token=match.group("token"),
node_name=match.group("node_name"),
retry=int(match.group("retry")),
arr_index=(
int(match.group("arr_index"))
if match.group("arr_index") is not None
else None
),
arr_retry=(
int(match.group("arr_retry"))
if match.group("arr_retry") is not None
else None
),
)
[docs]def rename_current_execution(name: str):
"""Rename the current execution.
Useful for naming executions based on inputs. If this function is called
outside of an execution context, it is a noop.
"""
token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None)
if token is None:
# noop during local execution / testing
click.secho(
"Running in local execution context - skipping rename.",
dim=True,
italic=True,
)
return
execute(
gql.gql("""
mutation RenameExecution($argName: String!, $argToken: String!) {
renameExecutionByToken(input: {argToken: $argToken, argName: $argName}) {
clientMutationId
}
}
"""),
{"argName": name, "argToken": token},
)