<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: anhcodes</title>
    <description>The latest articles on DEV Community by anhcodes (@anhcodes).</description>
    <link>https://dev.to/anhcodes</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F789092%2F2532b51b-741a-4f3c-84e5-38a54915ce38.jpeg</url>
      <title>DEV Community: anhcodes</title>
      <link>https://dev.to/anhcodes</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/anhcodes"/>
    <language>en</language>
    <item>
      <title>Debug long running Spark job</title>
      <dc:creator>anhcodes</dc:creator>
      <pubDate>Wed, 31 May 2023 00:41:11 +0000</pubDate>
      <link>https://dev.to/anhcodes/debug-long-running-spark-job-5go4</link>
      <guid>https://dev.to/anhcodes/debug-long-running-spark-job-5go4</guid>
      <description>&lt;p&gt;You Spark job is running for a long time, what to do? Generally, long-running Spark jobs can be due to various factors. We like to call them the 5S - Spill, Skew, Shuffle, Storage, and Serialization. So, how do we identify the main culprit?&lt;/p&gt;

&lt;p&gt;🔎 Look for Skew: Are some of the tasks taking longer than others? Do you have a join operation?&lt;/p&gt;

&lt;p&gt;🔎 Look for Spill: Any out-of-memory errors? Do the executors have enough memory to finish their tasks? Got any disk spills?&lt;/p&gt;

&lt;p&gt;🔎 Look for Shuffle: Large amounts of data being shuffled across executors? Do you have a join operation?&lt;/p&gt;

&lt;p&gt;🔎 Look for Storage Issues: Do you have small files or highly nested directory structure?&lt;/p&gt;

&lt;p&gt;🔎 Look for inefficient Spark code, or serialization issues.&lt;/p&gt;

&lt;p&gt;Now, the solution may vary depending on the root cause. Most of the times, the root cause indicators will show up in the SparkUI and it’s important that you understand how to read it. &lt;/p&gt;

&lt;p&gt;In general, always try to cut down the time Spark takes to load data into memory, parallelize tasks across executors, and scale the memory according to data size. &lt;/p&gt;

&lt;h2&gt;
  
  
  Skew [imbalance partitions]
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;TL,DR - Skew is caused by imbalance partitions. To fix Skew, try to repartition data, or set SkewHint when reading in data from disk to memory. If Skew is caused by partition imbalance after shuffling stage, enable SkewJoin option with Adaptive Query Execution, and set the correct shuffle partitions size. In most cases, turning on Adaptive Query Execution can help mitigate Skew issue.&lt;/em&gt;  &lt;/p&gt;

&lt;p&gt;Let's talk about one of the most common issues you might encounter in Spark - "skew". Skew refers to an imbalance in partition sizes, which can also lead to a spill. When you read in data in Spark, it's typically divided into partitions of 128 MB, distributed evenly according to the &lt;code&gt;maxPartitionBytes&lt;/code&gt; setting. However, during transformations, Spark will need to shuffle your data, and some partitions may end up having significantly more data than others, creating skew in your data.&lt;/p&gt;

&lt;p&gt;When a partition is bigger than the others, the executor will take longer to process that partition and need more memory, which might eventually result in a spill to disk or Out Of Memory (OOM) errors. &lt;/p&gt;

&lt;h3&gt;
  
  
  How do you identify Skew in your SparkUI?
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;If you see long running tasks and/or uneven distribution of tasks in the Event Timeline of a long-running stage&lt;/li&gt;
&lt;li&gt;If you see uneven Shuffle Read/Write Size in a stage’s Summary Metrics&lt;/li&gt;
&lt;li&gt;Skew can cause Spill, so sometimes you will see Disk or Memory Spill as well. If Spill is caused by Skew, you have to fix Skew as the Root cause.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--SL-3UqqH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--SL-3UqqH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/1.png" alt="image" width="800" height="507"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--6rzLNwAw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--6rzLNwAw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/2.png" alt="image" width="800" height="309"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  How do you fix Skew?
&lt;/h3&gt;

&lt;p&gt;If you disk spill or OOM errors are caused by skew, instead of solving for RAM problem, solve for uneven distribution of records across all partitions&lt;/p&gt;

&lt;p&gt;Option 1: If you run Spark on Databricks, use skew hint (refer &lt;a href="https://docs.databricks.com/optimizations/skew-join.html"&gt;Skew Join optimization&lt;/a&gt;). For example, assuming that you know the column used in the join is skewed, set the skew hint for that column. With this skew hint information, Spark can hopefully construct a better query plan for the join&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Set skew hint when loading the table
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"delta"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;trxPath&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;hint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"skew"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s"&gt;"&amp;lt;join_column&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Option 2: If you use Spark 3.x, utilize Adaptive Query Execution (AQE). AQE can automatically adapt query plans at runtime based on more accurate metrics, leading to more optimized execution. After a shuffle, AQE can automatically use split shuffle read to split skewed partitions into smaller partitions, ensuring that the executors are not burdened by larger skews. This can lead to faster query execution and a more efficient use of cluster resources. &lt;strong&gt;Using AQE is highly recommended, and generally more effective than other options&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Turn AQE on
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'spark.sql.adaptive.enabled'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'spark.sql.adaptive.skewedJoin.enabled'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Option3 : Another option is to salt the skewed column with a random number to create more evenly partitions but at the cost of extra processing. This is a more complex operations, which we should discuss in another post. &lt;/p&gt;

&lt;h2&gt;
  
  
  Spill [lack of memory]
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;TL,DR - Spill is caused by executors lacking of memory to process partitions. To fix Spill think about how you can add more memory to the executors, or manage the partitions sizes&lt;/em&gt;  &lt;/p&gt;

