DEV Community

Big Data Fundamentals: data sharding example

Data Sharding with Partitioned Tables in Iceberg: A Production Deep Dive

1. Introduction

The relentless growth of data volume and velocity presents a constant challenge for data platform engineers. We recently faced a scenario with a financial services client ingesting over 50TB of transaction data daily from a change data capture (CDC) stream. Queries requiring aggregations across this data, particularly for risk reporting, were experiencing unacceptable latency – exceeding 15 minutes for complex calculations. Traditional Hive tables were proving insufficient, leading to full table scans and resource contention. The core problem wasn’t just volume, but the inability to efficiently prune data during query execution. This necessitated a move towards a more sophisticated data organization strategy: data sharding via partitioned tables leveraging the Iceberg table format. This post details our implementation, focusing on architectural decisions, performance tuning, and operational considerations. We’ll cover how this approach integrates with our existing Spark-based ETL pipelines, Kafka ingestion, and Presto query engine.

2. What is Data Sharding in Big Data Systems?

Data sharding, in the context of Big Data, isn’t about physically splitting data across multiple databases like in traditional relational systems. Instead, it’s about logically partitioning a large dataset into smaller, more manageable segments based on a chosen key. These segments are typically represented as directories in object storage (S3, GCS, Azure Blob Storage) and are often aligned with query patterns.

Iceberg, a modern table format, provides a robust mechanism for implementing this. It manages metadata about data files, including partitioning information, allowing query engines like Presto and Spark to efficiently prune irrelevant data during query execution. Iceberg’s snapshot isolation and schema evolution capabilities are crucial for maintaining data consistency and accommodating changing business requirements. Under the hood, Iceberg uses a layered metadata architecture – manifest lists, manifest files, and data files – to track the location and properties of data. The partitioning scheme is defined as part of the table schema, and Iceberg enforces it during data writes. Data is typically stored in columnar formats like Parquet or ORC for efficient compression and query performance.

3. Real-World Use Cases

  • CDC Ingestion with Time-Based Partitioning: Ingesting CDC events and partitioning by event timestamp (e.g., event_time) allows for efficient querying of recent changes.
  • Log Analytics with Geo-Based Partitioning: Partitioning logs by geographic region (e.g., country_code) enables faster analysis of regional trends and troubleshooting.
  • Large-Scale Joins with Hash Partitioning: Partitioning related tables using a consistent hash function on a join key (e.g., user_id) can significantly reduce shuffle during join operations.
  • ML Feature Pipelines with Categorical Partitioning: Partitioning feature data by categorical variables (e.g., product_category) can improve the performance of model training and inference.
  • Financial Transaction Analysis with Date and Account ID Partitioning: Combining date partitioning with partitioning on account ID allows for efficient querying of transaction history for specific accounts within a specific time range.

4. System Design & Architecture

Our architecture centers around a Kafka-based ingestion pipeline, a Spark-based ETL process, and a Presto query engine. Iceberg tables are used for storing the transformed data.

graph LR
    A[Kafka Topic] --> B(Spark Streaming Job);
    B --> C{Iceberg Table (Partitioned)};
    C --> D[Presto Query Engine];
    subgraph Data Lake (S3)
        C
    end
    style C fill:#f9f,stroke:#333,stroke-width:2px
Enter fullscreen mode Exit fullscreen mode

The Spark Streaming job consumes CDC events from Kafka, performs basic transformations (e.g., schema validation, data cleansing), and writes the data to the Iceberg table. The Iceberg table is partitioned by event_time (daily) and account_id (using bucketing for even distribution). Presto then queries the Iceberg table, leveraging the partitioning metadata to prune irrelevant data.

We deploy this on AWS EMR using Spark 3.3 and Presto 350. The Iceberg catalog is backed by AWS Glue. The table schema is defined using a schema registry (Confluent Schema Registry) to ensure data consistency.

5. Performance Tuning & Resource Management

Performance tuning focused on optimizing Spark write performance and Presto query performance.

  • Spark Configuration:
    • spark.sql.shuffle.partitions: Set to 200 to balance parallelism and overhead.
    • spark.sql.files.maxPartitionBytes: Set to 128MB to control the size of Parquet files.
    • spark.executor.memory: Increased to 8GB to accommodate larger datasets.
    • spark.driver.memory: Increased to 4GB to handle metadata operations.
  • Iceberg Configuration:
    • write.partitioning.spec: Defined the partitioning scheme (event_time, account_id).
    • write.format: Set to Parquet with Snappy compression.
  • Presto Configuration:
    • query.max-memory-per-node: Set to 16GB to allow for larger in-memory aggregations.
    • query.max-concurrent-queries: Adjusted based on cluster capacity.

