Source code for latch_cli.services.register.utils

import base64
import contextlib
import importlib.machinery as im
import importlib.util as iu
import io
import os
import sys
import typing
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    TypedDict,
    Union,
)

import boto3
import docker
import requests
from latch_sdk_config.latch import config

from latch.utils import current_workspace

if TYPE_CHECKING:
    from ...centromere.ctx import _CentromereCtx
else:
    _CentromereCtx = ""


# todo(maximsmol): only login if the credentials are expired
def _docker_login(ctx: _CentromereCtx):
    assert ctx.dkr_client is not None

    headers = {"Authorization": f"Bearer {ctx.token}"}
    data = {"pkg_name": ctx.image, "ws_account_id": current_workspace()}
    response = requests.post(ctx.latch_image_api_url, headers=headers, json=data)

    try:
        response = response.json()
        access_key = response["tmp_access_key"]
        secret_key = response["tmp_secret_key"]
        session_token = response["tmp_session_token"]
    except KeyError as err:
        raise ValueError(f"malformed response on image upload: {response}") from err

    try:
        client = boto3.session.Session(
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            aws_session_token=session_token,
            region_name="us-west-2",
        ).client("ecr")
        token = client.get_authorization_token()["authorizationData"][0][
            "authorizationToken"
        ]
    except Exception as err:
        raise ValueError(
            f"unable to retreive an ecr login token for user {ctx.account_id}"
        ) from err

    auth = ctx.dkr_client._auth_configs
    store_name = auth.get_credential_store(ctx.dkr_repo)
    if store_name is not None:
        store = auth._get_store_instance(store_name)
        try:
            store.erase(ctx.dkr_repo)
        # To handle: "Credentials store docker-credential-osxkeychain exited
        # with "The specified item could not be found in the keychain.""
        except docker.credentials.errors.StoreError:
            pass

    user, password = base64.b64decode(token).decode("utf-8").split(":")
    res = ctx.dkr_client.login(
        username=user, password=password, registry=ctx.dkr_repo, reauth=True
    )
    assert res["Status"] == "Login Succeeded"


[docs]class DockerBuildLogItem(TypedDict): message: Optional[str] error: Optional[str] stream: Optional[str]
[docs]def build_image( ctx: _CentromereCtx, image_name: str, context_path: Path, dockerfile: Optional[Path] = None, ) -> Iterable[DockerBuildLogItem]: assert ctx.dkr_client is not None _docker_login(ctx) build_logs = ctx.dkr_client.build( path=str(context_path), dockerfile=str(dockerfile) if dockerfile is not None else None, buildargs={"tag": f"{ctx.dkr_repo}/{image_name}"}, tag=f"{ctx.dkr_repo}/{image_name}", decode=True, ) return build_logs
[docs]def upload_image(ctx: _CentromereCtx, image_name: str) -> List[str]: assert ctx.dkr_client is not None return ctx.dkr_client.push( repository=f"{ctx.dkr_repo}/{image_name}", stream=True, decode=True, )
[docs]def serialize_pkg_in_container( ctx: _CentromereCtx, image_name: str, serialize_dir: str ) -> Tuple[List[str], str]: assert ctx.dkr_client is not None _serialize_cmd = ["make", "serialize"] container = ctx.dkr_client.create_container( f"{ctx.dkr_repo}/{image_name}", command=_serialize_cmd, volumes=[serialize_dir], environment={"LATCH_DKR_REPO": ctx.dkr_repo, "LATCH_VERSION": ctx.version}, host_config=ctx.dkr_client.create_host_config( binds={ serialize_dir: { "bind": "/tmp/output", "mode": "rw", }, } ), ) container_id = typing.cast(str, container.get("Id")) ctx.dkr_client.start(container_id) logs = typing.cast(Iterable[bytes], ctx.dkr_client.logs(container_id, stream=True)) return [x.decode("utf-8") for x in logs], container_id
[docs]def register_serialized_pkg( files: List[Path], token: Optional[str], version: str, workspace_id: str, latch_register_url: str = config.api.workflow.register, ) -> object: if token is None: token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", "") if token != "": headers = {"Authorization": f"Latch-Execution-Token {token}"} else: raise OSError( "The environment variable FLYTE_INTERNAL_EXECUTION_ID does not exist" ) else: headers = {"Authorization": f"Bearer {token}"} serialize_files: Dict[str, Union[bytes, io.BufferedReader]] = { "version": version.encode("utf-8"), ".latch_ws": workspace_id.encode("utf-8"), } with contextlib.ExitStack() as stack: for file in files: fh = open(file, "rb") stack.enter_context(fh) serialize_files[fh.name] = fh response = requests.post( latch_register_url, headers=headers, files=serialize_files, ) response.raise_for_status() return response.json()
[docs]def import_module_by_path(x: Path, *, module_name: str = "latch_metadata"): spec = iu.spec_from_file_location(module_name, x) assert spec is not None assert spec.loader is not None module = iu.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) return module