DEV Community

Deepak Poudel
Deepak Poudel

Posted on

Event Driven Shared Drive in AWS

This blog outlines the architecture and setup for Network File Sharing, and Event-Driven Processing using AWS services. The primary components include an Amazon EC2 instance configured as an FTP server with Amazon EFS mounted for shared storage, network file sharing enabled via SAMBA, and event-driven processing handled through AWS Lambda functions triggered by AWS EventBridge. The objective is to create a scalable, secure, and automated environment for file storage, sharing, and processing.

Image description

Fig: AWS Architecture Diagram

Create a EC2 instance and mount a NFS Drive

**Mount EFS on the EC2 Instance**
Enter fullscreen mode Exit fullscreen mode
  1. SSH into the EC2 instance and install NFS utilities:

    `sudo yum install -y amazon-efs-utils`
    
  2. Create a directory for mounting the EFS:

    `sudo yum install -y amazon-efs-utils`
    
  3. Mount the EFS using the file system ID:

    `sudo mount -t efs -o tls fs-XXXXXXXX:/ /mnt/efs`
    
  4. Add an entry to /etc/fstab to ensure EFS is remounted on reboot:

    `echo "fs-XXXXXXXX:/ /mnt/efs efs _netdev,tls 0 0" | sudo tee -a /etc/fstab`
    
  5. Check if the EFS is successfully mounted:

    `df -h`
    

Then Setup Samba so that windows devices can directly add network drive

  1. Install SAMBA on the EC2 Instance:

    sudo yum install -y samba samba-client samba-common

  2. Backup the default SAMBA configuration file:

    sudo cp /etc/samba/smb.conf /etc/samba/smb.conf.bak
    
  3. Edit the SAMBA Configuration:

    sudo nano /etc/samba/smb.conf

The following settings are configured under the [global] section:

[global]
workgroup = WORKGROUP
server string = Samba Server
netbios name = ftp-server
security = user
map to guest = bad user
Enter fullscreen mode Exit fullscreen mode

Following share configuration is added to allow Windows clients to access the EFS directory:

[EFS_Share]
path = /mnt/efs
browseable = yes
writable = yes
guest ok = yes
create mask = 0755
directory mask = 0755
Enter fullscreen mode Exit fullscreen mode
  1. Start the SAMBA services to apply the configuration:

    sudo systemctl start smb
    sudo systemctl start nmb
    
  2. Enable SAMBA services to start on boot:

    sudo systemctl enable smb
    

Now when the file is uploaded to the FTP server or on via Samba, We need a REPL that checks for changes and sends them for processing \
\
#!/bin/bash

# Set variables
SOURCE_DIR="/mnt/efs/fs1"
S3_BUCKET="s3://backup-efs-ftp-bucketffa/"
LOG_FILE="/home/ec2-user/upload_to_s3.log"
DEBOUNCE_DELAY=30  # Delay in seconds for file stability check

# Function to log messages
log_message() {
   echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}

# Function to check if file size is stable and not locked
is_file_stable() {
   local file="$1"
   local prev_size=$(stat -c%s "$file")
   log_message "Initial size of '$file': $prev_size bytes"


   # Sleep for the debounce delay
   sleep "$DEBOUNCE_DELAY"


   local new_size=$(stat -c%s "$file")
   log_message "Size of '$file' after sleep: $new_size bytes"


   # Check if the file size is stable
   if [ "$prev_size" -eq "$new_size" ]; then
       log_message "Size of '$file' after sleep didn't changed."
       # Now check if the file is locked
       if lsof "$file" &>/dev/null; then
           log_message "File '$file' is locked after stability check."
           return 1  # File is locked
       else
           log_message "File '$file' is stable and not locked."
           return 0  # File is stable and not locked
       fi
   else
       log_message "File '$file' size changed during stability check."
       return 1  # File is still changing
   fi
}

