<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Damon P. Cortesi</title>
    <description>The latest articles on DEV Community by Damon P. Cortesi (@dacort).</description>
    <link>https://dev.to/dacort</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F615165%2F37725df3-d1c1-4f7b-bad2-8b17f9ad7526.jpeg</url>
      <title>DEV Community: Damon P. Cortesi</title>
      <link>https://dev.to/dacort</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/dacort"/>
    <language>en</language>
    <item>
      <title>An Introduction to Modern Data Lake Storage Layers</title>
      <dc:creator>Damon P. Cortesi</dc:creator>
      <pubDate>Wed, 02 Feb 2022 22:22:22 +0000</pubDate>
      <link>https://dev.to/dacort/an-introduction-to-modern-data-lake-storage-layers-3iid</link>
      <guid>https://dev.to/dacort/an-introduction-to-modern-data-lake-storage-layers-3iid</guid>
      <description>&lt;p&gt;In recent years we’ve seen a rise in new storage layers for data lakes. In 2017, &lt;a href="https://eng.uber.com/hoodie/" rel="noopener noreferrer"&gt;Uber announced Hudi&lt;/a&gt; - an incremental processing framework for data pipelines. In 2018, &lt;a href="https://conferences.oreilly.com/strata/strata-ny-2018/public/schedule/detail/69503.html" rel="noopener noreferrer"&gt;Netflix introduced Iceberg&lt;/a&gt; - a new table format for managing extremely large cloud datasets. And in 2019, &lt;a href="https://techcrunch.com/2019/04/24/databricks-open-sources-delta-lake-to-make-data-lakes-more-reliable/" rel="noopener noreferrer"&gt;Databricks open-sourced Delta Lake&lt;/a&gt; - originally intended to bring ACID transactions to data lakes.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;📹 &lt;em&gt;If you’d like to watch a video that discusses the content of this post, I’ve also recorded &lt;a href="https://www.youtube.com/watch?v=fryfx0Zg7KA" rel="noopener noreferrer"&gt;an overview here&lt;/a&gt;. Each relevant section below will also link to individual timestamps.&lt;/em&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;This post aims to introduce each of these engines and give some insight into how they function under the hood and some of the differences in each. While I’ll summarize the findings here, you can also view my Jupyter notebooks for each in my &lt;a href="https://github.com/dacort/modern-data-lake-storage-layers/tree/main/notebooks" rel="noopener noreferrer"&gt;modern-data-lake-storage-layers&lt;/a&gt; repository. We begin with basic operations of writing and updating datasets.&lt;/p&gt;

&lt;p&gt;One thing to note about all of these frameworks is that each began with a different challenge they were solving for, but over time they have begun to converge on a common set of functionality. I should &lt;strong&gt;also&lt;/strong&gt; note that I am learning about these frameworks as well - the comments here are neither authoritative or comprehensive. 🤗&lt;/p&gt;

&lt;h2&gt;
  
  
  Apache Hudi
&lt;/h2&gt;

&lt;blockquote&gt;
&lt;p&gt;📹 &lt;a href="https://www.youtube.com/watch?v=fryfx0Zg7KA&amp;amp;t=323s" rel="noopener noreferrer"&gt;Intro to Apache Hudi video&lt;/a&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Apache Hudi (Hadoop Upsert Delete and Incremental) was originally designed as an incremental stream processing framework and was built to combine the benefits of stream and batch processing. Hudi can be used with Spark, Flink, Presto, Trino and Hive, but much of the original work was focused around Spark and that’s what I use for these examples. One of the other huge benefits of Hudi is the concept of a self-managed data layer. For example, Hudi can automatically perform &lt;a href="https://hudi.apache.org/docs/compaction" rel="noopener noreferrer"&gt;asynchronous compaction&lt;/a&gt; to optimize data lakes and also supports &lt;a href="https://hudi.apache.org/docs/concurrency_control" rel="noopener noreferrer"&gt;multi-writer gaurantees&lt;/a&gt;. Hudi also offers flexibility in storage formats depending on read/write requirements and data size.&lt;/p&gt;

&lt;p&gt;For Hudi, we create a simple Spark DataFrame partitioned by &lt;code&gt;creation_date&lt;/code&gt; and write that to S3.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"],
)

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
    "hoodie.table.name": "my_hudi_table",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.partitionpath.field": "creation_date",
    "hoodie.datasource.write.precombine.field": "last_update_time",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.table": "my_hudi_table",
    "hoodie.datasource.hive_sync.partition_fields": "creation_date",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.index.type": "GLOBAL_BLOOM", # This is required if we want to ensure we upsert a record, even if the partition changes
    "hoodie.bloom.index.update.partition.path": "true", # This is required to write the data into the new partition (defaults to false in 0.8.0, true in 0.9.0)
}

# Write a DataFrame as a Hudi dataset
inputDF.write.format("org.apache.hudi").option(
    "hoodie.datasource.write.operation", "insert"
).options(**hudiOptions).mode("overwrite").save(f"s3://{S3_BUCKET_NAME}/tmp/hudi/")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we look at the file structure on S3, we see a few things:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A &lt;code&gt;hoodie.properties&lt;/code&gt; file
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-14 00:33:46 503 tmp/hudi/.hoodie/hoodie.properties

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This file contains certain metadata about the Hudi dataset:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;#Properties saved on Fri Jan 14 00:33:45 UTC 2022
#Fri Jan 14 00:33:45 UTC 2022
hoodie.table.precombine.field=last_update_time
hoodie.table.partition.fields=creation_date
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.timeline.layout.version=1
hoodie.table.version=2
hoodie.table.recordkey.fields=id
hoodie.table.base.file.format=PARQUET
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.name=my_hudi_table

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;A set of commit-related files
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-14 00:33:57 2706 tmp/hudi/.hoodie/20220114003341.commit
2022-01-14 00:33:48 0 tmp/hudi/.hoodie/20220114003341.commit.requested
2022-01-14 00:33:52 1842 tmp/hudi/.hoodie/20220114003341.inflight

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;The actual &lt;code&gt;.parquet&lt;/code&gt; data files and associated metadata organized into date-based partitions.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-14 00:33:54 93 tmp/hudi/2015-01-01/.hoodie_partition_metadata
2022-01-14 00:33:54 434974 tmp/hudi/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet
2022-01-14 00:33:55 93 tmp/hudi/2015-01-02/.hoodie_partition_metadata
2022-01-14 00:33:55 434943 tmp/hudi/2015-01-02/43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We then update the &lt;code&gt;creation_date&lt;/code&gt; of one row in this dataset.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql.functions import lit

# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.where("id = 100").withColumn("creation_date", lit("2022-01-11"))

updateDF.show()

# Update by using the "upsert" operation
updateDF.write.format("org.apache.hudi").option(
    "hoodie.datasource.write.operation", "upsert"
).options(**hudiOptions).mode("append").save(f"s3://{S3_BUCKET_NAME}/tmp/hudi/")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One thing to note here is that since we’re updating a partition value ( &lt;strong&gt;DANGER!&lt;/strong&gt; ), we had to set the &lt;code&gt;hoodie.index.type&lt;/code&gt; to &lt;code&gt;GLOBAL_BLOOM&lt;/code&gt; as well as setting &lt;code&gt;hoodie.bloom.index.update.partition.path&lt;/code&gt; to &lt;code&gt;true&lt;/code&gt;. This can have a large impact on performance so normally we would try not to change a partition value in a production environment, but it’s useful here to see the impact it has. You can mind more details in the Hudi FAQ about &lt;a href="https://hudi.apache.org/learn/faq/#how-does-the-hudi-indexing-work--what-are-its-benefits" rel="noopener noreferrer"&gt;Hudi indexing&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;After this write, we have a new set of commit-related files on S3:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-14 00:34:15 2706 tmp/hudi/.hoodie/20220114003401.commit
2022-01-14 00:34:03 0 tmp/hudi/.hoodie/20220114003401.commit.requested
2022-01-14 00:34:08 2560 tmp/hudi/.hoodie/20220114003401.inflight

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And we actually have &lt;strong&gt;2&lt;/strong&gt; new &lt;code&gt;.parquet&lt;/code&gt; files:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-14 00:34:12 434925 tmp/hudi/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet
...
2022-01-14 00:34:13 93 tmp/hudi/2022-01-11/.hoodie_partition_metadata
2022-01-14 00:34:14 434979 tmp/hudi/2022-01-11/0c210872-484e-428b-a9ca-90a26e42125c-0_1-43-13681_20220114003401.parquet

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So what happened with the update is that the old partition (&lt;code&gt;2015-01-01&lt;/code&gt;) had its data overwritten and the new partition (&lt;code&gt;2022-01-11&lt;/code&gt;) &lt;em&gt;also&lt;/em&gt; had data written to it. You can now see why the global bloom index could have such a large impact on write performance as there is significant potential for write amplication.&lt;/p&gt;

&lt;p&gt;If we query the data and add the source filename for each row, we can also see that data for the old partition now comes from the new parquet file (notice the commit ID &lt;code&gt;20220114003401&lt;/code&gt; shows up in the filename):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql.functions import input_file_name

snapshotQueryDF = spark.read \
    .format('org.apache.hudi') \
    .load(f"s3://{S3_BUCKET_NAME}/tmp/hudi/") \
    .select('id', 'creation_date') \
    .withColumn("filename", input_file_name())

snapshotQueryDF.show(truncate=False)


+---+-------------+------------------------------------------------------------------------------------------------------------------------------+
|id |creation_date|filename |
+---+-------------+------------------------------------------------------------------------------------------------------------------------------+
|100|2022-01-11 |/hudi/2022-01-11/0c210872-484e-428b-a9ca-90a26e42125c-0_1-43-13681_20220114003401.parquet |
|105|2015-01-02 |/hudi/2015-01-02/43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet |
|104|2015-01-02 |/hudi/2015-01-02/43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet |
|102|2015-01-01 |/hudi/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet |
|103|2015-01-01 |/hudi/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet |
|101|2015-01-01 |/hudi/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet |
+---+-------------+------------------------------------------------------------------------------------------------------------------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One other thing to note is that Hudi adds quite a bit of metadata to your Parquet files. This data helps enable record-level change streams - more detail can be found in this &lt;a href="https://hudi.apache.org/blog/2021/07/21/streaming-data-lake-platform/#writers" rel="noopener noreferrer"&gt;comprehensive blog post about the Hudi platform&lt;/a&gt;. If we use native Spark to read one of the Parquet files and show it, we see that there’s various &lt;code&gt;_hoodie&lt;/code&gt;-prefixed keys.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql.functions import split