&lt;p&gt;If your Spark executors don’t have enough local memory to process their allocated partitions, Spark has to spill the data to disk. Spill is a Spark’s measure of moving an RDD from local RAM to disk and then back to executor’s RAM again for further processing with the goal of avoiding an out-of-memory (OOM) error. However, this can lead to expensive disk reads and writes, and significantly slow down the entire job. &lt;/p&gt;

&lt;p&gt;This process occurs when a partition becomes too large to fit into RAM, and it may be a result of skew in the data. Some potential causes of spill include setting &lt;code&gt;spark.sql.files.maxPartitionBytes&lt;/code&gt; too high, using &lt;code&gt;explode()&lt;/code&gt; on an array, performing a &lt;code&gt;join&lt;/code&gt; or &lt;code&gt;crossJoin&lt;/code&gt; of two tables, or aggregating results by a skewed column.&lt;/p&gt;

&lt;h3&gt;
  
  
  How do you identify Spill in your SparkUI?
&lt;/h3&gt;

&lt;p&gt;You can  find Spill indicators on SparkUI under each stage’s detail tabe or Aggregated Metrics by Executor. When data is spilled, both the size of the data in memory and on disk will be provided. Typically, the size on disk will be smaller due to compression that occurs when serializing data before it is written to disk.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--fE46srLK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--fE46srLK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/3.png" alt="image" width="485" height="149"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--RndX0pXo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--RndX0pXo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/4.png" alt="image" width="800" height="92"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  How do you fix Spill?
&lt;/h3&gt;

&lt;p&gt;To mitigate Spill issues, there are several actions you can take.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;First, make sure to fix the root cause of skew if this is the underlying issue behind the spill. To decrease partition sizes, increase the number of partitions or use explicit repartitioning.&lt;/li&gt;
&lt;li&gt;If above doesn’t work, allocate a cluster with more memory per worker if each worker need to process bigger partitions of data.&lt;/li&gt;
&lt;li&gt;Finally, you can adjust specific configuration settings using &lt;code&gt;spark.conf.set()&lt;/code&gt; to manage the size and number of partitions.

&lt;ul&gt;
&lt;li&gt;manage &lt;code&gt;spark.conf.set(spark.sql.shuffle.partitions, {num_partitions})&lt;/code&gt; to reduce data in each partition shuffled across executors&lt;/li&gt;
&lt;li&gt;manage &lt;code&gt;spark.conf.set(’spark.sql.files.maxPartitionBytes’, {MB}*1024*1024)&lt;/code&gt; to reduce size of partition when Spark read from disk to memory&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;It's worth noting that ignoring spill may not always be a good idea, as even a small percentage of tasks can delay an entire process. Therefore, it's important to take note of spills and manage them proactively to enhance the performance of your Spark jobs.&lt;/p&gt;

&lt;h2&gt;
  
  
  Shuffles [data transfer]
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;TL,DR - Shuffle refers to the movement of data across executors, and it's inevitable with wide transformation jobs. To tune shuffle operations, think about how you can reduce the amount of data that get shuffled across the cluster network&lt;/em&gt;  &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--X1-XAVg8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--X1-XAVg8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/5.png" alt="image" width="800" height="435"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Shuffle is the act of moving data between executors. If you have multiple data partitions in different executors on a Spark cluster, shuffle is necessary and inevitable. Most of the time, shuffle operations are actually quite fast. However, there are situations when shuffle can become the culprit of slowing down your Spark job. For example, moving data across the network is slow, and the more data you have to move, the slower it will get.  Moreover, Incorrect shuffles can cause Skew, and potentital Spill &lt;/p&gt;

&lt;h3&gt;
  
  
  How to identify Shuffle?
&lt;/h3&gt;

&lt;p&gt;If you use wide transformation (&lt;code&gt;distinct&lt;/code&gt;, &lt;code&gt;join&lt;/code&gt;, &lt;code&gt;groupBy/count&lt;/code&gt;, &lt;code&gt;orderBy&lt;/code&gt;, &lt;code&gt;crossJoin&lt;/code&gt;) in your Spark job and you have multiple executors in the Spark cluster, shuffle will most likely happen. &lt;/p&gt;

&lt;p&gt;In the SparkUI, you can find the Shuffle Read Size and Shuffle Write Size numbers in the stages on the SparkUI&lt;/p&gt;

&lt;h3&gt;
  
  
  How to mitigate Shuffle?
&lt;/h3&gt;

&lt;p&gt;To reduce the impact of shuffle on your Spark job, try to reduce the amount of data you have to shuffle across network&lt;/p&gt;

&lt;h4&gt;
  
  
  1. Tune the Spark Cluster
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Use fewer and larger workers: You normally pay the same unit price for the same total number of cores and memory in your cluster, no matter the number of executors. So if you have jobs that require a lot of wide transformations, choose the bigger instance and less workers. That way you don't have many executors to exchange data. &lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  2. Limit shuffled data and tune the shuffle partitions
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Use predicate push down and/or narrow the columns to reduce the amount of data being shuffled&lt;/li&gt;
&lt;li&gt;Turn on Adaptive Query Execution (AQE) to dynamically coaslesce shuffle partitions at runtime to avoid empty partitions
&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--sqSbMFk0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/debug-spark/aqe-shuffle.png" alt="image" width="800" height="504"&gt;
&lt;/li&gt;
&lt;li&gt;manage spark.conf.set(spark.sql.shuffle.partitions, {num_partitions}) to set the number of partitions to be shuffled&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  3. Try BroadcastHashJoin if possible
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Use BroadcastHashJoin if 1 table is less than 10MB. With BHJ, the smaller table will be broadcasted to all executors, which can eliminate the shuffle of data. 