# Function to upload file to S3
upload_to_s3() {
   local file="$1"
   local full_path="$SOURCE_DIR/$file"


   # Check if the file exists
   if [ ! -f "$full_path" ]; then
       log_message "File '$full_path' does not exist. Skipping upload."
       return
   fi


   # Ensure the file size is stable and not locked
   if ! is_file_stable "$full_path"; then
       log_message "File '$full_path' is still changing or locked. Delaying processing."
       return
   fi


   # Create destination path for S3
   local s3_path="${S3_BUCKET}${file}"

   # Upload file to S3
   log_message "Attempting to upload '$full_path' to S3 path '$s3_path'..."
   if aws s3 cp "$full_path" "$s3_path" --acl bucket-owner-full-control; then
       log_message "Successfully uploaded '$file' to S3"
   else
       log_message "Failed to upload '$file' to S3. Error code: $?"
   fi
}

# Main loop to monitor directory recursively
log_message "Starting to monitor '$SOURCE_DIR' for new files..."
inotifywait -m -r --format '%w%f' -e close_write -e moved_to "$SOURCE_DIR" |
while read -r full_path; do
   # Clean up the filename to remove unwanted characters
   clean_filename=$(basename "$full_path")

   # Debugging information
   echo "Detected full path: '$full_path'"
   echo "Cleaned filename: '$clean_filename'"

   # Log detected file
   log_message "Detected new file: '$full_path'"

   # Ignore temporary or partial files
   if [[ "$clean_filename" != .* ]] && [[ "$clean_filename" != *.part ]] && [[ "$clean_filename" != *.tmp ]]; then
       # Wait for the debounce delay before uploading
       if is_file_stable "$full_path"; then
           upload_to_s3 "${full_path#$SOURCE_DIR/}"  # Remove SOURCE_DIR from the path
       else
           log_message "File '$full_path' is still locked or changing. Ignoring this upload attempt."
       fi
   else
       log_message "Ignoring temporary or partial file: '$full_path'"
   fi
done
Enter fullscreen mode Exit fullscreen mode

Now Let’s Move Forward to Event Driven Architecture.

In our case let’s unzip uploaded files if they are zipped.

Image description

This is the code for our triggering AWS Lambda for each file upload. If the uploaded file is

import json
import boto3
from handler_run_task import run_ecs_task

s3 = boto3.client('s3')

def lambda_handler(event, context):
    try:
        # Get bucket name and object key from the event
        source_bucket = event['Records'][0]['s3']['bucket']['name']
        object_key = event['Records'][0]['s3']['object']['key']

        # Define ECS cluster and task details
        cluster_name = 'unzip-test-cluster'  # Replace with your ECS cluster name
        task_family = 'ple-family'  # Replace with your ECS task family
        container_name = 'test-container'  # Replace with your container name

        # Define the overrides for the ECS task
        overrides = {
            'environment': [
                {
                    "name": "BUCKET_NAME",
                    "value": source_bucket
                },
                {
                    "name": "KEY",
                    "value": object_key
                }
            ]
        }

        # Run ECS Task
        ecs_response = run_ecs_task(
            cluster_name,
            task_family,
            container_name,
            overrides,
            source_bucket,
            object_key
        )

        return {
            'statusCode': 200,
            'body': json.dumps('ECS task triggered successfully!')
        }

    except Exception as e:
        print(f"Error triggering ECS task: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps(f"Error triggering ECS task: {str(e)}")
        }
Enter fullscreen mode Exit fullscreen mode
**Let’s Create a handler when the previous lambda gives, json with files to process. The json will contain keys and bucket from which to unzip and the image to unzip the files**
Enter fullscreen mode Exit fullscreen mode

ECS Task is created by the lambda function and is able to handle the unzipping of files. If an unzipped file is found, the ECS task unzips the file and uploads the extracted contents back to a target S3 bucket. If the upload fails, the file is routed to a Dead Letter Queue (DLQ). Non-ZIP files are directly copied to the target bucket.

ECS Configuration:

  • Cluster Name: unzip-test-cluster
  • Task Family: ple-family
  • Container: test-container
  • Launch Type: Fargate

Task Execution:

  • Retrieves the latest task definition for the given family.
  • Executes the task with environment variables passed by the Lambda function.

Handler_run_task.py

import boto3

