Building a resilient data pipeline for 5G Network Operations using Python and Snowflake
The Invisible Data Lake
Here is the thing that surprises people when they first work on network analytics: The data exists. All of it.
Every cell in a national 5G network generates performance counters every 15 minutes, session records for every user connection, and measurement reports from devices in the field.
It has been collected for years, but most of it sits in files nobody queries because nobody built the pipeline to make it queryable. That is the actual problem. Not the AI. Not the models. The pipeline.
I spent years building one of these at national scale. This post covers ingestion — getting raw RAN counter files parsed, cleaned, and loaded into Snowflake in a way that does not come apart at 3 AM.
Note: Anomaly detection comes in the next post. You need the foundation first.
What the raw data actually looks like
A 5G base station reports performance counters in XML or JSON, depending on the vendor. Files land in object storage on a 15-minute schedule. The structure looks roughly like this:
Click to view raw 5G counter JSON
{
"measCollecFile": {
"fileHeader": {
"vendorName": "Ericsson",
"collectionBeginTime": "2026-03-15T14:00:00Z"
},
"measData": {
"managedElement": {
"localDn": "MeContext=SITE_4421,ManagedElement=1",
"measInfo": [
{
"measType": ["RRCConnEstabSucc", "RRCConnEstabAtt", "DRBThpVolDl"],
"measValue": {
"measObjLdn": "ManagedElement=1,ENodeBFunction=1,EUtranCellFDD=SITE_4421_1",
"r": [1842, 1847, 48291044]
}
}
]
}
}
}
}
Simple enough on paper. Now multiply that by 50,000 sites, three or four vendors, multiple schema versions per vendor, and files that sometimes arrive 40 minutes late.
That is your actual working environment.
The Parser
import json
import boto3
from datetime import datetime, timezone
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class RANCounterParser:
# Map vendor names to internal schema identifiers.
# Add new entries here when a vendor pushes a schema-breaking update.
VENDOR_SCHEMA_MAP = {
"Ericsson": "ericsson_v3",
"Nokia": "nokia_v2",
"Samsung": "samsung_v1",
}
def parse_file(self, s3_key: str, bucket: str) -> Optional[list[dict]]:
s3 = boto3.client("s3")
try:
obj = s3.get_object(Bucket=bucket, Key=s3_key)
raw = json.loads(obj["Body"].read().decode("utf-8"))
except Exception as e:
logger.error("fetch_failed key=%s error=%s", s3_key, e)
return None
vendor = self._detect_vendor(raw)
if vendor is None:
logger.warning("unknown_vendor key=%s", s3_key)
return None
return self._extract_records(raw, vendor, s3_key)
def _detect_vendor(self, raw: dict) -> Optional[str]:
try:
name = raw["measCollecFile"]["fileHeader"]["vendorName"]
return self.VENDOR_SCHEMA_MAP.get(name)
except KeyError:
return None
def _extract_records(
self, raw: dict, vendor: str, source_key: str
) -> list[dict]:
records = []
ingested_at = datetime.now(timezone.utc).isoformat()
try:
collection_time = (
raw["measCollecFile"]["fileHeader"]["collectionBeginTime"]
)
meas_data = raw["measCollecFile"]["measData"]["managedElement"]
site_id = (
meas_data["localDn"].split(",")[0].replace("MeContext=", "")
)
for meas_info in meas_data["measInfo"]:
counter_names = meas_info["measType"]
cell_ldn = meas_info["measValue"]["measObjLdn"]
values = meas_info["measValue"]["r"]
# counter_names and values are parallel lists — zip is safe
# only if the file is well-formed; log and skip if lengths differ
if len(counter_names) != len(values):
logger.warning(
"counter_length_mismatch cell=%s key=%s",
cell_ldn, source_key
)
continue
record = {
"collection_time": collection_time,
"ingested_at": ingested_at,
"site_id": site_id,
"cell_ldn": cell_ldn,
"vendor": vendor,
"source_file": source_key,
"is_schema_gap": False,
}
for counter, value in zip(counter_names, values):
record[counter.lower()] = value
records.append(record)
except KeyError as e:
logger.error(
"schema_mismatch key=%s missing_field=%s", source_key, e
)
return records
Two things worth noting:
ingested_atis stamped inside the parser, not at load time — that way it reflects when the file was actually processed.The length check on
counter_namesand values is not theoretical. I have seen vendor files where a counter was removed mid-release cycle and the value array shrank without the name array following.
Database Schema
Clustering by day and site is not optional at national scale:
CREATE TABLE ran_performance_counters (
collection_time TIMESTAMP_TZ NOT NULL,
ingested_at TIMESTAMP_TZ NOT NULL,
site_id VARCHAR(50) NOT NULL,
cell_ldn VARCHAR(200) NOT NULL,
vendor VARCHAR(30) NOT NULL,
source_file VARCHAR(500),
-- RRC
rrcconnestabsucc NUMBER,
rrcconnestablatt NUMBER,
-- Throughput (values in bits)
drbthpvoldl NUMBER,
drbthpvolul NUMBER,
-- Handover
interfreqhoexesucc NUMBER,
interfreqhoexeatt NUMBER,
-- Availability
cellunavailtime NUMBER,
-- Data quality — set during load, checked during analysis
is_late_arrival BOOLEAN DEFAULT FALSE,
is_schema_gap BOOLEAN DEFAULT FALSE
)
CLUSTER BY (DATE_TRUNC('day', collection_time), site_id);
The Loader
Connection cleanup goes in finally so it runs whether the write succeeds or not.
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
# Late-arrival threshold: 45 minutes after collection window close
LATE_THRESHOLD_SECONDS = 2700
@dataclass
class SnowflakeConfig:
account: str
user: str
password: str
warehouse: str
database: str
schema: str
def load_records(
records: list[dict],
config: SnowflakeConfig,
table: str = "RAN_PERFORMANCE_COUNTERS",
) -> int:
if not records:
return 0
df = pd.DataFrame(records)
df["collection_time"] = pd.to_datetime(df["collection_time"], utc=True)
df["ingested_at"] = pd.to_datetime(df["ingested_at"], utc=True)
lag_seconds = (
df["ingested_at"] - df["collection_time"]
).dt.total_seconds()
df["is_late_arrival"] = lag_seconds > LATE_THRESHOLD_SECONDS
df["is_schema_gap"] = df.get("is_schema_gap", False)
conn = snowflake.connector.connect(
account=config.account,
user=config.user,
password=config.password,
warehouse=config.warehouse,
database=config.database,
schema=config.schema,
)
try:
success, _, nrows, _ = write_pandas(
conn, df, table, auto_create_table=False
)
if not success:
logger.error("write_pandas reported failure for table=%s", table)
return 0
logger.info("loaded rows=%d table=%s", nrows, table)
return nrows
finally:
conn.close()
The finally block is #not optional. If write_pandas raises — and it does, when the warehouse is suspended or a column type does not match — the connection needs to close anyway. Without finally, you accumulate orphaned connections inside a Lambda or a long-running job until something breaks in a way that is very hard to trace.
Monitoring Pipeline Health
Run this query after your first week of ingestion to track your health metrics:
SELECT
vendor,
DATE_TRUNC('day', collection_time) AS day,
COUNT(*) AS total_records,
SUM(is_late_arrival::INT) AS late_count,
SUM(is_schema_gap::INT) AS gap_count,
ROUND(
100.0 * SUM(is_late_arrival::INT) / COUNT(*), 2
) AS late_pct
FROM ran_performance_counters
WHERE collection_time >= DATEADD('day', -7, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY 1, 2;
If any vendor shows a late percentage above 5%, the issue is usually the collection job, not the network. If gap count spikes on a specific day, check whether a vendor pushed a software update that night.
These two numbers — late percentage and gap percentage, per vendor per day — are the health metrics for your pipeline. Track them before you build anything on top.
What Comes Next
The next post builds cell-specific anomaly detection on top of this foundation—how to learn what "normal" looks like for 50,000 different cells.
Follow me to get a notification when Part 2 drops!
I have been building these systems at national scale for several years. If something here does not match your environment—different vendor schemas or a different cloud stack—I'm happy to dig into it in the comments!
Top comments (0)