&lt;ul&gt;
&lt;li&gt;Step 1: each executor will read in their assigned partitions from the bigger table&lt;/li&gt;
&lt;li&gt;Step 2: every partition of the broadcasted table is sent to the driver (therefore you want to make sure the broadcasted table is small enough to fit into the driver mem)&lt;/li&gt;
&lt;li&gt;Step 3: a copy of the entire broadcasted is sent to each executor&lt;/li&gt;
&lt;li&gt;Step 4: each executor can do a standard join between tables because they have the full copy of the broadcasted table, therefore shuffle can be avoided in join&lt;/li&gt;
&lt;li&gt;There are few considerations when it comes to BroadcastHashJoin you should be aware of that we can discuss in another post&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;With Adaptive Query Execution in Spark 3.x, Spark will try to do BHJ in runtime if one of the table is small enough&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  4. Bucketing
&lt;/h4&gt;

&lt;p&gt;Bucketing can be a useful technique to improve the efficiency of data processing by pre-sorting and aggregating data in the Join operation. The process involves dividing the data into N buckets, with both tables requiring the same number of buckets. This technique can be particularly effective for large datasets of several TBs, where data is queried and joined repeatedly.&lt;/p&gt;

&lt;p&gt;Bucketing should be performed by a skilled data engineer as part of the data preparation process. It's worth noting that bucketing is only worthwhile if the data is frequently queried and joined, and filtering does not improve the join operation.&lt;/p&gt;

&lt;p&gt;By using bucketing in the Join operation, the process of shuffling and exchanging data can be eliminated, resulting in a more efficient join. &lt;/p&gt;

&lt;h2&gt;
  
  
  Storage [small files, scanning, inferring schema/schema evolution]
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;TL,DR - Storage issues are problems related to how data is stored on disk, which can lead to high overhead with ingesting data by open, read, close files operation. To fix issues relating to Storage, think about how you can reduce the read, write, ingest files on disk&lt;/em&gt;  &lt;/p&gt;

&lt;h3&gt;
  
  
  How to Identify storage issue?
&lt;/h3&gt;

&lt;p&gt;There are a few storage-related potential issues that can slow down operations in Spark data processing. One is the overhead of opening, reading, and closing many &lt;strong&gt;small files&lt;/strong&gt;. To address this, it's recommended to aim for files with a size of 1GB or larger, which can significantly reduce the time spent on these operations.&lt;/p&gt;

&lt;p&gt;Another is &lt;strong&gt;directory scanning issue&lt;/strong&gt; which can arise with &lt;strong&gt;highly partitioned&lt;/strong&gt; datasets that have multiple directories for each partition, requiring the driver to scan all of them on disk. For example, files on your storage are fine-grained partitioned to year-month-date-minute. &lt;/p&gt;

&lt;p&gt;A third issue involves &lt;strong&gt;schema operations&lt;/strong&gt;, such as inferring the schema for JSON and CSV files. This can be very costly, requiring a full read of the files to determine data types, even if you only need a subset of the data. By contrast, reading Parquet files typically only requires one-time reading of the schema, which is much faster. Therefore, it's recommended to use Parquet as file storage for Spark considering that Parquet stores schema information in the file&lt;/p&gt;

&lt;p&gt;However, schema evolution support, even with Parquet files, can also be expensive, as it requires reading all the files and merging the schema collectively. For this reason, starting from Spark 1.5, schema merging is turned off by default and needs to be turned on by setting the &lt;code&gt;spark.sql.parquet.mergeSchema&lt;/code&gt; option.&lt;/p&gt;

&lt;h3&gt;
  
  
  How to fix storage issue?
&lt;/h3&gt;

&lt;p&gt;To address the issue of tiny files in a storage location, if you are using Delta Lake, consider using &lt;code&gt;autoOptimize&lt;/code&gt; with &lt;code&gt;optimizeWrite&lt;/code&gt; and &lt;code&gt;autoCompact&lt;/code&gt; features. These will automatically coalesce small files into larger ones, however they have some subtle differences:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Auto compaction occurs after a write to a table has succeeded and runs synchronously on the cluster that has performed the write. Auto compaction only compacts files that haven’t been compacted previously.&lt;/li&gt;
&lt;li&gt;Optimized writes improve file size as data is written and benefit subsequent reads on the table. Optimized writes are most effective for partitioned tables, as they reduce the number of small files written to each partition.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To address the issue of slow performance when scanning directories, consider registering your data as tables in order to leverage the metastore to track the files in the storage location. This may have some initial overhead, but will benefit you in the long run by avoiding repeated directory scans.&lt;/p&gt;

&lt;p&gt;To address issues with schema operations, one option is to specify the schema when reading in non-Parquet file types, though this can be a time-consuming process. Alternatively, you can register tables so that the metastore can track the table schema. The best option is to use Delta Lake which offer zero reads of schema with a metastore, and at most one reads for schema evolution. &lt;a href="https://delta.io/"&gt;Delta Lake&lt;/a&gt; also provides other benefits such as ACID transactions, time travel, DML operations, schema evolution, etc. &lt;/p&gt;

&lt;h2&gt;
  
  
  Serialization [API, programming]
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;TL,DR - Bad codes can also slow your Spark down, always try to use spark sql built-in functions and avoid UDFs when develop your Spark codes. If UDFs are needed, try vectorized UDFs for Python and Typed Transformations for Scala&lt;/em&gt;  &lt;/p&gt;

&lt;p&gt;Refer to my post about &lt;a href="https://anhcodes.dev/blog/spark-sql-programming/"&gt;SparkSQL Programming&lt;/a&gt; to understand the difference between built-in functions, UDFs and vectorized UDFs&lt;/p&gt;

&lt;p&gt;Slower Spark Jobs can sometimes occur as a result of suboptimal code. One example of this would be code segments that have not been reworked to support more efficient Spark operations.&lt;/p&gt;