File size compaction is crucial. We implemented a daily compaction job using Spark to merge small Parquet files into larger ones, improving read performance. We observed a 30% reduction in query latency after implementing file compaction. Monitoring S3 request costs revealed that smaller files resulted in significantly higher costs due to increased metadata operations.

6. Failure Modes & Debugging

  • Data Skew: Uneven distribution of account_id values led to data skew during Spark writes, causing some executors to become overloaded. Mitigation involved adding a salt to the account_id to distribute the data more evenly.
  • Out-of-Memory Errors: Large aggregations in Presto caused out-of-memory errors. Mitigation involved increasing query.max-memory-per-node and optimizing the query plan.
  • Job Retries: Transient network errors during Spark writes resulted in job retries. We implemented exponential backoff with jitter to reduce the impact of these errors.
  • Debugging Tools:
    • Spark UI: Used to monitor executor performance, identify data skew, and analyze shuffle statistics.
    • Presto Web UI: Used to analyze query plans, identify bottlenecks, and monitor resource usage.
    • Datadog: Used to monitor system metrics (CPU, memory, disk I/O) and set up alerts.

7. Data Governance & Schema Management

Iceberg’s schema evolution capabilities are critical. We use the Confluent Schema Registry to manage schema changes. When a schema change is required, we update the schema registry and then use Iceberg’s ALTER TABLE command to propagate the change to the table metadata. Backward compatibility is maintained by ensuring that new schema versions are compatible with older versions. The Hive Metastore is used as a metadata catalog, providing a unified view of the data.

8. Security and Access Control

We leverage AWS Lake Formation to manage access control to the Iceberg tables. Lake Formation allows us to define granular permissions based on IAM roles and policies. Data is encrypted at rest using KMS keys. Audit logging is enabled to track data access and modifications.

9. Testing & CI/CD Integration

We use Great Expectations to validate data quality during the ETL process. DBT tests are used to validate data transformations. Our CI/CD pipeline includes automated regression tests that verify the correctness of the data and the performance of the queries. We use Terraform to manage the infrastructure and deploy the code.

10. Common Pitfalls & Operational Misconceptions

  • Incorrect Partitioning Key: Choosing a partitioning key that doesn’t align with query patterns can negate the benefits of partitioning. Symptom: Full table scans despite partitioning. Mitigation: Analyze query patterns and choose a partitioning key accordingly.
  • Small File Problem: Writing too many small files can degrade read performance. Symptom: High S3 request costs, slow query performance. Mitigation: Implement file compaction.
  • Data Skew: Uneven data distribution can lead to performance bottlenecks. Symptom: Some executors are overloaded while others are idle. Mitigation: Add a salt to the partitioning key.
  • Schema Evolution Issues: Incompatible schema changes can break existing queries. Symptom: Query failures, data corruption. Mitigation: Use backward-compatible schema changes and thoroughly test the changes before deploying them.
  • Ignoring Metadata Management: Failing to properly manage Iceberg metadata can lead to inconsistencies and data corruption. Symptom: Query errors, data loss. Mitigation: Use a reliable metadata catalog (e.g., AWS Glue, Hive Metastore) and regularly back up the metadata.

11. Enterprise Patterns & Best Practices

  • Data Lakehouse vs. Warehouse: Iceberg enables a data lakehouse architecture, combining the flexibility of a data lake with the reliability and performance of a data warehouse.
  • Batch vs. Micro-Batch vs. Streaming: Choose the appropriate ingestion pattern based on the data velocity and latency requirements.
  • File Format Decisions: Parquet is generally the preferred file format for analytical workloads due to its columnar storage and efficient compression.
  • Storage Tiering: Use storage tiering to reduce costs by moving infrequently accessed data to cheaper storage tiers.
  • Workflow Orchestration: Use a workflow orchestration tool (e.g., Airflow, Dagster) to manage the ETL pipeline and ensure data consistency.

12. Conclusion

Data sharding with partitioned tables using Iceberg is a powerful technique for building scalable and performant Big Data systems. By carefully considering the partitioning scheme, optimizing the configuration, and implementing robust monitoring and alerting, we were able to significantly improve query latency and reduce infrastructure costs for our client. Next steps include benchmarking different compaction strategies and introducing schema enforcement to further improve data quality and reliability. We also plan to explore the use of Iceberg’s hidden partitioning feature to simplify table management.

Top comments (0)