DEV Community

Cover image for Speeding up Stream-Static Joins on Apache Spark
Gonçalo Trincão Cunha
Gonçalo Trincão Cunha

Posted on • Originally published at Medium

1 1

Speeding up Stream-Static Joins on Apache Spark

Some time ago I came across a use case where a spark structured streaming job required a join with static data located on very large table.

The first approach taken wasn’t really great. Even with small micro-batches, it increased the batch processing time by orders of magnitude.

A (very) simplified example of this case could be a stream of sales events that needs to be merged with additional product information located on a large table of products.

This post is about using mapPartitions to join Spark Structured Streaming data frames with static data.

Approach #1 — Stream-Static Join

The first approach involved a join of the sales events data frame with the static products table.

Stream-static Join

Image by Author
.

Unfortunately, the join caused each micro-batch to do a full scan of the product table, resulting in a high batch duration even if the stream had a single record to process.

join performance

Image by Author
.

The code went like this:

// streamingDS = … Sales stream initialization …
// Read static product table
val staticDS = spark.read
  .format("parquet")
  .load("/tmp/prods.parquet").as[Product]
// Join of sales stream with products table
streamingDS
  .joinWith(staticDS, 
    streamingDS("productId")===staticDS("productId") &&
    streamingDS("category")===staticDS("category"))
  .map{ 
    case (sale,product) => new SaleInfo(sale, Some(product))
  }
Enter fullscreen mode Exit fullscreen mode

Using a small demo application, the DAG shows the culprit:

The partitioning of the static table was ignored and thus all rows of all partitions (in this case 5) where read.
The full table scan of the product table added >1min to the micro-batch duration, even if it has only one event.

join DAG

Image by Author
.

Approach #2 — mapPartitions

The second approach was based on a lookup to a key-value store for each sale event via Spark mapPartitions operation, which allows you to make data frame/data set transformations at the row level.

mapPartitions approach

Image by Author
.

Neither Parquet nor Delta tables are suitable for individual key lookup, so the prerequisite for this scenario is to have the product information loaded into a key value store (Mongo DB in this example).

The sample code is a bit more complex, but in certain cases well worth the effort to keep the batch duration low. Especially on small micro-batches.

// streamingDS = … Sales stream initialization …
streamingDS.mapPartitions(partition => {
  // setup DB connection
  val dbService = new ProductService()
  dbService.connect()

  partition.map(sale => {
    // Product lookup and merge
    val product = dbService.findProduct(sale.productId)
    new SaleInfo(sale, Some(product))
  }).iterator
})
Enter fullscreen mode Exit fullscreen mode

The new batch duration graph shows that the problem is long gone, and we’re back to a short batch duration.

mapPartitions performance

Image by Author
.

Hope you enjoyed reading! Please let me know if you have better approaches to this problem.

Test details: Spark version 3.2.1 running on Ubuntu 20.04 LTS / WSL2.

Test Code: https://github.com/trincaog/spark-mappartitions-test

Photo by Marc Sendra Martorell on Unsplash

Image of Docusign

Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more

Top comments (0)

Retry later
Retry later