Source code for latch_cli.utils.path

import re
from pathlib import Path
from textwrap import dedent
from urllib.parse import urlparse

import click
from latch_sdk_config.user import user_config

from latch_cli.utils import get_auth_header

latch_url_regex = re.compile(r"^(latch)?://")


[docs]def is_remote_path(path: str) -> bool: return latch_url_regex.match(path) is not None
scheme = re.compile( r""" ^( (latch://) | (file://) | (?P<implicit_url>://) | (?P<absolute_path>/) | (?P<relative_path>[^/]) ) """, re.VERBOSE, ) domain = re.compile( r""" (?P<account_relative> ^$| # empty (?P<shared>^shared$) ) | ( ^ ( (shared\.\d+\.account) | (\d+\.account) | ([^/]+\.mount) | (\d+\.node) ) $ ) """, re.VERBOSE, ) # scheme inference rules: # ://domain/a/b/c => latch://domain/a/b/c # /a/b/c => file:///a/b/c # a/b/c => file://${pwd}/a/b/c # # if assume_remote = True (i.e. in the ls case): # /a/b/c => latch:///a/b/c # a/b/c => latch:///a/b/c
[docs]def append_scheme(path: str, *, assume_remote: bool = False) -> str: match = scheme.match(path) if match is None: click.secho( dedent(f""" `{path}` is not in a valid format. See https://docs.latch.bio/basics/latch_urls.html for a list of valid remote path formats. """).strip("\n"), fg="red", ) raise click.exceptions.Exit(1) if match["implicit_url"] is not None: path = f"latch{path}" elif match["absolute_path"] is not None: path = f"latch://{path}" if assume_remote else f"file://{path}" elif match["relative_path"] is not None: path = f"latch:///{path}" if assume_remote else f"file://{Path.cwd()}/{path}" return path
# domain inference rules: # latch:///a/b/c => latch://xxx.account/a/b/c # latch://shared/a/b/c => latch://shared.xxx.account/a/b/c # latch://any_other_domain/a/b/c => unchanged
[docs]def append_domain(path: str) -> str: workspace = user_config.workspace_id parsed = urlparse(path) dom = parsed.netloc if dom == "" and workspace != "": dom = f"{workspace}.account" match = domain.match(dom) if match is None: click.secho( dedent(f""" `{dom}` is not a valid path domain. See https://docs.latch.bio/basics/latch_urls.html for a list of valid domains. """).strip("\n"), fg="red", ) raise click.exceptions.Exit(1) if match["shared"] is not None and workspace != "": dom = f"shared.{workspace}.account" return parsed._replace(netloc=dom).geturl()
strip_domain_expr = re.compile(r"^(latch)?://[^/]*/")
[docs]def strip_domain(path: str) -> str: return strip_domain_expr.sub("", path, 1)
[docs]def is_account_relative(path: str) -> bool: parsed = urlparse(path) dom = parsed.netloc match = domain.match(dom) if match is None: click.secho( dedent(f""" `{dom}` is not a valid path domain. See https://docs.latch.bio/basics/latch_urls.html for a list of valid domains. """).strip("\n"), fg="red", ) raise click.exceptions.Exit(1) return match["account_relative"] is not None
[docs]def normalize_path(path: str, *, assume_remote: bool = False) -> str: path = append_scheme(path, assume_remote=assume_remote) if path.startswith("file://"): return path return append_domain(path)
auth = re.compile( r""" ^( (?P<sdk>Latch-SDK-Token) | (?P<execution>Latch-Execution-Token) )\s.*$ """, re.VERBOSE, )
[docs]def get_path_error(path: str, message: str, acc_id: str) -> str: with_scheme = append_scheme(path) normalized = normalize_path(path) account_relative = is_account_relative(with_scheme) auth_header = get_auth_header() match = auth.match(auth_header) if match is None: auth_type = auth_header elif match["sdk"] is not None: auth_type = "SDK Token" else: auth_type = "Execution Token" auth_str = ( f"{click.style(f'Authorized using:', bold=True, reset=False)} {click.style(auth_type, bold=False, reset=False)}" + "\n" ) ws_id = user_config.workspace_id ws_name = user_config.workspace_name resolve_str = ( f"{click.style(f'Relative path resolved to:', bold=True, reset=False)} {click.style(normalized, bold=False, reset=False)}" + "\n" ) ws_str = ( f"{click.style(f'Using Workspace:', bold=True, reset=False)} {click.style(ws_id, bold=False, reset=False)}" ) if ws_name is not None: ws_str = f"{ws_str} ({ws_name})" ws_str += "\n" return click.style( f""" {click.style(f'{path}: ', bold=True, reset=False)}{click.style(message, bold=False, reset=False)} {resolve_str if account_relative else ""}{ws_str if account_relative else ""} {auth_str} {click.style("Check that:", bold=True, reset=False)} {click.style("1. The target object exists", bold=False, reset=False)} {click.style(f"2. Account ", bold=False, reset=False)}{click.style(acc_id, bold=True, reset=False)}{click.style(" has permission to view the target object", bold=False, reset=False)} {"3. The correct workspace is selected" if account_relative else ""} For privacy reasons, non-viewable objects and non-existent objects are indistinguishable""", fg="red", )
name = re.compile(r"^.*/(?P<name>[^/]+)/?$")
[docs]def get_name_from_path(path: str): match = name.match(path) if match is None: click.secho(f"`{path}` is not a valid path.", fg="red") raise click.exceptions.Exit(1) return match["name"]