DEV Community

Cover image for Stock Market Real-Time Data Analytics Pipeline on AWS
Muhammad Awais Zahid
Muhammad Awais Zahid

Posted on

Stock Market Real-Time Data Analytics Pipeline on AWS

🚀 Building a Real-Time Stock Market Analytics Pipeline on AWS

Author: Muhammad Awais | DevSecOps Engineer | AWS | Kubernetes | Terraform


Overview ☁️

In this project, I built a fully serverless, event-driven Real-Time Stock Market Data Analytics Pipeline on AWS. The pipeline ingests live stock data, processes it, stores it, analyzes trends, and sends smart alerts — all with minimal cost and zero servers to manage.

Here is what the pipeline does end to end:

  • Streams real-time stock data from yfinance into Amazon Kinesis Data Streams
  • Triggers AWS Lambda to process, clean, and store incoming data
  • Stores raw data in Amazon S3 for historical analysis
  • Stores processed data in Amazon DynamoDB for low-latency querying
  • Queries historical data using Amazon Athena via AWS Glue Catalog
  • Detects stock trends using moving averages (SMA-5 and SMA-20)
  • Sends real-time buy/sell alerts via Amazon SNS (Email/SMS)

Image description1


Architecture at a Glance 🏗️

Python Script (yfinance)
        ↓
Amazon Kinesis Data Streams
        ↓
AWS Lambda (Process & Clean Data)
        ↓                    ↓
Amazon DynamoDB          Amazon S3
        ↓                    ↓
AWS Lambda            AWS Glue Catalog
(Trend Analysis)             ↓
        ↓             Amazon Athena
Amazon SNS                   ↓
(Email/SMS Alerts)       Amazon S3
                       (Query Results)
Enter fullscreen mode Exit fullscreen mode

Steps to Build This Pipeline 👩‍💻

Step 1 — Setting Up Data Streaming with Amazon Kinesis

The first step is creating a Kinesis Data Stream to act as the real-time data pipeline.

Create the Kinesis Stream:

  1. Go to AWS Console → Kinesis → Data Streams
  2. Click Create data stream
  3. Name it stock-market-stream
  4. Choose On-demand capacity mode (cost-efficient for variable workloads)
  5. Click Create

Python Producer Script:

This script continuously fetches stock data from yfinance and pushes it to Kinesis every 30 seconds.

import boto3
import json
import time
import yfinance as yf

kinesis_client = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = "stock-market-stream"
STOCK_SYMBOL = "AAPL"
DELAY_TIME = 30

def get_stock_data(symbol):
    stock = yf.Ticker(symbol)
    data = stock.history(period="2d")

    return {
        "symbol": symbol,
        "open": float(round(data.iloc[-1]["Open"], 2)),
        "high": float(round(data.iloc[-1]["High"], 2)),
        "low": float(round(data.iloc[-1]["Low"], 2)),
        "price": float(round(data.iloc[-1]["Close"], 2)),
        "previous_close": float(round(data.iloc[-2]["Close"], 2)),
        "change": float(round(data.iloc[-1]["Close"] - data.iloc[-2]["Close"], 2)),
        "change_percent": float(round(
            ((data.iloc[-1]["Close"] - data.iloc[-2]["Close"]) / data.iloc[-2]["Close"]) * 100, 2
        )),
        "volume": int(data.iloc[-1]["Volume"]),
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
    }

def send_to_kinesis():
    while True:
        stock_data = get_stock_data(STOCK_SYMBOL)
        print(f"Sending: {stock_data}")

        kinesis_client.put_record(
            StreamName=STREAM_NAME,
            Data=json.dumps(stock_data),
            PartitionKey=STOCK_SYMBOL
        )
        time.sleep(DELAY_TIME)

send_to_kinesis()
Enter fullscreen mode Exit fullscreen mode

💡 Tip: Always convert np.float64 values to plain float before sending to Kinesis to avoid serialization issues downstream.


Step 2 — Processing Data with AWS Lambda

Two Lambda functions handle the data processing:

Lambda 1 — Kinesis Consumer (Store to S3 + DynamoDB)

This function is triggered automatically by Kinesis, decodes the data, computes metrics, and stores it in both S3 and DynamoDB.

Setup:

  1. Go to AWS Lambda → Create Function
  2. Name it stock-market-processor
  3. Runtime: Python 3.12
  4. Add trigger: Kinesis → select stock-market-stream
  5. Attach IAM role with permissions for: DynamoDB, S3, Kinesis
import json
import boto3
import base64
from decimal import Decimal

dynamodb = boto3.resource("dynamodb")
s3 = boto3.client("s3")

DYNAMO_TABLE = "stock-market-data"
S3_BUCKET = "your-s3-bucket-name"
table = dynamodb.Table(DYNAMO_TABLE)

def lambda_handler(event, context):
    for record in event['Records']:
        raw_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")
        payload = json.loads(raw_data)

        # Save raw data to S3
        s3_key = f"raw-data/{payload['symbol']}/{payload['timestamp'].replace(':', '-')}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=s3_key, Body=json.dumps(payload), ContentType='application/json')

        # Compute metrics
        price_change = round(payload["price"] - payload["previous_close"], 2)
        price_change_percent = round((price_change / payload["previous_close"]) * 100, 2)
        is_anomaly = "Yes" if abs(price_change_percent) > 5 else "No"
        moving_average = round(
            (payload["open"] + payload["high"] + payload["low"] + payload["price"]) / 4, 2
        )

        # Store in DynamoDB
        table.put_item(Item={
            "symbol": payload["symbol"],
            "timestamp": payload["timestamp"],
            "price": Decimal(str(payload["price"])),
            "previous_close": Decimal(str(payload["previous_close"])),
            "change": Decimal(str(price_change)),
            "change_percent": Decimal(str(price_change_percent)),
            "volume": int(payload["volume"]),
            "moving_average": Decimal(str(moving_average)),
            "anomaly": is_anomaly
        })

    return {"statusCode": 200, "body": "Processing Complete"}
