<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: PaulOpu</title>
    <description>The latest articles on DEV Community by PaulOpu (@paulopu).</description>
    <link>https://dev.to/paulopu</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F852233%2Fe14e79d3-8e38-4930-9a8e-9b596f8758df.jpeg</url>
      <title>DEV Community: PaulOpu</title>
      <link>https://dev.to/paulopu</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/paulopu"/>
    <language>en</language>
    <item>
      <title>Nesting Columns like a Pro: A Guide to Mastering Nested Structs in PySpark</title>
      <dc:creator>PaulOpu</dc:creator>
      <pubDate>Sat, 14 Jan 2023 15:10:28 +0000</pubDate>
      <link>https://dev.to/paulopu/nesting-columns-like-a-pro-a-guide-to-mastering-nested-structs-in-pyspark-2p7i</link>
      <guid>https://dev.to/paulopu/nesting-columns-like-a-pro-a-guide-to-mastering-nested-structs-in-pyspark-2p7i</guid>
      <description>&lt;h2&gt;
  
  
  Task
&lt;/h2&gt;

&lt;p&gt;Are you tired of struggling with adding nested columns to already nested structs in PySpark DataFrames? Well, you're in luck because in this blog post, we'll be diving into the solution to this common problem. With the help of PySpark's &lt;strong&gt;&lt;code&gt;withColumn&lt;/code&gt;&lt;/strong&gt; and &lt;strong&gt;&lt;code&gt;struct&lt;/code&gt;&lt;/strong&gt; functions, we'll be showing you how to easily add nested columns to your DataFrames, but only if the column doesn't already exist. So, whether you're a beginner or an experienced PySpark developer, this post is for you. Keep reading to learn how to master nested columns in PySpark DataFrames!&lt;/p&gt;

&lt;h2&gt;
  
  
  Problem
&lt;/h2&gt;

&lt;p&gt;In PySpark, adding columns to a DataFrame is not as simple as selecting them like you would with a SELECT statement. Instead, when you want to add a new column that is nested within other structs, you have to rebuild all of the structs that are part of that column. This can be a bit tricky because you also have to make sure to include all of the other fields that were already in those structs, otherwise they will be lost. It's like a puzzle, you have to make sure to keep all the pieces in the right place.&lt;/p&gt;