&lt;p&gt;As a rule of thumb, always use &lt;code&gt;spark.sql.functions&lt;/code&gt; whenever possible, regardless of which language you're using. You can expect similar performance for both Python or Scala with these functions&lt;/p&gt;

&lt;p&gt;In Scala, if you have to use User-defined functions that are not supported by standard spark functions, it's more efficient to use Typed transformations instead of standard Scala UDFs. In general, Scala is more efficient than Python with UDFs and Typed transformation.&lt;/p&gt;

&lt;p&gt;If you're working with Python, avoid using Spark UDFs and vectorized UDFs if possible. But if you have to use UDF, always use &lt;a href="https://www.databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html"&gt;vectorized UDF&lt;/a&gt; &lt;/p&gt;

&lt;h2&gt;
  
  
  Reference
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://spark.apache.org/docs/latest/sql-performance-tuning.html"&gt;Spark SQL Performance Tuning&lt;/a&gt;&lt;/p&gt;

</description>
      <category>spark</category>
    </item>
    <item>
      <title>Spark working internals, and why should you care?</title>
      <dc:creator>anhcodes</dc:creator>
      <pubDate>Wed, 31 May 2023 00:40:06 +0000</pubDate>
      <link>https://dev.to/anhcodes/spark-working-internals-and-why-should-you-care-19bp</link>
      <guid>https://dev.to/anhcodes/spark-working-internals-and-why-should-you-care-19bp</guid>
      <description>&lt;p&gt;Most Big Data developers and Data Engineers start learning Spark by writing SparkSQL codes to perform ETL on DataFrame (I know I did). I also wrote a post about &lt;a href="https://anhcodes.dev/blog/spark-sql-programming/"&gt;SparkSQL Programming&lt;/a&gt;. However, we quickly learn that there’s more knowlege required to go from processing a few GBs of data to dealing with TBs and PBs of data, which is a challenge for big enterprises. Learning to write correct Spark codes is only a small part of the battle, you will need to understand the Spark Architecture and Spark working internals to correct tune Spark to handle true big data, and it’s the focus of this post. &lt;/p&gt;

&lt;h2&gt;
  
  
  Spark Architecture
&lt;/h2&gt;

&lt;p&gt;First, let this sink in: Spark is an &lt;strong&gt;in-memory&lt;/strong&gt;, &lt;strong&gt;parallel processing engine&lt;/strong&gt; that is very &lt;strong&gt;scalable.&lt;/strong&gt;  The more data you have, the more powerful Spark can become that sets it apart from other processing engines. Spark is faster than Map Reduce paradigm because it processes data in memory, which means that it can reduce the disk IO that normally slows down Map Reduce jobs. Spark is fast because of the ability to process data in parallel. &lt;/p&gt;

&lt;p&gt;Parallelism is key enabler of Spark efficiency. The Spark Architecture is designed so that you can add new computers to process growing amount of big data in parallel. &lt;/p&gt;

&lt;h3&gt;
  
  
  Spark Cluster Components
&lt;/h3&gt;

&lt;p&gt;A Spark cluster has, a driver and mutiple workers (think computers). &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Spark driver (JVMs) is responsible for instantiate Spark Session, turn Spark operations into DAGs, schedule and distribute tasks to the workers.&lt;/li&gt;
&lt;li&gt;Each worker has multiple cores (think threads) that can run multiple tasks. Each task is a single unit of work, each task maps to a single core and works on a single partition of data at a given time (1 task, 1 partition, 1 slot, 1 core)&lt;/li&gt;
&lt;li&gt;Besides, we also have a &lt;strong&gt;cluster manager,&lt;/strong&gt; and &lt;strong&gt;Spark Session&lt;/strong&gt; that runs Spark applications.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--XTO21sku--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--XTO21sku--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/1.png" alt="image" width="800" height="386"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Spark Session
&lt;/h3&gt;

&lt;p&gt;SparkSession is the single point of entry to all DataFrame API functionality. SparkSession is available since Spark 2.0, before that Spark Context was used with a limitation of only one Spark Context per JVM. SparkSession can unify numerous Spark Contexts.&lt;/p&gt;

&lt;p&gt;SparkSession automatically created in a Databricks notebook as the variable &lt;code&gt;spark&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--tcLvsfg4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--tcLvsfg4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/2.png" alt="image" width="800" height="349"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# In below code, the `spark` variable specifies a sparkSession
# spark.table reads a table to a dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'&amp;lt;table&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'a'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'b'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;where&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'a&amp;gt;1'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;orderBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'b'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# spark.read reads files to a dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/parquet'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# spark.sql execute sql queries on a table and save the result set to df
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'select * from &amp;lt;table&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Spark APIs
&lt;/h3&gt;

&lt;p&gt;Spark ecosystems have 4 APIs: SparkSQL, Spark Structured Streaming,  SparkML, and GraphX (I haven’t  used this before, not sure if it’s deprecated or not). Most of Spark developers started with SparkSQL APIs with ingestion and transformations on Spark DataFrame. However, Spark Structured Streaming and SparkML are pretty popular too, which we can discuss later in later posts. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--qAyhXcey--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--qAyhXcey--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/1.png" alt="image" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Computation in Spark
&lt;/h2&gt;

&lt;p&gt;Earlier I mentioned that Driver is responsible to turn operations into jobs or DAGs. DAGs are Directed Acyclic Graphs (fancy word for graphs that have direction with no cycle). In a spark execution plan, each &lt;strong&gt;job&lt;/strong&gt; is a DAG, each node within a DAG can have one or multiple &lt;strong&gt;stages&lt;/strong&gt;, each stage can have multiple &lt;strong&gt;tasks&lt;/strong&gt; (clear?)&lt;/p&gt;

