Data Analyst Guide: Mastering Why Data Analysts Must Master Business Storytelling
Business Problem Statement
As a data analyst, you're tasked with analyzing customer purchase behavior for an e-commerce company. The company wants to increase sales by 15% within the next quarter. To achieve this, you need to identify the most profitable customer segments, analyze their buying patterns, and develop targeted marketing campaigns. The ROI impact of this project is significant, with a potential revenue increase of $1.5 million.
Step-by-Step Technical Solution
Step 1: Data Preparation (pandas/SQL)
First, we need to prepare the data for analysis. We'll use a combination of pandas and SQL to load, clean, and transform the data.
# Import necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
# Load data from SQL database
import sqlite3
conn = sqlite3.connect('customer_data.db')
cursor = conn.cursor()
# SQL query to load customer data
sql_query = """
SELECT
customer_id,
age,
gender,
income,
purchase_history,
purchase_amount
FROM
customer_data
"""
# Execute SQL query and load data into pandas dataframe
customer_data = pd.read_sql_query(sql_query, conn)
# Close SQL connection
conn.close()
# Print first few rows of customer data
print(customer_data.head())
Step 2: Analysis Pipeline
Next, we'll develop an analysis pipeline to identify the most profitable customer segments.
# Define a function to calculate customer segment profitability
def calculate_segment_profitability(customer_data):
# Calculate average purchase amount by segment
segment_profitability = customer_data.groupby('income')['purchase_amount'].mean().reset_index()
# Sort segments by profitability in descending order
segment_profitability = segment_profitability.sort_values(by='purchase_amount', ascending=False)
return segment_profitability
# Calculate segment profitability
segment_profitability = calculate_segment_profitability(customer_data)
# Print segment profitability
print(segment_profitability)
Step 3: Model/Visualization Code
Now, we'll develop a machine learning model to predict customer purchase behavior and create visualizations to communicate insights to stakeholders.
# Define a function to train a random forest classifier
def train_random_forest(customer_data):
# Split data into training and testing sets
X = customer_data[['age', 'gender', 'income']]
y = customer_data['purchase_history']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train random forest classifier
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
# Make predictions on testing set
y_pred = rf.predict(X_test)
# Evaluate model performance
accuracy = accuracy_score(y_test, y_pred)
print('Model Accuracy:', accuracy)
print('Classification Report:')
print(classification_report(y_test, y_pred))
print('Confusion Matrix:')
print(confusion_matrix(y_test, y_pred))
return rf
# Train random forest classifier
rf = train_random_forest(customer_data)
# Define a function to create visualizations
def create_visualizations(customer_data):
# Import necessary libraries
import matplotlib.pyplot as plt
import seaborn as sns
# Create histogram of customer ages
plt.figure(figsize=(8,6))
sns.histplot(customer_data['age'], bins=10)
plt.title('Customer Age Distribution')
plt.xlabel('Age')
plt.ylabel('Frequency')
plt.show()
# Create bar chart of segment profitability
plt.figure(figsize=(8,6))
sns.barplot(x='income', y='purchase_amount', data=segment_profitability)
plt.title('Segment Profitability')
plt.xlabel('Income')
plt.ylabel('Average Purchase Amount')
plt.show()
# Create visualizations
create_visualizations(customer_data)
Step 4: Performance Evaluation
To evaluate the performance of our analysis pipeline, we'll calculate key metrics such as ROI and customer acquisition cost.
# Define a function to calculate ROI
def calculate_roi(customer_data):
# Calculate total revenue
total_revenue = customer_data['purchase_amount'].sum()
# Calculate total cost
total_cost = customer_data['purchase_amount'].sum() * 0.2
# Calculate ROI
roi = (total_revenue - total_cost) / total_cost
return roi
# Calculate ROI
roi = calculate_roi(customer_data)
# Print ROI
print('ROI:', roi)
# Define a function to calculate customer acquisition cost
def calculate_customer_acquisition_cost(customer_data):
# Calculate total marketing spend
total_marketing_spend = customer_data['purchase_amount'].sum() * 0.1
# Calculate number of new customers
new_customers = customer_data['customer_id'].nunique()
# Calculate customer acquisition cost
customer_acquisition_cost = total_marketing_spend / new_customers
return customer_acquisition_cost
# Calculate customer acquisition cost
customer_acquisition_cost = calculate_customer_acquisition_cost(customer_data)
# Print customer acquisition cost
print('Customer Acquisition Cost:', customer_acquisition_cost)
Step 5: Production Deployment
To deploy our analysis pipeline to production, we'll create a Python script that can be run on a schedule.
# Define a function to deploy analysis pipeline to production
def deploy_to_production(customer_data):
# Train random forest classifier
rf = train_random_forest(customer_data)
# Make predictions on new data
new_data = pd.read_csv('new_customer_data.csv')
predictions = rf.predict(new_data)
# Save predictions to database
import sqlite3
conn = sqlite3.connect('customer_data.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS predictions (customer_id INTEGER, prediction TEXT)')
cursor.executemany('INSERT INTO predictions VALUES (?, ?)', zip(new_data['customer_id'], predictions))
conn.commit()
conn.close()
# Deploy analysis pipeline to production
deploy_to_production(customer_data)
Edge Cases
To handle edge cases, we'll implement the following checks:
- Check for missing values in the data
- Check for outliers in the data
- Check for inconsistent data
# Define a function to check for missing values
def check_for_missing_values(customer_data):
# Check for missing values
missing_values = customer_data.isnull().sum()
# Print missing values
print('Missing Values:')
print(missing_values)
# Check for missing values
check_for_missing_values(customer_data)
# Define a function to check for outliers
def check_for_outliers(customer_data):
# Check for outliers
outliers = customer_data[(customer_data['age'] > 100) | (customer_data['income'] > 1000000)]
# Print outliers
print('Outliers:')
print(outliers)
# Check for outliers
check_for_outliers(customer_data)
# Define a function to check for inconsistent data
def check_for_inconsistent_data(customer_data):
# Check for inconsistent data
inconsistent_data = customer_data[(customer_data['age'] < 0) | (customer_data['income'] < 0)]
# Print inconsistent data
print('Inconsistent Data:')
print(inconsistent_data)
# Check for inconsistent data
check_for_inconsistent_data(customer_data)
Scaling Tips
To scale our analysis pipeline, we'll implement the following strategies:
- Use distributed computing to process large datasets
- Use cloud-based services to deploy the analysis pipeline
- Use automated testing to ensure the pipeline is working correctly
# Define a function to use distributed computing
def use_distributed_computing(customer_data):
# Import necessary libraries
from joblib import Parallel, delayed
# Define a function to process data in parallel
def process_data(data):
# Process data
processed_data = data.apply(lambda x: x ** 2)
return processed_data
# Process data in parallel
processed_data = Parallel(n_jobs=-1)(delayed(process_data)(customer_data) for _ in range(10))
# Use distributed computing
use_distributed_computing(customer_data)
# Define a function to use cloud-based services
def use_cloud_based_services(customer_data):
# Import necessary libraries
from google.cloud import storage
# Define a function to upload data to cloud storage
def upload_data_to_cloud(data):
# Upload data to cloud storage
client = storage.Client()
bucket = client.get_bucket('customer_data')
blob = bucket.blob('customer_data.csv')
blob.upload_from_string(data.to_csv(index=False), 'text/csv')
return blob.public_url
# Use cloud-based services
upload_data_to_cloud(customer_data)
# Define a function to use automated testing
def use_automated_testing(customer_data):
# Import necessary libraries
import unittest
# Define a function to test the analysis pipeline
def test_analysis_pipeline(data):
# Test the analysis pipeline
class TestAnalysisPipeline(unittest.TestCase):
def test_train_random_forest(self):
rf = train_random_forest(data)
self.assertIsNotNone(rf)
def test_make_predictions(self):
predictions = rf.predict(data)
self.assertIsNotNone(predictions)
# Run tests
unittest.main()
# Use automated testing
use_automated_testing(customer_data)
By following these steps and implementing these strategies, we can develop a robust and scalable analysis pipeline that provides valuable insights to stakeholders and drives business growth.
Top comments (0)