DEV Community

Tanroop Singh for Razorpay

Posted on

How We Cut Report Generation Costs by 75% by Ditching EMR

If you're running Spark workloads on AWS EMR, you've probably noticed something frustrating about your cloud bill. Those multi-node clusters are expensive, and they're expensive even when they're barely doing any real work. At Razorpay's Data Products Team, we were generating merchant reports using a dual-layer architecture: Kubernetes workers for real-time single-day reports and EMR clusters for everything else. The EMR approach was inefficient, costly, and slow. Most reports waited minutes just for cluster bootstrapping before any actual data processing began.

Here's what made this particularly painful. We were pulling massive amounts of data from S3 into EMR clusters, filtering it down to what we actually needed, running transformations, and delivering results. However, the filtering happened after data transfer, which meant we were paying to move gigabytes of irrelevant data across the network. Even with optimizations like Z-ordering on Delta Lake tables, we were fundamentally over-provisioning compute resources for workloads that didn't need distributed processing at all.

That's why we rebuilt our report generation system around a different principle: filter data at the query layer before it hits compute, and run transformations on single-node Spark workers in Kubernetes instead of spinning up entire EMR clusters. The result? We cut report generation costs by 75%, reduced P90 latency by 78%, and eliminated the operational complexity of managing hundreds of ephemeral EMR clusters. Here's how we did it, and why the architectural shift mattered more than the technology choices.

The Core Problem: When Distributed Computing Becomes Overhead

Before diving into our solution, let's talk about what made the original architecture so inefficient. The issue wasn't that EMR is bad technology; it's that we were using distributed computing infrastructure for problems that didn't require distribution.

Our merchant reporting platform generates hundreds of different report types: transaction summaries, settlement breakdowns, refund analyses, payment method distributions, and custom merchant-specific reports. Some reports cover single days with thousands of transactions. Others span months with millions of rows. The data lives in Delta Lake tables partitioned by date and optimized with Z-ordering on merchant IDs.

oldr-architecture

The dual-layer architecture created code redundancy because we maintained separate report generation logic for Kubernetes (PHP querying TiDB) and EMR (Scala/Spark querying Delta Lake). Developers had to implement report templates twice, test in two environments, and debug issues across two completely different technology stacks.

The EMR costs were substantial. Each report generation triggered a new cluster launch, bootstrapped Spark, executed the job, and terminated. Even for reports that processed small datasets, we paid for the overhead of distributed coordination, executor initialization, and inter-node communication. A report that only needed to scan 100MB of filtered data would spin up a multi-node cluster capable of processing terabytes.

The cluster bootstrap time killed our SLAs. From the moment a merchant requested a report to when they received it, the timeline looked like this: request queued (5-10 seconds), EMR cluster launch (60-90 seconds), Spark application start (20-30 seconds), data fetch and filter (10-30 seconds), transformations (5-15 seconds), S3 write and delivery (5-10 seconds). The actual data processing was a fraction of total time; most delay came from infrastructure provisioning.

The Inefficient Data Pipeline: Reading Everything, Using Little

Let me walk through exactly what happened during report generation to understand why it was so inefficient. When a merchant requested a transaction report for a specific date range and their merchant ID, here's what the Spark job did:

df = spark.read
  .format("delta")
  .load(dataPath)
  .select(columns.map(new Column(_)): _*)
  .filter(s"$datePartitionColumn >= '$filterStartDate' AND $datePartitionColumn <= '$filterEndDate'")

df = df.where(col(merchantColumn).isin(merchantList: _*))
Enter fullscreen mode Exit fullscreen mode

This looks innocent, but let's trace what actually happened at each step. The load(dataPath) operation read Delta Lake transaction logs to understand table structure, file paths, and statistics. No actual data moved yet, just metadata. The select() operation specified column projection, telling Spark which columns mattered. Still no data transfer.

The filter() on date partitions applied partition pruning. This was good; if the table had 365 daily partitions and the filter specified 7 days, Spark only scanned files in those 7 partitions. However, here's the critical inefficiency: Spark pulled all files from those 7 partitions into the EMR cluster, including data for thousands of merchants that the requesting merchant didn't care about.

The where() clause filtering by merchant ID happened after the data transfer. This meant that if a partition contained 1GB of data across 10,000 merchants, but the requesting merchant only had 10MB worth of transactions, we transferred the entire 1GB to the cluster and then filtered it down to 10MB. Multiply this across hundreds of concurrent report requests, and you understand why our data transfer costs and compute requirements were so high.

Z-ordering helped by co-locating rows with similar merchant IDs in the same data files, which improved data skipping. However, it didn't eliminate the fundamental problem: the filtering logic ran on the compute layer after data transfer, not at the storage layer before transfer.

The Solution: Query First, Compute Second

The breakthrough came from reversing the pipeline. Instead of pulling data into Spark and then filtering it, we pushed the filtering logic down to a query engine that operated directly on storage. That engine was Trino.

new-architecture

Here's how it works in practice. When a report request arrives, it gets queued to an SQS queue consumed by Scala worker pods running on Kubernetes. Each pod hosts a single-node Spark cluster (just the driver, no executors). The worker constructs a SQL query with all the necessary filters: date range, merchant IDs, column selection, and any aggregations we can push down.

This query goes to Trino via JDBC. Trino parses it, plans execution directly against Delta Lake files on S3, applies partition pruning, reads only the necessary columns from Parquet files, filters rows at the storage layer, and returns a compact result set to the Spark worker. Critically, Trino does all the heavy lifting of scanning, filtering, and projecting data before sending anything over the network.