&lt;p&gt;Spark parallelizes at 2 levels:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;splitting work among workers, executors (or workers) will run the spark code on the data partitions it has&lt;/li&gt;
&lt;li&gt;each executors have a number of slots/cores, each slot can execute a task on a data partition.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Another characteristic of Spark is lazy execution. When you specify transformations on a Spark DataFrame, Spark records lineage and only start the computation when an action is triggered (refer to my previous post about &lt;a href=""&gt;SparkSQL programming&lt;/a&gt; for more information on transformations and actions)&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ZJtMtLfB--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ZJtMtLfB--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/3.png" alt="image" width="800" height="411"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Under the hood, SparkSQL uses Spark Catalyst Optimizer to optimize query performation, similar to how a relational database or a data warehouse plans their query jobs. &lt;/p&gt;

&lt;h3&gt;
  
  
  Spark Catalyst Optimizer
&lt;/h3&gt;

&lt;p&gt;The Catalyst Optimizer is a component of Spark SQL that performs optimization on a query through 4 stages:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;analysis: create abstract syntax tree of a query&lt;/li&gt;
&lt;li&gt;logical optimization: create plan and cost-based optimizer and assign costs to plan&lt;/li&gt;
&lt;li&gt;physical planning: generate physical plan based on logical plan&lt;/li&gt;
&lt;li&gt;code generation: generate java &lt;strong&gt;bytecode&lt;/strong&gt; to run on each machine, spark sql acts as a &lt;strong&gt;compiler&lt;/strong&gt;. Project Tungsten engine generate RDD code&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Catalyst Optimizer is a rule based engine that takes the Logical Plan and rewrites it as an optimized Physical Plan. The Physical Plan is developed BEFORE a query is executed&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--M0_hs1Mk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--M0_hs1Mk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/4.png" alt="image" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To view the Catalyst Optimizier in action, use &lt;code&gt;df.explain(True)&lt;/code&gt; to view the Logical and Physical Execution plans of a query. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ZRVvOn0H--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ZRVvOn0H--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/5.png" alt="image" width="800" height="126"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Adaptive Query Execution
&lt;/h3&gt;

&lt;p&gt;In Spark 3.0, Adaptive Query Execution (AQE) was introduced. One difference between AQE and Catalyst Optimizer is that AQE modifies the Physical Plan based on Runtime Statistics, so AQE can tune your queries further on the flight. So you may think that AQE is complimentary to Catalyst Optimizer.&lt;/p&gt;

&lt;p&gt;For example, during runtime, based on the new information that is previously not available during planning, AQE can decide to change your join strategy to Broadcast Hash Join from Sort Merge Join to reduce data shuffle. Or AQE can coalesce your partitions to optimal size during shuffling stage, or help improve Skew Join. &lt;/p&gt;

&lt;p&gt;This option is not turned on by default in Spark, you can enable by setting spark config: &lt;code&gt;spark.conf.set(spark.sql.adaptive.enabled, True)&lt;/code&gt; , and it’s recommended to turn this on. However, If you run Spark on later version of Databricks Runtime, AQE is enabled by default. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--75XiwhOb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--75XiwhOb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/6.png" alt="image" width="800" height="385"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Shuffle, Partitioning and Caching in Spark
&lt;/h2&gt;

&lt;p&gt;We established that Spark processes data in parallel by splitting up data into &lt;strong&gt;partitions&lt;/strong&gt; and &lt;strong&gt;move (shuffle)&lt;/strong&gt; them to each executors so that they can run a task on a small subset of data in &lt;strong&gt;memory.&lt;/strong&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Shufflings, partitionings,&lt;/strong&gt; and &lt;strong&gt;memory can&lt;/strong&gt; potentially dictate Spark performance. So if you understand these terms in depth, debugging Spark can become much easier, which I explained further in another post about &lt;a href="https://anhcodes.dev/blog/debug-spark/"&gt;Debugging Long Running Spark Job&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To process your data, Spark will first have to ingest files from disk to memory, and by default it reads data into partitions of 128MB. If there’s any wide transformation on the DataFrame, Spark needs to repartition the data and move partititions to cores for processing. The implication of this is each partition will have to fit into the core’s memory or you will have spill or OOM errors. If partitions are not evenly distributed, you can have skew (which means some executors have more works than the others). Correctly tuning partitions upon ingestion and upon shuffling stage can help improve your Spark jobs. &lt;/p&gt;

&lt;h3&gt;
  
  
  Partitioning
&lt;/h3&gt;

&lt;p&gt;There are 2 types of partitioning:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Spark Partition: partition in flight (in RAM)&lt;/li&gt;
&lt;li&gt;Disk Partition (Hive partition): partition at rest (on disk)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;When Spark read data from disk to memory (dataframe), the initial partition in the dataframe (in MEMORY) will be determined by number of cores (default level of parallelism), dataset size, &lt;code&gt;spark.sql.files.maxPartitionBytes&lt;/code&gt; config, &lt;code&gt;spark.sql.files.openCostInBytes&lt;/code&gt; (default 4MB, overhead of opening file). Remember that this is the size of the partition in Memory, irrelevant to what it is on disk. &lt;/p&gt;

