DEV Community

suzuki-navi
suzuki-navi

Posted on

Add a WHERE clause to the JDBC data source of a Glue Job

This is a method for processing only a portion of data from a data source in a Glue Job by adding a WHERE clause to the access with JDBC.

It seams impossible to do with DynamicFrame, so I use DataFrame.

When connecting with DataFrame, it seams necessary to explicitly specify the connection information with JDBC. I use glueContext.extract_jdbc_conf to retrieve the information from the Glue Connection.

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

sparcContext = SparkContext()
glueContext = GlueContext(sparcContext)
logger = glueContext.get_logger()
sparkSession = glueContext.spark_session

CONNECTION_NAME = "test-connection" # Name of Glue Connection
DB_NAME = "testdb" # Name of the PostgreSQL database to connect to

# Retrieve the information from the Glue Connection
jdbc_conf = glueContext.extract_jdbc_conf(connection_name=CONNECTION_NAME)
logger.info(jdbc_conf)
# Outputs as follows:
# {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:postgresql://hogehoge.xxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com:5432', 'customJDBCCertString': '', 'user': 'postgres', 'customJDBCCert': '', 'password': 'xxxxxxxxxxxx', 'vendor': 'postgresql'}

# The string in jdbc_conf["url"] is in the format
# jdbc:postgresql://HOST:5432
# and missing the database name at the end.
# When passed to DynamicFrame.fromDF, adding the database name is necessary.
jdbc_url = jdbc_conf["url"] + "/" + DB_NAME

# The SQL is written as follows:
query = "(SELECT * FROM public.testtable WHERE id <= 100) t"

dyf0 = DynamicFrame.fromDF(sparkSession.read.format("jdbc").
    option("url", jdbc_url).
    option("driver", "org.postgresql.Driver").
    option("user", jdbc_conf["user"]).
    option("password", jdbc_conf["password"]).
    option("dbtable", query).load(), glueContext, "dyf0")

# Do something
Enter fullscreen mode Exit fullscreen mode

When this Glue Job is executed, the following query is issued to PostgreSQL.

SELECT "id","col1","col2","col3" FROM (SELECT * FROM public.testtable WHERE id <= 100) t
Enter fullscreen mode Exit fullscreen mode

Top comments (0)