DEV Community

Cover image for Processing Streaming Data and Fetching Files with AWS Lambda and Python
Eslam Genedy
Eslam Genedy

Posted on

Processing Streaming Data and Fetching Files with AWS Lambda and Python

AWS Lambda is a powerful serverless compute service that allows you to run code in response to events without managing servers. One of the most common use cases for Lambda is processing streaming data and fetching files from various sources, such as S3 buckets, HTTP endpoints, or databases. In this article, we’ll explore how to use AWS Lambda with Python to process streaming data and fetch different types of files, such as CSV, JSON, and images.


Use Case: Processing Streaming Data and Fetching Files

Imagine a scenario where you have a system that generates streaming data (e.g., logs, sensor data, or user activity). This data is stored in an S3 bucket, and you need to process it in real-time. Additionally, you may need to fetch and process files from external sources, such as APIs or databases.

Key Components:

  1. AWS Lambda: For processing streaming data and fetching files.
  2. Amazon S3: For storing streaming data and processed files.
  3. Python: For writing the Lambda function logic.

Step 1: Set Up an S3 Bucket for Streaming Data

  1. Go to the S3 console in AWS.
  2. Create a new bucket, e.g., streaming-data-bucket.
  3. Enable event notifications for the bucket to trigger a Lambda function whenever a new file is uploaded.

Step 2: Create a Lambda Function to Process Streaming Data

  1. Go to the Lambda console.
  2. Create a new function named processStreamingData.
  3. Choose Python 3.x as the runtime.
  4. Attach an IAM role with permissions to access S3 and other required services.

Here’s the Python code for the Lambda function:

import boto3
import json
import csv
import io

s3 = boto3.client('s3')

def lambda_handler(event, context):
    # Get the S3 bucket and object key from the event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    # Fetch the file from S3
    response = s3.get_object(Bucket=bucket_name, Key=object_key)
    file_content = response['Body'].read().decode('utf-8')

    # Process the file based on its type
    if object_key.endswith('.csv'):
        process_csv(file_content)
    elif object_key.endswith('.json'):
        process_json(file_content)
    elif object_key.endswith('.jpg') or object_key.endswith('.png'):
        process_image(bucket_name, object_key)
    else:
        print(f"Unsupported file type: {object_key}")

    return {
        'statusCode': 200,
        'body': json.dumps('File processed successfully')
    }

def process_csv(file_content):
    # Read CSV data
    csv_file = io.StringIO(file_content)
    csv_reader = csv.DictReader(csv_file)

    # Process each row
    for row in csv_reader:
        print(f"Processing CSV row: {row}")

def process_json(file_content):
    # Parse JSON data
    json_data = json.loads(file_content)

    # Process JSON data
    print(f"Processing JSON data: {json_data}")

def process_image(bucket_name, object_key):
    # Fetch the image from S3
    response = s3.get_object(Bucket=bucket_name, Key=object_key)
    image_content = response['Body'].read()

    # Example: Resize the image using a library like Pillow
    from PIL import Image
    import io

    image = Image.open(io.BytesIO(image_content))
    resized_image = image.resize((100, 100))

    # Save the resized image back to S3
    resized_image_bytes = io.BytesIO()
    resized_image.save(resized_image_bytes, format='JPEG')
    resized_image_bytes.seek(0)

    s3.put_object(
        Bucket=bucket_name,
        Key=f"resized/{object_key}",
        Body=resized_image_bytes
    )
    print(f"Resized image saved to S3: resized/{object_key}")
Enter fullscreen mode Exit fullscreen mode

Step 3: Configure S3 Event Notifications

  1. Go to the S3 console.
  2. Select your bucket (streaming-data-bucket).
  3. Go to the Properties tab and click Create event notification.
  4. Set the event name to NewFileUpload.
  5. Choose All object create events as the event type.
  6. Send the event to the processStreamingData Lambda function.

Step 4: Fetching Files from External Sources

Sometimes, you may need to fetch files from external sources, such as APIs or databases. Here’s how you can modify the Lambda function to fetch and process files from an HTTP endpoint.

Example: Fetching a JSON File from an API

import requests

def fetch_json_from_api(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to fetch JSON from {url}")

def lambda_handler(event, context):
    # Example: Fetch JSON data from an API
    api_url = "https://api.example.com/data.json"
    try:
        json_data = fetch_json_from_api(api_url)
        process_json(json_data)
    except Exception as e:
        print(f"Error fetching JSON: {e}")

    return {
        'statusCode': 200,
        'body': json.dumps('File fetched and processed successfully')
    }
Enter fullscreen mode Exit fullscreen mode

Step 5: Testing the Lambda Function

  1. Upload a CSV, JSON, or image file to your S3 bucket.
  2. Check the CloudWatch logs to verify that the Lambda function processed the file correctly.
  3. For external file fetching, ensure the Lambda function has internet access (configure a VPC with a NAT gateway if needed).

Best Practices for Processing Streaming Data and Files

  1. Use Environment Variables: Store sensitive information like API URLs or credentials in environment variables.
  2. Optimize Memory and Timeout: Adjust the Lambda function’s memory and timeout settings based on the size of the files being processed.
  3. Handle Errors Gracefully: Implement error handling for network issues, invalid file formats, or unexpected data.
  4. Leverage Layers: Use Lambda layers to include external libraries like Pillow for image processing.
  5. Monitor with CloudWatch: Set up CloudWatch alarms and dashboards to monitor the performance of your Lambda function.

Conclusion

AWS Lambda, combined with Python, provides a flexible and scalable solution for processing streaming data and fetching files from various sources. Whether you’re handling CSV files, JSON data, or images, Lambda makes it easy to build event-driven workflows that scale automatically.

By following the steps and best practices outlined in this article, you can build robust serverless applications that process data in real-time and integrate seamlessly with other AWS services. Happy coding! 🚀

Billboard image

Deploy and scale your apps on AWS and GCP with a world class developer experience

Coherence makes it easy to set up and maintain cloud infrastructure. Harness the extensibility, compliance and cost efficiency of the cloud.

Learn more

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more