Enter fullscreen mode Exit fullscreen mode

Lambda 2 — Trend Analyzer (SMA Crossover + SNS Alert)

This function runs on a schedule (EventBridge every 5 minutes), queries DynamoDB, computes SMA-5 and SMA-20, detects crossover signals, and sends alerts via SNS.

import boto3
import json
import decimal
from datetime import datetime, timedelta

dynamodb = boto3.resource("dynamodb")
sns = boto3.client("sns")

TABLE_NAME = "stock-market-data"
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:Stock_Trend_Alerts"

def get_recent_stock_data(symbol, minutes=5):
    table = dynamodb.Table(TABLE_NAME)
    past_time = datetime.utcnow() - timedelta(minutes=minutes)

    response = table.query(
        KeyConditionExpression="symbol = :symbol AND #ts >= :time",
        ExpressionAttributeNames={"#ts": "timestamp"},
        ExpressionAttributeValues={
            ":symbol": symbol,
            ":time": past_time.strftime("%Y-%m-%d %H:%M:%S"),
        },
        ScanIndexForward=True
    )
    return sorted(response.get("Items", []), key=lambda x: x["timestamp"])

def calculate_moving_average(data, period):
    if len(data) < period:
        return decimal.Decimal("0")
    return sum(decimal.Decimal(str(d["price"])) for d in data[-period:]) / period

def lambda_handler(event, context):
    for symbol in ["AAPL"]:
        stock_data = get_recent_stock_data(symbol)

        if len(stock_data) < 20:
            continue

        sma_5 = calculate_moving_average(stock_data, 5)
        sma_20 = calculate_moving_average(stock_data, 20)
        sma_5_prev = calculate_moving_average(stock_data[:-1], 5)
        sma_20_prev = calculate_moving_average(stock_data[:-1], 20)

        message = None
        if sma_5_prev < sma_20_prev and sma_5 > sma_20:
            message = f"{symbol} is in an Uptrend! Consider a buy opportunity."
        elif sma_5_prev > sma_20_prev and sma_5 < sma_20:
            message = f"{symbol} is in a Downtrend! Consider selling."

        if message:
            sns.publish(TopicArn=SNS_TOPIC_ARN, Message=message, Subject=f"Stock Alert: {symbol}")
            print(f"Alert sent: {message}")

    return {"statusCode": 200, "body": json.dumps("Trend analysis complete")}
Enter fullscreen mode Exit fullscreen mode

Step 3 — Query Historical Stock Data using Amazon Athena

Raw JSON files land in S3. AWS Glue Catalog creates a structured schema on top of them so Athena can query them with standard SQL.

Setup Glue Crawler:

  1. Go to AWS Glue → Crawlers → Create Crawler
  2. Point it to your S3 bucket path: s3://your-bucket/raw-data/
  3. Create a new database: stock_market_db
  4. Run the crawler — it auto-detects schema and creates a table

Query with Athena:

-- Average daily price per symbol
SELECT symbol,
       DATE(timestamp) AS date,
       ROUND(AVG(price), 2) AS avg_price,
       MAX(high) AS day_high,
       MIN(low) AS day_low
FROM stock_market_db.raw_data
GROUP BY symbol, DATE(timestamp)
ORDER BY date DESC;

-- Detect anomalies
SELECT symbol, timestamp, price, change_percent
FROM stock_market_db.raw_data
WHERE ABS(change_percent) > 5
ORDER BY timestamp DESC;
Enter fullscreen mode Exit fullscreen mode

💡 Tip: Set your Athena query results S3 bucket in Settings before running queries.


Step 4 — Stock Trend Alerts using SNS

Setup SNS Topic:

  1. Go to AWS SNS → Topics → Create Topic
  2. Type: Standard
  3. Name: Stock_Trend_Alerts
  4. Click Create

Add Subscription:

  1. Click your topic → Create Subscription
  2. Protocol: Email or SMS
  3. Endpoint: your email or phone number
  4. Confirm the subscription from your inbox

Once the trend analyzer Lambda detects a crossover, SNS fires an alert like:

Enter fullscreen mode Exit fullscreen mode

Key Learnings 🧠

  • Kinesis is perfect for real-time event streaming — On-demand mode saves cost for unpredictable workloads
  • Lambda + Kinesis trigger gives you a fully serverless processing layer with automatic scaling
  • Always convert numpy types to native Python before serializing to JSON or sending to AWS services
  • DynamoDB works great for low-latency lookups but design your partition key carefully — symbol + timestamp as composite key works well here
  • Glue + Athena is a powerful combo for ad-hoc SQL queries on raw S3 data without spinning up any infrastructure
  • SMA crossover is a simple but effective signal for trend detection in real-time pipelines

Resources 🔗


Written by **Muhammad Awais* — DevSecOps Engineer | CKA | CKS*
Connect on LinkedIn | GitHub

Top comments (0)