<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Anand</title>
    <description>The latest articles on DEV Community by Anand (@anandp86).</description>
    <link>https://dev.to/anandp86</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F127289%2Fd4411b77-a219-4a63-bd86-af4ce1ee1591.JPG</url>
      <title>DEV Community: Anand</title>
      <link>https://dev.to/anandp86</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/anandp86"/>
    <language>en</language>
    <item>
      <title>Connect to AWS Aurora PostgreSQL/Amazon Redshift Database from AWS Lambda</title>
      <dc:creator>Anand</dc:creator>
      <pubDate>Fri, 05 Mar 2021 20:58:45 +0000</pubDate>
      <link>https://dev.to/anandp86/connect-to-aws-aurora-postgresql-amazon-redshift-database-from-aws-lambda-1kne</link>
      <guid>https://dev.to/anandp86/connect-to-aws-aurora-postgresql-amazon-redshift-database-from-aws-lambda-1kne</guid>
      <description>&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Vo4hWGPe--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/rv23ciie5kl5awezfw7e.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Vo4hWGPe--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/rv23ciie5kl5awezfw7e.jpg" alt="Alt Text" width="880" height="880"&gt;&lt;/a&gt;&lt;br&gt;
Image by &lt;a href="https://pixabay.com/users/peggy_marco-1553824/?utm_source=link-attribution&amp;amp;utm_medium=referral&amp;amp;utm_campaign=image&amp;amp;utm_content=1019769"&gt;Peggy und Marco Lachmann-Anke&lt;/a&gt; from &lt;a href="https://pixabay.com/?utm_source=link-attribution&amp;amp;utm_medium=referral&amp;amp;utm_campaign=image&amp;amp;utm_content=1019769"&gt;Pixabay&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this blog post I will discuss following scenarios to connect to databases from AWS Lambda function:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Connecting to Amazon Aurora PostgreSQL database &lt;strong&gt;&lt;em&gt;in private subnet&lt;/em&gt;&lt;/strong&gt; with public accessibility set to No &lt;strong&gt;&lt;em&gt;in same AWS account&lt;/em&gt;&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Connecting to &lt;strong&gt;&lt;em&gt;cross account&lt;/em&gt;&lt;/strong&gt; Amazon Redshift database &lt;strong&gt;&lt;em&gt;in public subnet&lt;/em&gt;&lt;/strong&gt; with public accessibility set to Yes.&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
&lt;span&gt;Connect to Amazon Aurora PostgreSQL&lt;/span&gt; &lt;span&gt;database in Private subnet with Public accessibility set to No in the same AWS account&lt;/span&gt;
&lt;/h4&gt;

&lt;p&gt;In this setup, Amazon Aurora PostgreSQL database is running in private subnet with public accessibility set to No. The connectivity and security detail are as follows:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/02/rds_db_security.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Gcyhh97L--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/02/rds_db_security.png%3Fw%3D1024" alt="" width="880" height="403"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To connect to the Aurora PostgreSQL database in a Private subnet you need to configure Lambda function in a Virtual Private Cloud (VPC). Lets go ahead and create the Lambda function - &lt;/p&gt;

&lt;p&gt;&lt;em&gt;AWS Service &amp;gt; Lambda &amp;gt; Functions &amp;gt; Create Function &amp;gt; Author from scratch&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Under Basic Information &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Enter the function name&lt;/li&gt;
&lt;li&gt;Choose the language of your preference to use. Here I selected Python 3.8.&lt;/li&gt;
&lt;li&gt;In the Permissions section you can keep the default - Create a new role with basic Lambda permissions&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Under Advanced settings&lt;/p&gt;

&lt;ul&gt;&lt;li&gt;Enter the details in Network&lt;ul&gt;
&lt;li&gt;Choose the VPC name in which the Aurora PostgreSQL database is running&lt;/li&gt;
&lt;li&gt;Select at least 2 Private subnets. To access private Amazon VPC resources, such DB instance you need to associate your Lambda function with one or more private subnets.  In case you select public subnets instead of Private subnet, the Lambda function will time out as they cannot have public IP addresses.&lt;/li&gt;
&lt;li&gt;Choose the default security group&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;&lt;/ul&gt;

&lt;p&gt;Once you have completed the details, click on Create function. The function creation can take some time as Lambda creates ENI (elastic network interface) in each subnet of the VPC configuration. An ENI represents a virtual network cards and you can read more &lt;a href="https://docs.aws.amazon.com/vpc/latest/userguide/VPC_ElasticNetworkInterfaces.html" rel="noreferrer noopener"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In this case the Lambda function is launched with an execution role (IAM role) having 2 managed policies attached by default:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;AWSLambdaBasicExecutionRole&lt;/li&gt;
&lt;li&gt;AWSLambdaVPCAccessExecutionRole&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In case you choose to use an existing role while creating the Lambda function make sure to attach &lt;em&gt;AWSLambdaVPCAccessExecutionRole&lt;/em&gt; policy. This managed policy has the following permissions which Lambda uses to create and manage network interfaces:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;ec2:CreateNetworkInterface&lt;/li&gt;
&lt;li&gt;ec2:DescribeNetworkInterfaces&lt;/li&gt;
&lt;li&gt;ec2:DeleteNetworkInterface&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Below is the VPC configuration for my Lambda function attached with 2 Private subnets:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/01/screen-shot-2021-01-31-at-4.39.48-pm.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jzDp6Jj6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/01/screen-shot-2021-01-31-at-4.39.48-pm.png%3Fw%3D1024" alt="" width="880" height="461"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After creating the lambda function, I used the below code to connect and execute SQL against the Amazon Aurora PostgreSQL database. The code installs the PostgreSQL interface library &lt;a href="https://pypi.org/project/pg8000/" rel="noreferrer noopener"&gt;pg8000&lt;/a&gt; to interact with the database. The handler creates a connection to the PostgreSQL database, executes a SELECT sql to fetch the current timestamp from the database into results variable and returns the results as string.&lt;/p&gt;

&lt;pre class="wp-block-syntaxhighlighter-code"&gt;import sys
import boto3
import logging
import urllib.parse
from pip._internal import main