rawDF = (
    spark.read.parquet(f"s3://{S3_BUCKET_NAME}/tmp/hudi/*/*.parquet")
    .withColumn("filename", split(input_file_name(), "tmp/hudi").getItem(1))
    .sort("_hoodie_commit_time", "_hoodie_commit_seqno")
)
rawDF.show(truncate=False)


+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+-------------+---------------------------+------------------------------------------------------------------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |creation_date|last_update_time |filename |
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+-------------+---------------------------+------------------------------------------------------------------------------------+
|20220114003341 |20220114003341_0_1 |100 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |100|2015-01-01 |2015-01-01T13:51:39.340396Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |
|20220114003341 |20220114003341_0_2 |102 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |102|2015-01-01 |2015-01-01T13:51:40.417052Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet|
|20220114003341 |20220114003341_0_2 |102 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |102|2015-01-01 |2015-01-01T13:51:40.417052Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |
|20220114003341 |20220114003341_0_3 |103 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |103|2015-01-01 |2015-01-01T13:51:40.519832Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet|
|20220114003341 |20220114003341_0_3 |103 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |103|2015-01-01 |2015-01-01T13:51:40.519832Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |
|20220114003341 |20220114003341_0_4 |101 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |101|2015-01-01 |2015-01-01T12:14:58.597216Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-37-13680_20220114003401.parquet|
|20220114003341 |20220114003341_0_4 |101 |2015-01-01 |57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |101|2015-01-01 |2015-01-01T12:14:58.597216Z|/2015-01-01/57f66198-5303-4922-9323-91737ec40d25-0_0-4-98_20220114003341.parquet |
|20220114003341 |20220114003341_1_5 |105 |2015-01-02 |43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet |105|2015-01-02 |2015-01-01T13:51:42.248818Z|/2015-01-02/43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet |
|20220114003341 |20220114003341_1_6 |104 |2015-01-02 |43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet |104|2015-01-02 |2015-01-01T12:15:00.512679Z|/2015-01-02/43051d12-87e7-4dfb-8201-6ce293cf0df7-0_1-6-99_20220114003341.parquet |
|20220114003401 |20220114003401_1_1 |100 |2022-01-11 |0c210872-484e-428b-a9ca-90a26e42125c-0_1-43-13681_20220114003401.parquet|100|2022-01-11 |2015-01-01T13:51:39.340396Z|/2022-01-11/0c210872-484e-428b-a9ca-90a26e42125c-0_1-43-13681_20220114003401.parquet|
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+-------------+---------------------------+------------------------------------------------------------------------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the background, Hudi figures out which commits and values to show based on the commit files and metadata in the parquet files.&lt;/p&gt;

&lt;h2&gt;
  
  
  Apache Iceberg
&lt;/h2&gt;

&lt;blockquote&gt;
&lt;p&gt;📹 &lt;a href="https://www.youtube.com/watch?v=fryfx0Zg7KA&amp;amp;t=1039s" rel="noopener noreferrer"&gt;Intro to Apache Iceberg video&lt;/a&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;When I first heard about Iceberg, the phrase “table format for storing large, slow-moving tabular data” didn’t really make sense to me. But after working with data lakes at scale, it became quite clear. Apache Hive is a popular data warehouse project that provides a SQL-like interface to large datasets. Built on top of Hadoop, it originally used HDFS as its data store. With cloud migrations, object stores like Amazon S3 enabled the ability to store even more data particularly without the operational concerns of a large Hadoop cluster, but with some limitations when compared to HDFS. Specifically, directory listings are slower (simple physics here, network calls are slower), renames are not atomic (by design), and results were previously eventually consistent.&lt;/p&gt;

&lt;p&gt;So imagine you are Netflix, you have &lt;a href="https://netflixtechblog.com/optimizing-data-warehouse-storage-7b94a48fdcbe" rel="noopener noreferrer"&gt;hundreds of petabytes of data&lt;/a&gt; stored on S3, and you need a way for your organization to efficiently query this. You need a data storage layer that reduces or removes directory listings, you want atomic changes, and you want to ensure that when you’re reading your data you get consistent results. &lt;em&gt;There is more to Iceberg, but I’m simplifying because this helped me understand. :)&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;These were some of the original goals for Iceberg, so let’s dive in and see how it works. Similar to Hudi, we’ll create a simple Spark DataFrame and write that to S3 in Iceberg format.&lt;/p&gt;

&lt;p&gt;I should note that much of Iceberg is focused around Spark SQL, so I will switch to that below for certain operations.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"],
)

# Write a DataFrame as an Iceberg dataset
inputDF.write.format("iceberg").mode("overwrite").partitionBy("creation_date").option(
    "path", f"s3://{S3_BUCKET_NAME}/tmp/iceberg/"
).saveAsTable(ICEBERG_TABLE_NAME)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There are two main differences here - there is not as much “configuration” as we had to do with Hudi and we also explicitly use &lt;code&gt;saveAsTable&lt;/code&gt;. With Iceberg, much of the metadata is stored in a data catalog so creating the table is necessary. Let’s see what happened on S3.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;First, we have a &lt;code&gt;metadata.json&lt;/code&gt; file
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-28 06:03:50 2457 tmp/iceberg/metadata/00000-bb1d38a9-af77-42c4-a7b7-69416fe36d9c.metadata.json

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Then a snapshot &lt;strong&gt;manifest list&lt;/strong&gt; file
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-28 06:03:50 3785 tmp/iceberg/metadata/snap-7934053180928033536-1-e79c79ba-c7f0-45ad-8f2e-fd1bc349db55.avro

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;And a manifest file
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-28 06:03:50 6244 tmp/iceberg/metadata/e79c79ba-c7f0-45ad-8f2e-fd1bc349db55-m0.avro

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;And finally, we’ve got our Parquet data files
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-28 06:03:49 1197 tmp/iceberg/data/creation_date=2015-01-01/00000-4-fa9a18fd-abc4-4e04-91b4-e2ac4c9531be-00001.parquet
2022-01-28 06:03:49 1171 tmp/iceberg/data/creation_date=2015-01-01/00001-5-eab30115-a1d6-4918-abb4-a198ac12b262-00001.parquet
2022-01-28 06:03:50 1182 tmp/iceberg/data/creation_date=2015-01-02/00001-5-eab30115-a1d6-4918-abb4-a198ac12b262-00002.parquet

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There are a lot of moving pieces here, but the image from the Iceberg spec illustrates it quite well.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Tq70LNET--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dacort.dev/posts/modern-data-lake-storage-layers/iceberg-metadata.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Tq70LNET--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dacort.dev/posts/modern-data-lake-storage-layers/iceberg-metadata.png" alt="Iceberg Metadata Diagram" width="800" height="827"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Similar to Hudi, our data is written to Parquet files in each partition, although Hive-style partitioning is used by default. Hudi can also do this by setting the &lt;a href="https://hudi.apache.org/docs/configurations/#hoodiedatasourcewritehive_style_partitioning" rel="noopener noreferrer"&gt;&lt;code&gt;hoodie.datasource.write.hive_style_partitioning&lt;/code&gt;&lt;/a&gt; parameter.&lt;/p&gt;

&lt;p&gt;Different from Hudi, though, is the default usage of the data catalog to identify the current metadata file to use. (&lt;a href="https://hudi.apache.org/releases/release-0.7.0/#metadata-table" rel="noopener noreferrer"&gt;Hudi 0.7.0&lt;/a&gt; introduced support for a metadata table to reduce the performance impact of file listings.) That metadata file contains references to a list of manifest files to use to determine which data files compose the dataset for that particular version, also known as snapshots. The snapshot data also has quite a bit of additional information. Let’s update our dataset then take a look at S3 again and the snapshot portion of the metadata file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark.sql(f"UPDATE {ICEBERG_TABLE_NAME} SET creation_date = '2022-01-11' WHERE id = 100")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can see that we have:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;2 new .parquet data files
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-28 06:07:07 1180 tmp/iceberg/data/creation_date=2015-01-01/00000-16-033354bd-7b02-44f4-95e2-7045e10706fc-00001.parquet
2022-01-28 06:07:08 1171 tmp/iceberg/data/creation_date=2022-01-11/00000-16-033354bd-7b02-44f4-95e2-7045e10706fc-00002.parquet

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As well as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;1 new metadata.json file&lt;/li&gt;
&lt;li&gt;2 new .avro metadata listings&lt;/li&gt;
&lt;li&gt;1 new snap-*.avro snapshot file&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let’s look at the snapshot portion of the &lt;code&gt;metadata.json&lt;/code&gt; file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"snapshots": [
    {
        "manifest-list": "s3://&amp;lt;BUCKET&amp;gt;/tmp/iceberg/metadata/snap-7934053180928033536-1-e79c79ba-c7f0-45ad-8f2e-fd1bc349db55.avro",
        "schema-id": 0,
        "snapshot-id": 7934053180928033536,
        "summary": {
            "added-data-files": "3",
            "added-files-size": "3550",
            "added-records": "6",
            "changed-partition-count": "2",
            "operation": "append",
            "spark.app.id": "application_1643153254969_0029",
            "total-data-files": "3",
            "total-delete-files": "0",
            "total-equality-deletes": "0",
            "total-files-size": "3550",
            "total-position-deletes": "0",
            "total-records": "6"
        },
        "timestamp-ms": 1643349829278
    },
    {
        "manifest-list": "s3://&amp;lt;BUCKET&amp;gt;/tmp/iceberg/metadata/snap-5441092870212826638-1-605de48f-8ccf-450c-935e-bbd4194ee8cc.avro",
        "parent-snapshot-id": 7934053180928033536,
        "schema-id": 0,
        "snapshot-id": 5441092870212826638,
        "summary": {
            "added-data-files": "2",
            "added-files-size": "2351",
            "added-records": "3",
            "changed-partition-count": "2",
            "deleted-data-files": "1",
            "deleted-records": "3",
            "operation": "overwrite",
            "removed-files-size": "1197",
            "spark.app.id": "application_1643153254969_0029",
            "total-data-files": "4",
            "total-delete-files": "0",
            "total-equality-deletes": "0",
            "total-files-size": "4704",
            "total-position-deletes": "0",
            "total-records": "6"
        },
        "timestamp-ms": 1643350027635
    }
]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is pretty amazing - we see how many files &lt;strong&gt;and records&lt;/strong&gt; were added or deleted, what the file sizes were, and even what the Spark &lt;code&gt;app_id&lt;/code&gt; was! 🤯 Some of this data is in the &lt;code&gt;manifest-list&lt;/code&gt; files as well, but you can begin to see &lt;em&gt;just&lt;/em&gt; how much you could potentially optimize your queries using this data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Delta Lake
&lt;/h2&gt;