def run_ecs_task(cluster_name, task_family, container_name, overrides, source_bucket, object_key):
    ecs_client = boto3.client('ecs')

    # Get the latest task definition for the given family
    response = ecs_client.list_task_definitions(
        familyPrefix=task_family,
        sort='DESC',
        maxResults=1
    )

    latest_task_definition = response['taskDefinitionArns'][0]
    print("Printing Latest task def")
    print(latest_task_definition)

    # Run the ECS task with the latest task definition
    response = ecs_client.run_task(
        cluster=cluster_name,
        taskDefinition=latest_task_definition,
        overrides={
            'containerOverrides': [
                {
                    'name': container_name,
                    # 'cpu': overrides.get('cpu', 512),  # Default CPU to 512
                    # 'memory': overrides.get('memory', 1024),  # Default memory to 1024 MiB
                    'environment': overrides.get('environment', [])
                }
            ]
        },
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': ['subnet-089f9162bd2913570', 'subnet-05591da28974513ee', 'subnet-0732585a95fcd1b64'],  # Replace with your subnet ID
                'assignPublicIp': 'ENABLED'  # or 'DISABLED' depending on your network setup
            }
        },
        launchType='FARGATE',  # Or 'EC2', depending on your setup
        count=1
    )

    return response
Enter fullscreen mode Exit fullscreen mode
**Now let’s create a service linked role.**
Enter fullscreen mode Exit fullscreen mode