# install pg8000
main(['install', '-I', '-q', 'pg8000', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check'])
sys.path.insert(0,'/tmp/')
    
import pg8000

def lambda_handler(event, context):
    
    sql = """SELECT current_timestamp"""
    
    conn = pg8000.connect(
        database='demodb',
        user='admin',
        password='xxxxxxx',
        host='cluster-demodb.cluster-cijke9kklkrh.us-east-1.rds.amazonaws.com',
        port=8192,
        ssl_context=True
        )
        
    dbcur = conn.cursor()
    dbcur.execute(sql)
    results = dbcur.fetchall()
    dbcur.close()
    
    return str(results)&lt;/pre&gt;

&lt;p&gt;  Output: &lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/01/screen-shot-2021-01-31-at-4.54.42-pm.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--bzC2tFj_--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/01/screen-shot-2021-01-31-at-4.54.42-pm.png%3Fw%3D1024" alt="" width="880" height="152"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;&lt;span&gt;Connect to cross account Amazon Redshift database in Public subnet with Publicly accessible set to Yes &lt;/span&gt;&lt;/h4&gt;

&lt;p&gt;Below is Amazon Redshift connection details running in &lt;strong&gt;&lt;em&gt;Account A&lt;/em&gt;&lt;/strong&gt;. The database is running in public subnet and is publicly accessible. The security groups acts as virtual firewall for the cluster to control inbound and outbound traffic.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-01-31-at-9.31.30-pm.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--uwc2HG_5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-01-31-at-9.31.30-pm.png%3Fw%3D1024" alt="" width="880" height="427"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In &lt;strong&gt;&lt;em&gt;Account B&lt;/em&gt;&lt;/strong&gt; I have created a new VPC "Cross-Account-Lambda-VPC" to test this use-case. To create the Lambda function I followed the same steps as mentioned in the previous section except that the VPC selected in this case was "Cross-Account-Lambda-VPC" (VPC in Account B). The screenshot below is of Lambda function from Account B which has 2 private subnets added.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/01/screen-shot-2021-01-31-at-9.40.14-pm-1.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--MpPOXyt1--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/01/screen-shot-2021-01-31-at-9.40.14-pm-1.png%3Fw%3D1024" alt="" width="880" height="498"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The private subnets are attached to a route table. The route table needs a NAT Gateway attached. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-02-28-at-6.33.16-pm-1.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Ebc87P_e--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-02-28-at-6.33.16-pm-1.png%3Fw%3D964" alt="" width="880" height="419"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The NAT gateway resides in a public subnet and has an Elastic IP (EIP) associated with it which acts as a public IP address and can connect to the internet through the VPC's internet gateway. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-02-28-at-6.44.39-pm.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--_eBj6Zxh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-02-28-at-6.44.39-pm.png%3Fw%3D1024" alt="" width="880" height="364"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This EIP address needs to be added to the inbound rules of security group attached with the Amazon Redshift database in Account A as shown below. With this configuration settings the lambda function will be able to connect to the database in cross account.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-02-28-at-7.19.20-pm.png"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---mqr8kO4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2021/02/screen-shot-2021-02-28-at-7.19.20-pm.png%3Fw%3D1024" alt="" width="880" height="440"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For now to conclude, in this blog post we reviewed step by step how we can setup the Lambda function to connect to database running in private subnet in the same AWS account and to connect to cross account database running on public subnet. In the next blog post I will show few libraries which I played around with to connect and query the cross account Redshift database from Lambda function.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>database</category>
      <category>datascience</category>
    </item>
    <item>
      <title>New Features in Amazon DynamoDB - PartiQL, Export to S3, Integration with Kinesis Data Streams</title>
      <dc:creator>Anand</dc:creator>
      <pubDate>Tue, 15 Dec 2020 06:45:34 +0000</pubDate>
      <link>https://dev.to/anandp86/new-features-in-amazon-dynamodb-partiql-export-to-s3-integration-with-kinesis-data-streams-54eg</link>
      <guid>https://dev.to/anandp86/new-features-in-amazon-dynamodb-partiql-export-to-s3-integration-with-kinesis-data-streams-54eg</guid>
      <description>&lt;p&gt;Every time with AWS re:Invent around, AWS releases many new features over a period of month. In this blog post I will touch on 3 new features which were introduced for Amazon DynamoDB. DynamoDB is a non-relational managed database with single digit millisecond performance at any scale.&lt;/p&gt;

&lt;p&gt;New Features in Amazon DynamoDB - &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;PartiQL - SQL-compatible query language for Amazon DynamoDB.&lt;/li&gt;
&lt;li&gt;Export to S3 - Export Amazon DynamoDB table to S3. In this blog I have added a use-case of deserializing the DynamoDB items, writing it to S3 and query using Athena.&lt;/li&gt;
&lt;li&gt;Direct integration of DynamoDB with Kinesis Streams - Stream item-level images of Amazon DynamoDB as a Kinesis Data Stream.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;To start with, lets look at the new Amazon DynamoDB console. I have 2 DDB tables to play around with for this blog. The Books table is partitioned by Author and sorted by Title. The Movies table is partitioned by year and sorted by title.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/11/screen-shot-2020-11-23-at-9.18.58-pm.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F11%2Fscreen-shot-2020-11-23-at-9.18.58-pm.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Lets jump on to the features:&lt;/p&gt;

&lt;ol&gt;&lt;li&gt;
&lt;strong&gt;&lt;span&gt;PartiQL&lt;/span&gt;&lt;/strong&gt; - You can use SQL to select, insert, update and delete items from Amazon DynamoDB.  Currently you can use PartiQL for DynamoDB from the Amazon DynamoDB console, the AWS Command Line Interface (AWS CLI), and DynamoDB APIs. For this blog, I am using the AWS console. &lt;/li&gt;&lt;/ol&gt;

&lt;p&gt;DynamoDB &amp;gt; PartiQL editor&lt;/p&gt;

&lt;p&gt;&lt;span&gt;SELECT SQLs -&lt;/span&gt; &lt;/p&gt;

&lt;p&gt;Simple select SQL&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;SELECT * FROM Books where Author='William Shakespeare'&lt;/code&gt;&lt;/pre&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Title&lt;/td&gt;
&lt;td&gt;Formats&lt;/td&gt;
&lt;td&gt;Author&lt;/td&gt;
&lt;td&gt;Category&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Hamlet&lt;/td&gt;
&lt;td&gt;{ "Hardcover" : { "S" : "GVJZQ7JK" }, "Paperback" : { "S" : "A4TFUR98" }, "Audiobook" : { "S" : "XWMGHW96" } }&lt;/td&gt;
&lt;td&gt;William Shakespeare&lt;/td&gt;
&lt;td&gt;Drama&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;&lt;/table&gt;&lt;/div&gt;





&lt;p&gt;The following SQL returns the title, hardcover and category using key path -&lt;/p&gt;





&lt;pre&gt;&lt;code&gt;SELECT Title, Formats['Hardcover'], category FROM Books where Author='John Grisham'&lt;/code&gt;&lt;/pre&gt;





&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Category&lt;/td&gt;
&lt;td&gt;Title&lt;/td&gt;
&lt;td&gt;Hardcover&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Suspense&lt;/td&gt;
&lt;td&gt;The Firm&lt;/td&gt;
&lt;td&gt;Q7QWE3U2&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Suspense&lt;/td&gt;
&lt;td&gt;The Rainmaker&lt;/td&gt;
&lt;td&gt;J4SUKVGU&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Thriller&lt;/td&gt;
&lt;td&gt;The Reckoning&lt;/td&gt;
&lt;td&gt;null&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;&lt;/table&gt;&lt;/div&gt;





&lt;p&gt;The following SQL uses "contains" function which returns TRUE if attribute category has string 'Suspense' - &lt;/p&gt;





&lt;pre&gt;&lt;code&gt;SELECT Title, Formats['Audiobook'], Category FROM Books where Author='John Grisham' and contains(Category, 'Suspense')&lt;/code&gt;&lt;/pre&gt;





&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;&lt;tr&gt;
&lt;th&gt;&lt;strong&gt;year&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;title&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;release_date&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;rank&lt;/strong&gt;&lt;/th&gt;
&lt;/tr&gt;&lt;/thead&gt;
&lt;tbody&gt;&lt;tr&gt;
&lt;td&gt;2011&lt;/td&gt;
&lt;td&gt;Sherlock Holmes: A Game of Shadows&lt;/td&gt;
&lt;td&gt;2011-12-10T00:00:00Z&lt;/td&gt;
&lt;td&gt;570&lt;/td&gt;
&lt;/tr&gt;&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;&lt;span&gt;INSERT SQL - &lt;/span&gt;&lt;/p&gt;

&lt;p&gt;Insert a single item -&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;&lt;strong&gt;INSERT&lt;/strong&gt; INTO Books value {'Title' : 'A time to kill', 'Author' : 'John Grisham', 'Category' : 'Suspense' }

SELECT * FROM Books WHERE Title='A time to kill'&lt;/code&gt;&lt;/pre&gt;



&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;&lt;tr&gt;
&lt;th&gt;&lt;strong&gt;Author&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Title&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Category&lt;/strong&gt;&lt;/th&gt;
&lt;/tr&gt;&lt;/thead&gt;
&lt;tbody&gt;&lt;tr&gt;
&lt;td&gt;John Grisham&lt;/td&gt;
&lt;td&gt;A time to kill&lt;/td&gt;
&lt;td&gt;Suspense&lt;/td&gt;
&lt;/tr&gt;&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;



&lt;p id="block-fd0bc930-3997-46f2-a705-2af32d6f4501"&gt;"INSERT INTO SELECT" SQL fails with ValidationException: Unsupported operation: Inserting  multiple items in a single statement is not supported, use "INSERT INTO  tableName VALUE item" instead&lt;/p&gt;



&lt;p&gt;&lt;span&gt;UPDATE SQL -&lt;/span&gt; &lt;/p&gt;



&lt;p id="block-7de45d7f-d34a-4bc7-ae12-5051e2a7b9d5"&gt;In the previous insert sql, Formats column was null. So lets update the Formats column for the book.&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;&lt;strong&gt;UPDATE&lt;/strong&gt; Books &lt;strong&gt;SET&lt;/strong&gt; Formats={'Hardcover':'J4SUKVGU' ,'Paperback': 'D7YF4FCX'} WHERE Author='John Grisham' and Title='A time to kill'&lt;/code&gt;&lt;/pre&gt;



&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;&lt;tr&gt;
&lt;th&gt;&lt;strong&gt;Title&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Formats&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Author&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Category&lt;/strong&gt;&lt;/th&gt;
&lt;/tr&gt;&lt;/thead&gt;
&lt;tbody&gt;&lt;tr&gt;
&lt;td&gt;A time to kill&lt;/td&gt;
&lt;td&gt;{"Hardcover":{"S":"J4SUKVGU"},"Paperback":{"S":"D7YF4FCX"}}&lt;/td&gt;
&lt;td&gt;John Grisham&lt;/td&gt;
&lt;td&gt;Suspense&lt;/td&gt;
&lt;/tr&gt;&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;



&lt;p&gt;You can use update sql to remove key from map - &lt;/p&gt;



&lt;pre&gt;&lt;code&gt;UPDATE Books REMOVE Formats.Paperback WHERE Author='John Grisham' and Title='A time to kill'&lt;/code&gt;&lt;/pre&gt;



&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;&lt;tr&gt;
&lt;th&gt;&lt;strong&gt;Title&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Formats&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Author&lt;/strong&gt;&lt;/th&gt;
&lt;th&gt;&lt;strong&gt;Category&lt;/strong&gt;&lt;/th&gt;
&lt;/tr&gt;&lt;/thead&gt;
&lt;tbody&gt;&lt;tr&gt;
&lt;td&gt;A time to kill&lt;/td&gt;
&lt;td&gt;{"Hardcover":{"S":"J4SUKVGU"}}&lt;/td&gt;
&lt;td&gt;John Grisham&lt;/td&gt;
&lt;td&gt;Suspense&lt;/td&gt;
&lt;/tr&gt;&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;



&lt;p&gt;&lt;span&gt;DELETE SQL - &lt;/span&gt;&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;DELETE FROM Books WHERE Author='John Grisham' and Title='A time to kill'&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;For more references - &lt;a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html" rel="noreferrer noopener"&gt;https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;2. &lt;strong&gt;&lt;span&gt;EXPORT TO S3&lt;/span&gt;&lt;/strong&gt; - Export full Amazon DynamoDB table to Amazon S3 bucket.  Using DynamoDB table export, you can export data from an Amazon DynamoDB table from any time within your point-in-time recovery window. To do so the table must have point in time recovery (PITR) enabled. If PITR is not enabled for the table, the Export to S3 will report error asking it to be enabled.&lt;/p&gt;



&lt;p&gt;DynamoDB &amp;gt; Exports to S3&lt;/p&gt;



&lt;a href="https://aprakash.files.wordpress.com/2020/11/screen-shot-2020-11-23-at-11.03.16-pm.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F11%2Fscreen-shot-2020-11-23-at-11.03.16-pm.png%3Fw%3D792" alt=""&gt;&lt;/a&gt;



&lt;a href="https://aprakash.files.wordpress.com/2020/12/screen-shot-2020-12-02-at-11.46.36-am-1.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fscreen-shot-2020-12-02-at-11.46.36-am-1.png%3Fw%3D819" alt=""&gt;&lt;/a&gt;



&lt;p&gt;After the export is complete it generates a manifest-summary.json file summarizing the export details and a manifest-file.json containing the details of S3 file locations.&lt;/p&gt;



&lt;p&gt;Further to extend on Export to S3 feature to perform analytics and complex queries on your data you can create a data pipeline to deserialize the DynamoDB JSON format and query data from AWS Athena.&lt;/p&gt;



&lt;a href="https://aprakash.files.wordpress.com/2020/12/ddb_pipeline_1-6.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fddb_pipeline_1-6.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;



&lt;p&gt;The workflow contains the following steps:&lt;/p&gt;



&lt;ul&gt;
&lt;li&gt;Time based CloudWatch event is triggered.&lt;/li&gt;
&lt;li&gt;This event triggers AWS Lambda function.&lt;/li&gt;
&lt;li&gt;DynamoDB Export to S3 is initiated.&lt;/li&gt;
&lt;li&gt;It writes the data in DynamoDB JSON format to S3 RAW bucket. The S3 objects are zipped json files.&lt;/li&gt;
&lt;li&gt;For each .json.gz file in S3 RAW bucket an event notification is set to trigger AWS Lambda.  The detail is show below in S3 Event to trigger AWS Lambda section.&lt;/li&gt;
&lt;li&gt;AWS Lambda reads the S3 object and deserializes the DynamoDB JSON format data using DynamoDB TypeDeserializer class. This class deserializes DynamoDB types to Python types. The deserialized data is written to S3 Content bucket in Parquet format. Code is in Lambda function code section.&lt;/li&gt;
&lt;li&gt;AWS Lambda function updates the table location in AWS Glue catalog.&lt;/li&gt;
&lt;li&gt;Query the data using AWS Athena.&lt;/li&gt;
&lt;/ul&gt;



&lt;p&gt;&lt;span&gt;S3 Event to trigger AWS Lambda &lt;/span&gt;&lt;/p&gt;



&lt;a href="https://aprakash.files.wordpress.com/2020/12/screen-shot-2020-12-02-at-11.48.08-am.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fscreen-shot-2020-12-02-at-11.48.08-am.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;



&lt;p&gt;&lt;span&gt;Lambda function code&lt;/span&gt; &lt;/p&gt;



&lt;pre class="wp-block-syntaxhighlighter-code"&gt;import io
import gzip
import json
import boto3
import uuid
import pandas as pd
import awswrangler as wr
from datetime import datetime
from urllib.parse import unquote_plus


def update_glue_table(*, database, table_name, new_location, region_name):
    """ Update AWS Glue non-partitioned table location
    """

    glue = boto3.client("glue", region_name=region_name)

    response = glue.get_table(
        DatabaseName=database, Name=table_name)

    table_input = response["Table"]
    current_location = table_input["StorageDescriptor"]["Location"]

    table_input.pop("UpdateTime", None)
    table_input.pop("CreatedBy", None)
    table_input.pop("CreateTime", None)
    table_input.pop("DatabaseName", None)
    table_input.pop("IsRegisteredWithLakeFormation", None)
    table_input.pop("CatalogId", None)

    table_input["StorageDescriptor"]["Location"] = new_location

    response = glue.update_table(
        DatabaseName=database,
        TableInput=table_input
    )

    return response
    

def lambda_handler(event, context): 
    
    """
    Uses class TypeDeserializer which deserializes DynamoDB types to Python types 
    
    Example - 

    raw data format :
        [{'ACTIVE': {'BOOL': True}, 'params': {'M': {'customer': {'S': 'TEST'}, 'index': {'N': '1'}}}}, ]
    deserialized data format:
        [{'ACTIVE': True, 'params': {'customer': 'TEST', 'index': Decimal('1')}}]

    """
        
    
    s3client = boto3.client('s3')
    athena_db = "default"
    athena_table = "movies"
    
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
 
        response = s3client.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read()
        with gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb') as fh:
            data = [json.loads(line) for line in fh]
                    
    all_data = []
    
    boto3.resource('dynamodb')
    deserializer = boto3.dynamodb.types.TypeDeserializer()
    for row in data:
        all_data.append({k: deserializer.deserialize(v) for k,v in row['Item'].items()})

    
    data_df = pd.DataFrame(all_data)
    
    dt = datetime.utcnow().strftime("%Y-%m-%d-%H-%M")
    s3_path="s3://%s/dynamodb/%s/content/dt=%s/" % (bucket, athena_table, dt)
    
    wr.s3.to_parquet(
        df=data_df,
        path=s3_path,
        dataset = True,
    )
    

    update_response = update_glue_table(
        database=athena_db, 
        table_name=athena_table, 
        new_location=s3_path,
        region_name="us-west-2")

    if update_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
        return (f"Successfully updated glue table location - {athena_db}.{athena_table}")
    else:
        return (f"Failed updating glue table location - {athena_db}.{athena_table}")

&lt;/pre&gt;

&lt;p&gt;&lt;span&gt;Query data from Athena&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;SELECT title, info.actors, info.rating, info.release_date, year FROM movies where title='Christmas Vacation'&lt;/code&gt;&lt;/pre&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;&lt;tr&gt;
&lt;th&gt;&lt;/th&gt;
&lt;th&gt;title&lt;/th&gt;
&lt;th&gt;actors&lt;/th&gt;
&lt;th&gt;rating&lt;/th&gt;
&lt;th&gt;release_date&lt;/th&gt;
&lt;th&gt;year&lt;/th&gt;
&lt;/tr&gt;&lt;/thead&gt;
&lt;tbody&gt;&lt;tr&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;Christmas Vacation&lt;/td&gt;
&lt;td&gt;[Chevy Chase, Beverly D'Angelo, Juliette Lewis]&lt;/td&gt;
&lt;td&gt;0.73&lt;/td&gt;
&lt;td&gt;1989-11-30T00:00:00Z&lt;/td&gt;
&lt;td&gt;1989&lt;/td&gt;
&lt;/tr&gt;&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;For more references - &lt;a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.html" rel="noreferrer noopener"&gt;https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.html&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;span&gt;&lt;strong&gt;3. DynamoDB integration with Kinesis Stream.&lt;/strong&gt;&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;DynamoDB Streams captures a time-ordered sequence of item-level modifications in a DynamoDB table. Earlier to publish DynamoDB data to S3 in near real time, one of the ways was to enable DynamoDB streams and use AWS Lambda function to forward data to Kinesis Firehose which published the data to S3. To do so you can use a handy package provided by aws labs - &lt;a rel="noreferrer noopener" href="https://github.com/awslabs/lambda-streams-to-firehose"&gt;https://github.com/awslabs/lambda-streams-to-firehose&lt;/a&gt; .&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/12/ddb_pipeline_1-1.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fddb_pipeline_1-1.png%3Fw%3D572" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With several such use cases, AWS has now integrated AWS DynamoDB directly with Amazon Kinesis Stream. Now you can capture item-level changes in your DynamoDB tables as a Kinesis data stream. This can enable you to publish the data to S3 as shown in the below pipeline.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/12/ddb_pipeline_2.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fddb_pipeline_2.png%3Fw%3D502" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To setup the pipeline, create a Amazon Kinesis Data Stream. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/12/screen-shot-2020-12-12-at-12.51.12-pm.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fscreen-shot-2020-12-12-at-12.51.12-pm.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After setting up the Amazon Kinesis Data Stream, create Amazon Kinesis Firehose. The source to Kinesis Firehose will be Amazon Kinesis Data Stream and the destination will be S3.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/12/screen-shot-2020-12-12-at-12.54.37-pm.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fscreen-shot-2020-12-12-at-12.54.37-pm.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To proceed move on to enable streaming to Kinesis.&lt;/p&gt;

&lt;p&gt;DynamoDB &amp;gt; Table &amp;gt; Kinesis data stream details &amp;gt; Manage streaming to Kinesis&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/12/screen-shot-2020-11-29-at-12.16.49-am.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fscreen-shot-2020-11-29-at-12.16.49-am.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;&lt;a href="https://aprakash.files.wordpress.com/2020/12/screen-shot-2020-11-29-at-12.43.04-am.png" rel="noopener noreferrer"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F12%2Fscreen-shot-2020-11-29-at-12.43.04-am.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once the stream is enabled any item-level change in the table will be captured and written to Amazon S3 bucket. Below is an example of the record which was updated in DynamoDB using PartiQL. The record contains approximate creation date time of the record in DynamoDB streams, along with New and Old image of the record. These records can be parsed using AWS Lambda or AWS Glue and stored in Data Lake for analytical use-cases. &lt;/p&gt;

&lt;pre&gt;{
   "awsRegion": "us-west-2",
   "dynamodb": {
     "ApproximateCreationDateTime": 1606714671542,
     "Keys": {
       "Author": {
         "S": "James Patterson"
       },
       "Title": {
         "S": "The President Is Missing"
       }
     },
     "NewImage": {
       "Title": {
         "S": "The President Is Missing"
       },
       "Formats": {
         "M": {
           "Hardcover": {
             "S": "JSU4KGVU"
           }
         }
       },
       "Author": {
         "S": "James Patterson"
       },
       "Category": {
         "S": "Mystery"
       }
     },
     "OldImage": {
       "Title": {
         "S": "The President Is Missing"
       },
       "Formats": {
         "M": {
           "Hardcover": {
             "S": "JSU4KGVU"
           },
           "Paperback": {
             "S": "DY7F4CFX"
           }
         }
       },
       "Author": {
         "S": "James Patterson"
       },
       "Category": {
         "S": "Mystery"
       }
     },
     "SizeBytes": 254
   },
   "eventID": "bcaaf073-7e0d-49c2-818e-fe3cf7e5f18a",
   "eventName": "MODIFY",
   "userIdentity": null,
   "recordFormat": "application/json",
   "tableName": "Books",
   "eventSource": "aws:dynamodb"
 }&lt;/pre&gt;

&lt;p&gt;To conclude, in this post I have introduced you to PartiQL which provides SQL-compatible query for Amazon DynamoDB.  We also looked at the Export to S3 feature and how we can create an end to end pipeline to query the Amazon DynamoDB data in Amazon S3 bucket using AWS Athena. And finally we looked at real time analytics use-case where you can enable streams to capture item-level changes in Amazon DynamoDB table as Kinesis data streams.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>dynamodb</category>
      <category>database</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Transform AWS CloudTrail data using AWS Data Wrangler</title>
      <dc:creator>Anand</dc:creator>
      <pubDate>Sun, 20 Sep 2020 18:27:01 +0000</pubDate>
      <link>https://dev.to/anandp86/transform-aws-cloudtrail-data-using-aws-data-wrangler-1cmo</link>
      <guid>https://dev.to/anandp86/transform-aws-cloudtrail-data-using-aws-data-wrangler-1cmo</guid>
      <description>&lt;p&gt;AWS CloudTrail service captures actions taken by an IAM user, IAM role, APIs, SDKs and other AWS services. By default, AWS CloudTrail is enabled in your AWS account. You can create "trail" to record ongoing events which will be delivered in JSON format to an Amazon S3 Bucket of your choice.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--6gz-Hrfm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.03.45-pm.png%3Fw%3D218" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--6gz-Hrfm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.03.45-pm.png%3Fw%3D218" alt="" width="218" height="264"&gt;&lt;/a&gt;CloudTrail Dashboard&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--XPT5lSLu--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.08.54-pm.png%3Fw%3D1024" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--XPT5lSLu--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.08.54-pm.png%3Fw%3D1024" alt="" width="880" height="477"&gt;&lt;/a&gt;Create Trail&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9SvyVpAf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.13.30-pm.png%3Fw%3D1024" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9SvyVpAf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.13.30-pm.png%3Fw%3D1024" alt="" width="880" height="696"&gt;&lt;/a&gt;Choose events to capture&lt;/p&gt;

&lt;p&gt;You can configure the trail to log read-write, read-only, write-only data events for all current and future S3 buckets&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--2SGfT5ao--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.32.56-pm-1.png%3Fw%3D846" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--2SGfT5ao--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.32.56-pm-1.png%3Fw%3D846" alt="" width="846" height="354"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Also, you have the option to log data events for Lambda functions. You can select all region, all functions or specify any specific Lambda ARN or region.&lt;/p&gt;



&lt;p&gt;The trail creates small, mostly KB size gzipped json files in the S3 Bucket.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--SORcAx6S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.22.46-pm.png%3Fw%3D1024" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--SORcAx6S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-1.22.46-pm.png%3Fw%3D1024" alt="" width="880" height="272"&gt;&lt;/a&gt;Trail Log files in S3 Bucket&lt;/p&gt;

&lt;p&gt;You can select the file and use "Select from" tab to view the content of the file.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--iX_ApBBY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-17-at-6.20.28-pm.png%3Fw%3D1024" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--iX_ApBBY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-17-at-6.20.28-pm.png%3Fw%3D1024" alt="" width="880" height="833"&gt;&lt;/a&gt;Select from S3 file&lt;/p&gt;

&lt;p&gt;Below is an example of "PutObject" event to S3 bucket.&lt;/p&gt;

&lt;pre&gt;{
    "eventVersion": "1.07",
    "userIdentity": {
        "type": "AWSService",
        "invokedBy": "s3.amazonaws.com"
    },
    "eventTime": "2020-09-12T23:53:22Z",
    "eventSource": "s3.amazonaws.com",
    "eventName": "PutObject",
    "awsRegion": "us-east-1",
    "sourceIPAddress": "s3.amazonaws.com",
    "userAgent": "s3.amazonaws.com",
    "requestParameters": {
        "bucketName": "my-data-bucket",
        "Host": "s3.us-east-1.amazonaws.com",
        "key": "mydatabase/mytable/data-content.snappy.parquet"
    },
    "responseElements": null,
    "additionalEventData": {
        "SignatureVersion": "SigV4",
        "CipherSuite": "ECDHE-RSA-AES128-SHA",
        "bytesTransferredIn": 107886,
        "AuthenticationMethod": "AuthHeader",
        "x-amz-id-2": "Dg9gelyiPojDT00UJ+CI7MmmEyUhPRe1EAUtzQSs3kJAZ8JxMe+2IQ4f6wT2Kpd+Czih1Dc2SI8=",
        "bytesTransferredOut": 0
    },
    "requestID": "29C76F4BC75743BF",
    "eventID": "6973f9b1-1a7d-46d4-a48f-f2d91c80b2d3",
    "readOnly": false,
    "resources": [
        {
            "type": "AWS::S3::Object",
            "ARN": "arn:aws:s3:::my-data-bucket/mydatabase/mytable/data-content.snappy.parquet"
        },
        {
            "accountId": "xxxxxxxxxxxx",
            "type": "AWS::S3::Bucket",
            "ARN": "arn:aws:s3:::my-data-bucket"
        }
    ],
    "eventType": "AwsApiCall",
    "managementEvent": false,
    "recipientAccountId": "xxxxxxxxxxxx",
    "sharedEventID": "eb37214b-623b-43e6-876b-7088c7d0e0ee",
    "vpcEndpointId": "vpce-xxxxxxx",
    "eventCategory": "Data"
}&lt;/pre&gt;

&lt;p&gt;CloudTrail provides a useful feature under Event history to create Athena table over the trail's Amazon S3 bucket which you can use to query the data using standard SQL.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--JEytbzRX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-17-at-5.31.33-pm.png%3Fw%3D1024" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--JEytbzRX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-17-at-5.31.33-pm.png%3Fw%3D1024" alt="" width="880" height="128"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3A7M-jxY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.18.23-pm.png%3Fw%3D824" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3A7M-jxY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.18.23-pm.png%3Fw%3D824" alt="" width="824" height="828"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now, depending on the duration and events captured, CloudTrail would create lots of small files in S3, which can impact execution time, when  queried from Athena. &lt;/p&gt;

&lt;p&gt;Moving ahead, I will show you how you can use &lt;a rel="noreferrer noopener" href="https://aws-data-wrangler.readthedocs.io/en/latest/what.html"&gt;AWS Data Wrangler&lt;/a&gt; and Pandas to perform the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Query data from Athena into Pandas dataframe using AWS Data Wrangler.&lt;/li&gt;
&lt;li&gt;Transform eventtime string datatype to datetime datatype.&lt;/li&gt;
&lt;li&gt;Extract and add year, month, and day columns from eventtime to dataframe.&lt;/li&gt;
&lt;li&gt;Write dataframe to S3 in Parquet format with hive partition using AWS Data Wrangler.&lt;/li&gt;
&lt;li&gt;Along with writing the dataframe, how you can create the table in Glue catalog using AWS Data Wrangler.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;For this example, I have setup a Sagemaker Notebook with &lt;a rel="noreferrer noopener" href="https://aws-data-wrangler.readthedocs.io/en/latest/install.html"&gt;Lifecycle configuration&lt;/a&gt;. Once you have the notebook open, you can use conda_python3 kernel to work using AWS Data Wrangler.&lt;/p&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;Import the required libraries&lt;/u&gt;&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;import awswrangler as wr
import pandas as pd
pd.set_option('display.width', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.notebook_repr_html', True)
pd.set_option('display.max_rows', None)&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;Python function to execute the SQL in Athena using AWS Data Wrangler&lt;/u&gt;&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;def execute_sql(sql, database, ctas=False):
    return wr.athena.read_sql_query(sql, database, ctas_approach=ctas)&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;SQL query to get details related to S3 Events&lt;/u&gt;&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;s3ObjectSql = """
SELECT 
    useridentity.sessioncontext.sessionissuer.username as username,
    useridentity.sessioncontext.sessionissuer.type as type,
    useridentity.principalid as principalid,
    useridentity.invokedby as invokedby,
    eventname as event_name,
    eventtime,
    eventsource as event_source,
    awsregion as aws_region,
    sourceipaddress,
    eventtype as event_type,
    readonly as read_only,
    requestparameters
FROM cloudtrail_logs_cloudtrail_logs_traillogs
WHERE eventname in ('ListObjects', 'PutObject', 'GetObject') and eventtime &amp;gt; '2020-08-23'
"""&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;Execute the sql and have results in Pandas dataframe&lt;/u&gt;&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;data = execute_sql(sql=s3GObjectSql, database='default')&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ZWkbTdVG--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.40.38-pm.png%3Fw%3D384" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ZWkbTdVG--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.40.38-pm.png%3Fw%3D384" alt="" width="384" height="370"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;Find unique username&lt;/u&gt;&lt;/span&gt;(just for fun)&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;data['username'].value_counts()&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;If you observe, eventtime column is "string" datatype so performing any date transformations will be difficult. So here we will create a new column with datetime datatype and drop eventtime.&lt;/p&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;String to Datetime conversion for eventtime column&lt;/u&gt;&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;data['event_time'] = pd.to_datetime(data['eventtime'], errors='coerce')&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--p8f4u2gW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.45.16-pm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--p8f4u2gW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.45.16-pm.png" alt="" width="142" height="306"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--chIXqrqq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.45.24-pm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--chIXqrqq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-13-at-2.45.24-pm.png" alt="" width="115" height="305"&gt;&lt;/a&gt;&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;data.drop('eventtime', axis=1, inplace=True)&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;Lets extract and add year, month, day columns from event_time column. With this change you can write the data back to S3 as Hive partitions.&lt;/p&gt;

&lt;p&gt;&lt;span&gt;&lt;u&gt;Extract and add new fields to dataframe&lt;/u&gt;&lt;/span&gt;&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;data['year'] = data['event_time'].dt.year
data['month'] = data['event_time'].dt.month
data['day'] = data['event_time'].dt.day&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;Now using AWS Data Wrangler s3.to_parquet API you can write the data back to S3 partitioned by year, month, day, and in parquet format. You can also add database and table parameters to it, to write the metadata on Athena/Glue catalog. Note that the database must exists to be command to be successful.&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;wr.s3.to_parquet(
    df=data,
    path='s3://my-bucket/s3_access_analyzer/cloudtrail/',
    dataset=True,
    partition_cols=['year', 'month', 'day'],
    database='default',  # Athena/Glue database
    table='cloudtrail' # Athena/Glue table
)&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;You can query the Athena to view the results &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--g3I5CZvt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-17-at-5.56.32-pm.png%3Fw%3D544" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--g3I5CZvt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://aprakash.files.wordpress.com/2020/09/screen-shot-2020-09-17-at-5.56.32-pm.png%3Fw%3D544" alt="" width="544" height="345"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The query took just 1.74 seconds to complete with 0 KB of data scanned. Now why 0 KB? Well, I will leave that for you to think and answer :)&lt;/p&gt;

&lt;p&gt;To conclude, with AWS Data Wrangler you can easily and efficiently perform extract, transform and load (ETL) task as shown above. It is well integrated with other AWS services and is actively being updated with new features and enhancements. &lt;/p&gt;

</description>
      <category>aws</category>
      <category>bigdata</category>
      <category>cloud</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Guide - AWS Glue and PySpark </title>
      <dc:creator>Anand</dc:creator>
      <pubDate>Sat, 19 Sep 2020 17:25:49 +0000</pubDate>
      <link>https://dev.to/anandp86/using-aws-glue-and-pyspark-56fi</link>
      <guid>https://dev.to/anandp86/using-aws-glue-and-pyspark-56fi</guid>
      <description>&lt;p&gt;In this post, I have penned down AWS Glue and PySpark functionalities which can be helpful when thinking of creating AWS pipeline and writing AWS Glue PySpark scripts.&lt;/p&gt;

&lt;p&gt;AWS Glue is a fully managed extract, transform, and load (ETL) service  to process large amount of datasets from various sources for analytics and data processing.&lt;/p&gt;

&lt;p&gt;While creating the AWS Glue job, you can select between Spark, Spark Streaming and Python shell. These job can run proposed script generated by AWS Glue, or an existing script that you provide or a new script authored by you. Along with this you can select different monitoring options, job execution capacity, timeouts, delayed notification threshold and non-overridable and overridable parameters.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-6.15.45-pm.png%3Fw%3D300" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-6.15.45-pm.png%3Fw%3D300" alt=""&gt;&lt;/a&gt;Glue Job Type and Glue Version&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-6.16.07-pm.png%3Fw%3D288" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-6.16.07-pm.png%3Fw%3D288" alt=""&gt;&lt;/a&gt;Script file name and other available options&lt;/p&gt;

&lt;p&gt;AWS recently launched Glue version 2.0 which features 10x faster Spark ETL job start times and reducing the billing duration from a 10 minute minimum to 1 minute minimum.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration" rel="noreferrer noopener"&gt;https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With AWS Glue you can create development endpoint and configure SageMaker or Zeppelin notebooks to develop and test your Glue ETL scripts. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-6.56.12-pm-1.png%3Fw%3D110" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-6.56.12-pm-1.png%3Fw%3D110" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I create a SageMaker notebook connected to the Dev endpoint to author and test the ETL scripts. Depending on the language you are comfortable with, you can spin up the notebook.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-7.06.00-pm-1.png%3Fw%3D1024" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-7.06.00-pm-1.png%3Fw%3D1024" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now, lets talk about some specific features and functionalities in AWS Glue and PySpark which can be helpful.&lt;/p&gt;

&lt;p&gt;1. &lt;strong&gt;&lt;span&gt;Spark DataFrames&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database. You can create DataFrame from RDD, from file formats like csv, json, parquet.&lt;/p&gt;

&lt;p&gt;With SageMaker Sparkmagic(PySpark) Kernel notebook, Spark session is automatically created.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F07%2Fscreen-shot-2020-07-04-at-9.43.19-pm.png%3Fw%3D699" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F07%2Fscreen-shot-2020-07-04-at-9.43.19-pm.png%3Fw%3D699" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To create DataFrame -&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;# from CSV files 
S3_IN = "s3://mybucket/train/training.csv"

csv_df = (
    spark.read.format("org.apache.spark.csv")
    .option("header", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("inferSchema", True)
    .option("ignoreLeadingWhiteSpace", True)
    .option("ignoreTrailingWhiteSpace", True)
    .csv(S3_IN, multiLine=False)
)

# from PARQUET files 
S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"

df = spark.read.parquet(S3_PARQUET)

# from JSON files
df = spark.read.json(S3_JSON)

# from multiline JSON file 
df = spark.read.json(S3_JSON, multiLine=True)&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;2. &lt;strong&gt;&lt;span&gt;GlueContext&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;GlueContext is the entry point for reading and writing DynamicFrames in AWS Glue. It wraps the Apache SparkSQL SQLContext object providing mechanisms for interacting with the Apache Spark platform.&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(SparkContext.getOrCreate())&lt;/code&gt;&lt;/pre&gt;



&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F08%2Fscreen-shot-2020-08-21-at-8.03.09-pm.png%3Fw%3D1024" alt=""&gt;



&lt;p&gt;3. &lt;strong&gt;&lt;span&gt;DynamicFrame&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;AWS Glue DynamicFrames are similar to SparkSQL DataFrames. It represent a distributed collection of data without requiring you to specify a schema.It can also be used to read and transform data that contains inconsistent values and types. &lt;/p&gt;



&lt;p&gt;DynamicFrame can be created using the below options –&lt;/p&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;em&gt;create_dynamic_frame_from_rdd&lt;/em&gt; – created from an Apache Spark Resilient Distributed Dataset (RDD)&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;create_dynamic_frame_from_catalog&lt;/em&gt; – created using a Glue catalog database and table name&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;create_dynamic_frame_from_options&lt;/em&gt; – created with the specified connection and format. Example – The connection type, such as Amazon S3, Amazon Redshift, and JDBC&lt;/li&gt;
&lt;/ul&gt;



&lt;p&gt;DynamicFrames can be converted to and from DataFrames using .toDF() and fromDF().&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;#create DynamicFame from S3 parquet files
datasource0 = glueContext.create_dynamic_frame_from_options(
            connection_type="s3",
            connection_options = {
                "paths": [S3_location]
            },
            format="parquet",
            transformation_ctx="datasource0")

#create DynamicFame from glue catalog 
datasource0 = glueContext.create_dynamic_frame.from_catalog(
           database = "demo",
           table_name = "testtable",
           transformation_ctx = "datasource0")

#convert to spark DataFrame 
df1 = datasource0.toDF()

#convert to Glue DynamicFrame
df2 = DynamicFrame.fromDF(df1, glueContext , "df2")
&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;Further Read - &lt;a rel="noreferrer noopener" href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog"&gt;https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;4. &lt;strong&gt;&lt;span&gt;AWS Glue Job Bookmark &lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;AWS Glue&lt;em&gt; Job bookmark&lt;/em&gt; helps process incremental data when rerunning the job on a scheduled interval, preventing reprocessing of old data.&lt;/p&gt;



&lt;p&gt;Further Read  - &lt;a href="https://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/" rel="noopener noreferrer"&gt;https://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;&lt;a href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html" rel="noopener noreferrer"&gt;https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;5. &lt;strong&gt;&lt;span&gt;Write out data&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;The DynamicFrame of transformed dataset can be written out to S3 as non-partitioned (default) or partitioned. "&lt;em&gt;partitionKeys&lt;/em&gt;" parameter can be specified in connection_option to write out the data to S3 as partitioned. AWS Glue organizes these dataset in Hive-style partition.&lt;/p&gt;



&lt;p&gt;In the below code example, AWS Glue DynamicFrame is partitioned by year, month, day, hour and written in parquet format in Hive-style partition on to S3.&lt;/p&gt;



&lt;p&gt;s3://bucket_name/table_name/year=2020/month=7/day=13/hour=14/part-000-671c.c000.snappy.parquet&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;S3_location = "s3://bucket_name/table_name"

datasink = glueContext.write_dynamic_frame_from_options(
    frame= data,
    connection_type="s3",
    connection_options={
        "path": S3_location,
        "partitionKeys": ["year", "month", "day", "hour"]
    },
    format="parquet",
    transformation_ctx ="datasink")&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;Further Read - &lt;a href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options" rel="noopener noreferrer"&gt;https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;6. &lt;strong&gt;&lt;span&gt;"glueparquet" format option&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;glueparquet is a performance optimized Apache parquet writer type for writing DynamicFrames. It computes and modifies the schema dynamically. &lt;/p&gt;



&lt;pre&gt;&lt;code&gt;datasink = glueContext.write_dynamic_frame_from_options(
               frame=dynamicframe,
               connection_type="s3",
               connection_options={
                  "path": S3_location,
                  "partitionKeys": ["year", "month", "day", "hour"]
               },
               format="glueparquet",
               format_options = {"compression": "snappy"},
               transformation_ctx ="datasink")&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;Further Read - &lt;a href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html" rel="noreferrer noopener"&gt;https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;7. &lt;strong&gt;&lt;span&gt;S3 Lister and other options for optimizing memory management &lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;AWS Glue provides an optimized mechanism to list files on S3 while reading data into DynamicFrame which can be enabled using additional_options parameter "&lt;em&gt;useS3ListImplementation&lt;/em&gt;" to true.&lt;/p&gt;



&lt;p&gt;Further Read - &lt;a href="https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/" rel="noopener noreferrer"&gt;https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;8. &lt;strong&gt;&lt;span&gt;Purge S3 path&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;&lt;em&gt;purge_s3_path&lt;/em&gt; is a nice option available to delete files from specified S3 path recursively based on retention period or other available filters. As an example, suppose you are running AWS Glue job to fully refresh the table per day writing the data to S3 with naming convention of &lt;em&gt;s3://bucket-name/table-name/dt=&amp;lt;data-time&amp;gt;.&lt;/em&gt;  Based on the defined retention period using the Glue job itself you can delete the dt=&amp;lt;date-time&amp;gt; s3 folders. Another option is to set S3 bucket lifecycle policy with prefix.&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
glueContext.purge_s3_path(
    s3_path=output_loc, 
    options={"retentionPeriod": 72})&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;You have other options like purge_table, transition_table and transition_s3_path also available. The transition_table option transitions the storage class of the files stored on Amazon S3 for the specified catalog's database and table.&lt;/p&gt;



&lt;p&gt;Further Read - &lt;a href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path" rel="noopener noreferrer"&gt;https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;9. &lt;strong&gt;&lt;span&gt;Relationalize Class&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;&lt;em&gt;Relationalize&lt;/em&gt; class can help flatten nested json outermost level. &lt;/p&gt;



&lt;p&gt;Further read - &lt;a href="https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/" rel="noopener noreferrer"&gt;https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;10. &lt;strong&gt;&lt;span&gt;Unbox Class&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;The &lt;em&gt;Unbox&lt;/em&gt; class helps unbox string field in DynamicFrame to specified format type(optional).&lt;/p&gt;



&lt;p&gt;Further read - &lt;a href="https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/" rel="noopener noreferrer"&gt;https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/&lt;/a&gt;&lt;/p&gt;



&lt;p&gt;11. &lt;strong&gt;&lt;span&gt;Unnest Class &lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;The &lt;em&gt;Unnest&lt;/em&gt; class flattens nested objects to top-level elements in a DynamicFrame.&lt;/p&gt;



&lt;pre&gt;root
|-- id: string
|-- type: string
|-- content: map
|    |-- keyType: string
|    |-- valueType: string&lt;/pre&gt;



&lt;p&gt;With content attribute/column being map Type, we can use unnest class to unnest each key elements.&lt;/p&gt;



&lt;pre&gt;&lt;code&gt;unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
unnested.printSchema()&lt;/code&gt;&lt;/pre&gt;



&lt;pre&gt;root
|-- id: string
|-- type: string
|-- content.dateLastUpdated: string
|-- content.creator: string
|-- content.dateCreated: string
|-- content.title: string&lt;/pre&gt;



&lt;p&gt;12. &lt;strong&gt;&lt;span&gt;printSchema()&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;



&lt;p&gt;To print the Spark or Glue DynamicFrame schema in tree format use &lt;em&gt;printSchema()&lt;/em&gt;.&lt;/p&gt;



&lt;pre&gt;datasource0.printSchema()

root
|-- ID: int
|-- Name: string
|-- Identity: string
|-- Alignment: string
|-- EyeColor: string
|-- HairColor: string
|-- Gender: string
|-- Status: string
|-- Appearances: int
|-- FirstAppearance: choice
|    |-- int
|    |-- long
|    |-- string
|-- Year: int
|-- Universe: string
&lt;/pre&gt;

&lt;p&gt;13. &lt;strong&gt;&lt;span&gt;Fields Selection&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;select_fields&lt;/em&gt; can be used to select fields from Glue DynamicFrame.&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;# From DynamicFrame

datasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F07%2Fscreen-shot-2020-07-05-at-12.14.07-am.png%3Fw%3D700" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F07%2Fscreen-shot-2020-07-05-at-12.14.07-am.png%3Fw%3D700" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To select fields from Spark Dataframe use "&lt;em&gt;select&lt;/em&gt;" - &lt;/p&gt;

&lt;pre&gt;&lt;code&gt;# From Dataframe

datasource0_df.select(["Status","HairColor"]).distinct().show()&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F07%2Fscreen-shot-2020-07-05-at-12.16.33-am.png%3Fw%3D661" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Faprakash.files.wordpress.com%2F2020%2F07%2Fscreen-shot-2020-07-05-at-12.16.33-am.png%3Fw%3D661" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;14. &lt;strong&gt;&lt;span&gt;Timestamp&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Suppose the application writes data into DynamoDB and has last_updated attribute/column.  DynamoDB does not natively support date/timestamp data type. So, you could either store it as String or Number. If stored as number, its usually done as epoch time - the number of seconds since 00:00:00 UTC on 1 January 1970. You could see something like "1598331963" which is 2020-08-25T05:06:03+00:00 in ISO 8601.&lt;/p&gt;

&lt;p&gt;&lt;a rel="noreferrer noopener" href="https://www.unixtimestamp.com/index.php"&gt;https://www.unixtimestamp.com/index.php&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;How can you convert it to timestamp?&lt;/p&gt;

&lt;p&gt;When you read the data using AWS Glue DynamicFrame and view the schema, it will show it as "long" data type.&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;root
|-- version: string
|-- item_id: string
|-- status: string
|-- event_type: string
|-- last_updated: long&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;To convert the last_updated long data type into timestamp data type, you can use the below -&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;import pyspark.sql.functions as f
import pyspark.sql.types as t

new_df = (
    df
        .withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
)   &lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;15. &lt;strong&gt;&lt;span&gt;Temporary View from Spark DataFrame&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to store the Spark DataFrame as table and query it using spark sql, you can convert the DataFrame into temporary view that is available for only that spark session using  &lt;em&gt;createOrReplaceTempView&lt;/em&gt;.&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;df = spark.createDataFrame(
    [
        (1, ['a', 'b', 'c'], 90.00),
        (2, ['x', 'y'], 99.99),
    ],
    ['id', 'event', 'score'] 
)

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- event: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: double (nullable = true)

df.createOrReplaceTempView("example")

spark.sql("select * from example").show()

+---+---------+-----+
| id|    event|score|
+---+---------+-----+
|  1|[a, b, c]| 90.0|
|  2|   [x, y]|99.99|
+---+---------+-----+&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;16. &lt;strong&gt;&lt;span&gt;Extract element from ArrayType&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Suppose from the above example, you want to create a new attribute/column to store only the last event. How would you do it? &lt;/p&gt;

&lt;p&gt;Using &lt;em&gt;element_at&lt;/em&gt; function. It returns element of array at given index in extraction if col is array. It can also be used to extract given key in extraction if col is map.&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;import pyspark.sql.functions as element_at

newdf = df.withColumn("last_event", element_at("event", -1))

newdf.printSchema()
root
 |-- id: long (nullable = true)
 |-- event: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: double (nullable = true)
 |-- last_event: string (nullable = true)

newdf.show()
+---+---------+-----+----------+
| id|    event|score|last_event|
+---+---------+-----+----------+
|  1|[a, b, c]| 90.0|         c|
|  2|   [x, y]|99.99|         y|
+---+---------+-----+----------+&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;17. &lt;strong&gt;&lt;span&gt;explode&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The &lt;em&gt;explode&lt;/em&gt; function in PySpark is used to explode array or map columns in rows. Taking an example, lets try to explode "event" column from the above example&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;from pyspark.sql.functions import explode

df1 = df.select(df.id,explode(df.event))

df1.printSchema()
root
 |-- id: long (nullable = true)
 |-- col: string (nullable = true)

df1.show()
+---+---+
| id|col|
+---+---+
|  1|  a|
|  1|  b|
|  1|  c|
|  2|  x|
|  2|  y|
+---+---+&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt; 18. &lt;strong&gt;&lt;span&gt;getField&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In a Struct type, if you want to get a field by name, you can use "&lt;em&gt;getField&lt;/em&gt;".&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;import pyspark.sql.functions as f
from pyspark.sql import Row

from pyspark.sql import Row
df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
                            Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
)

df.printSchema()
root
 |-- attributes: struct (nullable = true)
 |    |-- Hair: string (nullable = true)
 |    |-- Height: double (nullable = true)
 |    |-- Name: string (nullable = true)

df.show()
+-------------------+
|         attributes|
+-------------------+
|[black, 6.0, scott]|
|[brown, 6.1, kevin]|
+-------------------+

df1 = (df
      .withColumn("name", f.col("attributes").getField("Name"))
      .withColumn("height", f.col("attributes").getField("Height"))
      .drop("attributes")
      )

df1.show()
+-----+------+
| name|height|
+-----+------+
|scott|   6.0|
|kevin|   5.1|
+-----+------+&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;19. &lt;strong&gt;&lt;span&gt;startswith&lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to find records based on string match you can use "&lt;em&gt;startswith&lt;/em&gt;".&lt;/p&gt;

&lt;p&gt;In the below example I am searching for all records where value for description column starts with "[{".&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;import pyspark.sql.functions as f

df.filter(f.col("description").startswith("[{")).show()&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;20. &lt;strong&gt;&lt;span&gt;Extract year,  month,  day,  hour &lt;/span&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;One of the common use case is to write the AWS Glue DynamicFrame or Spark DataFrame to S3 in Hive-style partition. To do so you can extract year, month, day, hour and use it as partitionkeys to write the DynamicFrame/DataFrame to S3.&lt;/p&gt;

&lt;pre&gt;&lt;code&gt;import pyspark.sql.functions as f

df2 = (raw_df
        .withColumn('year', f.year(f.col('last_updated')))
        .withColumn('month', f.month(f.col('last_updated')))
        .withColumn('day', f.dayofmonth(f.col('last_updated')))
        .withColumn('hour', f.hour(f.col('last_updated')))            
        )&lt;/code&gt;&lt;/pre&gt;

</description>
      <category>aws</category>
      <category>cloud</category>
      <category>bigdata</category>
      <category>pyspark</category>
    </item>
  </channel>
</rss>
