🚀 Building a Real-Time Stock Market Analytics Pipeline on AWS
Author: Muhammad Awais | DevSecOps Engineer | AWS | Kubernetes | Terraform
Overview ☁️
In this project, I built a fully serverless, event-driven Real-Time Stock Market Data Analytics Pipeline on AWS. The pipeline ingests live stock data, processes it, stores it, analyzes trends, and sends smart alerts — all with minimal cost and zero servers to manage.
Here is what the pipeline does end to end:
- Streams real-time stock data from yfinance into Amazon Kinesis Data Streams
- Triggers AWS Lambda to process, clean, and store incoming data
- Stores raw data in Amazon S3 for historical analysis
- Stores processed data in Amazon DynamoDB for low-latency querying
- Queries historical data using Amazon Athena via AWS Glue Catalog
- Detects stock trends using moving averages (SMA-5 and SMA-20)
- Sends real-time buy/sell alerts via Amazon SNS (Email/SMS)
Architecture at a Glance 🏗️
Python Script (yfinance)
↓
Amazon Kinesis Data Streams
↓
AWS Lambda (Process & Clean Data)
↓ ↓
Amazon DynamoDB Amazon S3
↓ ↓
AWS Lambda AWS Glue Catalog
(Trend Analysis) ↓
↓ Amazon Athena
Amazon SNS ↓
(Email/SMS Alerts) Amazon S3
(Query Results)
Steps to Build This Pipeline 👩💻
Step 1 — Setting Up Data Streaming with Amazon Kinesis
The first step is creating a Kinesis Data Stream to act as the real-time data pipeline.
Create the Kinesis Stream:
- Go to AWS Console → Kinesis → Data Streams
- Click Create data stream
- Name it
stock-market-stream - Choose On-demand capacity mode (cost-efficient for variable workloads)
- Click Create
Python Producer Script:
This script continuously fetches stock data from yfinance and pushes it to Kinesis every 30 seconds.
import boto3
import json
import time
import yfinance as yf
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = "stock-market-stream"
STOCK_SYMBOL = "AAPL"
DELAY_TIME = 30
def get_stock_data(symbol):
stock = yf.Ticker(symbol)
data = stock.history(period="2d")
return {
"symbol": symbol,
"open": float(round(data.iloc[-1]["Open"], 2)),
"high": float(round(data.iloc[-1]["High"], 2)),
"low": float(round(data.iloc[-1]["Low"], 2)),
"price": float(round(data.iloc[-1]["Close"], 2)),
"previous_close": float(round(data.iloc[-2]["Close"], 2)),
"change": float(round(data.iloc[-1]["Close"] - data.iloc[-2]["Close"], 2)),
"change_percent": float(round(
((data.iloc[-1]["Close"] - data.iloc[-2]["Close"]) / data.iloc[-2]["Close"]) * 100, 2
)),
"volume": int(data.iloc[-1]["Volume"]),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
def send_to_kinesis():
while True:
stock_data = get_stock_data(STOCK_SYMBOL)
print(f"Sending: {stock_data}")
kinesis_client.put_record(
StreamName=STREAM_NAME,
Data=json.dumps(stock_data),
PartitionKey=STOCK_SYMBOL
)
time.sleep(DELAY_TIME)
send_to_kinesis()
💡 Tip: Always convert
np.float64values to plainfloatbefore sending to Kinesis to avoid serialization issues downstream.
Step 2 — Processing Data with AWS Lambda
Two Lambda functions handle the data processing:
Lambda 1 — Kinesis Consumer (Store to S3 + DynamoDB)
This function is triggered automatically by Kinesis, decodes the data, computes metrics, and stores it in both S3 and DynamoDB.
Setup:
- Go to AWS Lambda → Create Function
- Name it
stock-market-processor - Runtime: Python 3.12
- Add trigger: Kinesis → select
stock-market-stream - Attach IAM role with permissions for:
DynamoDB,S3,Kinesis
import json
import boto3
import base64
from decimal import Decimal
dynamodb = boto3.resource("dynamodb")
s3 = boto3.client("s3")
DYNAMO_TABLE = "stock-market-data"
S3_BUCKET = "your-s3-bucket-name"
table = dynamodb.Table(DYNAMO_TABLE)
def lambda_handler(event, context):
for record in event['Records']:
raw_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")
payload = json.loads(raw_data)
# Save raw data to S3
s3_key = f"raw-data/{payload['symbol']}/{payload['timestamp'].replace(':', '-')}.json"
s3.put_object(Bucket=S3_BUCKET, Key=s3_key, Body=json.dumps(payload), ContentType='application/json')
# Compute metrics
price_change = round(payload["price"] - payload["previous_close"], 2)
price_change_percent = round((price_change / payload["previous_close"]) * 100, 2)
is_anomaly = "Yes" if abs(price_change_percent) > 5 else "No"
moving_average = round(
(payload["open"] + payload["high"] + payload["low"] + payload["price"]) / 4, 2
)
# Store in DynamoDB
table.put_item(Item={
"symbol": payload["symbol"],
"timestamp": payload["timestamp"],
"price": Decimal(str(payload["price"])),
"previous_close": Decimal(str(payload["previous_close"])),
"change": Decimal(str(price_change)),
"change_percent": Decimal(str(price_change_percent)),
"volume": int(payload["volume"]),
"moving_average": Decimal(str(moving_average)),
"anomaly": is_anomaly
})
return {"statusCode": 200, "body": "Processing Complete"}
Lambda 2 — Trend Analyzer (SMA Crossover + SNS Alert)
This function runs on a schedule (EventBridge every 5 minutes), queries DynamoDB, computes SMA-5 and SMA-20, detects crossover signals, and sends alerts via SNS.
import boto3
import json
import decimal
from datetime import datetime, timedelta
dynamodb = boto3.resource("dynamodb")
sns = boto3.client("sns")
TABLE_NAME = "stock-market-data"
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:Stock_Trend_Alerts"
def get_recent_stock_data(symbol, minutes=5):
table = dynamodb.Table(TABLE_NAME)
past_time = datetime.utcnow() - timedelta(minutes=minutes)
response = table.query(
KeyConditionExpression="symbol = :symbol AND #ts >= :time",
ExpressionAttributeNames={"#ts": "timestamp"},
ExpressionAttributeValues={
":symbol": symbol,
":time": past_time.strftime("%Y-%m-%d %H:%M:%S"),
},
ScanIndexForward=True
)
return sorted(response.get("Items", []), key=lambda x: x["timestamp"])
def calculate_moving_average(data, period):
if len(data) < period:
return decimal.Decimal("0")
return sum(decimal.Decimal(str(d["price"])) for d in data[-period:]) / period
def lambda_handler(event, context):
for symbol in ["AAPL"]:
stock_data = get_recent_stock_data(symbol)
if len(stock_data) < 20:
continue
sma_5 = calculate_moving_average(stock_data, 5)
sma_20 = calculate_moving_average(stock_data, 20)
sma_5_prev = calculate_moving_average(stock_data[:-1], 5)
sma_20_prev = calculate_moving_average(stock_data[:-1], 20)
message = None
if sma_5_prev < sma_20_prev and sma_5 > sma_20:
message = f"{symbol} is in an Uptrend! Consider a buy opportunity."
elif sma_5_prev > sma_20_prev and sma_5 < sma_20:
message = f"{symbol} is in a Downtrend! Consider selling."
if message:
sns.publish(TopicArn=SNS_TOPIC_ARN, Message=message, Subject=f"Stock Alert: {symbol}")
print(f"Alert sent: {message}")
return {"statusCode": 200, "body": json.dumps("Trend analysis complete")}
Step 3 — Query Historical Stock Data using Amazon Athena
Raw JSON files land in S3. AWS Glue Catalog creates a structured schema on top of them so Athena can query them with standard SQL.
Setup Glue Crawler:
- Go to AWS Glue → Crawlers → Create Crawler
- Point it to your S3 bucket path:
s3://your-bucket/raw-data/ - Create a new database:
stock_market_db - Run the crawler — it auto-detects schema and creates a table
Query with Athena:
-- Average daily price per symbol
SELECT symbol,
DATE(timestamp) AS date,
ROUND(AVG(price), 2) AS avg_price,
MAX(high) AS day_high,
MIN(low) AS day_low
FROM stock_market_db.raw_data
GROUP BY symbol, DATE(timestamp)
ORDER BY date DESC;
-- Detect anomalies
SELECT symbol, timestamp, price, change_percent
FROM stock_market_db.raw_data
WHERE ABS(change_percent) > 5
ORDER BY timestamp DESC;
💡 Tip: Set your Athena query results S3 bucket in Settings before running queries.
Step 4 — Stock Trend Alerts using SNS
Setup SNS Topic:
- Go to AWS SNS → Topics → Create Topic
- Type: Standard
- Name:
Stock_Trend_Alerts - Click Create
Add Subscription:
- Click your topic → Create Subscription
- Protocol: Email or SMS
- Endpoint: your email or phone number
- Confirm the subscription from your inbox
Once the trend analyzer Lambda detects a crossover, SNS fires an alert like:
Subject: Stock Alert: AAPL
Message: AAPL is in an Uptrend! Consider a buy opportunity.
Key Learnings 🧠
- Kinesis is perfect for real-time event streaming — On-demand mode saves cost for unpredictable workloads
- Lambda + Kinesis trigger gives you a fully serverless processing layer with automatic scaling
- Always convert numpy types to native Python before serializing to JSON or sending to AWS services
-
DynamoDB works great for low-latency lookups but design your partition key carefully —
symbol+timestampas composite key works well here - Glue + Athena is a powerful combo for ad-hoc SQL queries on raw S3 data without spinning up any infrastructure
- SMA crossover is a simple but effective signal for trend detection in real-time pipelines
Resources 🔗
- Amazon Kinesis Data Streams Docs
- AWS Lambda Developer Guide
- Amazon Athena Getting Started
- Amazon SNS Documentation
- yfinance Python Library
Written by **Muhammad Awais* — DevSecOps Engineer | CKA | CKS*
Connect on LinkedIn | GitHub

Top comments (0)