&lt;p&gt;Check number of partitions in DataFrame when ingested from disk to memory with &lt;code&gt;df.rdd.getNumPartitions()&lt;/code&gt;. We can estimate the size of your dataframe in memory by multiply the number of partitions in memory by the partition size. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;By default, each partition has the size of 128MB but you can set with &lt;code&gt;spark.sql.files.maxPartitionBytes&lt;/code&gt;. A situation when setting this config can be beneficial is to write data to 1GB part files.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--r46OwSda--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--r46OwSda--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/7.png" alt="image" width="800" height="110"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Don’t allow partition size to increase &amp;gt;200MB per 8GB of core total memory, if more than that, increase number of partitions. It’s better to have many small partitions than too few large partitions.&lt;/li&gt;
&lt;li&gt;It’s best to tune the number of partitions so it is at least a multiple of number of cores in your cluster. This allows for better paralellism. Run &lt;code&gt;df.rdd.getNumPartitions()&lt;/code&gt; to check the number of partitions in memory.&lt;/li&gt;
&lt;li&gt;In case if you want to change partition size at runtime, you can run &lt;code&gt;coalesce()&lt;/code&gt; and &lt;code&gt;repartition()&lt;/code&gt;.  Coalesce can only reduce the number of partitions and increase partition size, but as a &lt;strong&gt;narrow transformation&lt;/strong&gt; with no shuffling, coalsce is more efficient than repartition. Repartition returns new DF with exactly N partitions of even size. It can increase or decrease your partition count, but it requires expensive data shuffling&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Shuffle
&lt;/h3&gt;

&lt;p&gt;Shuffle is one of the most expensive operation in Spark. In every wide transformation (for example a &lt;code&gt;groupBy&lt;/code&gt;), shuffle create multiple stages: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;First stage will create shuffle files (shuffle write)&lt;/li&gt;
&lt;li&gt;Subsequent stages will reuse those shuffle files (shuffle read)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Y5t-WtJf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Y5t-WtJf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/8.png" alt="image" width="800" height="403"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;If cache is used, first stage can create shuffle files and cache the results, later stages can read from cache with improve the performance&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--GpuvamEt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--GpuvamEt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/9.png" alt="image" width="800" height="403"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--BNk1lTCL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/10.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--BNk1lTCL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/spark/10.png" alt="image" width="800" height="403"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The issues with shuffle partitions are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Too many partitions and you may have empty or very small partitions which put pressure on driver. This issue can be solved by enabling Adaptive Query Execution (AQE) as explained above&lt;/li&gt;
&lt;li&gt;Too few partitions and big partitions can cause spill or OOM. Correctly setting the &lt;code&gt;spark.sql.shuffle.partitions&lt;/code&gt; based on the rule in partition section can help. This setting indicates how many partitions Spark will create for the next stage, and it MUST be managed by user for every job.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Besides, there are a few techniques to mitigate excessive shuffles in my previous post &lt;a href="https://anhcodes.dev/blog/debug-spark/"&gt;Debugging Long Running Spark Job&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Cache in Spark
&lt;/h3&gt;

&lt;p&gt;By default, data in a DataFrame is only present in Spark cluster while bing processed during a query, it won’t be persisted on a cluster afterwards. However, you can explicitly request Spark to persist DataFrame on the cluster by invoking &lt;code&gt;df.cache&lt;/code&gt;. Cache can store as many partitions of the dataframe as the cluster memory allows&lt;/p&gt;

&lt;p&gt;Note that cache is another type of persist: &lt;code&gt;df.cache&lt;/code&gt; is &lt;code&gt;df.persist(StorageLevel.MEMORY_AND_DISK)&lt;/code&gt;. This stores partitions in memory and spills excess to disk.&lt;/p&gt;

&lt;p&gt;Cache should be used with care because caching consumes cluster resources that could otherwise be used for other executions, and it can prevent Spark from performing query optimization. You should only used cache in below situations:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;DataFrames frequently used during Exploratory Data Analysis, iterative machine learning training in a Spark session&lt;/li&gt;
&lt;li&gt;DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines&lt;/li&gt;
&lt;li&gt;Don’t use when data is too big to fit in memory, or only need infrequent transformation&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;When you use cache() or persist(), the DataFrame is not fully cached until you invoke an action that goes through every record (e.g., count()). If you use an action like take(1)w, only one partition will be cached because Catalyst realizes that you do not need to compute all the partitions just to retrieve one record.&lt;/p&gt;

&lt;p&gt;Don’t forget to cleanup with &lt;code&gt;df.unpersist&lt;/code&gt; to evict the dataframe from cache when you no longer need it.&lt;/p&gt;

</description>
      <category>spark</category>
    </item>
    <item>
      <title>Spark SQL Programming Primer</title>
      <dc:creator>anhcodes</dc:creator>
      <pubDate>Tue, 30 May 2023 19:30:38 +0000</pubDate>
      <link>https://dev.to/anhcodes/spark-sql-programming-primer-4k4h</link>
      <guid>https://dev.to/anhcodes/spark-sql-programming-primer-4k4h</guid>
      <description>&lt;p&gt;&lt;em&gt;TL,DR - SparkSQL is a huge component of Spark Programming. This post introduces programming in SparkSQL through Spark DataFrame API. It's important to be aware of Spark SQL built-in functions to be a more efficient Spark programmer&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What is SparkSQL
&lt;/h2&gt;

&lt;p&gt;SparkSQL is one of the 4 APIs in Spark ecosystems. SparkSQL provides structured data processing with interfaces such as SQL or Dataframe API using Python, Scala, R, Java programming languages&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--qAyhXcey--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--qAyhXcey--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/1.png" alt="image" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The same SparkSQL query can be expressed with SQL and DataFrame API. SQL queries, Python DataFrame and Scala DataFrame Queries will then be executed on the same engine. The queries will go through Query Plans, RDDs then Execution. SparkSQL always optimizes the queries before execution using &lt;strong&gt;Catalyst Optimizer&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- sql&lt;/span&gt;
&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;table&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;order&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## python
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'&amp;lt;table&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'a'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'b'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;where&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'a&amp;gt;1'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;orderBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'b'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--dTHgpiwP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--dTHgpiwP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/2.png" alt="image" width="800" height="349"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  DataFrame API in SparkSQL
&lt;/h2&gt;

