import os
import re
from os import PathLike
from pathlib import Path
from typing import Optional, Type, Union
from urllib.parse import urlparse
import gql
from flytekit.core.annotation import FlyteAnnotation
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer
from flytekit.models.literals import Literal
from flytekit.types.file.file import FlyteFile, FlyteFilePathTransformer
from latch_sdk_gql.execute import execute
from typing_extensions import Annotated
from latch.ldata.path import LPath
from latch.types.utils import format_path, is_absolute_node_path, is_valid_url
from latch_cli.utils.path import normalize_path
[docs]class LatchFile(FlyteFile):
"""Represents a file object in the context of a task execution.
The local path identifies the file object's location on local disk in
the context of a task execution. `LatchFile` inherits implementation of
`__fsopen__` from `FlyteFile`, so methods like `open` can retrieve a string
representation of self.
..
@task
def task(file: LatchFile):
with open(file, "r") as f:
print(f.read())
mypath = Path(file).resolve()
The remote path identifies a remote location. The remote location, either a
latch or s3 url, can be inspected from an object passed to the task to
reveal its remote source.
It can also to deposit the file to a latch path when the object is returned
from a task.
..
@task
def task(file: LatchFile):
path = file.remote_path # inspect remote location
# Returning a different file to LatchData.
return LatchFile("./foobar.txt", "latch:///foobar.txt")
"""
def __init__(
self,
path: Union[str, PathLike],
remote_path: Optional[Union[str, PathLike]] = None,
**kwargs,
):
if path is None:
raise ValueError("Unable to instantiate LatchFile with None")
# Cast PathLike objects so that LatchFile has consistent JSON
# representation.
parsed = urlparse(str(path))
if parsed.scheme == "file":
self.path = parsed.path
elif parsed.scheme == "latch":
self.path = normalize_path(str(path))
else:
self.path = str(path)
self._path_generated = False
if is_valid_url(self.path) and remote_path is None:
self._remote_path = str(path)
else:
self._remote_path = None if remote_path is None else str(remote_path)
if kwargs.get("downloader") is not None:
super().__init__(self.path, kwargs["downloader"], self._remote_path)
else:
def downloader():
ctx = FlyteContextManager.current_context()
if (
ctx is not None
and hasattr(self, "_remote_path")
and self._remote_path is not None
# todo(kenny) is this necessary?
and ctx.inspect_objects_only is False
):
local_path_hint = self._remote_path
if is_absolute_node_path.match(self._remote_path) is not None:
data = execute(
gql.gql("""
query getName($argPath: String!) {
ldataResolvePathData(argPath: $argPath) {
name
}
}
"""),
{"argPath": self._remote_path},
)["ldataResolvePathData"]
if data is not None and data["name"] is not None:
local_path_hint = data["name"]
self._idempotent_set_path(local_path_hint)
return ctx.file_access.get_data(
self._remote_path,
self.path,
is_multipart=False,
)
super().__init__(self.path, downloader, self._remote_path)
[docs] def size(self):
return LPath(self.remote_path).size()
def _idempotent_set_path(self, hint: Optional[str] = None):
if self._path_generated:
return
ctx = FlyteContextManager.current_context()
if ctx is None:
return
self.path = ctx.file_access.get_random_local_path(hint)
self._path_generated = True
def _create_imposters(self):
self._idempotent_set_path()
p = Path(self.path)
p.parent.mkdir(exist_ok=True, parents=True)
p.touch(exist_ok=True)
@property
def local_path(self) -> str:
"""File path local to the environment executing the task."""
# This will manually download object to local disk in the case of a
# user wishing to access the file without using `open`.
self.__fspath__()
return str(self.path)
@property
def remote_path(self) -> Optional[str]:
"""A url referencing in object in LatchData or s3."""
return self._remote_path
def __repr__(self):
if self.remote_path is None:
return f"LatchFile({repr(format_path(self.local_path))})"
return (
f"LatchFile({repr(self.path)},"
f" remote_path={repr(format_path(self.remote_path))})"
)
def __str__(self):
if self.remote_path is None:
return "LatchFile()"
return f"LatchFile({format_path(self.remote_path)})"
LatchOutputFile = Annotated[
LatchFile,
FlyteAnnotation(
{"output": True},
),
]
"""A LatchFile tagged as the output of some workflow.
The Latch Console uses this metadata to avoid checking for existence of the
file at its remote path and displaying an error. This check is normally made to
avoid launching workflows with LatchFiles that point to objects that don't
exist.
"""
TypeEngine.register(LatchFilePathTransformer())