&lt;h2&gt;
  
  
  Solution
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Column&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;DataFrame&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_column_to_struct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataTypes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;current_struct_name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;any&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Column&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;current_column_level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;next_column_level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

    &lt;span class="n"&gt;should_place_column&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;should_place_column&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;new_column&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;next_column_level&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;dataTypes&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;names&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;new_column&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;next_column_level&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="p"&gt;)[&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;dataTypes&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;names&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;new_column&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_struct_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;current_struct_fields&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;column&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;enumerate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataTypes&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;names&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;

        &lt;span class="n"&gt;is_column_in_next_level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;column&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;next_column_level&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;is_column_in_next_level&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;grouped_nested_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;next_column_level&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;:]]&lt;/span&gt;

                        &lt;span class="n"&gt;current_struct&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;add_column_to_struct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;dataTypes&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;fields&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="n"&gt;dataType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;grouped_nested_path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;next_column_level&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;current_struct&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="p"&gt;)[&lt;/span&gt;&lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;current_struct_fields&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_struct&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;current_struct_fields&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_struct_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_column_if_not_exist&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;any&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="n"&gt;current_column_level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;not_nested&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;not_nested&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;current_column_level&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;functions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nf"&gt;add_column_to_struct&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="n"&gt;dataType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;current_column_level&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;value&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The previous code defines two functions &lt;strong&gt;&lt;code&gt;create_column_if_not_exist&lt;/code&gt;&lt;/strong&gt; and &lt;strong&gt;&lt;code&gt;add_column_to_struct&lt;/code&gt;&lt;/strong&gt; that allow adding a new column to a nested struct column in a PySpark DataFrame only if the column doesn't already exist.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;&lt;code&gt;create_column_if_not_exist&lt;/code&gt;&lt;/strong&gt; function takes in three parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;df&lt;/code&gt;&lt;/strong&gt;: a PySpark DataFrame where the column will be added&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;path&lt;/code&gt;&lt;/strong&gt;: a list of strings representing the path to the nested struct column where the new column will be added. For example, if the struct column is called &lt;strong&gt;&lt;code&gt;address&lt;/code&gt;&lt;/strong&gt; and is nested within another struct column called &lt;strong&gt;&lt;code&gt;user&lt;/code&gt;&lt;/strong&gt;, the path would be &lt;strong&gt;&lt;code&gt;["user", "address"]&lt;/code&gt;&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;value&lt;/code&gt;&lt;/strong&gt;: the value that will be assigned to the new column&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The function first checks if the &lt;strong&gt;&lt;code&gt;path&lt;/code&gt;&lt;/strong&gt; has only one element, meaning that the column is not nested within any other structs. In this case, it checks if the column already exists in the DataFrame, if it does the function returns the DataFrame unmodified, otherwise, it adds the new column using the &lt;strong&gt;&lt;code&gt;withColumn&lt;/code&gt;&lt;/strong&gt; method and assigns it the &lt;strong&gt;&lt;code&gt;value&lt;/code&gt;&lt;/strong&gt; passed in.&lt;/p&gt;

&lt;p&gt;If the path has more than one element, it means that the column is nested within other structs, the function then calls the &lt;strong&gt;&lt;code&gt;add_column_to_struct&lt;/code&gt;&lt;/strong&gt; function, passing the struct's dataType and the other parameters to it.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;&lt;code&gt;add_column_to_struct&lt;/code&gt;&lt;/strong&gt; function takes four parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;dataTypes&lt;/code&gt;&lt;/strong&gt;: the dataType of the struct column where the new column will be added&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;path&lt;/code&gt;&lt;/strong&gt;: the path to the nested struct column where the new column will be added&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;current_struct_name&lt;/code&gt;&lt;/strong&gt;: the name of the struct column where the new column will be added&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;&lt;code&gt;value&lt;/code&gt;&lt;/strong&gt;: the value that will be assigned to the new column&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;It starts by extracting the first and second element of the path, these will be used to determine if we should add the new column in this level of nesting or if it should be added in a deeper level.&lt;/p&gt;

&lt;p&gt;If the path has only two elements, the function checks if the new column already exists in the struct, if it does not, it creates the new column and assigns it the value passed in. It then creates a new struct column containing the existing columns and the new column.&lt;/p&gt;

&lt;p&gt;If the path has more than two elements, the function iterates through all the columns of the struct and checks if the next level of nesting is reached. If it is, it calls the &lt;strong&gt;&lt;code&gt;add_column_to_struct&lt;/code&gt;&lt;/strong&gt; function again, passing the dataType of the next level of struct and updating the path and struct name accordingly. Otherwise, it simply uses the existing column.&lt;/p&gt;

&lt;p&gt;Once all the columns have been processed, the function creates a new struct column containing all the columns and returns it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In conclusion, working with nested structs in PySpark DataFrames can be like a game of Tetris - it takes a bit of skill to manoeuvre the columns in the right place, but with the right tools, it's totally doable. And let's be real, there's nothing more satisfying than fitting that last piece into the perfect spot. The provided code is like the cheat code to this game, it gives you hopefully a clear and clean way to add new columns to nested structs without losing any existing data. So, whether you're a PySpark pro or a beginner, this solution will have you adding nested columns like a boss in no time.&lt;/p&gt;

</description>
      <category>watercooler</category>
    </item>
    <item>
      <title>ElasticSearch: Switch Index like a Pro</title>
      <dc:creator>PaulOpu</dc:creator>
      <pubDate>Sun, 23 Oct 2022 14:30:08 +0000</pubDate>
      <link>https://dev.to/paulopu/elasticsearch-switch-index-like-a-pro-3eo8</link>
      <guid>https://dev.to/paulopu/elasticsearch-switch-index-like-a-pro-3eo8</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;One of the most used databases to search for large documents is Elastic Search. There are specific mechanisms in place to speed up the search significantly. Nevertheless, the configurations must be set wisely, otherwise, your search takes much longer than expected.  &lt;/p&gt;

&lt;h2&gt;
  
  
  Background (Elastic Search Concepts)
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s---1dhmxh6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5t6k7f4n6smtvhe613p1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---1dhmxh6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5t6k7f4n6smtvhe613p1.png" alt="Elastic Search Cluster with Index and Shards" width="873" height="371"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Node
&lt;/h3&gt;

&lt;p&gt;The data is distributed across multiple computational instances, called nodes to parallelize the search and make it quicker. Thereby, each instance only has a fraction of the entire database. &lt;/p&gt;

&lt;h3&gt;
  
  
  Primary Shards
&lt;/h3&gt;

&lt;p&gt;The data is organized in indices, which can be compared with tables in SQL databases. For example, if you are a sports website, you have indices for players, teams, or tournaments. The data of each index is split into primary shards so that one shard can stay on one node. &lt;br&gt;
You already realize that the number of primary shards should not be higher than the number of nodes, otherwise we have 2 separate shards on one node and that is less efficient. &lt;/p&gt;
&lt;h3&gt;
  
  
  Replica Shards
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--TKgnBK5c--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qrijf4lbttj4b2y2z9ae.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--TKgnBK5c--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qrijf4lbttj4b2y2z9ae.png" alt="Replica shards add redundancy to the index" width="469" height="242"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Consider we have a player index with 5 shards in a cluster with 5 nodes, where shard 1 is on node 1, and so on. Assume that node 1 is busy searching in another index. If you now want to search for players, shard 1 (is on the busy node 1) might respond with a delay. &lt;br&gt;
To overcome this, we introduce replica shards: each shard can have 0-n replicas, which contain exactly the same data as the corresponding shard. They should lie on a different node. In our example considering the replica 1 for shard 1 is on node 2, we can also address node 2 and skip the busy node 1.&lt;/p&gt;
&lt;h2&gt;
  
  
  Problem
&lt;/h2&gt;

&lt;p&gt;We experienced that the CPU utilization of our nodes increased drastically during peak times and elastic search was not able to handle all search requests. The time to resolve a search request is split into dividing the request on each node, the actual search in the shards, and collecting the results to aggregate them. &lt;br&gt;
The actual search cannot be the problem, as each primary shard just contains a few hundred MBs and the desired size should be at least a few GBs. Therefore, we assumed that the splitting and aggregation might be a bigger overhead for the system. As a result, we needed to reduce the number of nodes and shards to see if our hypothesis was correct. &lt;/p&gt;

&lt;p&gt;Unfortunately, it is not possible to update the number of shards of an existing index.&lt;/p&gt;
&lt;h2&gt;
  
  
  Solution
&lt;/h2&gt;
&lt;h2&gt;
  
  
  Current Setup
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;6 Nodes&lt;/li&gt;
&lt;li&gt;6 indices with &amp;lt; 1Gb data

&lt;ul&gt;
&lt;li&gt;5 primary shards&lt;/li&gt;
&lt;li&gt;1 replica per shard&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Requirements
&lt;/h2&gt;

&lt;p&gt;Changing the number of shards is not just flipping a switch. Therefore, we need to make sure that the productive system is still responsive.&lt;/p&gt;

&lt;p&gt;Next to zero downtime, the data amount increases in the future, and the cluster should be able to scale up again&lt;/p&gt;
&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;There are several ways to deal with this situation. In the end, we decided to use the concept of aliases in Elastic Search.&lt;/p&gt;
&lt;h3&gt;
  
  
  Alias
&lt;/h3&gt;

&lt;p&gt;An alias is a pointer to an existing index. For example, the alias &lt;strong&gt;member&lt;/strong&gt; points toward the &lt;strong&gt;player&lt;/strong&gt; index. Now any request to the &lt;strong&gt;member&lt;/strong&gt; alias is redirected to the &lt;strong&gt;player&lt;/strong&gt; index. Creating and deleting an index is very easy compared to an index. You cannot create an alias with the same name as an index. Nevertheless, creating an alias and deleting an index can happen at the same time. We use this behavior.&lt;/p&gt;
&lt;h3&gt;
  
  
  Process
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--MWZtZJB4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/c96fvpz0jcl8f19kkmf7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--MWZtZJB4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/c96fvpz0jcl8f19kkmf7.png" alt="How to change an index in 3 steps" width="383" height="421"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Create a new index with the name  and the desired replica and primary shard size.&lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;PUT&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;/new_index&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"settings"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"index"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"number_of_shards"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;  
      &lt;/span&gt;&lt;span class="nl"&gt;"number_of_replicas"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="w"&gt; 
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Copy the data from  to . Use the reindex API call from Elastic Search.&lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;POST&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;_reindex&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"index"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"old_index"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"dest"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"index"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"new_index"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Redirection and Cleaning up&lt;/p&gt;

&lt;p&gt;a. Delete   and create a alias with the name &lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt;```json
POST /_aliases
  "actions": [
    {
      "add": {
        "index": "new_index",
        "alias": "old_index"
      }
    },
    {
      "remove_index": {
        "index": "old_index"
      }
    }
  ]
}
```
&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;b. If you already have an alias, redirect it to &lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt;```json
POST /_aliases
  "actions": [
    {
      "add": {
        "index": "new_index",
        "alias": "old_index"
      }
    },
        {
      "remove": {
        "index": "other_index",
        "alias": "old_index"
      }
    },
    {
      "remove_index": {
        "index": "old_index"
      }
    }
  ]
}
```
&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Flags
&lt;/h3&gt;

&lt;p&gt;To make the approach testable you can add flags to your script making each part of the process optional. Thereby, you can create a new temporary index and switch from that one. That doesn’t affect your production environment, but you can test each step of the process. Here is an example:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Add a new temporary index and reindex from an existing one, but don’t switch (1. and 2. step of the process)&lt;/li&gt;
&lt;li&gt;Execute all steps on the temporary index. You can optionally run a load test on that index to see if there was no downtime while switching the indices.&lt;/li&gt;
&lt;li&gt;See if everything worked as expected and if your service experienced any downtime&lt;/li&gt;
&lt;li&gt;Delete the new index, as it was just for testing (just step 3)&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Switch the index again
&lt;/h3&gt;

&lt;p&gt;You might ask yourself: if I want to change the number of shards again how can I do that? No problem, just use step 3.b, which takes into account that you already created an alias.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;I showed you how to change the settings of an index without interrupting your service. You create a new index, copy all data, and create an alias that points to your new index while deleting the old one. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You have to downtime, as switching the index and deleting the old index happens at the same moment.&lt;/li&gt;
&lt;li&gt;Testing is no problem, as you can enable and disable each step and create temporary indices.&lt;/li&gt;
&lt;li&gt;The process can be applied multiple times, as it also works with existing aliases.&lt;/li&gt;
&lt;li&gt;Use the &lt;a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html"&gt;cat endpoint&lt;/a&gt; to monitor the process in real-time&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Don’t be afraid to scale down
&lt;/h3&gt;

&lt;p&gt;We reduced the number of shards to 1 with 2 replicas and 3 nodes (before 5 primary shards with 1 replica and 5 nodes). &lt;strong&gt;The CPU utilization was reduced by 75% and the costs by 20%.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I hope I could help you to get the most out of your Elastic Search Cluster. If you have any questions let me know and I’m happy to answer them. I would appreciate any feedback that you have, as I like to learn every day. Thanks a lot!&lt;/p&gt;

&lt;h2&gt;
  
  
  Reference
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html"&gt;https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html"&gt;https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://www.elastic.co/guide/en/elasticsearch/reference/8.4/docs-reindex.html"&gt;https://www.elastic.co/guide/en/elasticsearch/reference/8.4/docs-reindex.html&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://medium.com/zumba-tech/elasticsearch-series-rebuilding-indices-with-no-downtime-3498bebbebc"&gt;https://medium.com/zumba-tech/elasticsearch-series-rebuilding-indices-with-no-downtime-3498bebbebc&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://ugosan.org/Elasticsearch-replacing-index-by-alias/"&gt;https://ugosan.org/Elasticsearch-replacing-index-by-alias/&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>elasticsearch</category>
      <category>database</category>
    </item>
  </channel>
</rss>