&lt;blockquote&gt;
&lt;p&gt;📹 &lt;a href="https://www.youtube.com/watch?v=fryfx0Zg7KA&amp;amp;t=1814s" rel="noopener noreferrer"&gt;Intro to Delta Lake video&lt;/a&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Delta Lake was also introduced by Databricks as a way to address many of the challenges of Data Lakes. Similar to Hudi and Iceberg its goals include unifying batch and stream processing, ACID transactions, and scalable metadata handling among others.&lt;/p&gt;

&lt;p&gt;Again, we’ll create a simple Spark DataFrame and write it to S3 in Delta format.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"],
)

# Write a DataFrame as a Delta dataset
inputDF.write.format("delta").mode("overwrite").option(
    "overwriteSchema", "true"
).partitionBy("creation_date").save(f"s3://{S3_BUCKET_NAME}/tmp/delta/")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;On S3, we now see the following files:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;a &lt;code&gt;00000000000000000000.json&lt;/code&gt; file
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-24 22:57:54 2120 tmp/delta/_delta_log/00000000000000000000.json

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Several &lt;code&gt;.snappy.parquet&lt;/code&gt; files
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-24 22:57:52 875 tmp/delta/creation_date=2015-01-01/part-00005-2e09dbe4-469e-40dc-9b36-833480f6d375.c000.snappy.parquet
2022-01-24 22:57:52 875 tmp/delta/creation_date=2015-01-01/part-00010-848c69e1-71fb-4f8f-a19a-dd74e0ef1b8a.c000.snappy.parquet
2022-01-24 22:57:53 875 tmp/delta/creation_date=2015-01-01/part-00015-937d1837-0f03-4306-9b4e-4366207e688d.c000.snappy.parquet
2022-01-24 22:57:54 875 tmp/delta/creation_date=2015-01-01/part-00021-978a808e-4c36-4646-b7b1-ef5a21e706d8.c000.snappy.parquet
2022-01-24 22:57:54 875 tmp/delta/creation_date=2015-01-02/part-00026-538e1ac6-055e-4e72-9177-63daaaae1f98.c000.snappy.parquet
2022-01-24 22:57:52 875 tmp/delta/creation_date=2015-01-02/part-00031-8a03451a-0297-4c43-b64d-56db25807d02.c000.snappy.parquet

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;OK, so what’s in that &lt;code&gt;_delta_log&lt;/code&gt; file? Similar to Iceberg, quite a bit of information about this initial write to S3 including the number of files written, the schema of the dataset, and even the individual &lt;code&gt;add&lt;/code&gt; operations for each file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "commitInfo": {
    "timestamp": 1643065073634,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "[\"creation_date\"]"
    },
    "isBlindAppend": false,
    "operationMetrics": {
      "numFiles": "6",
      "numOutputBytes": "5250",
      "numOutputRows": "6"
    }
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}
{
  "metaData": {
    "id": "a7f4b1d1-09f6-4475-894a-0eec90d1aab5",
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"creation_date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_update_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
    "partitionColumns": [
      "creation_date"
    ],
    "configuration": {},
    "createdTime": 1643065064066
  }
}
{
  "add": {
    "path": "creation_date=2015-01-01/part-00005-2e09dbe4-469e-40dc-9b36-833480f6d375.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2015-01-01"
    },
    "size": 875,
    "modificationTime": 1643065072000,
    "dataChange": true
  }
}
{
  "add": {
    "path": "creation_date=2015-01-01/part-00010-848c69e1-71fb-4f8f-a19a-dd74e0ef1b8a.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2015-01-01"
    },
    "size": 875,
    "modificationTime": 1643065072000,
    "dataChange": true
  }
}
{
  "add": {
    "path": "creation_date=2015-01-01/part-00015-937d1837-0f03-4306-9b4e-4366207e688d.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2015-01-01"
    },
    "size": 875,
    "modificationTime": 1643065073000,
    "dataChange": true
  }
}
{
  "add": {
    "path": "creation_date=2015-01-01/part-00021-978a808e-4c36-4646-b7b1-ef5a21e706d8.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2015-01-01"
    },
    "size": 875,
    "modificationTime": 1643065074000,
    "dataChange": true
  }
}
{
  "add": {
    "path": "creation_date=2015-01-02/part-00026-538e1ac6-055e-4e72-9177-63daaaae1f98.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2015-01-02"
    },
    "size": 875,
    "modificationTime": 1643065074000,
    "dataChange": true
  }
}
{
  "add": {
    "path": "creation_date=2015-01-02/part-00031-8a03451a-0297-4c43-b64d-56db25807d02.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2015-01-02"
    },
    "size": 875,
    "modificationTime": 1643065072000,
    "dataChange": true
  }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Alright, let’s go ahead and update one of our rows. Delta Lake provides a merge operation that we can use. We’ll use the syntax &lt;a href="https://docs.delta.io/latest/quick-start.html#update-table-data" rel="noopener noreferrer"&gt;from the docs&lt;/a&gt; that’s slightly different from native Spark as it creates a DeltaTable object.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql.functions import lit

# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.where("id = 100").withColumn("creation_date", lit("2022-01-11"))

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, f"s3://{S3_BUCKET_NAME}/tmp/delta/")

deltaTable.alias("oldData") \
  .merge(
    updateDF.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "creation_date": col("newData.creation_date") }) \
  .execute()

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Interestingly, now when we look at S3 we see 1 new &lt;code&gt;json&lt;/code&gt; file and only 1 new &lt;code&gt;parquet&lt;/code&gt; file (Remember Hudi and Iceberg both had 2 new &lt;code&gt;parquet&lt;/code&gt; files).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2022-01-24 23:05:46 1018 tmp/delta/_delta_log/00000000000000000001.json
2022-01-24 23:05:46 875 tmp/delta/creation_date=2022-01-11/part-00000-3f3fd83a-b876-4b6f-8f64-d8a4189392ae.c000.snappy.parquet

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we look at that new JSON file we see something really interesting:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "commitInfo": {
    "timestamp": 1643065545396,
    "operation": "MERGE",
    "operationParameters": {
      "predicate": "(oldData.`id` = newData.`id`)",
      "matchedPredicates": "[{\"actionType\":\"update\"}]",
      "notMatchedPredicates": "[]"
    },
    "readVersion": 0,
    "isBlindAppend": false,
    "operationMetrics": {
      "numTargetRowsCopied": "0",
      "numTargetRowsDeleted": "0",
      "numTargetFilesAdded": "1",
      "executionTimeMs": "4705",
      "numTargetRowsInserted": "0",
      "scanTimeMs": "3399",
      "numTargetRowsUpdated": "1",
      "numOutputRows": "1",
      "numSourceRows": "1",
      "numTargetFilesRemoved": "1",
      "rewriteTimeMs": "1265"
    }
  }
}
{
  "remove": {
    "path": "creation_date=2015-01-01/part-00005-2e09dbe4-469e-40dc-9b36-833480f6d375.c000.snappy.parquet",
    "deletionTimestamp": 1643065545378,
    "dataChange": true,
    "extendedFileMetadata": true,
    "partitionValues": {
      "creation_date": "2015-01-01"
    },
    "size": 875
  }
}
{
  "add": {
    "path": "creation_date=2022-01-11/part-00000-3f3fd83a-b876-4b6f-8f64-d8a4189392ae.c000.snappy.parquet",
    "partitionValues": {
      "creation_date": "2022-01-11"
    },
    "size": 875,
    "modificationTime": 1643065546000,
    "dataChange": true
  }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In addition to the &lt;code&gt;operationMetrics&lt;/code&gt; that gives us insight into how the data changed on “disk”, we also now see both a &lt;code&gt;remove&lt;/code&gt; and &lt;code&gt;add&lt;/code&gt; operation. In Delta Lake (and I’m not quite sure why this happened yet…), each row was written to an individual &lt;code&gt;.parquet&lt;/code&gt; file! So for this second version of the data, the fact that that row was updated simply lives in the metadata because it was the only row stored in that Parquet file. I’m guessing this is simply because my dataset is so small the default number of partitions in Spark/Delta Lake resulted in this write configuration.&lt;/p&gt;

&lt;h2&gt;
  
  
  Snapshots
&lt;/h2&gt;

&lt;p&gt;So now we’ve got a good idea of the semantics of each of these storage layers. Let’s take one more look at an important component of all of them and that’s snapshots!&lt;/p&gt;

&lt;h3&gt;
  
  
  Hudi
&lt;/h3&gt;

&lt;p&gt;Hudi has a concept of “point-in-time” queries where you provide it a range of two commit timestamps and it will show you what the data looked like at that point in time.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Query data from the first version of the table
readOptions = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': '0',
  'hoodie.datasource.read.end.instanttime': '20220114003341',
}

incQueryDF = spark.read \
    .format('org.apache.hudi') \
    .options(**readOptions) \
    .load(f"s3://{S3_BUCKET_NAME}/tmp/hudi")

incQueryDF.show()

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Iceberg
&lt;/h3&gt;

&lt;p&gt;Iceberg supports a similar mechanism called time travel and you can use either a &lt;code&gt;snapshot-id&lt;/code&gt; or &lt;code&gt;as-of-timestamp&lt;/code&gt; similar to Hudi.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# time travel to 2022-01-27 22:04:00 -0800
df = spark.read \
    .option("as-of-timestamp", "1643349840000") \
    .format("iceberg") \
    .load(ICEBERG_TABLE_NAME)

