DEV Community

Cover image for The RAN Generates Millions of Records a Day
SESHA GONABOYINA
SESHA GONABOYINA

Posted on

The RAN Generates Millions of Records a Day

Building a resilient data pipeline for 5G Network Operations using Python and Snowflake

The Invisible Data Lake

Here is the thing that surprises people when they first work on network analytics: The data exists. All of it.

Every cell in a national 5G network generates performance counters every 15 minutes, session records for every user connection, and measurement reports from devices in the field.

It has been collected for years, but most of it sits in files nobody queries because nobody built the pipeline to make it queryable. That is the actual problem. Not the AI. Not the models. The pipeline.

I spent years building one of these at national scale. This post covers ingestion — getting raw RAN counter files parsed, cleaned, and loaded into Snowflake in a way that does not come apart at 3 AM.

Note: Anomaly detection comes in the next post. You need the foundation first.

What the raw data actually looks like

A 5G base station reports performance counters in XML or JSON, depending on the vendor. Files land in object storage on a 15-minute schedule. The structure looks roughly like this:

Click to view raw 5G counter JSON
{
  "measCollecFile": {
    "fileHeader": {
      "vendorName": "Ericsson",
      "collectionBeginTime": "2026-03-15T14:00:00Z"
    },
    "measData": {
      "managedElement": {
        "localDn": "MeContext=SITE_4421,ManagedElement=1",
        "measInfo": [
          {
            "measType": ["RRCConnEstabSucc", "RRCConnEstabAtt", "DRBThpVolDl"],
            "measValue": {
              "measObjLdn": "ManagedElement=1,ENodeBFunction=1,EUtranCellFDD=SITE_4421_1",
              "r": [1842, 1847, 48291044]
            }
          }
        ]
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Simple enough on paper. Now multiply that by 50,000 sites, three or four vendors, multiple schema versions per vendor, and files that sometimes arrive 40 minutes late.

That is your actual working environment.

The Parser

import json
import boto3
from datetime import datetime, timezone
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class RANCounterParser:

    # Map vendor names to internal schema identifiers.
    # Add new entries here when a vendor pushes a schema-breaking update.
    VENDOR_SCHEMA_MAP = {
        "Ericsson": "ericsson_v3",
        "Nokia":    "nokia_v2",
        "Samsung":  "samsung_v1",
    }

    def parse_file(self, s3_key: str, bucket: str) -> Optional[list[dict]]:
        s3 = boto3.client("s3")
        try:
            obj = s3.get_object(Bucket=bucket, Key=s3_key)
            raw = json.loads(obj["Body"].read().decode("utf-8"))
        except Exception as e:
            logger.error("fetch_failed key=%s error=%s", s3_key, e)
            return None

        vendor = self._detect_vendor(raw)
        if vendor is None:
            logger.warning("unknown_vendor key=%s", s3_key)
            return None

        return self._extract_records(raw, vendor, s3_key)

    def _detect_vendor(self, raw: dict) -> Optional[str]:
        try:
            name = raw["measCollecFile"]["fileHeader"]["vendorName"]
            return self.VENDOR_SCHEMA_MAP.get(name)
        except KeyError:
            return None

    def _extract_records(
        self, raw: dict, vendor: str, source_key: str
    ) -> list[dict]:
        records = []
        ingested_at = datetime.now(timezone.utc).isoformat()

        try:
            collection_time = (
                raw["measCollecFile"]["fileHeader"]["collectionBeginTime"]
            )
            meas_data = raw["measCollecFile"]["measData"]["managedElement"]
            site_id = (
                meas_data["localDn"].split(",")[0].replace("MeContext=", "")
            )

            for meas_info in meas_data["measInfo"]:
                counter_names = meas_info["measType"]
                cell_ldn      = meas_info["measValue"]["measObjLdn"]
                values        = meas_info["measValue"]["r"]

                # counter_names and values are parallel lists — zip is safe
                # only if the file is well-formed; log and skip if lengths differ
                if len(counter_names) != len(values):
                    logger.warning(
                        "counter_length_mismatch cell=%s key=%s",
                        cell_ldn, source_key
                    )
                    continue

                record = {
                    "collection_time": collection_time,
                    "ingested_at":     ingested_at,
                    "site_id":         site_id,
                    "cell_ldn":        cell_ldn,
                    "vendor":          vendor,
                    "source_file":     source_key,
                    "is_schema_gap":   False,
                }
                for counter, value in zip(counter_names, values):
                    record[counter.lower()] = value

                records.append(record)

        except KeyError as e:
            logger.error(
                "schema_mismatch key=%s missing_field=%s", source_key, e
            )

        return records
Enter fullscreen mode Exit fullscreen mode

Two things worth noting:

  1. ingested_at is stamped inside the parser, not at load time — that way it reflects when the file was actually processed.

  2. The length check on counter_names and values is not theoretical. I have seen vendor files where a counter was removed mid-release cycle and the value array shrank without the name array following.

Database Schema

Clustering by day and site is not optional at national scale:


CREATE TABLE ran_performance_counters (
    collection_time       TIMESTAMP_TZ  NOT NULL,
    ingested_at           TIMESTAMP_TZ  NOT NULL,
    site_id               VARCHAR(50)   NOT NULL,
    cell_ldn              VARCHAR(200)  NOT NULL,
    vendor                VARCHAR(30)   NOT NULL,
    source_file           VARCHAR(500),

    -- RRC
    rrcconnestabsucc      NUMBER,
    rrcconnestablatt      NUMBER,

    -- Throughput (values in bits)
    drbthpvoldl           NUMBER,
    drbthpvolul           NUMBER,

    -- Handover
    interfreqhoexesucc    NUMBER,
    interfreqhoexeatt     NUMBER,

    -- Availability
    cellunavailtime       NUMBER,

    -- Data quality — set during load, checked during analysis
    is_late_arrival       BOOLEAN DEFAULT FALSE,
    is_schema_gap         BOOLEAN DEFAULT FALSE
)
CLUSTER BY (DATE_TRUNC('day', collection_time), site_id);

Enter fullscreen mode Exit fullscreen mode

The Loader

Connection cleanup goes in finally so it runs whether the write succeeds or not.


import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

# Late-arrival threshold: 45 minutes after collection window close
LATE_THRESHOLD_SECONDS = 2700


@dataclass
class SnowflakeConfig:
    account:   str
    user:      str
    password:  str
    warehouse: str
    database:  str
    schema:    str


def load_records(
    records: list[dict],
    config: SnowflakeConfig,
    table: str = "RAN_PERFORMANCE_COUNTERS",
) -> int:
    if not records:
        return 0

    df = pd.DataFrame(records)

    df["collection_time"] = pd.to_datetime(df["collection_time"], utc=True)
    df["ingested_at"]     = pd.to_datetime(df["ingested_at"],     utc=True)

    lag_seconds = (
        df["ingested_at"] - df["collection_time"]
    ).dt.total_seconds()

    df["is_late_arrival"] = lag_seconds > LATE_THRESHOLD_SECONDS
    df["is_schema_gap"]   = df.get("is_schema_gap", False)

    conn = snowflake.connector.connect(
        account=config.account,
        user=config.user,
        password=config.password,
        warehouse=config.warehouse,
        database=config.database,
        schema=config.schema,
    )

    try:
        success, _, nrows, _ = write_pandas(
            conn, df, table, auto_create_table=False
        )
        if not success:
            logger.error("write_pandas reported failure for table=%s", table)
            return 0
        logger.info("loaded rows=%d table=%s", nrows, table)
        return nrows
    finally:
        conn.close()
Enter fullscreen mode Exit fullscreen mode

The finally block is #not optional. If write_pandas raises — and it does, when the warehouse is suspended or a column type does not match — the connection needs to close anyway. Without finally, you accumulate orphaned connections inside a Lambda or a long-running job until something breaks in a way that is very hard to trace.

Monitoring Pipeline Health

Run this query after your first week of ingestion to track your health metrics:

SELECT
    vendor,
    DATE_TRUNC('day', collection_time)  AS day,
    COUNT(*)                            AS total_records,
    SUM(is_late_arrival::INT)           AS late_count,
    SUM(is_schema_gap::INT)             AS gap_count,
    ROUND(
        100.0 * SUM(is_late_arrival::INT) / COUNT(*), 2
    )                                   AS late_pct
FROM ran_performance_counters
WHERE collection_time >= DATEADD('day', -7, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY 1, 2;

Enter fullscreen mode Exit fullscreen mode

If any vendor shows a late percentage above 5%, the issue is usually the collection job, not the network. If gap count spikes on a specific day, check whether a vendor pushed a software update that night.

These two numbers — late percentage and gap percentage, per vendor per day — are the health metrics for your pipeline. Track them before you build anything on top.

What Comes Next

The next post builds cell-specific anomaly detection on top of this foundation—how to learn what "normal" looks like for 50,000 different cells.

Follow me to get a notification when Part 2 drops!

I have been building these systems at national scale for several years. If something here does not match your environment—different vendor schemas or a different cloud stack—I'm happy to dig into it in the comments!

Top comments (0)