Spaces:
Runtime error
Runtime error
| """ | |
| Utility that uses boto to create buckets. | |
| This work is not our own but is entirely written by https://github.com/full-stack-deep-learning. | |
| """ | |
| import hashlib | |
| import json | |
| import boto3 | |
| import botocore | |
| S3_URL_FORMAT = "https://{bucket}.s3.{region}.amazonaws.com/{key}" | |
| S3_URI_FORMAT = "s3://{bucket}/{key}" | |
| s3 = boto3.resource("s3") | |
| def get_or_create_bucket(name): | |
| """Gets an S3 bucket with boto3 or creates it if it doesn't exist.""" | |
| try: # try to create a bucket | |
| name, response = _create_bucket(name) | |
| except botocore.exceptions.ClientError as err: | |
| # error handling from https://github.com/boto/boto3/issues/1195#issuecomment-495842252 | |
| status = err.response["ResponseMetadata"][ | |
| "HTTPStatusCode" | |
| ] # status codes identify particular errors | |
| if status == 409: # if the bucket exists already, | |
| pass # we don't need to make it -- we presume we have the right permissions | |
| else: | |
| raise err | |
| bucket = s3.Bucket(name) | |
| return bucket | |
| def _create_bucket(name): | |
| """Creates a bucket with the provided name.""" | |
| session = boto3.session.Session() # sessions hold on to credentials and config | |
| current_region = session.region_name # so we can pull the default region | |
| bucket_config = {"LocationConstraint": current_region} # and apply it to the bucket | |
| bucket_response = s3.create_bucket( | |
| Bucket=name, CreateBucketConfiguration=bucket_config | |
| ) | |
| return name, bucket_response | |
| def make_key(fileobj, filetype=None): | |
| """Creates a unique key for the fileobj and optionally append the filetype.""" | |
| identifier = make_identifier(fileobj) | |
| if filetype is None: | |
| return identifier | |
| else: | |
| return identifier + "." + filetype | |
| def make_unique_bucket_name(prefix, seed): | |
| """Creates a unique bucket name from a prefix and a seed.""" | |
| name = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:10] | |
| return prefix + "-" + name | |
| def get_url_of(bucket, key=None): | |
| """Returns the url of a bucket and optionally of an object in that bucket.""" | |
| if not isinstance(bucket, str): | |
| bucket = bucket.name | |
| region = _get_region(bucket) | |
| key = key or "" | |
| url = _format_url(bucket, region, key) | |
| return url | |
| def get_uri_of(bucket, key=None): | |
| """Returns the s3:// uri of a bucket and optionally of an object in that bucket.""" | |
| if not isinstance(bucket, str): | |
| bucket = bucket.name | |
| key = key or "" | |
| uri = _format_uri(bucket, key) | |
| return uri | |
| def enable_bucket_versioning(bucket): | |
| """Turns on versioning for bucket contents, which avoids deletion.""" | |
| if not isinstance(bucket, str): | |
| bucket = bucket.name | |
| bucket_versioning = s3.BucketVersioning(bucket) | |
| return bucket_versioning.enable() | |
| def add_access_policy(bucket): | |
| """Adds a policy to our bucket that allows the Gantry app to access data.""" | |
| access_policy = json.dumps(_get_policy(bucket.name)) | |
| s3.meta.client.put_bucket_policy(Bucket=bucket.name, Policy=access_policy) | |
| def _get_policy(bucket_name): | |
| """Returns a bucket policy allowing Gantry app access as a JSON-compatible dictionary.""" | |
| return { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| { | |
| "Effect": "Allow", | |
| "Principal": { | |
| "AWS": [ | |
| "arn:aws:iam::848836713690:root", | |
| "arn:aws:iam::339325199688:root", | |
| "arn:aws:iam::665957668247:root", | |
| ] | |
| }, | |
| "Action": ["s3:GetObject", "s3:GetObjectVersion"], | |
| "Resource": f"arn:aws:s3:::{bucket_name}/*", | |
| }, | |
| { | |
| "Effect": "Allow", | |
| "Principal": { | |
| "AWS": [ | |
| "arn:aws:iam::848836713690:root", | |
| "arn:aws:iam::339325199688:root", | |
| "arn:aws:iam::665957668247:root", | |
| ] | |
| }, | |
| "Action": "s3:ListBucketVersions", | |
| "Resource": f"arn:aws:s3:::{bucket_name}", | |
| }, | |
| ], | |
| } | |
| def make_identifier(byte_data): | |
| """Create a unique identifier for a collection of bytes via hashing.""" | |
| # feed them to hashing algo -- security is not critical here, so we use SHA-1 | |
| hashed_data = hashlib.sha1(byte_data) # noqa: S3 | |
| identifier = hashed_data.hexdigest() # turn it into hexdecimal | |
| return identifier | |
| def _get_region(bucket): | |
| """Determine the region of an s3 bucket.""" | |
| if not isinstance(bucket, str): | |
| bucket = bucket.name | |
| s3_client = boto3.client("s3") | |
| bucket_location_response = s3_client.get_bucket_location(Bucket=bucket) | |
| bucket_location = bucket_location_response["LocationConstraint"] | |
| return bucket_location | |
| def _format_url(bucket_name, region, key=None): | |
| key = key or "" | |
| url = S3_URL_FORMAT.format(bucket=bucket_name, region=region, key=key) | |
| return url | |
| def _format_uri(bucket_name, key=None): | |
| key = key or "" | |
| uri = S3_URI_FORMAT.format(bucket=bucket_name, key=key) | |
| return uri | |