df.show()

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Delta Lake
&lt;/h3&gt;

&lt;p&gt;And, of course, Delta Lake supports this as well using either &lt;a href="https://docs.delta.io/latest/delta-batch.html#-deltatimetravel" rel="noopener noreferrer"&gt;Spark SQL or DataFrames&lt;/a&gt;. And similar to Iceberg you can use &lt;code&gt;versionAsOf&lt;/code&gt; or &lt;code&gt;timestampAsOf&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# time travel to 2022-01-24 23:00
df1 = (
    spark.read.format("delta")
    .option("timestampAsOf", "2022-01-24 23:00")
    .load(f"s3://{S3_BUCKET_NAME}/tmp/delta/")
)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Deletes
&lt;/h2&gt;

&lt;p&gt;I bet you’re surprised I haven’t mentioned deletes or GDPR yet. Don’t worry…I will. 😀 But first I just wanted to understand exactly how these different systems work.&lt;/p&gt;

&lt;h2&gt;
  
  
  Wrapup
&lt;/h2&gt;

&lt;p&gt;In this post, we reviewed the basics of Apache Hudi, Apache Iceberg, and Delta Lake - modern data lake storage layers. All these frameworks enable a set of functionality that optimize working with data in cloud-based object stores, albeit with slightly different approaches.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>analytics</category>
    </item>
    <item>
      <title>SSH to EC2 Instances with Session Manager</title>
      <dc:creator>Damon P. Cortesi</dc:creator>
      <pubDate>Wed, 29 Sep 2021 16:49:03 +0000</pubDate>
      <link>https://dev.to/dacort/ssh-to-ec2-instances-with-session-manager-1494</link>
      <guid>https://dev.to/dacort/ssh-to-ec2-instances-with-session-manager-1494</guid>
      <description>&lt;p&gt;I’m kind of an old-school sys admin (aka, managed NT4 in the 90’s) so I’m really used to SSH’ing into hosts. More often than not, however, I’m working with AWS EC2 instances in a private subnet.&lt;/p&gt;

&lt;p&gt;If you’re not familiar with it &lt;a href="https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager.html" rel="noopener noreferrer"&gt;AWS Systems Manager Session Manager&lt;/a&gt; is a pretty sweet feature that allows you to connect remotely to EC2 instances with the AWS CLI, without needing to open up ports for SSH or utilize a bastion host.&lt;/p&gt;

&lt;p&gt;I’ve been using it in my browser occasionally, which is pretty handy, but I wanted to use it from my terminal. It required a couple steps to get working.&lt;/p&gt;

&lt;h2&gt;
  
  
  Set up Session Manager with AWS CLI
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Install the &lt;a href="https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html" rel="noopener noreferrer"&gt;Session Manager plugin&lt;/a&gt; for the AWS CLI&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I’m on a mac, so I just installed the plugin with the signed installer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/mac/session-manager-plugin.pkg" -o ~/Downloads/session-manager-plugin.pkg

sudo installer -pkg ~/Downloads/session-manager-plugin.pkg -target /
sudo ln -s /usr/local/sessionmanagerplugin/bin/session-manager-plugin /usr/local/bin/session-manager-plugin


➜ session-manager-plugin

The Session Manager plugin was installed successfully. Use the AWS CLI to start a session.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Sweet, good to go there!&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Now use the AWS CLI to connect to an instance!&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;em&gt;You may need to specify the region your instance is in&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws ssm start-session --target i-abcdefgh123456789 --region us-west-2

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Awesome! You’re good to go!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--aXWnUebP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dacort.dev/posts/ssh-to-ec2-instances-with-session-manager/session-output.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--aXWnUebP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dacort.dev/posts/ssh-to-ec2-instances-with-session-manager/session-output.png" alt="Session Output" width="505" height="334"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Enable Logging
&lt;/h2&gt;

&lt;p&gt;The other nice thing if your memory is as bad as mine (or you want auditing, which is a more legitimate reason), you can also enable logging of your sessions to S3 or CloudWatch.&lt;/p&gt;

&lt;p&gt;This is what the default config looks like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws ssm get-document \
    --region us-west-2 \
    --name "SSM-SessionManagerRunShell" \
    --document-version "\$LATEST" \
    | jq '.Content | fromjson'


{
  "schemaVersion": "1.0",
  "description": "Document to hold regional settings for Session Manager",
  "sessionType": "Standard_Stream",
  "inputs": {
    "s3BucketName": "",
    "s3KeyPrefix": "",
    "s3EncryptionEnabled": true,
    "cloudWatchLogGroupName": "",
    "cloudWatchEncryptionEnabled": true,
    "cloudWatchStreamingEnabled": true,
    "idleSessionTimeout": "20",
    "kmsKeyId": "",
    "runAsEnabled": false,
    "runAsDefaultUser": "",
    "shellProfile": {
      "windows": "",
      "linux": ""
    }
  }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So we’ll just update that to add in the S3 configuration.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Update Session Manager preferences&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;em&gt;Note that I &lt;strong&gt;do not&lt;/strong&gt; enable encryption here.&lt;/em&gt; This setting needs to match your bucket setting and you need to make sure your VPC has the proper endpoints and access to write to S3. Check &lt;a href="https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-troubleshooting.html#session-manager-troubleshooting-start-blank-screen" rel="noopener noreferrer"&gt;troubleshooting&lt;/a&gt; if you get a blank screen when trying to start a session.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;BUCKET=&amp;lt;BUCKET_NAME&amp;gt;
PREFIX=logs/session_manager/

aws ssm update-document \
    --region us-west-2 \
    --name "SSM-SessionManagerRunShell" \
    --document-version "\$LATEST" \
    --content '{
  "schemaVersion": "1.0",
  "description": "Document to hold regional settings for Session Manager",
  "sessionType": "Standard_Stream",
  "inputs": {
    "s3BucketName": "'${BUCKET}'",
    "s3KeyPrefix": "'${PREFIX}'",
    "s3EncryptionEnabled": false,
    "cloudWatchLogGroupName": "",
    "cloudWatchEncryptionEnabled": true,
    "cloudWatchStreamingEnabled": true,
    "idleSessionTimeout": "20",
    "kmsKeyId": "",
    "runAsEnabled": false,
    "runAsDefaultUser": "",
    "shellProfile": {
      "windows": "",
      "linux": ""
    }
  }
}'

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Create another session!
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws ssm start-session --target i-abcdefgh123456789 --region us-west-2

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you’re done with your session and exit, you should have a log file in your S3 bucket.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;View logs
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws s3 ls s3://&amp;lt;BUCKET_NAME&amp;gt;/logs/session_manager/


2021-09-29 10:21:24 4177 your-aws-username-abcdefgh123456789.log

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And that log file will have the full contents of your session.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws s3 cp s3://&amp;lt;BUCKET_NAME&amp;gt;/logs/session_manager/your-aws-username-abcdefgh123456789.log -

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--tMlZLCMs--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dacort.dev/posts/ssh-to-ec2-instances-with-session-manager/log-output.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--tMlZLCMs--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dacort.dev/posts/ssh-to-ec2-instances-with-session-manager/log-output.png" alt="SSM Log Output" width="800" height="469"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And yes, the &lt;em&gt;FULL CONTENTS&lt;/em&gt;. So if you enter a password or sensitive info, you should follow the steps &lt;a href="https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-logging.html" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;stty -echo; read passwd; stty echo;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>aws</category>
    </item>
    <item>
      <title>Updating Partition Values With Apache Hudi</title>
      <dc:creator>Damon P. Cortesi</dc:creator>
      <pubDate>Thu, 23 Sep 2021 19:21:15 +0000</pubDate>
      <link>https://dev.to/dacort/updating-partition-values-with-apache-hudi-3hbg</link>
      <guid>https://dev.to/dacort/updating-partition-values-with-apache-hudi-3hbg</guid>
      <description>&lt;p&gt;If you're not familiar with &lt;a href="https://hudi.apache.org/" rel="noopener noreferrer"&gt;Apache Hudi&lt;/a&gt;, it's a pretty awesome piece of software that brings transactions and record-level updates/deletes to data lakes.&lt;/p&gt;

&lt;p&gt;More specifically, if you're doing Analytics with S3, Hudi provides a way for you to &lt;em&gt;consistently&lt;/em&gt; update records in your data lake, which historically has been pretty challenging. It can also optimize file sizes, allow for rollbacks, and makes &lt;a href="https://aws.amazon.com/blogs/big-data/new-features-from-apache-hudi-available-in-amazon-emr/" rel="noopener noreferrer"&gt;streaming CDC data impressively easy&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Updating Partition Values
&lt;/h2&gt;

&lt;p&gt;I'm learning more about Hudi and was following this &lt;a href="https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html" rel="noopener noreferrer"&gt;EMR guide to working with a Hudi dataset&lt;/a&gt;, but the "Upsert" operation didn't quite work as I expected. Instead of overwriting the desired record, it added a second one with the same ID. 🤔&lt;/p&gt;

&lt;p&gt;After some furious searching, I finally came across this post about &lt;a href="https://medium.com/apache-hudi-blogs/employing-the-right-indexes-for-fast-updates-deletes-in-apache-hudi-814d863635f6" rel="noopener noreferrer"&gt;employing the right indexes in Apache Hudi&lt;/a&gt;. Specifically, this line caught my attention:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Global indexes enforce uniqueness of keys across all partitions of a table i.e guarantees that exactly one record exists in the table for a given record key.&lt;/strong&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Ah-ha! In the example, we're updating a partition value. &lt;em&gt;BY DEFAULT&lt;/em&gt;, the &lt;code&gt;hoodie.index.type&lt;/code&gt; is &lt;code&gt;BLOOM&lt;/code&gt;. I tried changing it to &lt;code&gt;GLOBAL_BLOOM&lt;/code&gt;, and when updating the record, it wrote it into the old partition. It turns out that there is &lt;em&gt;also&lt;/em&gt; a &lt;code&gt;hoodie.bloom.index.update.partition.path&lt;/code&gt; setting that will also update the partition path. This defaults to &lt;code&gt;true&lt;/code&gt; in Hudi v0.9.0, but I'm using v0.8.0 where it defaults to &lt;code&gt;false&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Note that there is a performance/storage impact to enabling global indexes&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;So flipping that, I got the expected behavior. Using the example from the EMR docs, my code now looks like this:&lt;/p&gt;