The Spark worker receives a small, pre-filtered dataset (typically hundreds of kilobytes to a few megabytes) and performs in-memory transformations: pivoting, custom calculations, formatting, and sorting. These transformations run blazingly fast because the dataset is small and already filtered. The worker writes the final report to S3 and triggers delivery to the merchant.

This new architecture now serves 80-85% of our report workload. The remaining 15-20% are genuinely heavy reports that either process huge data volumes (multi-gigabyte result sets after filtering) or require complex multi-stage transformations. Those still run on EMR because they legitimately need distributed processing. But for the vast majority of reports, single-node Spark on Kubernetes is more than sufficient.

Why Trino Changes Everything

You might wonder why Trino makes such a dramatic difference. After all, Spark can also read Delta Lake and filter data. The answer lies in architectural differences between query engines and compute engines.

Trino is optimized for query execution, not general computation. It uses a pipelined execution model where data flows through operators without intermediate materialization. Filters, projections, and aggregations happen in a streaming fashion, minimizing memory footprint. Spark, by contrast, often materializes intermediate results, especially when shuffling data across executors. For read-heavy workloads with simple filters and projections, Trino's approach is fundamentally more efficient.

Trino applies predicates at the file scan level. When reading Parquet files, Trino leverages column statistics (min/max values, null counts) to skip entire row groups that don't match filter predicates. This data skipping happens before reading file contents, dramatically reducing I/O. Spark has similar capabilities, but Trino's implementation is more aggressive and efficient for ad-hoc queries.

Trino's lightweight execution eliminates distributed coordination overhead. A single-node Spark cluster still has coordination logic between driver and executors (even if there are no executors), task scheduling, shuffle management, and other distributed systems machinery. Trino's architecture is simpler for single-node operation, resulting in lower CPU and memory usage for equivalent workloads.

The cost equation matters too. We run Trino as a shared in-house cluster using the open-source version. The operational cost is minimal because it's amortized across multiple use cases beyond just reporting. By contrast, EMR charges for every cluster hour plus EC2 instance costs. Eliminating EMR for 80% of reports removed a massive cost center.

Results: When Architecture Meets Economics

The shift to Kubernetes workers with Trino produced dramatic improvements across multiple dimensions. P90 report generation time dropped by 78%. Reports that previously took 12-13 minutes (including cluster bootstrap) now complete in 3-4 minutes. The elimination of cluster startup time accounted for most of this improvement, but the more efficient data pipeline also contributed.

time-comparison

Report generation costs fell by 75%. The cost breakdown shifted from EC2 instances ($$$), EMR service fees ($$), and S3 data transfer costs ($$) to just Kubernetes pod costs ($) and minimal Trino overhead. The savings were substantial enough to justify the engineering investment within the first month of production deployment.

Code complexity decreased because we eliminated the dual-layer architecture. The Scala transformation logic that previously existed only in the EMR layer now serves both Kubernetes workers and the EMR fallback for heavy reports. Developers write report templates once, test in one environment, and deploy with confidence. This reduced development time for new reports by roughly 40%.

Operational overhead dropped significantly. Managing hundreds of ephemeral EMR clusters meant dealing with cluster failures, bootstrap timeouts, version upgrades, and capacity planning. The Kubernetes approach simplified operations for pod scaling, which our existing platform team already handled well. We went from managing a complex distributed system to running stateless workers, which is a massive operational win.

What We Learned: Profiling Before Optimizing

Building this system taught us a critical lesson that applies beyond report generation: profile your workloads before choosing infrastructure. We assumed distributed processing was necessary because we were processing "big data." Profiling revealed that most reports accessed small data subsets after filtering, making distribution overhead counterproductive.

The profiling process involved instrumenting our EMR jobs to track time spent in each phase: cluster bootstrap, data fetch, data filtering, transformations, and delivery. The data was clear; 60-70% of total time went to cluster bootstrap and data fetch, while actual computation consumed less than 20%. This insight drove our architectural redesign.

We also learned that query engines and compute engines solve different problems. Trino excels at reading data, applying filters, and returning compact results. Spark excels at complex multi-stage transformations, iterative algorithms, and distributed aggregations. Using each tool for its strength rather than forcing Spark to do everything produced better outcomes.

The 80/20 split between Kubernetes and EMR proved optimal. We initially considered moving 100% of workloads to Kubernetes, but profiling showed that some reports genuinely needed distributed processing. Rather than over-engineering the Kubernetes solution to handle edge cases, we kept EMR for the small percentage of reports that justified it. This pragmatic approach delivered most of the benefits with less risk.

The Bottom Line

The Scala worker on Kubernetes demonstrates that the right architecture matters more than raw technology power. EMR is excellent technology for genuinely distributed workloads. However, most of our report generation didn't need distribution; it needed efficient data filtering followed by lightweight transformations. By restructuring the pipeline to filter at the query layer (Trino) and transform on single-node compute (Kubernetes Spark workers), we dramatically improved performance and cost.

The 75% cost reduction and 78% latency improvement aren't just numbers; they represent a fundamental shift in how we think about data processing infrastructure. Not every data workload needs distributed computing. Many workloads are better served by efficient query engines that minimize data movement, paired with lightweight compute for transformations. Understanding which category your workload falls into requires profiling, measurement, and a willingness to challenge assumptions.

If you're running expensive distributed compute infrastructure for data workloads, profile your jobs carefully. You might discover that most of your workload is paying for coordination overhead that doesn't deliver value. The solution might not be bigger clusters or more optimization; it might be questioning whether you need clusters at all.

editor: @paaarth96

Top comments (0)