🚀 Executive Summary
TL;DR: The article details a cost-effective method to stream Cloudflare logs to ElasticSearch, addressing the limitations of manual analysis and Cloudflare’s dashboard. This integration enables real-time anomaly detection, advanced querying, and visualization in Kibana for enhanced security and troubleshooting.
🎯 Key Takeaways
- Cloudflare Logpush requires a Business or Enterprise plan and an API Token with specific permissions (Zone.Logpush:edit, Zone.Logpush:read) to send logs to a destination.
- AWS S3 serves as an efficient intermediary for Cloudflare Logpush, necessitating an IAM user/role with write-only access for Cloudflare and read-only access for the ingestion script.
- A Python script using
boto3andelasticsearchlibraries is crucial for polling S3, decompressing gzipped newline-delimited JSON logs, converting Cloudflare’s nanosecondEdgeStartTimestampto ElasticSearch’s@timestamp, and bulk indexing into daily indices. - Implementing an ElasticSearch Index Template with appropriate field mappings (e.g.,
ip,keyword,text) is vital for optimal querying, analysis, and integration with Index Lifecycle Management (ILM). - A
LAST\_PROCESSED\_KEY\_FILEmechanism helps the Python script track processed S3 objects, preventing redundant ingestion of old log files on subsequent runs.
Detecting Anomaly Traffic: Cloudflare Logs to ElasticSearch Tutorial
As a Senior DevOps Engineer, you understand the critical importance of maintaining a robust and secure online presence. For many organizations, Cloudflare stands as the first line of defense, protecting web assets from malicious traffic and ensuring performance. However, merely having Cloudflare in place isn’t enough; you need to understand the traffic flowing through it to detect anomalies, identify threats, and troubleshoot issues proactively.
Introduction
Manually sifting through Cloudflare logs, often delivered in massive files, is a tedious, time-consuming, and frankly, error-prone endeavor. Relying solely on Cloudflare’s dashboard for deep historical analysis or complex correlations can also be limiting. While various expensive SaaS solutions promise anomaly detection, they might not always align with your specific needs or budget constraints. The problem is clear: how do we gain comprehensive, real-time insights into Cloudflare traffic without manual toil or exorbitant costs?
The solution we propose at TechResolve is a powerful, cost-effective, and highly customizable integration: streaming Cloudflare logs to ElasticSearch. By centralizing your Cloudflare logs within an ElasticSearch cluster, you unlock unparalleled capabilities for advanced querying, visualization with Kibana, and sophisticated anomaly detection. This tutorial will guide you step-by-step through setting up this critical data pipeline, empowering your team with the tools to detect unusual traffic patterns and respond swiftly.
Prerequisites
Before we dive into the integration, ensure you have the following:
- Cloudflare Account: A Business or Enterprise plan is required to utilize Cloudflare Logpush.
- Cloudflare API Token: Create an API Token with specific permissions for Logpush (at minimum, Zone.Logpush:edit and Zone.Logpush:read for the relevant zones).
- AWS Account: We’ll use an S3 bucket as an intermediary destination for Cloudflare Logpush.
- ElasticSearch Cluster: Access to a running ElasticSearch cluster (self-managed, Elastic Cloud, or local development setup).
- Python 3.x: Installed on your local machine or a server where the ingestion script will run, along with pip.
- Basic Linux/Bash Knowledge: For executing commands and scripts.
- Familiarity with ElasticSearch: Basic understanding of indices, documents, and data types.
Step-by-Step Guide: Integrating Cloudflare Logs with ElasticSearch
Step 1: Configure Cloudflare Logpush to an AWS S3 Bucket
Cloudflare Logpush allows you to send logs to various destinations. We’ll use AWS S3 as an efficient and scalable intermediary. You can set this up via the Cloudflare dashboard, but for a DevOps approach, we’ll demonstrate using the Cloudflare API.
First, create an S3 bucket where Cloudflare will push logs. Let’s assume your bucket is named your-cloudflare-logs-bucket in the us-east-1 region.
Now, use the Cloudflare API to configure Logpush. Replace placeholders with your actual values:
curl -X POST "https://api.cloudflare.com/client/v4/zones/{ZONE_ID}/logpush/jobs" \
-H "Authorization: Bearer {CLOUDFLARE_API_TOKEN}" \
-H "Content-Type: application/json" \
--data '{
"dataset": "http_requests",
"frequency": "low",
"enabled": true,
"destination_conf": "s3://your-cloudflare-logs-bucket.s3.amazonaws.com/{LOG_PREFIX}?region=us-east-1&access_key_id={AWS_ACCESS_KEY_ID}&secret_access_key={AWS_SECRET_ACCESS_KEY}&bucket_path_format=logs/{DATE_Y}/{DATE_M}/{DATE_D}/{TIME_H}Z",
"logpull_options": "fields=ClientIP,ClientRequestHost,ClientRequestMethod,ClientRequestURI,ClientRequestUserAgent,EdgeResponseBytes,EdgeResponseStatus,OriginIP,OriginResponseStatus,RayID,ZoneID,ZoneName,EdgeStartTimestamp,ClientRequestPath,ClientRequestProtocol",
"name": "TechResolve_Logpush_S3"
}'
Logic Explained:
-
{ZONE_ID}: The ID of your Cloudflare zone. -
{CLOUDFLARE_API_TOKEN}: The API Token you generated. -
dataset: Specifies the type of logs;http_requestsis standard for web traffic. -
destination_conf: This is crucial. It defines the S3 bucket URL, region, and importantly, the AWS credentials Cloudflare will use to write to your bucket. For security, it’s recommended to create a dedicated IAM user with limited write-only access to this specific bucket. -
bucket_path_format: Defines how logs are organized within S3. -
logpull_options: Defines which fields you want to include in your logs. Customize this based on your analytical needs to avoid unnecessary data transfer.
Step 2: Create AWS S3 Bucket and IAM Permissions
If you haven’t already, create an S3 bucket. Ensure you configure an IAM policy that allows Cloudflare to write to it and, more importantly, an IAM user/role that your Python script (in the next step) can use to read from it.
For the Python script’s IAM user/role (let’s call it LogIngesterRole), attach a policy similar to this:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-cloudflare-logs-bucket",
"arn:aws:s3:::your-cloudflare-logs-bucket/*"
]
}
]
}
Logic Explained:
- This policy grants read access (
s3:GetObject) to objects withinyour-cloudflare-logs-bucketand the ability to list its contents (s3:ListBucket). - Adhering to the principle of least privilege, this role only has the necessary permissions to read the logs, not delete or modify them.
Step 3: Develop a Python Script to Ingest Logs from S3 to ElasticSearch
This Python script will periodically poll your S3 bucket, download new Cloudflare log files, parse them, and bulk index them into your ElasticSearch cluster.
Install necessary libraries:
pip install boto3 elasticsearch
Here’s a sample Python script:
import os
import json
import gzip
import boto3
from elasticsearch import Elasticsearch, helpers
import datetime
import time
# --- Configuration ---
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = "your-cloudflare-logs-bucket"
ES_HOSTS = ["https://your-elasticsearch-host:9243"] # Or "http://localhost:9200"
ES_API_KEY = ("your_api_key_id", "your_api_key_secret") # For Elastic Cloud or API Key auth
ES_INDEX_PREFIX = "cloudflare-logs"
LAST_PROCESSED_KEY_FILE = "last_processed_s3_key.txt" # To track processed files
# --- S3 Client ---
s3_client = boto3.client("s3", region_name=AWS_REGION)
# --- ElasticSearch Client ---
# If using API Key auth (recommended for Elastic Cloud):
es_client = Elasticsearch(ES_HOSTS, api_key=ES_API_KEY)
# If using username/password:
# es_client = Elasticsearch(ES_HOSTS, http_auth=("username", "password"))
# If no auth (local dev):
# es_client = Elasticsearch(ES_HOSTS)
def get_last_processed_key():
if os.path.exists(LAST_PROCESSED_KEY_FILE):
with open(LAST_PROCESSED_KEY_FILE, "r") as f:
return f.read().strip()
return ""
def set_last_processed_key(key):
with open(LAST_PROCESSED_KEY_FILE, "w") as f:
f.write(key)
def process_s3_object(bucket, key):
print(f"Processing s3://{bucket}/{key}")
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
compressed_data = response['Body'].read()
# Cloudflare logs are gzipped
with gzip.GzipFile(fileobj=io.BytesIO(compressed_data)) as f:
log_data = f.read().decode('utf-8')
actions = []
for line in log_data.strip().split('\n'):
if not line:
continue
try:
log_entry = json.loads(line)
# Add timestamp for index pattern and @timestamp for ES
# Cloudflare's EdgeStartTimestamp is nanoseconds, convert to milliseconds
if 'EdgeStartTimestamp' in log_entry:
log_entry['@timestamp'] = datetime.datetime.fromtimestamp(log_entry['EdgeStartTimestamp'] / 1_000_000_000, tz=datetime.timezone.utc).isoformat()
# Determine index name based on log date
timestamp_dt = datetime.datetime.fromisoformat(log_entry['@timestamp'].replace('Z', '+00:00')) if '@timestamp' in log_entry else datetime.datetime.utcnow()
index_name = f"{ES_INDEX_PREFIX}-{timestamp_dt.strftime('%Y.%m.%d')}"
actions.append({
"_index": index_name,
"_source": log_entry
})
except json.JSONDecodeError as e:
print(f"Error decoding JSON from line: {line}. Error: {e}")
except Exception as e:
print(f"Error preparing log entry: {e}")
if actions:
# Bulk index into ElasticSearch
success, failed = helpers.bulk(es_client, actions, chunk_size=500, request_timeout=60)
print(f"Indexed {success} documents, {len(failed)} failed.")
if failed:
print(f"Failed items: {failed}")
return True # Indicate successful processing
except Exception as e:
print(f"Error processing S3 object {key}: {e}")
return False
def main():
last_key = get_last_processed_key()
processed_this_run = []
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=S3_BUCKET_NAME)
all_keys = []
for page in pages:
if "Contents" in page:
all_keys.extend(sorted([obj['Key'] for obj in page['Contents'] if obj['Key'].endswith('.gz')]))
# Filter keys to only process new ones
start_processing = False
if not last_key:
start_processing = True # Process all if no last_key
keys_to_process = []
for key in all_keys:
if key == last_key:
start_processing = True
continue # Skip the last processed key itself
if start_processing:
keys_to_process.append(key)
for key in keys_to_process:
if process_s3_object(S3_BUCKET_NAME, key):
processed_this_run.append(key)
else:
# If processing fails for a key, stop to avoid partial processing state
print(f"Stopping processing due to failure on key: {key}")
break
if processed_this_run:
set_last_processed_key(processed_this_run[-1])
print(f"Updated last processed key to: {processed_this_run[-1]}")
else:
print("No new files processed or all files failed.")
print("Script finished.")
if __name__ == "__main__":
import io # required for gzip file handling
main()
Logic Explained:
- Configuration: Set your AWS region, S3 bucket name, ElasticSearch hosts, and API keys.
-
LAST_PROCESSED_KEY_FILE: This simple mechanism ensures that the script only processes new log files and avoids re-ingesting old data on subsequent runs. -
S3 Interaction:
boto3is used to list objects in the S3 bucket and download.gzfiles, which is Cloudflare’s default log compression format. - ElasticSearch Client: Initializes the ElasticSearch client. Always use API keys or username/password for production environments.
-
Log Parsing: Cloudflare logs are newline-delimited JSON. The script reads each line, decompresses it using
gzip, and parses it as JSON. -
Timestamp Handling: Cloudflare’s
EdgeStartTimestampis in nanoseconds. We convert it to milliseconds and assign it to@timestamp, which is standard for ElasticSearch and Kibana. -
Index Naming: We use daily indices (e.g.,
cloudflare-logs-2023.10.27) for better data management and lifecycle policies. -
Bulk Indexing:
elasticsearch.helpers.bulkis highly efficient for sending many documents to ElasticSearch in a single request. - Error Handling: Includes basic error handling for JSON decoding and ElasticSearch indexing.
You would typically run this script periodically using a cron job or a dedicated worker like AWS Lambda for serverless execution.
Step 4: Configure ElasticSearch for Anomaly Detection (Post-Ingestion)
While this tutorial focuses on ingestion, it’s vital to prepare ElasticSearch for analysis. Define an Index Template to ensure your Cloudflare logs have consistent mappings.
Example Index Template (apply via Kibana’s Dev Tools or ElasticSearch API):
PUT /_index_template/cloudflare_logs_template
{
"index_patterns": ["cloudflare-logs-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "cloudflare_policy",
"index.lifecycle.rollover_alias": "cloudflare-logs"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"ClientIP": { "type": "ip" },
"ClientRequestHost": { "type": "keyword" },
"ClientRequestMethod": { "type": "keyword" },
"ClientRequestURI": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 }}},
"ClientRequestUserAgent": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 }}},
"EdgeResponseBytes": { "type": "long" },
"EdgeResponseStatus": { "type": "integer" },
"OriginIP": { "type": "ip" },
"OriginResponseStatus": { "type": "integer" },
"RayID": { "type": "keyword" },
"ZoneName": { "type": "keyword" },
"ClientRequestPath": { "type": "keyword" },
"ClientRequestProtocol": { "type": "keyword" }
// Add other fields from your Cloudflare logs as needed
}
}
},
"priority": 500,
"version": 1,
"_meta": {
"description": "Template for Cloudflare HTTP request logs"
}
}
Logic Explained:
-
index_patterns: This template will apply to all indices starting withcloudflare-logs-. -
settings: Define shard/replica counts and integrate with Index Lifecycle Management (ILM) for automated data retention (e.g., move old indices to colder storage or delete them). -
mappings: Crucially, this defines the data types for your fields. Usingipfor IP addresses,keywordfor exact string matches (like hostnames, methods), andtextfor searchable strings (like user agents) ensures optimal querying and analysis.
With data flowing and properly mapped, you can now use Kibana to build visualizations, dashboards, and set up Elastic’s Machine Learning features to automatically detect unusual spikes, drops, or patterns in your Cloudflare traffic.
Common Pitfalls
-
Cloudflare API Rate Limits: When setting up Logpush via API, be mindful of Cloudflare’s API rate limits. Excessive requests can lead to
429 Too Many Requestserrors. - AWS IAM Permissions: Incorrect S3 bucket policies or IAM user/role permissions are a frequent source of errors. Double-check that Cloudflare has write access and your ingestion script has read access, following the principle of least privilege.
- ElasticSearch Indexing Errors: If log entries are malformed or if the data type in a log entry conflicts with your ElasticSearch index mapping, documents might fail to index. Monitor your ingestion script logs for these errors. Use the index template effectively.
- Cloudflare Logpush Latency: Cloudflare Logpush is near real-time but not instantaneous. There can be a delay of a few minutes (typically 1-5 minutes, sometimes more) before logs appear in your S3 bucket. Account for this in your anomaly detection expectations.
Conclusion
By following this tutorial, you’ve established a robust pipeline for ingesting Cloudflare logs into ElasticSearch. You’ve transformed raw traffic data into a searchable, centralized, and highly analyzable resource. This setup provides your SysAdmins, Developers, and DevOps Engineers with immediate access to critical information, enabling them to:
- Quickly search and filter traffic for specific events or IPs.
- Build custom dashboards in Kibana to visualize traffic patterns and identify trends.
- Leverage ElasticSearch’s powerful query language and Machine Learning capabilities for sophisticated anomaly detection, protecting your infrastructure from emerging threats.
What’s Next?
With your logs flowing, consider these next steps:
- Advanced Anomaly Detection: Explore Elastic’s Machine Learning features to automatically identify unusual patterns (e.g., sudden spikes in error rates, atypical geographic traffic, or unusual user agent strings).
- Alerting: Integrate ElasticSearch alerts with your existing notification systems (Slack, PagerDuty, email) to ensure your team is immediately notified of detected anomalies.
- Security Information and Event Management (SIEM): Extend your Elastic Stack to function as a full-fledged SIEM by integrating other security-related logs.
- Index Lifecycle Management (ILM): Implement comprehensive ILM policies to automate data retention, move older indices to cheaper storage tiers, and optimize cluster performance.
Empowering your team with this level of visibility and control over your Cloudflare traffic is a significant step towards a more secure and resilient infrastructure. Happy hunting for those anomalies!
“`html
Detecting Anomaly Traffic: Cloudflare Logs to ElasticSearch Tutorial
As a Senior DevOps Engineer, you understand the critical importance of maintaining a robust and secure online presence. For many organizations, Cloudflare stands as the first line of defense, protecting web assets from malicious traffic and ensuring performance. However, merely having Cloudflare in place isn’t enough; you need to understand the traffic flowing through it to detect anomalies, identify threats, and troubleshoot issues proactively.
Introduction
Manually sifting through Cloudflare logs, often delivered in massive files, is a tedious, time-consuming, and frankly, error-prone endeavor. Relying solely on Cloudflare’s dashboard for deep historical analysis or complex correlations can also be limiting. While various expensive SaaS solutions promise anomaly detection, they might not always align with your specific needs or budget constraints. The problem is clear: how do we gain comprehensive, real-time insights into Cloudflare traffic without manual toil or exorbitant costs?
The solution we propose at TechResolve is a powerful, cost-effective, and highly customizable integration: streaming Cloudflare logs to ElasticSearch. By centralizing your Cloudflare logs within an ElasticSearch cluster, you unlock unparalleled capabilities for advanced querying, visualization with Kibana, and sophisticated anomaly detection. This tutorial will guide you step-by-step through setting up this critical data pipeline, empowering your team with the tools to detect unusual traffic patterns and respond swiftly.
Prerequisites
Before we dive into the integration, ensure you have the following:
- Cloudflare Account: A Business or Enterprise plan is required to utilize Cloudflare Logpush.
- Cloudflare API Token: Create an API Token with specific permissions for Logpush (at minimum, Zone.Logpush:edit and Zone.Logpush:read for the relevant zones).
- AWS Account: We’ll use an S3 bucket as an intermediary destination for Cloudflare Logpush.
- ElasticSearch Cluster: Access to a running ElasticSearch cluster (self-managed, Elastic Cloud, or local development setup).
- Python 3.x: Installed on your local machine or a server where the ingestion script will run, along with pip.
- Basic Linux/Bash Knowledge: For executing commands and scripts.
- Familiarity with ElasticSearch: Basic understanding of indices, documents, and data types.
Step-by-Step Guide: Integrating Cloudflare Logs with ElasticSearch
Step 1: Configure Cloudflare Logpush to an AWS S3 Bucket
Cloudflare Logpush allows you to send logs to various destinations. We’ll use AWS S3 as an efficient and scalable intermediary. You can set this up via the Cloudflare dashboard, but for a DevOps approach, we’ll demonstrate using the Cloudflare API.
First, create an S3 bucket where Cloudflare will push logs. Let’s assume your bucket is named your-cloudflare-logs-bucket in the us-east-1 region.
Now, use the Cloudflare API to configure Logpush. Replace placeholders with your actual values:
curl -X POST "https://api.cloudflare.com/client/v4/zones/{ZONE_ID}/logpush/jobs" \
-H "Authorization: Bearer {CLOUDFLARE_API_TOKEN}" \
-H "Content-Type: application/json" \
--data '{
"dataset": "http_requests",
"frequency": "low",
"enabled": true,
"destination_conf": "s3://your-cloudflare-logs-bucket.s3.amazonaws.com/{LOG_PREFIX}?region=us-east-1&access_key_id={AWS_ACCESS_KEY_ID}&secret_access_key={AWS_SECRET_ACCESS_KEY}&bucket_path_format=logs/{DATE_Y}/{DATE_M}/{DATE_D}/{TIME_H}Z",
"logpull_options": "fields=ClientIP,ClientRequestHost,ClientRequestMethod,ClientRequestURI,ClientRequestUserAgent,EdgeResponseBytes,EdgeResponseStatus,OriginIP,OriginResponseStatus,RayID,ZoneID,ZoneName,EdgeStartTimestamp,ClientRequestPath,ClientRequestProtocol",
"name": "TechResolve_Logpush_S3"
}'
Logic Explained:
-
{ZONE_ID}: The ID of your Cloudflare zone. -
{CLOUDFLARE_API_TOKEN}: The API Token you generated. -
dataset: Specifies the type of logs;http_requestsis standard for web traffic. -
destination_conf: This is crucial. It defines the S3 bucket URL, region, and importantly, the AWS credentials Cloudflare will use to write to your bucket. For security, it’s recommended to create a dedicated IAM user with limited write-only access to this specific bucket. -
bucket_path_format: Defines how logs are organized within S3. -
logpull_options: Defines which fields you want to include in your logs. Customize this based on your analytical needs to avoid unnecessary data transfer.
Step 2: Create AWS S3 Bucket and IAM Permissions
If you haven’t already, create an S3 bucket. Ensure you configure an IAM policy that allows Cloudflare to write to it and, more importantly, an IAM user/role that your Python script (in the next step) can use to read from it.
For the Python script’s IAM user/role (let’s call it LogIngesterRole), attach a policy similar to this:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-cloudflare-logs-bucket",
"arn:aws:s3:::your-cloudflare-logs-bucket/*"
]
}
]
}
Logic Explained:
- This policy grants read access (
s3:GetObject) to objects withinyour-cloudflare-logs-bucketand the ability to list its contents (s3:ListBucket). - Adhering to the principle of least privilege, this role only has the necessary permissions to read the logs, not delete or modify them.
Step 3: Develop a Python Script to Ingest Logs from S3 to ElasticSearch
This Python script will periodically poll your S3 bucket, download new Cloudflare log files, parse them, and bulk index them into your ElasticSearch cluster.
Install necessary libraries:
pip install boto3 elasticsearch
Here’s a sample Python script:
`
import os
import json
import gzip
import boto3
from elasticsearch import Elasticsearch, helpers
import datetime
import time
import io # Import io for gzip file handling
--- Configuration ---
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = "your-cloudflare-logs-bucket"
ES_HOSTS = ["https://your-elasticsearch-host:9243"] # Or "http://localhost:9200"
ES_API_KEY = ("your_api_key_id", "your_api_key_secret") # For Elastic Cloud or API Key auth
ES_INDEX_PREFIX = "cloudflare-logs"
LAST_PROCESSED_KEY_FILE = "last_processed_s3_key.txt" # To track processed files
--- S3 Client ---
s3_client = boto3.client("s3", region_name=AWS_REGION)
--- ElasticSearch Client ---
If using API Key auth (recommended for Elastic Cloud):
es_client = Elasticsearch(ES_HOSTS, api_key=ES_API_KEY)
If using username/password:
es_client = Elasticsearch(ES_HOSTS, http_auth=("username", "password"))
If no auth (local dev):
es_client = Elasticsearch(ES_HOSTS)
def get_last_processed_key():
if os.path.exists(LAST_PROCESSED_KEY_FILE):
with open(LAST_PROCESSED_KEY_FILE, "r") as f:
return f.read().strip()
return ""
def set_last_processed_key(key):
with open(LAST_PROCESSED_KEY_FILE, "w") as f:
f.write(key)
def process_s3_object(bucket, key):
print(f"Processing s3://{bucket}/{key}")
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
compressed_data = response['Body'].read()
# Cloudflare logs are gzipped
with gzip.GzipFile(fileobj=io.BytesIO(compressed_data)) as f:
log_data = f.read().decode('utf-8')
actions = []
for line in log_data.strip().split('\n'):
if not line:
continue
try:
log_entry = json.loads(line)
# Add timestamp for index pattern and @timestamp for ES
# Cloudflare's EdgeStartTimestamp is nanoseconds, convert to milliseconds
if 'EdgeStartTimestamp' in log_entry:
log_entry['@timestamp'] = datetime.datetime.fromtimestamp(log_entry['EdgeStartTimestamp'] / 1_000_000_000, tz=datetime.timezone.utc).isoformat()
# Determine index name based on log date
timestamp_dt = datetime.datetime.fromisoformat(log_entry['@timestamp'].replace('Z', '+00:00')) if '@timestamp' in log_entry else datetime.datetime.utcnow()
index_name = f"{ES_INDEX_PREFIX}-{timestamp_dt.strftime('%Y.%m.%d')}"
actions.append({
"_index": index_name,
"_source": log_entry
})
except json.JSONDecodeError as e:
print(f"Error decoding JSON from line: {line}. Error: {e}")
except Exception as e:
print(f"Error preparing log entry: {e}")
if actions:
# Bulk index into ElasticSearch
success, failed = helpers.bulk(es_client, actions, chunk_size=500, request_timeout=60)
print(f"Indexed {success} documents, {len(failed)} failed.")
if failed:
print(f"Failed items: {failed}")
return True # Indicate successful processing
except Exception as e:
print(f"Error processing S3 object {key}: {e}")
return False
def main():
last_key = get_last_processed_key()
processed_this_run = []
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=S3_BUCKET_NAME)
all_keys = []
for page in pages:
if "Contents" in page:
all_keys.extend(sorted([obj['Key'] for obj in page['Contents'] if obj['Key'].endswith('.gz')]))
# Filter keys to only process new ones
start_processing = False
if not last_key:
start_processing = True # Process all if no last_key
keys_to_process = []
for key in all_keys:
if key == last_key:
start_processing = True
continue # Skip the last processed key itself
if start_processing:
keys_to_process.append(key)
for key in keys_to_process:
if process_s3_object(S3_BUCKET_NAME, key):
processed_this_run.append(key)
else:
# If processing fails for a key, stop to avoid partial processing state
print(f"Stopping processing due to failure on key: {key}")
break
if processed_this_run:
set_last_processed_key(processed_this_run[-1])
print(f"Updated last processed key to: {processed_this_run[-1]}")
else:
print("No new files processed or all files failed.")
print("Script finished.")
if name == "main":
main()
`
Logic Explained:
- Configuration: Set your AWS region, S3 bucket name, ElasticSearch hosts, and API keys.
-
LAST_PROCESSED_KEY_FILE: This simple mechanism ensures that the script only processes new log files and avoids re-ingesting old data on subsequent runs. -
S3 Interaction:
boto3is used to list objects in the S3 bucket and download.gzfiles, which is Cloudflare’s default log compression format. - ElasticSearch Client: Initializes the ElasticSearch client. Always use API keys or username/password for production environments.
-
Log Parsing: Cloudflare logs are newline-delimited JSON. The script reads each line, decompresses it using
gzip, and parses it as JSON. -
Timestamp Handling: Cloudflare’s
EdgeStartTimestampis in nanoseconds. We convert it to milliseconds and assign it to@timestamp, which is standard for ElasticSearch and Kibana. -
Index Naming: We use daily indices (e.g.,
cloudflare-logs-2023.10.27) for better data management and lifecycle policies. -
Bulk Indexing:
elasticsearch.helpers.bulkis highly efficient for sending many documents to ElasticSearch in a single request. - Error Handling: Includes basic error handling for JSON decoding and ElasticSearch indexing.
You would typically run this script periodically using a cron job or a dedicated worker like AWS Lambda for serverless execution.
Step 4: Configure ElasticSearch for Anomaly Detection (Post-Ingestion)
While this tutorial focuses on ingestion, it’s vital to prepare ElasticSearch for analysis. Define an Index Template to ensure your Cloudflare logs have consistent mappings.
Example Index Template (apply via Kibana’s Dev Tools or ElasticSearch API):
PUT /_index_template/cloudflare_logs_template
{
"index_patterns": ["cloudflare-logs-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "cloudflare_policy",
"index.lifecycle.rollover_alias": "cloudflare-logs"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"ClientIP": { "type": "ip" },
"ClientRequestHost": { "type": "keyword" },
"ClientRequestMethod": { "type": "keyword" },
"ClientRequestURI": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 }}},
"ClientRequestUserAgent": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 }}},
"EdgeResponseBytes": { "type": "long" },
"EdgeResponseStatus": { "type": "integer" },
"OriginIP": { "type": "ip" },
"OriginResponseStatus": { "type": "integer" },
"RayID": { "type": "keyword" },
"ZoneName": { "type": "keyword" },
"ClientRequestPath": { "type": "keyword" },
"ClientRequestProtocol": { "type": "keyword" }
// Add other fields from your Cloudflare logs as needed
}
}
},
"priority": 500,
"version": 1,
"_meta": {
"description": "Template for Cloudflare HTTP request logs"
}
}
Logic Explained:
-
index_patterns: This template will apply to all indices starting withcloudflare-logs-. -
settings: Define shard/replica counts and integrate with Index Lifecycle Management (ILM) for automated data retention (e.g., move old indices to colder storage or delete them). -
mappings: Crucially, this defines the data types for your fields. Usingipfor IP addresses,keywordfor exact string matches (like hostnames, methods), andtextfor searchable strings (like user agents) ensures optimal querying and analysis.
With data flowing and properly mapped, you can now use Kibana to build visualizations, dashboards, and set up Elastic’s Machine Learning features to automatically detect unusual spikes, drops, or patterns in your Cloudflare traffic.
Common Pitfalls
-
Cloudflare API Rate Limits: When setting up Logpush via API, be mindful of Cloudflare’s API rate limits. Excessive requests can lead to
429 Too Many Requestserrors. - AWS IAM Permissions: Incorrect S3 bucket policies or IAM user/role permissions are a frequent source of errors. Double-check that Cloudflare has write access and your ingestion script has read access, following the principle of least privilege.
- ElasticSearch Indexing Errors: If log entries are malformed or if the data type in a log entry conflicts with your ElasticSearch index mapping, documents might fail to index. Monitor your ingestion script logs for these errors. Use the index template effectively.
- Cloudflare Logpush Latency: Cloudflare Logpush is near real-time but not instantaneous. There can be a delay of a few minutes (typically 1-5 minutes, sometimes more) before logs appear in your S3 bucket. Account for this in your anomaly detection expectations.
Conclusion
By following this tutorial, you’ve established a robust pipeline for ingesting Cloudflare logs into ElasticSearch. You’ve transformed raw traffic data into a searchable, centralized, and highly analyzable resource. This setup provides your SysAdmins, Developers, and DevOps Engineers with immediate access to critical information, enabling them to:
- Quickly search and filter traffic for specific events or IPs.
- Build custom dashboards in Kibana to visualize traffic patterns and identify trends.
- Leverage ElasticSearch’s powerful query language and Machine Learning capabilities for sophisticated anomaly detection, protecting your infrastructure from emerging threats.
What’s Next?
With your logs flowing, consider these next steps:
- Advanced Anomaly Detection: Explore Elastic’s Machine Learning features to automatically identify unusual patterns (e.g., sudden spikes in error rates, atypical geographic traffic, or unusual user agent strings).
- Alerting: Integrate ElasticSearch alerts with your existing notification systems (Slack, PagerDuty, email) to ensure your team is immediately notified of detected anomalies.
- Security Information and Event Management (SIEM): Extend your Elastic Stack to function as a full-fledged SIEM by integrating other security-related logs.
- Index Lifecycle Management (ILM): Implement comprehensive ILM policies to automate data retention, move older indices to cheaper storage tiers, and optimize cluster performance.
Empowering your team with this level of visibility and control over your Cloudflare traffic is a significant step towards a more secure and resilient infrastructure. Happy hunting for those anomalies!

Top comments (0)