In this example we will see the basic operations that we can do with the cube and the files
Installation
pip install boto3
Connection
from boto3 import client
from os import getenv
clientS3 = client("s3",
aws_access_key_id=getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=getenv("AWS_SECRET_ACCESS_KEY"),
region_name=getenv("REGION_NAME")
)
Methods for buckets
Create bucket
from botocore.exceptions import ClientError
def create_bucket(bucket: str):
try:
response = clientS3.create_bucket(Bucket=bucket)
return response
except ClientError as e:
return e.response["Error"]
Delete bucket
from botocore.exceptions import ClientError
def delete_bucket(bucket: str):
try:
response = clientS3.delete_bucket(Bucket=bucket)
return response
except ClientError as e:
return e.response["Error"]
List all buckets
from botocore.exceptions import ClientError
def list_buckets():
try:
response = clientS3.list_buckets()
return response
except ClientError as e:
return e.response["Error"]
Methods for files
Upload file
from typing import BinaryIO
from botocore.exceptions import ClientError
def upload_file(data: BinaryIO, bucket: str, filename: str):
try:
clientS3.upload_fileobj(Fileobj=data, Bucket=bucket, Key=filename)
return "success"
except ClientError as e:
return e.response["Error"]
Get file
from typing import BinaryIO
from botocore.exceptions import ClientError
def get_file(bucket: str, filename: str):
try:
response = clientS3.get_object(Bucket=bucket, Key=filename)
return response
except ClientError as e:
return e.response["Error"]
Delete file
from typing import BinaryIO
from botocore.exceptions import ClientError
def delete_file(bucket: str, filename: str):
try:
response = clientS3.delete_object(Bucket=bucket, Key=filename)
return response
except ClientError as e:
return e.response["Error"]
Top comments (0)