&lt;h3&gt;
  
  
  Writing initial dataset
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Create a DataFrame
&lt;/span&gt;&lt;span class="n"&gt;inputDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createDataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;100&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01T13:51:39.340396Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;101&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01T12:14:58.597216Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;102&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01T13:51:40.417052Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;103&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01T13:51:40.519832Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;104&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-02&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01T12:15:00.512679Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;105&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-02&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2015-01-01T13:51:42.248818Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;creation_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_update_time&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Specify common DataSourceWriteOptions in the single hudiOptions variable
&lt;/span&gt;&lt;span class="n"&gt;hudiOptions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.table.name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_hudi_table&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.write.recordkey.field&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.write.partitionpath.field&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;creation_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.write.precombine.field&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_update_time&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.hive_sync.enable&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.hive_sync.table&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_hudi_table&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.hive_sync.partition_fields&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;creation_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.hive_sync.partition_extractor_class&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;org.apache.hudi.hive.MultiPartKeysValueExtractor&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.index.type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GLOBAL_BLOOM&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;                 &lt;span class="c1"&gt;# This is required if we want to ensure we upsert a record, even if the partition changes
&lt;/span&gt;    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.bloom.index.update.partition.path&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# This is required to write the data into the new partition (defaults to false in 0.8.0, true in 0.9.0)
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# Write a DataFrame as a Hudi dataset
&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;inputDF&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;org.apache.hudi&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.write.operation&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;insert&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;options&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;hudiOptions&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;overwrite&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3://&amp;lt;BUCKET&amp;gt;/tmp/myhudidataset_001/&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Updating one &lt;em&gt;partition&lt;/em&gt; row
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;lit&lt;/span&gt;

&lt;span class="n"&gt;updateDF&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inputDF&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;limit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;creation_date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2021-09-22&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;updateDF&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;org.apache.hudi&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hoodie.datasource.write.operation&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;upsert&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;options&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;hudiOptions&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3://&amp;lt;BUCKET&amp;gt;/tmp/myhudidataset_001/&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Resulting Parquet Files
&lt;/h3&gt;

&lt;p&gt;Now if we look at the Parquet files on S3, we can see that:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The old partition has a new Parquet file with the record removed&lt;/li&gt;
&lt;li&gt;There is a new partition with the single record
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws s3 &lt;span class="nb"&gt;ls &lt;/span&gt;s3://&amp;lt;BUCKET&amp;gt;/tmp/myhudidataset_001/

2021-09-23 11:45:23     434901 tmp/myhudidataset_001/2015-01-01/cd4b4b74-13f7-4c1e-a7ce-110bba8e16fd-0_0-404-90423_20210923184511.parquet
2021-09-23 11:45:44     434864 tmp/myhudidataset_001/2015-01-01/cd4b4b74-13f7-4c1e-a7ce-110bba8e16fd-0_0-442-103950_20210923184526.parquet
2021-09-23 11:45:23     434863 tmp/myhudidataset_001/2015-01-02/578ea02b-09f0-4952-afe5-94d44d158d29-0_1-404-90424_20210923184511.parquet
2021-09-23 11:45:43     434895 tmp/myhudidataset_001/2021-09-22/d67c9b50-1034-44b2-8ec9-2f3b1dcbf26c-0_1-442-103951_20210923184526.parquet
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Athena Compatibility
&lt;/h3&gt;

&lt;p&gt;We can also successfully query this dataset from Athena and see the updated data as well!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="nv"&gt;"default"&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nv"&gt;"my_hudi_table"&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9pg4jgfv7empx230ibgl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9pg4jgfv7empx230ibgl.png" alt="Athena Results" width="800" height="206"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Note the the different &lt;code&gt;_hoodie_file_name&lt;/code&gt; for record id &lt;code&gt;100&lt;/code&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Awesome! Now that I understand what's going on, it makes perfect sense. 🙌&lt;/p&gt;

</description>
      <category>aws</category>
      <category>hudi</category>
      <category>datalakes</category>
      <category>spark</category>
    </item>
    <item>
      <title>Continuous Deployment of Jupyter Notebooks</title>
      <dc:creator>Damon P. Cortesi</dc:creator>
      <pubDate>Wed, 14 Jul 2021 23:37:28 +0000</pubDate>
      <link>https://dev.to/dacort/continuous-deployment-of-jupyter-notebooks-4adh</link>
      <guid>https://dev.to/dacort/continuous-deployment-of-jupyter-notebooks-4adh</guid>
      <description>&lt;p&gt;This is a guide on how to use AWS CodePipeline to continuously deploy Jupyter notebooks to an S3-backed static website.&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;Since I started using &lt;a href="https://aws.amazon.com/emr/features/studio/" rel="noopener noreferrer"&gt;EMR Studio&lt;/a&gt;, I've been making more use of Jupyter notebooks as scratch pads and often want to be able to easily share the results of my research. I hunted around for a few different solutions and while there are some good ones like &lt;a href="https://nbconvert.readthedocs.io/en/latest/" rel="noopener noreferrer"&gt;nbconvert&lt;/a&gt; and &lt;a href="https://github.com/mwouts/jupytext/" rel="noopener noreferrer"&gt;jupytext&lt;/a&gt;, I wanted something a bit simpler and off-the-shelf. This post from &lt;a href="https://www.linkedin.com/in/mikkelhartmann/" rel="noopener noreferrer"&gt;Mikkel Hartmann&lt;/a&gt; about &lt;a href="http://mikkelhartmann.dk/2019/05/14/static-website-from-jupyter-notebooks.html" rel="noopener noreferrer"&gt;making a static website from Jupyter Notebooks&lt;/a&gt; led me to &lt;a href="https://www.mkdocs.org/" rel="noopener noreferrer"&gt;MkDocs&lt;/a&gt; and luckily, I came across &lt;a href="https://github.com/greenape/mknotebooks" rel="noopener noreferrer"&gt;mknotebooks&lt;/a&gt;, which offers a simple plugin for MkDocs. 😅&lt;/p&gt;

&lt;p&gt;So, by using a simple static site generator that's geared toward project documentation, and a plugin that renders Jupyter notebooks quite well, and a few fancy code pipelines...I can easily push my notebooks to production. Let's go!&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;

&lt;p&gt;This is the architecture we'll be implementing. This will all be built using the &lt;a href="https://aws.amazon.com/cdk/" rel="noopener noreferrer"&gt;AWS Cloud Development Kit&lt;/a&gt; (CDK).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1249zn5980hv3pnu67oz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1249zn5980hv3pnu67oz.png" alt="Jupyter Notebook Continuous Deployment Architecture" width="800" height="328"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We'll be creating the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;2 S3 buckets to store our logs and website artifacts&lt;/li&gt;
&lt;li&gt;A CodeCommmit repository that holds our site and notebooks&lt;/li&gt;
&lt;li&gt;A CodeBuild project that generates the static site&lt;/li&gt;
&lt;li&gt;A CodePipeline that is triggered by new commits, builds the site, and deploys it to S3&lt;/li&gt;
&lt;li&gt;A CloudFront Distribution that serves the site&lt;/li&gt;
&lt;li&gt;And optionally an ACM certificate if you want an alternate domain name&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I won't go into the details of the entire CDK stack, but instead will show how to deploy the CD pipeline.&lt;/p&gt;

&lt;h2&gt;
  
  
  Deploying
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Pre-requisites
&lt;/h3&gt;

&lt;p&gt;You'll need to have &lt;a href="https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html#getting_started_prerequisites" rel="noopener noreferrer"&gt;CDK installed&lt;/a&gt; (&amp;gt;= v1.107.0) and Python &amp;gt;= 3.9.&lt;/p&gt;

&lt;p&gt;I use &lt;a href="https://github.com/nodenv/nodenv" rel="noopener noreferrer"&gt;nodenv&lt;/a&gt; and &lt;a href="https://virtualenv.pypa.io/en/latest/" rel="noopener noreferrer"&gt;virtualenv&lt;/a&gt; for my respective environments.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# I use node 14.5.0&lt;/span&gt;
nodenv shell 14.5.0

&lt;span class="c"&gt;# And Python3&lt;/span&gt;
virtualenv &lt;span class="nt"&gt;-p&lt;/span&gt; python3.8 .venv
&lt;span class="nb"&gt;source&lt;/span&gt; .venv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Bootstrapping
&lt;/h3&gt;

&lt;p&gt;The source code is available in &lt;a href="https://github.com/dacort/jupyter-static-website" rel="noopener noreferrer"&gt;dacort/jupyter-static-website&lt;/a&gt;. In order to get started, we just need to clone that repo and deploy our CDK stack!&lt;/p&gt;

&lt;p&gt;This project is a two-phased deploy due to the fact that CloudFront certificates need to be in &lt;code&gt;us-east-1&lt;/code&gt;. If you &lt;em&gt;do not&lt;/em&gt; need a custom domain, you can skip the first part.&lt;/p&gt;

&lt;p&gt;First, clone the project and install the necessary requirements.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/dacort/jupyter-static-website.git
&lt;span class="nb"&gt;cd &lt;/span&gt;jupyter-static-website
pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-r&lt;/span&gt; requirements.txt
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You'll also need to &lt;a href="https://docs.aws.amazon.com/cdk/latest/guide/bootstrapping.html" rel="noopener noreferrer"&gt;bootstrap&lt;/a&gt; your AWS CDK environment in the account and region you want to deploy Part 2 in.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;cdk bootstrap aws://ACCOUNT-NUMBER-1/REGION-1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Part 1 - CloudFront Certificate
&lt;/h3&gt;

&lt;p&gt;&lt;em&gt;If you are not using a custom domain, skip to Part 2&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;This project only supports using the default CloudFront certificate and a DNS-validated CNAME. In order to generate the certificate, you'll need to run the command below, go into the &lt;a href="https://console.aws.amazon.com/acm/home?region=us-east-1#/" rel="noopener noreferrer"&gt;AWS Certificate Manager console&lt;/a&gt; and make sure you follow the validation instructions.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;cdk deploy CloudfrontCertificateStack &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="nv"&gt;domain_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;notebooks.example.com
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the domain is validated, the stack should finish provisioning. &lt;/p&gt;