&lt;p&gt;DataFrame is immutable collections of data grouped into named columns. Schema defines the column names and data types of a DataFrame. &lt;/p&gt;

&lt;h3&gt;
  
  
  Read and write data with Spark DataFrame
&lt;/h3&gt;

&lt;p&gt;You can read data almost all file formats such as CSV, JSON, Parquet, Delta, etc.  into Spark DataFrame &lt;/p&gt;

&lt;p&gt;You can either choose to inferSchema from the files (expensive with JSON and CSV), or specify schema explitcitly (more efficient)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Read data from parquet files to dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/parquet_files'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'inferSchema'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## read data from csv specifying separator, header and schema
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/csv_files'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sep&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'t'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;header&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;inferSchema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## Read data from json files to dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/json_files'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;inferSchema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Option 1: read data from file with schema specified as StructType
&lt;/span&gt;&lt;span class="n"&gt;sparkSchema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;StructType&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;StructField&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'col1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;StringType&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;StructField&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'col2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IntegerType&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/csv_files'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sep&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'t'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;header&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sparkSchema&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## Option 2: read data from file with schema specified as DDL syntax
&lt;/span&gt;&lt;span class="n"&gt;ddlSchema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"col1 string, col2 integer"&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/csv_files'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sep&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'t'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;header&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;ddlSchema&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Write dataframe to parquet files
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'compression'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'snappy'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'overwrite'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'path/to/storage'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## Write data to table
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'overwrite'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;saveAsTable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'&amp;lt;table_name&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## write dataframe to Delta, default Parquet format
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'delta'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'overwrite'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'outputPath'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Columns in DF
&lt;/h3&gt;

&lt;p&gt;There are many ways to pick a column in DF depending on which language API you use&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Multi ways of extracting columns from Spark DF 
## Python
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'columnName'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;columnName&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;spark.sql.functions&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;
&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'columnName'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'columnName.field'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;##nested column array
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="c1"&gt;// Scala&lt;/span&gt;
&lt;span class="nf"&gt;df&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"columnName"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.sql.functions.col&lt;/span&gt;
&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"columnName"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"columnName"&lt;/span&gt;
&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"columnName.field"&lt;/span&gt; &lt;span class="c1"&gt;//nested column array&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Column Operators &amp;amp; Methods&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Tnv7KZRq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Tnv7KZRq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/3.png" alt="image" width="800" height="345"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--LzsKMgZH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--LzsKMgZH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/4.png" alt="image" width="800" height="430"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions.col&lt;/span&gt;

&lt;span class="c1"&gt;## These are chained transformations
&lt;/span&gt;&lt;span class="n"&gt;new_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"colA"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;isNotNull&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"colB"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"colA"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;cast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"int"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
                    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sort&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"colB"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;desc&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="c1"&gt;## transformations with selectExp
&lt;/span&gt;&lt;span class="n"&gt;appleDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;eventsDf&lt;/span&gt;
                    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;selectExpr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"device in ('macOS', 'iOS') as apple_user"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## transformation with regular PythonAPI
&lt;/span&gt;&lt;span class="n"&gt;appleDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;eventsDF&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user_id"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"apple_user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"device"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;isin&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'macOS'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'iOS'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Rows in DF
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--EFltSKVP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--EFltSKVP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/5.png" alt="image" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--GTzLVUld--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--GTzLVUld--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/6.png" alt="image" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Data Operations in Spark DataFrame
&lt;/h3&gt;

&lt;p&gt;There are 2 main types of operations you can do with Spark DataFrame. &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;transformations&lt;/strong&gt; (&lt;code&gt;select&lt;/code&gt;, &lt;code&gt;where&lt;/code&gt;, &lt;code&gt;orderBy&lt;/code&gt;, &lt;code&gt;groupBy&lt;/code&gt;): Remember that DataFrame is immutable so after a transformation, a new dataFrame will be created. Transformation is evaluated lazily until action is invoked or data is touched, not executed immediately but recorded as lineage. There are 2 types of Spark Transformations&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;* **narrow transformation**: single input partition computes single output partition (each column are computed separately), **without exchange of data** (such as `filter`, `contains`). 

* **wide transformation**: data from many partitions read, combined and written to disk (`groupBy`, `orderBy`, `count`), which causes **shuffle of data** across partitions 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;action&lt;/strong&gt; (&lt;code&gt;show&lt;/code&gt;, &lt;code&gt;display&lt;/code&gt;, &lt;code&gt;take&lt;/code&gt;, &lt;code&gt;describe&lt;/code&gt;, &lt;code&gt;summary&lt;/code&gt;, &lt;code&gt;first, head&lt;/code&gt;, &lt;code&gt;count&lt;/code&gt;, &lt;code&gt;collect&lt;/code&gt;): trigger the lazy evaluation of recorded transformation&lt;/p&gt;

&lt;p&gt;&lt;code&gt;count&lt;/code&gt; vs &lt;code&gt;collect&lt;/code&gt;: &lt;code&gt;count&lt;/code&gt; returns single number to the driver, &lt;code&gt;collect&lt;/code&gt; returns collection of row objects (expensive and can cause out of memory)&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--QA80zycU--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--QA80zycU--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/7.png" alt="image" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Remember that when you specify transformation, your Spark code will not be executed until you call an action on it. Lazy evaluation provide fault tolerance as spark records transformation lineage to restart the job if there’s failure.&lt;/p&gt;

&lt;h2&gt;
  
  
  SparkSQL Built-in Functions
&lt;/h2&gt;

