| import boto3 | |
| from botocore.exceptions import ClientError | |
| from datetime import datetime | |
| from typing import Optional, List | |
| from .models import User, FileUpload | |
| dynamodb = boto3.resource('dynamodb') | |
| users_table = dynamodb.Table('Users') | |
| files_table = dynamodb.Table('Files') | |
| async def get_user_by_username(username: str) -> Optional[User]: | |
| try: | |
| response = users_table.get_item(Key={'username': username}) | |
| if 'Item' in response: | |
| return User(**response['Item']) | |
| return None | |
| except ClientError: | |
| return None | |
| async def create_user(user: User) -> bool: | |
| try: | |
| users_table.put_item( | |
| Item={ | |
| 'username': user.username, | |
| 'email': user.email, | |
| 'password': user.password, | |
| }, | |
| ConditionExpression='attribute_not_exists(username)' | |
| ) | |
| return True | |
| except ClientError: | |
| return False | |
| async def save_file(username: str, file_upload: FileUpload) -> bool: | |
| try: | |
| files_table.put_item( | |
| Item={ | |
| 'username': username, | |
| 'filename': file_upload.filename, | |
| 'content': file_upload.content, | |
| 'created_at': datetime.utcnow().isoformat(), | |
| 'updated_at': datetime.utcnow().isoformat() | |
| } | |
| ) | |
| return True | |
| except ClientError: | |
| return False | |
| async def get_user_files(username: str) -> List[FileUpload]: | |
| try: | |
| response = files_table.query( | |
| KeyConditionExpression='username = :username', | |
| ExpressionAttributeValues={':username': username} | |
| ) | |
| return [FileUpload(**item) for item in response.get('Items', [])] | |
| except ClientError: | |
| return [] |