&lt;p&gt;One of the outputs from this stack will be &lt;code&gt;CloudfrontCertificateStack.certificatearn&lt;/code&gt; - you'll need the value of this for the next phase.&lt;/p&gt;

&lt;h3&gt;
  
  
  Part 2 - Jupyter CD Pipeline
&lt;/h3&gt;

&lt;p&gt;&lt;em&gt;If you are not using a custom domain, you can omit both of the &lt;code&gt;-c&lt;/code&gt; options below.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;If you want to deploy to a different region, make sure you set the &lt;code&gt;AWS_DEFAULT_REGION&lt;/code&gt; environment variable.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;cdk deploy EmrStudioPublisherStack &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="nv"&gt;domain_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;notebooks.example.com &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="nv"&gt;certificate_arn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;arn:aws:acm:us-east-1:012345678912:certificate/f07b01a4-3e8c-4639-8a22-b7a20a832de3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once this stack finishes, you should have a CodeCommit repository you can make changes to, a CloudFront distribution, and a publicly accessible URL (found in the &lt;code&gt;EmrStudioPublisherStack.cloudfrontendpoint&lt;/code&gt; output) that has a pre-populated example site.&lt;/p&gt;

&lt;p&gt;The site will take a few minutes to deploy - you'll be able to keep an eye on the status in the &lt;a href="https://console.aws.amazon.com/codesuite/codepipeline/pipelines" rel="noopener noreferrer"&gt;CodePipeline console&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Usage
&lt;/h2&gt;

&lt;p&gt;Usage is pretty straight-forward. &lt;code&gt;git clone&lt;/code&gt; the repository, add a new notebook, and push it back up! If you're using EMR Studio, you can add your CodeCommit repository and make your changes to your Jupyter notebooks there.&lt;/p&gt;

&lt;p&gt;I made a video about &lt;a href="https://www.youtube.com/watch?v=ZdbUTxBjBIs" rel="noopener noreferrer"&gt;connecting to Git in EMR Studio&lt;/a&gt; that you might find useful.&lt;/p&gt;

&lt;p&gt;Any new notebooks added in the &lt;code&gt;site/docs/notebooks/&lt;/code&gt; directory will automatically be published.&lt;/p&gt;

&lt;p&gt;You can add links to the notebooks by updating the &lt;code&gt;nav&lt;/code&gt; section of the &lt;code&gt;mkdocs.yml&lt;/code&gt; file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;nav&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Home&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;index.md&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Notebooks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Oura Sleep Analysis&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;notebooks/damons_sleep.ipynb&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Intro to Data Processing on AWS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;notebooks/intro_data_processing_aws.ipynb&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This YAML config will generate a nav dropdown like so.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgdbgpem6y11d9n1a9hut.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgdbgpem6y11d9n1a9hut.png" alt="Navigation example" width="800" height="391"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Advanced Usage
&lt;/h3&gt;

&lt;p&gt;Note that not &lt;em&gt;all&lt;/em&gt; images or libraries render nicely when converting to HTML. This is why, for example, in my plotly example I had to use &lt;code&gt;fig.show(renderer="jupyterlab")&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;In addition, if you paste multiple images into your notebook's Markdown, mknotebooks &lt;a href="https://github.com/greenape/mknotebooks/issues/466" rel="noopener noreferrer"&gt;only renders one of them&lt;/a&gt;. In order to work around this, I added a pre-build step that uniquify's all the image attachments in Markdown cells.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>jupyter</category>
      <category>python</category>
      <category>devops</category>
    </item>
    <item>
      <title>Build your own Air Quality Map with OpenAQ and EMR on EKS</title>
      <dc:creator>Damon P. Cortesi</dc:creator>
      <pubDate>Fri, 09 Jul 2021 18:48:52 +0000</pubDate>
      <link>https://dev.to/dacort/build-your-own-air-quality-map-with-openaq-and-emr-on-eks-4nl3</link>
      <guid>https://dev.to/dacort/build-your-own-air-quality-map-with-openaq-and-emr-on-eks-4nl3</guid>
      <description>&lt;p&gt;Fire season is closely approaching and as somebody that spent two weeks last year hunkered down inside with my browser glued to various air quality sites, I wanted to show how to use data from OpenAQ to build your own air quality analysis. &lt;/p&gt;

&lt;p&gt;With Amazon EMR on EKS, you can now &lt;a href="https://aws.amazon.com/blogs/aws/customize-and-package-dependencies-with-your-apache-spark-applications-on-amazon-emr-on-amazon-eks/" rel="noopener noreferrer"&gt;customize and package your own Apache Spark dependencies&lt;/a&gt; and I use that functionality for this post.&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;OpenAQ maintains a &lt;a href="https://registry.opendata.aws/openaq/" rel="noopener noreferrer"&gt;publicly accessible dataset of various air quality metrics&lt;/a&gt; that's updated every half hour. &lt;a href="https://docs.bokeh.org/en/latest/index.html" rel="noopener noreferrer"&gt;Bokeh&lt;/a&gt; is a popular library for Python data visualization. While it includes sample data for US county and state boundaries, we're going to use &lt;a href="https://www.census.gov/geographies/mapping-files/time-series/geo/cartographic-boundary.html" rel="noopener noreferrer"&gt;shapefiles from census.gov&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;We'll use an Apache Spark job on EMR on EKS to read the initial dataset from the S3 bucket, filter it for use case, and then combine it with the boundary data from census.gov in order to draw a map of the current air quality.&lt;/p&gt;

&lt;p&gt;This post also shows how to use the custom containers support in EMR on EKS to build our own container image with the necessary dependencies.&lt;/p&gt;

&lt;h2&gt;
  
  
  Pre-requisites
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;An AWS account with access to Amazon Elastic Container Registry (ECR)&lt;/li&gt;
&lt;li&gt;An EMR on EKS cluster already setup&lt;/li&gt;
&lt;li&gt;Docker&lt;/li&gt;
&lt;li&gt;A container registry to push your image to&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Building the EMR on EKS Container Image
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Download the EMR base image
&lt;/h3&gt;

&lt;p&gt;For this post, we'll be using the &lt;code&gt;us-west-2&lt;/code&gt; region and EMR 6.3.0 release. Each region and release has a different base image URL, and you can find the full list &lt;a href="https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-tag.html" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws ecr get-login-password &lt;span class="nt"&gt;--region&lt;/span&gt; us-west-2 &lt;span class="se"&gt;\&lt;/span&gt;
    | docker login &lt;span class="nt"&gt;--username&lt;/span&gt; AWS &lt;span class="nt"&gt;--password-stdin&lt;/span&gt; 895885662937.dkr.ecr.us-west-2.amazonaws.com

docker pull 895885662937.dkr.ecr.us-west-2.amazonaws.com/notebook-spark/emr-6.3.0:latest
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Customize the image
&lt;/h3&gt;

&lt;p&gt;EMR on EKS comes with a variety of &lt;a href="https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-install-libraries-and-kernels.html" rel="noopener noreferrer"&gt;default libraries&lt;/a&gt; installed including plotly and seaborn, but we wanted to try out Bokeh for my illustration as they have a great &lt;a href="https://docs.bokeh.org/en/latest/docs/gallery/texas.html" rel="noopener noreferrer"&gt;choropleth example&lt;/a&gt; and it's a library I've been hearing about occasionally. I was hoping to use Bokeh's &lt;code&gt;sampledata&lt;/code&gt; for US and county, but I ended up using &lt;a href="https://geopandas.org/" rel="noopener noreferrer"&gt;GeoPandas&lt;/a&gt; to re-project my map to a conic projection so Michigan wasn't squashed up against Wisconsin. :) GeoPandas makes it easy to read in shapefiles, so I just used the census.gov provided state and county data. &lt;/p&gt;

&lt;p&gt;Bokeh also uses Selenium and Chrome for it's static image generation, so we go ahead and install Chrome on the container image as well.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;FROM&lt;/span&gt;&lt;span class="s"&gt; 895885662937.dkr.ecr.us-west-2.amazonaws.com/notebook-spark/emr-6.3.0:latest&lt;/span&gt;

&lt;span class="k"&gt;USER&lt;/span&gt;&lt;span class="s"&gt; root&lt;/span&gt;

&lt;span class="c"&gt;# Install Chrome&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;curl https://intoli.com/install-google-chrome.sh | bash &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nb"&gt;mv&lt;/span&gt; /usr/bin/google-chrome-stable /usr/bin/chrome

&lt;span class="c"&gt;# We need to upgrade pip in order to install pyproj&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;pip3 &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--upgrade&lt;/span&gt; pip

&lt;span class="c"&gt;# If you pip install as root, use this&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;pip3 &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nv"&gt;bokeh&lt;/span&gt;&lt;span class="o"&gt;==&lt;/span&gt;2.3.2 &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nv"&gt;boto3&lt;/span&gt;&lt;span class="o"&gt;==&lt;/span&gt;1.17.93 &lt;span class="se"&gt;\
&lt;/span&gt;    chromedriver-py&lt;span class="o"&gt;==&lt;/span&gt;91.0.4472.19.0 &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nv"&gt;geopandas&lt;/span&gt;&lt;span class="o"&gt;==&lt;/span&gt;0.9.0 &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nv"&gt;selenium&lt;/span&gt;&lt;span class="o"&gt;==&lt;/span&gt;3.141.0 &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nv"&gt;shapely&lt;/span&gt;&lt;span class="o"&gt;==&lt;/span&gt;1.7.1

&lt;span class="k"&gt;RUN &lt;/span&gt;&lt;span class="nb"&gt;ln&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; /usr/local/lib/python3.7/site-packages/chromedriver_py/chromedriver_linux64 /usr/local/bin/chromedriver

&lt;span class="c"&gt;# Install bokeh sample data to /usr/local/share&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;&lt;span class="nb"&gt;mkdir&lt;/span&gt; /root/.bokeh &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"sampledata_dir: /usr/local/share/bokeh"&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; /root/.bokeh/config &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;    bokeh sampledata

