Source code for latch_cli.main

"""Entrypoints to service functions through a latch_cli."""

import os
import sys
from pathlib import Path
from textwrap import dedent
from typing import Callable, List, Optional, Tuple, TypeVar, Union

import click
from packaging.version import parse as parse_version
from typing_extensions import ParamSpec

import latch_cli.click_utils
from latch.ldata._transfer.progress import Progress as _Progress
from latch_cli.click_utils import EnumChoice
from latch_cli.exceptions.handler import CrashHandler
from latch_cli.services.cp.autocomplete import complete as cp_complete
from latch_cli.services.cp.autocomplete import remote_complete
from latch_cli.services.init.init import template_flag_to_option
from latch_cli.services.local_dev import TaskSize
from latch_cli.utils import (
    AuthenticationError,
    WorkflowType,
    get_auth_header,
    get_latest_package_version,
    get_local_package_version,
)
from latch_cli.workflow_config import BaseImageOptions

latch_cli.click_utils.patch()

crash_handler = CrashHandler()

P = ParamSpec("P")
T = TypeVar("T")


[docs]def requires_login(f: Callable[P, T]) -> Callable[P, T]: def decorated(*args: P.args, **kwargs: P.kwargs): try: get_auth_header() except AuthenticationError as e: click.secho( dedent(""" Unable to authenticate with Latch. If you are on a machine with a browser, run `latch login`. If not, navigate to `https://console.latch.bio/settings/developer` on a different machine, select `Access Tokens`, and copy your `API Key` to `~/.latch/token` on this machine. If you do not see this value in the console, make sure you are logged into your personal workspace. """).strip("\n"), fg="red", ) raise click.exceptions.Exit(1) from e return f(*args, **kwargs) decorated.__doc__ = f.__doc__ return decorated
@click.group( "latch", context_settings={ "max_content_width": 160, }, ) @click.version_option(package_name="latch") def main(): """ Collection of command line tools for using the Latch SDK and interacting with the Latch platform. """ if os.environ.get("LATCH_SKIP_VERSION_CHECK") is not None: return local_ver = parse_version(get_local_package_version()) latest_ver = parse_version(get_latest_package_version()) if local_ver < latest_ver: click.secho( dedent(f""" WARN: Your local version of latch ({local_ver}) is out of date. This may result in unexpected behavior. Please upgrade to the latest version ({latest_ver}) using `python3 -m pip install --upgrade latch`. """).strip("\n"), fg="yellow", ) crash_handler.init() """ LOGIN COMMANDS """ @main.command("login") @click.option( "--connection", type=str, default=None, help="Specific AuthO connection name e.g. for SSO.", ) def login(connection: Optional[str]): """Manually login to Latch.""" crash_handler.message = "Unable to log in" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.login import login login(connection) click.secho("Successfully logged into LatchBio.", fg="green") @main.command("workspace") @requires_login def workspace(): """Spawns an interactive terminal prompt allowing users to choose what workspace they want to work in.""" crash_handler.message = "Unable to fetch workspaces" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.workspace import workspace workspace() """ WORKFLOW COMMANDS """ @main.command("init") @click.argument("pkg_name", nargs=1) @click.option( "--template", "-t", type=click.Choice( list(template_flag_to_option.keys()), case_sensitive=False, ), ) @click.option( "--dockerfile", "-d", help="Create a user editable Dockerfile for this workflow.", is_flag=True, default=False, ) @click.option( "--base-image", "-b", help="Which base image to use for the Dockerfile.", type=click.Choice( list(BaseImageOptions._member_names_), case_sensitive=False, ), default="default", ) def init( pkg_name: str, template: Optional[str] = None, dockerfile: bool = False, base_image: str = "default", ): """Initialize boilerplate for local workflow code.""" crash_handler.message = f"Unable to initialize {pkg_name}" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.init import init created = init(pkg_name, template, dockerfile, base_image) if created: click.secho(f"Created a latch workflow in `{pkg_name}`", fg="green") click.secho("Run", fg="green") click.secho(f"\t$ latch register {pkg_name}", fg="green") click.secho("To register the workflow with console.latch.bio.", fg="green") return click.secho("No workflow created.", fg="yellow") @main.command("dockerfile") @click.argument("pkg_root", type=click.Path(exists=True, file_okay=False)) @click.option( "-s", "--snakemake", is_flag=True, default=False, type=bool, help="Generate a Dockerfile with arguments needed for Snakemake compatability", ) def dockerfile(pkg_root: str, snakemake: bool = False): """Generates a user editable dockerfile for a workflow and saves under `pkg_root/Dockerfile`. Visit docs.latch.bio to learn more. """ crash_handler.message = "Failed to generate Dockerfile." crash_handler.pkg_root = pkg_root from latch_cli.docker_utils import generate_dockerfile source = Path(pkg_root) dest = source / "Dockerfile" if dest.exists() and not click.confirm( f"Dockerfile already exists at `{dest}`. Overwrite?" ): return workflow_type = WorkflowType.latchbiosdk if snakemake is True: workflow_type = WorkflowType.snakemake generate_dockerfile(source, dest, wf_type=workflow_type) click.secho(f"Successfully generated dockerfile `{dest}`", fg="green") @main.command("generate-metadata") @click.argument("config_file", nargs=1, type=click.Path(exists=True, path_type=Path)) @click.option( "--yes", "-y", is_flag=True, default=False, help=( "Overwrite an existing `latch_metadata/parameters.py` file without confirming." ), ) @click.option( "--no-infer-files", "-I", is_flag=True, default=False, help="Don't parse strings with common file extensions as file parameters.", ) @click.option( "--no-defaults", "-D", is_flag=True, default=False, help="Don't generate defaults for parameters.", ) def generate_metadata( config_file: Path, yes: bool, no_infer_files: bool, no_defaults: bool ): """Generate a `latch_metadata.py` file from a Snakemake config file""" from latch_cli.snakemake.config.parser import generate_metadata generate_metadata( config_file, skip_confirmation=yes, infer_files=not no_infer_files, generate_defaults=not no_defaults, ) @main.command("develop") @click.argument("pkg_root", nargs=1, type=click.Path(exists=True, path_type=Path)) @click.option( "--yes", "-y", is_flag=True, default=False, type=bool, help="Skip the confirmation dialog.", ) @click.option( "--image", "-i", type=str, help="Image to use for develop session.", ) @click.option( "-s", "--snakemake", is_flag=True, default=False, type=bool, help="Start a develop session for a Snakemake workflow.", ) @requires_login def local_development( pkg_root: Path, yes: bool, image: Optional[str], snakemake: bool, ): """Develop workflows "locally" Visit docs.latch.bio to learn more. """ crash_handler.message = "Error during local development session" crash_handler.pkg_root = str(pkg_root) if os.environ.get("LATCH_DEVELOP_BETA") is not None: from latch_cli.services.local_dev import local_development local_development( pkg_root.resolve(), skip_confirm_dialog=yes, size=TaskSize.small_task, image=image, ) else: from latch_cli.services.local_dev_old import local_development local_development(pkg_root.resolve(), snakemake) @main.command("exec") @click.option( "--execution-id", "-e", type=str, help="Optional execution ID to inspect." ) @click.option("--egn-id", "-g", type=str, help="Optional task execution ID to inspect.") @click.option( "--container-index", "-c", type=int, help="Optional container index to inspect (only used for Map Tasks)", ) @requires_login def execute( execution_id: Optional[str], egn_id: Optional[str], container_index: Optional[int] ): """Drops the user into an interactive shell from within a task.""" from latch_cli.services.execute.main import exec exec(execution_id=execution_id, egn_id=egn_id, container_index=container_index) @main.command("register") @click.argument("pkg_root", type=click.Path(exists=True, file_okay=False)) @click.option( "-d", "--disable-auto-version", is_flag=True, default=False, type=bool, help=( "Whether to automatically bump the version of the workflow each time register" " is called." ), ) @click.option( "--remote/--no-remote", is_flag=True, default=True, type=bool, help="Use a remote server to build workflow.", ) @click.option( "--docker-progress", type=click.Choice(["plain", "tty", "auto"], case_sensitive=False), default="auto", help=( "`tty` shows only the last N lines of the build log. `plain` does no special" " handling. `auto` chooses `tty` when stdout is a terminal and `plain`" " otherwise. Equivalent to Docker's `--progress` flag." ), ) @click.option( "-y", "--yes", is_flag=True, default=False, type=bool, help="Skip the confirmation dialog.", ) @click.option( "--open", "-o", is_flag=True, default=False, type=bool, help="Open the registered workflow in the browser.", ) @click.option( "--snakefile", type=click.Path(exists=True, dir_okay=False, path_type=Path), default=None, help="Path to a Snakefile to register.", ) @click.option( "--cache-tasks/--no-cache-tasks", "-c/-C", is_flag=True, default=False, type=bool, help=( "Whether or not to cache snakemake tasks. Ignored if --snakefile is not" " provided." ), ) @requires_login def register( pkg_root: str, disable_auto_version: bool, remote: bool, docker_progress: str, yes: bool, open: bool, snakefile: Optional[Path], cache_tasks: bool, ): """Register local workflow code to Latch. Visit docs.latch.bio to learn more. """ use_new_centromere = os.environ.get("LATCH_REGISTER_BETA") is not None crash_handler.message = "Unable to register workflow." crash_handler.pkg_root = pkg_root from latch_cli.services.register import register register( pkg_root, disable_auto_version=disable_auto_version, remote=remote, skip_confirmation=yes, open=open, snakefile=snakefile, progress_plain=(docker_progress == "auto" and not sys.stdout.isatty()) or docker_progress == "plain", use_new_centromere=use_new_centromere, cache_tasks=cache_tasks, ) @main.command("launch") @click.argument("params_file", nargs=1, type=click.Path(exists=True)) @click.option( "--version", default=None, help="The version of the workflow to launch. Defaults to latest.", ) @requires_login def launch(params_file: Path, version: Union[str, None] = None): """Launch a workflow using a python parameter map.""" crash_handler.message = f"Unable to launch workflow" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.launch import launch wf_name = launch(params_file, version) if version is None: version = "latest" click.secho( f"Successfully launched workflow named {wf_name} with version {version}.", fg="green", ) @main.command("get-params") @click.argument("wf_name", nargs=1) @click.option( "--version", default=None, help="The version of the workflow. Defaults to latest.", ) @requires_login def get_params(wf_name: Union[str, None], version: Union[str, None] = None): """Generate a python parameter map for a workflow.""" crash_handler.message = "Unable to generate param map for workflow" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.get_params import get_params get_params(wf_name, version) if version is None: version = "latest" click.secho( f"Successfully generated python param map named {wf_name}.params.py with" f" version {version}\n Run `latch launch {wf_name}.params.py` to launch it.", fg="green", ) @main.command("get-wf") @click.option( "--name", default=None, help="The name of the workflow to list. Will display all versions", ) @requires_login def get_wf(name: Union[str, None] = None): """List workflows.""" crash_handler.message = "Unable to get workflows" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.get import get_wf wfs = get_wf(name) id_padding, name_padding, version_padding = 0, 0, 0 for wf in wfs: id, name, version = wf id_len, name_len, version_len = len(str(id)), len(name), len(version) id_padding = max(id_padding, id_len) name_padding = max(name_padding, name_len) version_padding = max(version_padding, version_len) # TODO(ayush): make this much better click.secho( f"ID{id_padding * ' '}\tName{name_padding * ' '}\tVersion{version_padding * ' '}" ) for wf in wfs: click.secho( f"{wf[0]}{(id_padding - len(str(wf[0]))) * ' '}\t{wf[1]}{(name_padding - len(wf[1])) * ' '}\t{wf[2]}{(version_padding - len(wf[2])) * ' '}" ) @main.command("preview") @click.argument("pkg_root", nargs=1, type=click.Path(exists=True, path_type=Path)) @requires_login def preview(pkg_root: Path): """Creates a preview of your workflow interface.""" crash_handler.message = f"Unable to preview inputs for {pkg_root}" crash_handler.pkg_root = str(pkg_root) from latch_cli.services.preview import preview preview(pkg_root) @main.command("get-executions") @requires_login def get_executions(): """Spawns an interactive terminal UI that shows all executions in a given workspace""" crash_handler.message = "Unable to fetch executions" from latch_cli.services.get_executions import get_executions get_executions() """ LDATA COMMANDS """ @main.command("cp") @click.argument("src", shell_complete=cp_complete, nargs=-1) @click.argument("dest", shell_complete=cp_complete, nargs=1) @click.option( "--progress", help="Type of progress information to show while copying", type=EnumChoice(_Progress, case_sensitive=False), default="tasks", show_default=True, ) @click.option( "--verbose", "-v", help="Print file names as they are copied", is_flag=True, default=False, show_default=True, ) @click.option( "--no-glob", "-G", help="Don't expand globs in remote paths", is_flag=True, default=False, show_default=True, ) @requires_login def cp( src: List[str], dest: str, progress: _Progress, verbose: bool, no_glob: bool, ): """Copy files between Latch Data and local, or between two Latch Data locations. Behaves like `cp -R` in Unix. Directories are copied recursively. If any parents of dest do not exist, the copy will fail.""" crash_handler.message = f"Unable to copy {src} to {dest}" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.cp.main import cp cp( src, dest, progress=progress, verbose=verbose, expand_globs=not no_glob, ) @main.command("mv") @click.argument("src", shell_complete=remote_complete, nargs=1) @click.argument("dest", shell_complete=remote_complete, nargs=1) @click.option( "--no-glob", "-G", help="Don't expand globs in remote paths", is_flag=True, default=False, show_default=True, ) @requires_login def mv(src: str, dest: str, no_glob: bool): """Move remote files in LatchData.""" crash_handler.message = f"Unable to move {src} to {dest}" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.move import move move(src, dest, no_glob=no_glob) @main.command("ls") @click.option( "--group-directories-first", "--gdf", help="List directories before files.", is_flag=True, default=False, ) @click.argument("paths", nargs=-1, shell_complete=remote_complete) @requires_login def ls(paths: Tuple[str], group_directories_first: bool): """ List the contents of a Latch Data directory """ crash_handler.message = f"Unable to display contents of {paths}" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.ls import ls # If the user doesn't provide any arguments, default to root if len(paths) == 0: paths = ("/",) for path in paths: if len(paths) > 1: click.echo(f"{path}:") ls( path, group_directories_first=group_directories_first, ) if len(paths) > 1: click.echo("") @main.command("rmr") @click.argument("remote_path", nargs=1, type=str) @click.option( "-y", "--yes", is_flag=True, default=False, type=bool, help="Skip the confirmation dialog.", ) @click.option( "--no-glob", "-G", help="Don't expand globs in remote paths", is_flag=True, default=False, show_default=True, ) @click.option( "--verbose", "-v", help="Print all files when deleting", is_flag=True, default=False, show_default=True, ) @requires_login def rmr(remote_path: str, yes: bool, no_glob: bool, verbose: bool): """Deletes a remote entity.""" crash_handler.message = f"Unable to delete {remote_path}" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.rm import rmr rmr(remote_path, skip_confirmation=yes, no_glob=no_glob, verbose=verbose) @main.command("mkdirp") @click.argument("remote_directory", nargs=1, type=str) @requires_login def mkdir(remote_directory: str): """Creates a new remote directory.""" crash_handler.message = f"Unable to create directory {remote_directory}" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.mkdir import mkdirp mkdirp(remote_directory) @main.command("sync") @click.argument("srcs", nargs=-1) @click.argument("dst", nargs=1) @click.option( "--delete", help="Delete extraneous files from destination.", is_flag=True, default=False, ) @click.option( "--ignore-unsyncable", help=( "Synchronize even if some source paths do not exist or refer to special files." ), is_flag=True, default=False, ) @requires_login def sync(srcs: List[str], dst: str, delete: bool, ignore_unsyncable: bool): """ Update the contents of a remote directory with local data. """ from latch_cli.services.sync import sync # todo(maximsmol): remote -> local # todo(maximsmol): remote -> remote sync( srcs, dst, delete=delete, ignore_unsyncable=ignore_unsyncable, ) """ POD COMMANDS """ @main.group() def pods(): """Manage pods""" pass @pods.command("stop") @click.argument("pod_id", nargs=1, type=int, required=False) @requires_login def stop_pod(pod_id: Optional[int] = None): """Stops a pod given a pod_id or the pod from which the command is run""" crash_handler.message = "Unable to stop pod" from latch_cli.services.stop_pod import stop_pod if pod_id is None: id_path = Path("/root/.latch/id") try: pod_id = int(id_path.read_text().strip("\n")) except Exception as e: if isinstance(e, FileNotFoundError): err_str = f"Pod ID not found at `{id_path}`" elif isinstance(e, ValueError): err_str = f"Could not parse Pod ID at `{id_path}`" else: err_str = f"Error reading Pod ID from `{id_path}`" click.secho( f"{err_str} -- please provide a Pod ID as a command line argument.", fg="red", ) return stop_pod(pod_id) """ TEST DATA COMMANDS """ @main.group(invoke_without_command=True) @click.version_option(package_name="latch") @click.pass_context def test_data(ctx: click.Context): """Subcommands to upload and delete test data objects.""" if ctx.invoked_subcommand is None: click.secho(f"{ctx.get_help()}") @test_data.command("upload") @click.argument("src_path", nargs=1, type=click.Path(exists=True)) @click.option( "--dont-confirm-overwrite", "-d", is_flag=True, default=False, type=bool, help="Automatically overwrite any files without asking for confirmation.", ) @requires_login def test_data_upload(src_path: str, dont_confirm_overwrite: bool): """Upload test data object.""" crash_handler.message = f"Unable to upload {src_path} to managed bucket" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.test_data.upload import upload s3_url = upload(src_path, dont_confirm_overwrite) click.secho(f"Successfully uploaded to {s3_url}", fg="green") @test_data.command("remove") @click.argument("object_url", nargs=1, type=str) @requires_login def test_data_remove(object_url: str): """Remove test data object.""" crash_handler.message = f"Unable to remove {object_url} from managed bucket" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.test_data.remove import remove remove(object_url) click.secho(f"Successfully removed {object_url}", fg="green") @test_data.command("ls") @requires_login def test_data_ls(): """List test data objects.""" crash_handler.message = f"Unable to list objects within managed bucket" crash_handler.pkg_root = str(Path.cwd()) from latch_cli.services.test_data.ls import ls objects = ls() click.secho("Listing your managed objects by full S3 path.\n", fg="green") for o in objects: print(f"\ts3://latch-public/{o}")