Source code for latch_cli.services.test_data.remove

"""Service to remove test objects from a managed bucket."""

import boto3
import botocore

from latch_cli.services.test_data.utils import _retrieve_creds

BUCKET = "latch-public"


[docs]def remove(object_url: str): """Removes an object from a managed bucket. Args: object_url: The url of the object to remove. Example: >>> remove("s3://latch-public/1/foo.txt") # Removes the object at this path from your managed bucket prefix. """ session_token, access_key, secret_key, account_id = _retrieve_creds() s3_resource = boto3.resource( "s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token, ) object_key = object_url[len(f"s3://{BUCKET}/") :] try: s3_resource.Object(BUCKET, object_key).load() except botocore.exceptions.ClientError: raise ValueError(f"{object_url} does not exist") s3_resource.Object(BUCKET, object_key).delete()