&lt;span class="c"&gt;# Also install census data into the image :)&lt;/span&gt;
&lt;span class="k"&gt;ADD&lt;/span&gt;&lt;span class="s"&gt; https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_state_500k.zip  /usr/local/share/bokeh/&lt;/span&gt;
&lt;span class="k"&gt;ADD&lt;/span&gt;&lt;span class="s"&gt; https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip /usr/local/share/bokeh/&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;&lt;span class="nb"&gt;chmod &lt;/span&gt;644 /usr/local/share/bokeh/cb&lt;span class="k"&gt;*&lt;/span&gt;.zip

&lt;span class="c"&gt;# This is a simple test to make sure generating the image works properly&lt;/span&gt;
&lt;span class="k"&gt;COPY&lt;/span&gt;&lt;span class="s"&gt; test /test/&lt;/span&gt;

&lt;span class="k"&gt;USER&lt;/span&gt;&lt;span class="s"&gt; hadoop:hadoop&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Build and push
&lt;/h3&gt;

&lt;p&gt;Great, we've customized our image – now we just need to build and push it to a container registery somewhere! For this post, I chose GitHub but you can use any container registry like ECR or DockerHub.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;The below commands assume you have a GitHub Personal Access Token that has access to push images in the &lt;code&gt;CR_PAT&lt;/code&gt; environment variable.&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker build &lt;span class="nt"&gt;-t&lt;/span&gt; emr-6.3.0-bokeh:latest &lt;span class="nb"&gt;.&lt;/span&gt;

&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;USERNAME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;GH_USERNAME
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$CR_PAT&lt;/span&gt;| docker login ghcr.io &lt;span class="nt"&gt;-u&lt;/span&gt; &lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;USERNAME&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt; &lt;span class="nt"&gt;--password-stdin&lt;/span&gt;
docker tag emr-6.3.0-bokeh:latest ghcr.io/&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;USERNAME&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;/emr-6.3.0-bokeh:latest
docker push ghcr.io/&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;USERNAME&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;/emr-6.3.0-bokeh:latest
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Great, now your image is ready to go! Let's look at the code we're going to use to generate our air quality map.&lt;/p&gt;

&lt;h2&gt;
  
  
  Code walkthrough
&lt;/h2&gt;

&lt;p&gt;If you already built your image, you can run the below code locally. In order to access S3 data, you'll have to set your &lt;code&gt;AWS_ACCESS_KEY_ID&lt;/code&gt; and &lt;code&gt;AWS_SECRET_KEY_ID&lt;/code&gt; environment variables.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;--rm&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; &lt;span class="nt"&gt;--name&lt;/span&gt; airq-demo &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; AWS_ACCESS_KEY_ID &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; AWS_SECRET_ACCESS_KEY &lt;span class="se"&gt;\&lt;/span&gt;
    emr-6.3.0-bokeh &lt;span class="se"&gt;\&lt;/span&gt;
    pyspark &lt;span class="nt"&gt;--deploy-mode&lt;/span&gt; client &lt;span class="nt"&gt;--master&lt;/span&gt; &lt;span class="s1"&gt;'local[1]'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Reading and filtering OpenAQ Data
&lt;/h3&gt;

&lt;p&gt;The first thing we need to do is read the the data for today's date into a Spark dataframe.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;

