Source code for latch_cli.services.sync

import os
import stat
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import click
import dateutil.parser as dp
import gql
from gql.transport.exceptions import TransportQueryError
from latch_sdk_gql.execute import JsonValue, execute

import latch.ldata._transfer.upload as _upl
from latch_cli.utils.path import is_remote_path


[docs]def upload_file(src: Path, dest: str): start = _upl.start_upload(src, dest) if start is None: return parts: List[_upl.CompletedPart] = [] for idx, url in enumerate(start.urls): parts.append( _upl.upload_file_chunk( src, url, idx, start.part_size, ) ) _upl.end_upload(dest, start.upload_id, parts)
[docs]def check_src(p: Path, *, indent: str = "") -> Optional[Tuple[Path, os.stat_result]]: try: p_stat = os.stat(p) except FileNotFoundError: click.secho(indent + f"`{p}`: no such file or directory", fg="red", bold=True) return if not stat.S_ISREG(p_stat.st_mode) and not stat.S_ISDIR(p_stat.st_mode): click.secho(indent + f"`{p}`: not a regular file", fg="red", bold=True) return return (p, p_stat)
[docs]def sync_rec( srcs: Dict[str, Tuple[Path, os.stat_result]], dest: str, *, delete: bool, level: int = 0, ): # rsync never deletes from the top level destination delete_effective = delete and level > 0 indent = " " * level try: query = """ query LatchCLISync($argPath: String! ${name_filter_arg}) { ldataResolvePathData(argPath: $argPath) { finalLinkTarget { type childLdataTreeEdges( filter: { child: { removed: {equalTo: false}, pending: {equalTo: false}, copiedFrom: {isNull: true} ${name_filter} } } ) { nodes { child { id name finalLinkTarget { type ldataNodeEvents( condition: {type: INGRESS}, orderBy: TIME_DESC, first: 1 ) { nodes { time } } } } } } } } } """ args: JsonValue = {"argPath": dest, "nameFilter": []} if not delete_effective: query = query.replace("${name_filter_arg}", ", $nameFilter: [String!]") query = query.replace("${name_filter}", ", name: {in: $nameFilter}") args["nameFilter"] = list(srcs.keys()) else: query = query.replace("${name_filter_arg}", "") query = query.replace("${name_filter}", "") resolve_data = execute( gql.gql(query), args, )["ldataResolvePathData"] dest_data = None if resolve_data is not None: dest_data = resolve_data["finalLinkTarget"] except TransportQueryError as e: if e.errors is None or len(e.errors) == 0: raise msg: str = e.errors[0]["message"] raise if len(srcs) == 0: if dest_data is not None: if dest_data["type"] != "DIR": click.secho( indent + f"`{dest}` is in the way of a directory", fg="red", ) return click.secho(indent + "Empty directory", dim=True) return if not dest[-1] == "/": dest += "/" click.secho(indent + "Creating empty directory", fg="bright_blue") execute( gql.gql(""" mutation LatchCLISyncMkdir($argPath: String!) { ldataMkdirp(input: {argPath: $argPath}) { clientMutationId } } """), {"argPath": dest}, ) return if ( (len(srcs) > 1 or stat.S_ISDIR(list(srcs.values())[0][1].st_mode)) and dest_data is not None and dest_data["type"] not in {"DIR", "ACCOUNT_ROOT"} ): click.secho(f"`{dest}` is not a directory", fg="red", bold=True) click.secho("\nOnly a single file can be synced with a file", fg="red") sys.exit(1) if dest_data is not None and dest_data["type"] not in {"DIR", "ACCOUNT_ROOT"}: # todo(maximsmol): implement click.secho( "Syncing single files is currently not supported", bold=True, fg="red" ) sys.exit(1) dest_children_by_name = ( { x["name"]: x for x in (raw["child"] for raw in dest_data["childLdataTreeEdges"]["nodes"]) } if dest_data is not None else {} ) for name, (p, p_stat) in srcs.items(): is_dir = stat.S_ISDIR(p_stat.st_mode) child = dest_children_by_name.get(name) child_dest = f"{dest}/{name}" skip = False verb = "Uploading" reason = "new" if child is not None: flt = child["finalLinkTarget"] if flt["type"] == "DIR" and not is_dir: # todo(maximsmol): confirm? pre-check? click.secho( indent + f"`{dest}` is in the way of a file", fg="red", ) continue if flt["type"] != "DIR" and is_dir: # todo(maximsmol): confirm? pre-check? click.secho( indent + f"`{dest}` is in the way of a directory", fg="red", ) continue if flt["type"] == "OBJ": remote_mtime = dp.isoparse(flt["ldataNodeEvents"]["nodes"][0]["time"]) local_mtime = datetime.fromtimestamp(p_stat.st_mtime).astimezone() if remote_mtime == local_mtime: verb = "Skipping" reason = "unmodified" skip = True elif remote_mtime > local_mtime: verb = "Skipping" reason = "older" skip = True else: verb = "Uploading" reason = "updated" else: reason = "existing" if verb == "Uploading" and is_dir: verb = "Syncing" fg = "bright_blue" dim = None if verb == "Skipping": fg = None dim = True click.echo( click.style( indent + verb + " ", fg=fg, dim=dim, ) + click.style( reason, underline=True, fg=fg, dim=dim, ) + click.style( ": ", fg=fg, dim=dim, ) + click.style( str(p) + ("" if not is_dir else "/") + ("" if skip else click.style(" -> ", dim=True) + child_dest), dim=dim, ) ) if skip: continue if is_dir: sub_srcs: Dict[str, Tuple[Path, os.stat_result]] = {} for x in p.iterdir(): res = check_src(x, indent=indent + " ") if res is None: # todo(maximsmol): pre-check or confirm? continue sub_srcs[x.name] = res sync_rec(sub_srcs, child_dest, delete=delete, level=level + 1) continue # todo(maximsmol): upload in parallel? upload_file(p, child_dest) if delete_effective: for name, child in dest_children_by_name.items(): child_dest = f"{dest}/{name}" if name in srcs: continue click.echo( indent + click.style("Removing extraneous: ", fg="yellow") + child_dest ) execute( gql.gql(""" mutation LatchCLISyncRemove($argNodeId: BigInt!) { ldataRmr(input: {argNodeId: $argNodeId}) { clientMutationId } } """), {"argNodeId": child["id"]}, )
[docs]def sync( srcs_raw: List[str], dest: str, *, delete: bool, ignore_unsyncable: bool, ): if not is_remote_path(dest): click.secho( f"`{dest}`: only local -> remote sync is supported", fg="red", bold=True ) raise click.exceptions.Exit(1) srcs: Dict[str, Tuple[Path, os.stat_result]] = {} have_errors = False for x in srcs_raw: if is_remote_path(x): click.secho( f"`{x}`: only local -> remote sync is supported", fg="red", bold=True ) have_errors = True continue p = Path(x) res = check_src(p) if res is None: have_errors = True continue srcs[p.name] = res if len(srcs) == 0: click.secho( "\nAll source paths were skipped due to errors", fg="red", bold=True ) raise click.exceptions.Exit(1) if have_errors: # todo(maximsmol): do we want to precheck recursively? click.secho("\nSome source paths will be skipped due to errors", fg="red") if not ignore_unsyncable: if not click.confirm(click.style(f"Proceed?", fg="red")): sys.exit(1) else: click.secho( "Proceeding due to " + click.style("`--ignore-unsyncable`", bold=True), fg="yellow", ) click.echo() sync_rec(srcs, dest, delete=delete)