&lt;p&gt;You can use built-in aggregate functions coming from &lt;code&gt;pyspark.sql.functions&lt;/code&gt; for Python and &lt;code&gt;org.apache.spark.sql.functions&lt;/code&gt; for Scala. Refer &lt;a href="https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html"&gt;spark sql built-in functions&lt;/a&gt;. Built-in functions are highly efficient and best practices for Spark Programming. It’s highly recommended to utilize built-in functions before attempting to create your own UDFs (User Defined Functions)&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Aggregation functions
&lt;/h3&gt;

&lt;p&gt;All aggregations methods require a &lt;code&gt;groupBy&lt;/code&gt; method that returns a GroupedData object&lt;/p&gt;

&lt;p&gt;Use the grouped data method &lt;code&gt;agg&lt;/code&gt; to apply these built-in aggregate functions&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--dkO_D1da--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--dkO_D1da--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/8.png" alt="image" width="762" height="481"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"col1"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="n"&gt;display&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"col1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"col2"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"val1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"val2"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;display&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'col1'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt; &lt;span class="nb"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'val1'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'total1'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
                &lt;span class="n"&gt;avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'val2'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'average2'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;display&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'col1'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt; &lt;span class="n"&gt;sumDistinct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'val1'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'total1'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
                &lt;span class="n"&gt;approx_count_distinct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'val2'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'count2'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;display&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  2. Datetime functions
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9y25tr9M--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9y25tr9M--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/9.png" alt="image" width="800" height="385"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Reformat the timestamp column to string representation
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"date_string"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;date_format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"MMMM dd, yyyy"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"time_string"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;date_format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"HH:mm:ss.SSSSSS"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Extract date time parts from timestamp
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'year'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'timestamp'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'month'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;month&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'timestamp'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'dayofweek'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dayofweek&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'timestamp'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'minute'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;minute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'timestamp'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'second'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;second&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'timestamp'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Convert timestamp to date
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"date"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;to_date&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;manipulate datetimes
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumns&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"add_2_day"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;date_add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. Complex Data Types funtions
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--imS920ym--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/10.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--imS920ym--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/10.png" alt="image" width="800" height="350"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--RTmpMKC2--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/11.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--RTmpMKC2--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://raw.githubusercontent.com/anhhchu/anhcodes/master/static/images/inpost/sparksql/11.png" alt="image" width="800" height="273"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Assume we have a DataFrame with &lt;code&gt;items&lt;/code&gt; column as nested array. For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## For example
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt;
&lt;span class="c1"&gt;## explode the items field to create a new row for each element in the array
&lt;/span&gt;    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"items"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;explode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"items"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="c1"&gt;## split column item_name by " " to array
&lt;/span&gt;    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"details"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"item_name"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;" "&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;span class="c1"&gt;## extract the element from details
&lt;/span&gt;    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"size"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;element_at&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"details"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. Join functions
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## inner join 
&lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## inner join with 2 columns
&lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'age'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="c1"&gt;## specify join
&lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'left'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'right'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'outer'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## specify explicit column expressiion
&lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'customer_name'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'user_name'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="s"&gt;'left_outer'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  User Defined Functions (UDF) in Spark
&lt;/h2&gt;

&lt;p&gt;In case Built-in Functions are not enough to cover the need, you can write your own custom functions at an efficiency cost. &lt;/p&gt;

&lt;p&gt;User-defined function can’t be optimized by Catalyst Optimizer, and must be serialized and sent to executors. Moreover, row data is deserialized from Spark binary format to pass to UDF, then results are serialized back into Spark native format. For Python, they also add overhead to Python interpreter running on each worker node. &lt;/p&gt;

&lt;p&gt;Using UDFs can cause &lt;a href="https://anhcodes.dev/blog/mitigate-skew-spark/##serialization-api-coding-style"&gt;serialization&lt;/a&gt; issues and long-running Spark job. &lt;/p&gt;

&lt;p&gt;A way to fix this is to use &lt;a href="https://www.databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html"&gt;Pandas UDF aka Vectorized UDFs&lt;/a&gt; using Apache Arrow in Spark 3.x. &lt;/p&gt;

&lt;p&gt;To create a UDF, you can follow below steps:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Step 1: Create a function
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;calProfit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sales&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sales&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;

&lt;span class="c1"&gt;## Step 2: Register function -&amp;gt; serialize the function and send to executors
&lt;/span&gt;&lt;span class="n"&gt;calProfitUDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;calProfit&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;## Step 3: Apply the udf to the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"profit"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;calProfitUDF&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sales"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"cost"&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;

&lt;span class="c1"&gt;## Register UDF to use in SQL
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'sales'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;calProfitUDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;park&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;udf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;register&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sql_udf"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;calProfit&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Use the UDF in sql&lt;/span&gt;
&lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="n"&gt;sq&lt;/span&gt;
&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;sql_udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sales&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;profit&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sales&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Alternatively, you can use decorator syntax (only applicable in Python)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;## Use Decorator Syntax for Python
## Our input/output is float
&lt;/span&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"float"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;calProfitUDF&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sales&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sales&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;

&lt;span class="c1"&gt;## use the UDF
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"profit"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;calProfitUDF&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sales"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"cost"&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Recommend to use Pandas/Vectorized UDFs, notice the difference in syntax&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas_udf&lt;/span&gt;

&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;pandas_udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"float"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;vectorizedUDF&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sales&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Series&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Series&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Series&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sales&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;

&lt;span class="c1"&gt;## use the UDF
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"profit"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;vectorizedUDF&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sales"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"cost"&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;

&lt;span class="c1"&gt;## register the UDF for sql 
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;udf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;register&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sql_vectorized_udf"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;vectorizedUDF&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;sql_vectorized_udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sales&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cost&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;profit&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sales&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>spark</category>
    </item>
  </channel>
</rss>