&lt;span class="n"&gt;date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;utcnow&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3://openaq-fetches/realtime-gzipped/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;date&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;show&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+--------------------+---------------+---------+--------------------+-------+--------------------+--------------------+------+---------+-----------------+----------+-----+-----+
|         attribution|averagingPeriod|     city|         coordinates|country|                date|            location|mobile|parameter|       sourceName|sourceType| unit|value|
+--------------------+---------------+---------+--------------------+-------+--------------------+--------------------+------+---------+-----------------+----------+-----+-----+
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-13T22:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 25.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-13T23:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 16.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T00:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 18.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T01:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 23.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T02:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 23.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T03:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 21.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T04:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 20.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T05:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 16.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T06:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 17.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T07:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 18.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T08:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 20.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T09:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 26.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T10:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 29.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T11:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 34.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T12:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 33.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T13:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 40.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T14:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 39.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T15:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 41.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T16:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 50.0|
|[{EPA AirNow DOS,...|   {hours, 1.0}|Abu Dhabi|{24.424399, 54.43...|     AE|{2021-06-14T17:00...|US Diplomatic Pos...| false|     pm25|StateAir_AbuDhabi|government|µg/m³| 56.0|
+--------------------+---------------+---------+--------------------+-------+--------------------+--------------------+------+---------+-----------------+----------+-----+-----+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can quickly see a few things:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Data is provided from all over the globe, we just want US&lt;/li&gt;
&lt;li&gt;We have coordinates and country, but that's it for location data&lt;/li&gt;
&lt;li&gt;There are multiple different types of readings&lt;/li&gt;
&lt;li&gt;There are multiple different readings per day per location
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;unit&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;parameter&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;distinct&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;sort&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;parameter&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;show&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+-----+---------+                                                               
| unit|parameter|
+-----+---------+
|µg/m³|       bc|
|µg/m³|       co|
|  ppm|       co|
|  ppm|      no2|
|µg/m³|      no2|
|µg/m³|       o3|
|  ppm|       o3|
|µg/m³|     pm10|
|µg/m³|     pm25|
|  ppm|      so2|
|µg/m³|      so2|
+-----+---------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So, let's go ahead and filter down to the most recent PM2.5 reading in the United States.&lt;/p&gt;

&lt;p&gt;To do that, it's a couple &lt;code&gt;where&lt;/code&gt; filters and then we can utilize a window function (&lt;code&gt;last&lt;/code&gt;) to get the last reading.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Filter down to US locations and PM2.5 readings only
&lt;/span&gt;&lt;span class="n"&gt;usdf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;where&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;country&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;US&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;where&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;parameter&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;pm25&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;coordinates&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;parameter&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;unit&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;location&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Retrieve the most recent pm2.5 reading per county
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.window&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Window&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;last&lt;/span&gt;
&lt;span class="n"&gt;windowSpec&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;Window&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;location&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;orderBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;date.utc&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;rangeBetween&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Window&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;unboundedPreceding&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Window&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;unboundedFollowing&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;last_reading_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;usdf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;last&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;over&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowSpec&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;coordinates&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;distinct&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;last_reading_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;show&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We also only selected the &lt;code&gt;coordinates&lt;/code&gt; and &lt;code&gt;last_value&lt;/code&gt; columns as these are all we need at this point.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+--------------------+----------+                                               
|         coordinates|last_value|
+--------------------+----------+
|{38.6619, -121.7278}|       2.0|
| {41.9767, -91.6878}|       4.9|
|{39.54092, -119.7...|       8.0|
|{43.629605, -72.3...|       9.0|
|{46.8505, -111.98...|      10.0|
|{39.818715, -75.4...|       8.5|
+--------------------+----------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Mapping coordinates to counties
&lt;/h3&gt;

&lt;p&gt;This was the most "fun" part of this journey. Bokeh provides some sample data and I initially just created a UDF that looked up the first county ID using the Polygon &lt;code&gt;intersects&lt;/code&gt; method. Unfortunately, I then wanted to re-project the map to a conical projection (Albers). Bokeh's geo support isn't very strong, so I ended up looking at using GeoPandas to do the reprojection. That worked well, but the Bokeh county data wasn't in a format I could use with GeoPandas so I ended up downloading &lt;a href="https://www.census.gov/geographies/mapping-files/time-series/geo/cartographic-boundary.html" rel="noopener noreferrer"&gt;Shapefiles from the Census Bureau&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;So, we've got our &lt;code&gt;last_reading_df&lt;/code&gt; dataframe. Lets map those coordinates to counties. The county data is relatively small (12mb zipped) so what I did was create a broadcast variable of &lt;code&gt;GEOID&lt;/code&gt; -&amp;gt; &lt;code&gt;Geometry&lt;/code&gt; mappings that could be used in a UDF to figure out if a PM2.5 reading is inside a specific county.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Download the census data and create a broadcast variable
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;geopandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;gpd&lt;/span&gt;

&lt;span class="n"&gt;COUNTY_URL&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

&lt;span class="n"&gt;countydf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gpd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;COUNTY_URL&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;bc_county&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sc&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;broadcast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;zip&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;countydf&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GEOID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;countydf&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;geometry&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])))&lt;/span&gt;

&lt;span class="n"&gt;countydf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;head&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can see we're just mapping the &lt;code&gt;GEOID&lt;/code&gt; column to the &lt;code&gt;geometry&lt;/code&gt; column which is a polygon object containing the county boundaries.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  STATEFP COUNTYFP  COUNTYNS        AFFGEOID  GEOID       NAME          NAMELSAD STUSPS  STATE_NAME LSAD       ALAND     AWATER                                           geometry
0      21      141  00516917  0500000US21141  21141      Logan      Logan County     KY    Kentucky   06  1430224002   12479211  POLYGON ((-87.06037 36.68085, -87.06002 36.708...
1      36      081  00974139  0500000US36081  36081     Queens     Queens County     NY    New York   06   281594050  188444349  POLYGON ((-73.96262 40.73903, -73.96243 40.739...
2      34      017  00882278  0500000US34017  34017     Hudson     Hudson County     NJ  New Jersey   06   119640822   41836491  MULTIPOLYGON (((-74.04220 40.69997, -74.03900 ...
3      34      019  00882228  0500000US34019  34019  Hunterdon  Hunterdon County     NJ  New Jersey   06  1108086284   24761598  POLYGON ((-75.19511 40.57969, -75.19466 40.581...
4      21      147  00516926  0500000US21147  21147   McCreary   McCreary County     KY    Kentucky   06  1105416696   10730402  POLYGON ((-84.77845 36.60329, -84.73068 36.665...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Create a UDF to find the county a coordinate is in&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This just brute forces the list of GEOIDs/polygons and returns the first GEOID that intersects. There is likely a more elegant to do this.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;udf&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.types&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;StringType&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;shapely.geometry&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Point&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;find_first_county_id&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;longitude&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;latitude&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;float&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Point&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;longitude&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;latitude&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;geo&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;bc_county&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;geo&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;intersects&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;


&lt;span class="n"&gt;find_first_county_id_udf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;find_first_county_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringType&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Now we apply to the UDF to our &lt;code&gt;last_reading_df&lt;/code&gt; dataframe
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Find the county that this reading is from
&lt;/span&gt;&lt;span class="n"&gt;mapped_county_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;last_reading_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GEOID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nf"&gt;find_first_county_id_udf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;last_reading_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;coordinates&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;longitude&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;last_reading_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;coordinates&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;latitude&lt;/span&gt;
    &lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GEOID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;And then finally we calculate the average PM2.5 value per county
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Calculate the average reading per county
&lt;/span&gt;&lt;span class="n"&gt;pm_avg_by_county&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;mapped_county_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GEOID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;avg&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;withColumnRenamed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;avg(last_value)&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;avg_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;pm_avg_by_county&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+-----+------------------+                                                      
|GEOID|         avg_value|
+-----+------------------+
|31157|              16.0|
|49053|               3.0|
|26153|               6.9|
|36029|               1.1|
|42101|             10.66|
+-----+------------------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Cool! So now we have a &lt;code&gt;GEOID&lt;/code&gt; we can use in our GeoPandas dataframe and an average value of the most recent PM2.5 reading for that county. &lt;/p&gt;

&lt;h3&gt;
  
  
  Generating our Air Quality map
&lt;/h3&gt;

&lt;p&gt;Now that we've got an average PM2.5 value per county, we need to join this with our map data and generate an image!&lt;/p&gt;

&lt;p&gt;The first step is reading in US State and County shapefiles. We fetched these from census.gov while building the image and they're stored in &lt;code&gt;/usr/local/share/bokeh&lt;/code&gt;. We also exclude any state not in the continental US.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;geopandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;gpd&lt;/span&gt;

&lt;span class="n"&gt;STATE_FILE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;file:///usr/local/share/bokeh/cb_2020_us_state_500k.zip&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;COUNTY_FILE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;file:///usr/local/share/bokeh/cb_2020_us_county_500k.zip&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;EXCLUDED_STATES&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AK&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;HI&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PR&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GU&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;VI&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;MP&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AS&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="n"&gt;county_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gpd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;COUNTY_FILE&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STUSPS not in &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;EXCLUDED_STATES&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;state_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gpd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;STATE_FILE&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STUSPS not in &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;EXCLUDED_STATES&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we just do a simple &lt;code&gt;merge&lt;/code&gt; on the GeoPandas dataframe, convert our maps to the Albers projection and save them as JSON objects.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Merge in our air quality data
&lt;/span&gt;&lt;span class="n"&gt;county_aqi_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;county_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;merge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pm_avg_by_county&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;toPandas&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;on&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GEOID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Convert to a "proper" Albers projection :)
&lt;/span&gt;&lt;span class="n"&gt;state_json&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;state_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_crs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ESRI:102003&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;to_json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;county_json&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;county_aqi_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_crs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ESRI:102003&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;to_json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now comes the fun part! Our data is all prepped, we've averaged the most recent data by county, and built a GeoJSON file of everything we need. Let's map it!&lt;/p&gt;

&lt;p&gt;I won't go into the details of every line, but we'll make use of Bokeh's awesome &lt;code&gt;GeoJSONDataSource&lt;/code&gt; functionality, add a &lt;code&gt;LinearColorMapper&lt;/code&gt; that automatically shades the counties for us by the &lt;code&gt;avg_value&lt;/code&gt; column using the &lt;code&gt;Reds9&lt;/code&gt; palette, and adds a &lt;code&gt;ColorBar&lt;/code&gt; on the right-hand side.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bokeh.models&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ColorBar&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;GeoJSONDataSource&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;LinearColorMapper&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bokeh.palettes&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Reds9&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;palette&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bokeh.plotting&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;figure&lt;/span&gt;

&lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;figure&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;title&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;US Air Quality Data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;plot_width&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;plot_height&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;700&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;toolbar_location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;x_axis_location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;y_axis_location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;tooltips&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;County&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;@NAME&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Air Quality Index&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;@avg_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;grid&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;grid_line_color&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

&lt;span class="c1"&gt;# This just adds our state lines
&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;patches&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;xs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ys&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;fill_alpha&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;line_color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;black&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;line_width&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;GeoJSONDataSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;geojson&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;state_json&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Add our county data and shade them based on "avg_value"
&lt;/span&gt;&lt;span class="n"&gt;color_mapper&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LinearColorMapper&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;palette&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;tuple&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;reversed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;palette&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;
&lt;span class="n"&gt;color_column&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;avg_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;patches&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;xs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ys&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;fill_alpha&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.7&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;fill_color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;field&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;color_column&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;transform&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;color_mapper&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;line_color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;black&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;line_width&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;GeoJSONDataSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;geojson&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;county_json&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Now add a color bar legend on the right-hand side
&lt;/span&gt;&lt;span class="n"&gt;color_bar&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ColorBar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;color_mapper&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;color_mapper&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;label_standoff&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;width&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_layout&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;color_bar&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;right&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, let's go ahead export the png!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bokeh.io&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;export_png&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;bokeh.io.webdriver&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_chromium_webdriver&lt;/span&gt;

&lt;span class="n"&gt;driver&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_chromium_webdriver&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;--no-sandbox&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="nf"&gt;export_png&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;map.png&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;webdriver&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, if you're running on a mac, you can just copy the generated map to your local system and open it up!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;airq-demo:/home/hadoop/map.png &lt;span class="nb"&gt;.&lt;/span&gt;
open map.png
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhk7q3vpf99k5w4su3myo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhk7q3vpf99k5w4su3myo.png" alt="Air Quality map" width="800" height="509"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Running on EMR on EKS
&lt;/h3&gt;

&lt;p&gt;I've bundled this all up into a pyspark script in my &lt;code&gt;demo-code&lt;/code&gt; repo. &lt;/p&gt;

&lt;p&gt;This demo assumes you already have an EMR on EKS virtual cluster up and running, you've built the image in the first part and pushed it to a container registry, and the IAM Role you use to run the job has access to both read and write to an S3 bucket.&lt;/p&gt;

&lt;p&gt;First, download the &lt;code&gt;generate_aqi_map.py&lt;/code&gt; code from the GitHub repo.&lt;/p&gt;

&lt;p&gt;Then, upload that script to an S3 bucket you have access to.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws s3 &lt;span class="nb"&gt;cp &lt;/span&gt;generate_aqi_map.py s3://&amp;lt;BUCKET&amp;gt;/code/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, just run your job! The pyspark script takes a few parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;&amp;lt;S3_BUCKET&amp;gt;&lt;/code&gt; - The S3 bucket where you want to upload the generated image to&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;&amp;lt;PREFIX&amp;gt;&lt;/code&gt; - The prefix in the bucket where you want the image located&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;--date 2021-01-01&lt;/code&gt; (optional)  - A specific date for which you want to generate data for

&lt;ul&gt;
&lt;li&gt;Defaults to UTC today
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;S3_BUCKET&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&amp;lt;BUCKET_NAME&amp;gt;
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;EMR_EKS_CLUSTER_ID&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;abcdefghijklmno1234567890
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;EMR_EKS_EXECUTION_ARN&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;arn:aws:iam::123456789012:role/emr_eks_default_role
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Replace ghcr.io/OWNER/emr-6.3.0-bokeh:latest below with your image URL&lt;/span&gt;
aws emr-containers start-job-run &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--virtual-cluster-id&lt;/span&gt; &lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;EMR_EKS_CLUSTER_ID&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--name&lt;/span&gt; openaq-conus &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--execution-role-arn&lt;/span&gt; &lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;EMR_EKS_EXECUTION_ARN&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--release-label&lt;/span&gt; emr-6.3.0-latest &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--job-driver&lt;/span&gt; &lt;span class="s1"&gt;'{
        "sparkSubmitJobDriver": {
            "entryPoint": "s3://'&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;S3_BUCKET&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s1"&gt;'/code/generate_aqi_map.py",
            "entryPointArguments": ["'&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;S3_BUCKET&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s1"&gt;'", "output/airq/"],
            "sparkSubmitParameters": "--conf spark.kubernetes.container.image=ghcr.io/OWNER/emr-6.3.0-bokeh:latest"
        }
    }'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--configuration-overrides&lt;/span&gt; &lt;span class="s1"&gt;'{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": { "logUri": "s3://'&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;S3_BUCKET&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s1"&gt;'/logs/" }
        }
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"0000000abcdefg12345"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"openaq-conus"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"arn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"arn:aws:emr-containers:us-east-2:123456789012:/virtualclusters/abcdefghijklmno1234567890/jobruns/0000000abcdefg12345"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"virtualClusterId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"abcdefghijklmno1234567890"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;While the job is running, you can get the fetch the status of the job using the &lt;code&gt;emr-containers describe-job-run&lt;/code&gt; command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws emr-containers describe-job-run &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--virtual-cluster-id&lt;/span&gt; &lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;EMR_EKS_CLUSTER_ID&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--id&lt;/span&gt; 0000000abcdefg12345
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the job is in the &lt;code&gt;COMPLETED&lt;/code&gt; state, you should be able to copy the resulting image from your S3 bucket!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws s3 &lt;span class="nb"&gt;cp &lt;/span&gt;s3://&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;S3_BUCKET&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;/output/airq/2021-06-24-latest.png &lt;span class="nb"&gt;.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And if you open that file, you'll get the most recent PM2.5 readings!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhk7q3vpf99k5w4su3myo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhk7q3vpf99k5w4su3myo.png" alt="Air Quality map" width="800" height="509"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Wrapup
&lt;/h2&gt;

&lt;p&gt;Be sure to check out the &lt;a href="https://aws.amazon.com/blogs/aws/customize-and-package-dependencies-with-your-apache-spark-applications-on-amazon-emr-on-amazon-eks/" rel="noopener noreferrer"&gt;launch post&lt;/a&gt; for more details, the documentation for &lt;a href="https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images.html" rel="noopener noreferrer"&gt;customing docker images for EMR on EKS&lt;/a&gt;, and my &lt;a href="https://youtu.be/0x4DRKmNPfQ" rel="noopener noreferrer"&gt;demo video&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>kubernetes</category>
      <category>spark</category>
      <category>emr</category>
    </item>
  </channel>
</rss>