An IAM role is created to grant the Lambda function and ECS tasks the necessary permissions to access other AWS resources, such as S3, ECS, and CloudWatch Logs.

  • Key Policies:

    • Log creation and event publishing to CloudWatch
    • S3 bucket access (GetObject, ListBucket, PutObject)
    • ECS task execution and description
    • IAM role passing

      {
      "Version": "2012-10-17",
      "Statement": [
      {
          "Effect": "Allow",
          "Action": "logs:CreateLogGroup",
          "Resource": "arn:aws:logs:<aws_region>:<account_id>:*"
      },
      {
          "Effect": "Allow",
          "Action": [
              "logs:CreateLogStream",
              "logs:PutLogEvents"
          ],
          "Resource": "arn:aws:logs:<aws_region>:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
      },
      {
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::<source_bucket_name>",
              "arn:aws:s3:::<source_bucket_name>/*"
          ]
      },
      {
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::<target_bucket_name>/*"
      },
      {
          "Effect": "Allow",
          "Action": "iam:PassRole",
          "Resource": "arn:aws:iam::<account_id>:role/*"
      },
      {
          "Effect": "Allow",
          "Action": [
              "ecs:RegisterTaskDefinition",
              "ecs:DescribeTaskDefinition"
          ],
          "Resource": "*"
      }
      ]
      }
      
**Dead-letter Queue (DLQ) Mechanism**
Enter fullscreen mode Exit fullscreen mode

The DLQ captures and logs events where the ECS task fails to process the uploaded file correctly. This mechanism ensures that errors are captured and stored for subsequent analysis or reprocessing.

Failure Handling

  • Condition: A failure is identified if the ECS task returns an HTTP status code other than 200.
  • DLQ Logging: The failed event, including file details and error messages, is sent to the DLQ. The DLQ serves as a reliable storage for these failed events, ensuring no data is lost.

Now let’s create an ecs image \
\
Here goes the unzip file \
\
#!/bin/bash

# Environment variables
SRC_BUCKET_NAME="$BUCKET_NAME"
SRC_BUCKET_KEY="$KEY"
DEST_BUCKET_NAME="backup-efs-ftp-ple-unzipped"
DLQ_URL="https://sqs.us-east-1.amazonaws.com/654654543848/unzip-failed-processing-queue"
OUTPUT_DIR="./output"
LOCAL_FILE_PATH="./$(basename "$SRC_BUCKET_KEY")"

# Function to log messages
log_message() {
   echo "$(date '+%Y-%m-%d %H:%M:%S') - $1"
}

# Function to download a file from S3
download_file() {
   log_message "Downloading $SRC_BUCKET_KEY from $SRC_BUCKET_NAME"
   aws s3 cp "s3://$SRC_BUCKET_NAME/$SRC_BUCKET_KEY" "$LOCAL_FILE_PATH"
   if [ $? -ne 0 ]; then
       log_message "Error downloading file from S3"
       send_to_dlq "Error downloading file from S3"
       exit 1
   fi
}

# Function to upload a file to S3
upload_to_s3() {
   local file_path="$1"
   local s3_key="$2"
   log_message "Uploading $file_path to s3://$DEST_BUCKET_NAME/$s3_key"
   aws s3 cp "$file_path" "s3://$DEST_BUCKET_NAME/$s3_key" --acl bucket-owner-full-control
   if [ $? -ne 0 ]; then
       log_message "Failed to upload $file_path to S3"
       send_to_dlq "Failed to upload $file_path to S3"
       exit 1
   fi
   invoke_load_balancer "$s3_key"
}

# Function to send a message to the DLQ
send_to_dlq() {
   local message="$1"
   log_message "Sending message to DLQ: $message"
   aws sqs send-message --queue-url "$DLQ_URL" --message-body "$message"
   if [ $? -ne 0 ]; then
       log_message "Failed to send message to DLQ"
       exit 1
   fi
}

# Function to invoke load balancer
invoke_load_balancer() {
   local s3_key="$1"
   log_message "Invoking load balancer for $s3_key"
   local payload=$(jq -n \
       --arg bucket "$DEST_BUCKET_NAME" \
       --arg key "$s3_key" \
       '{bucket: $bucket, key: $key, filePath: $key}')
   local response=$(curl -s -X POST "https://asdlb.mydomain.com/companyx/gateway/listenFTPWebhook" \
       -H "Content-Type: application/json" \
       -d "$payload")

   local status_code=$(echo "$response" | jq -r '.status')
   if [ "$status_code" != "200" ]; then
       log_message "Load balancer invocation failed with status code $status_code"
       send_to_dlq "Load balancer invocation failed with status code $status_code"
   else
       log_message "Load balancer invocation successful"
   fi
}

# Function to extract ZIP files
extract_and_process_files() {
   log_message "Extracting ZIP file $LOCAL_FILE_PATH"
   mkdir -p "$OUTPUT_DIR"
   unzip -o "$LOCAL_FILE_PATH" -d "$OUTPUT_DIR"
   if [ $? -ne 0 ]; then
       log_message "Failed to extract ZIP file"
       send_to_dlq "Failed to extract ZIP file"
       exit 1
   fi

   for file in "$OUTPUT_DIR"/*; do
       local s3_key=$(dirname "$SRC_BUCKET_KEY")/$(basename "$file")
       log_message "Processing file $file"
       upload_to_s3 "$file" "$s3_key"
   done
}

# Main process
main() {
   log_message "Starting processing for $SRC_BUCKET_KEY from $SRC_BUCKET_NAME"
   download_file

   if [[ "$LOCAL_FILE_PATH" == *.zip ]]; then
       extract_and_process_files
   else
       local s3_key=$(dirname "$SRC_BUCKET_KEY")/$(basename "$LOCAL_FILE_PATH")
       upload_to_s3 "$LOCAL_FILE_PATH" "$s3_key"
   fi

   log_message "Processing complete"
}

main
Enter fullscreen mode Exit fullscreen mode

And here’s the Docker File for the same \
\
# Use the official lightweight Debian image as the base

FROM debian:bookworm-slim

# Set the working directory
WORKDIR /usr/src/app

# Set non-interactive mode to avoid prompts during package installation
ENV DEBIAN_FRONTEND=noninteractive

# Install necessary tools with cache cleanup
RUN apt-get update && \
   apt-get install -y --no-install-recommends \
   bash \
   curl \
   unzip \
   awscli \
   apt-get clean && \
   rm -rf /var/lib/apt/lists/*

# Copy your shell script into the container
COPY unzip.sh /usr/src/app/

# Make the shell script executable
RUN chmod +x /usr/src/app/unzip.sh

# Set environment variables (can be overridden at runtime)
ARG BUCKET_NAME
ARG KEY

# Command to run your shell script
CMD ["/usr/src/app/unzip.sh"]
Enter fullscreen mode Exit fullscreen mode

Build and Push the Docker Image to ECR and update the code.

Conclusion

We have Shared Samba Server for windows users with Event Driven on demand unzip functionality when a user uploads file to the Drive.

aws #awscommunitybuuilder #awscommunitynepal #communitybuilder

Top comments (0)