What is bigquery?
BigQuery is a fully managed enterprise data warehouse that provides built-in features such as machine learning, geospatial analytics, and business intelligence to help you manage and analyze your data. BigQuery is used to store and analyze data. Federated queries read data from external sources, whereas streaming allows for continuous data updates. This data can be analyzed and understood using powerful tools such as BigQuery ML and BI Engine. BigQuery uses a columnar storage format that is optimized for analytical queries to store data. BigQuery displays data in table, row, and column formats and supports full database transactional semantics.
Setting up the environment
Sign up to google cloud platform and create a project,select the project you want to use within your Google Cloud Console. This is typically a drop-down menu in the Google Cloud console nav. Enable your BigQuery API for the selected project.Create a Service Account and IAM policy that allows access to BigQuery within your project. Generate the keys preferably in json format.
Accessing bigquery from python
To access bigquery use the google-cloud-bigquery
python library.Create a client, use the given credentials and connect to bigquery.
bigquery dataset
Before loading data in biquery, a create_dataset
method is used to create a dataset in bigquery where the tables/data will be stored
Extracting data from local db-MySql
MySql contains a classicmodels db which is a database for a car retail company. From the database we can extract the following.
- customers who made the most orders
- products that have the highest number of purchases
- customers who have spent more.
This data is then extracted and transformed using pandas.
airflow.hooks.mysql_hook import MySqlHook
is used to connect to local db (MySql) from airflow. Setting host tohost.docker.internal
enables access to local db. Connection string/properties should be set in airflow web UI under admin/connections menu
# connecting to local db to query classicmodels db
mysql_hook = MySqlHook(mysql_conn_id='mysql_default', schema='classicmodels')
connection = mysql_hook.get_conn()
Inserting into BigQuery
client.load_table_from_dataframe(df,'table_name')
is a method used to insert data into biquery tables using dataframes created from queries and tables_names of the target tables in bigquery.
Automating with airflow
The job runs every 20 minutes. The ETL is seperated into 3 tasks creating_dataset >> truncating_tables >> inserting_data
which are executed using PythonVirtualenvOperator
in airflow
airflow is run in docker. A volume ./data:/opt/airflow/data
is added into the docker-compose file to store the classicmodels db
and the json file that contains google application credentials.
Entire code for the dag
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator # for executing python functions
from airflow.operators.python import PythonVirtualenvOperator # for working with venvs airflow
from airflow.hooks.mysql_hook import MySqlHook # for connecting to local db
from datetime import timedelta
# function to create a dataset in bigquery to store data
def create_dataset():
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import os
# setting application credentials to access biqguery
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
client = bigquery.Client()
"""
Create a dataset in Google BigQuery if it does not exist.
:param dataset_id: Name of dataset
:param region_name: Region name for data center, i.e. europe-west2 for London
"""
dataset_id = 'dataset'
region_name = 'europe-west2'
reference = client.dataset(dataset_id)
try:
client.get_dataset(reference)
except NotFound:
dataset = bigquery.Dataset(reference)
dataset.location = region_name
dataset = client.create_dataset(dataset)
# function to truncate tables before inserting data
def truncate():
from google.cloud import bigquery
import os
# setting application credentials to access biqguery
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
# tables to truncate in biquery (*this service task is billed)
table1 = 'dataset.product_dem'
table2 = 'dataset.toporders'
table3 = 'dataset.customer_spe'
# Truncate a Google BigQuery table
client = bigquery.Client()
query1 = ("DELETE FROM "+ table1 +" WHERE 1=1")
query2 = ("DELETE FROM "+ table2 +" WHERE 1=1")
query3 = ("DELETE FROM "+ table3 +" WHERE 1=1")
job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
query_job1 = client.query(query1, job_config=job_config)
query_job2 = client.query(query2, job_config=job_config)
query_job3 = client.query(query3, job_config=job_config)
# function to insert data from a pandas df to bigquery
def insert():
from google.cloud import bigquery
from airflow.hooks.mysql_hook import MySqlHook # for connecting to local db
import pandas as pd
import os
#setting application credentials to access biqguery
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
# connecting to local db to query classicmodels db
mysql_hook = MySqlHook(mysql_conn_id='mysql_default', schema='classicmodels')
connection = mysql_hook.get_conn()
# querying the source db
# products with highest number purchase
query1 ="""
SELECT productName , SUM(quantityOrdered) AS quantity_ordered\
FROM products, orderdetails\
WHERE products.productCode = orderdetails.productCode\
GROUP BY productName\
ORDER BY quantity_ordered DESC\
LIMIT 20;
"""
# customers that have made the most orders
query2 = """
SELECT contactFirstName, contactLastName , COUNT(*) AS number_of_orders\
FROM customers, orders\
WHERE customers.customerNumber = orders.customerNumber\
GROUP BY customerName\
ORDER BY number_of_orders DESC\
LIMIT 20;
"""
# customers that have spent more
query3 = """
SELECT contactFirstName , contactLastName, SUM(quantityOrdered*priceEach) AS total_amount_spent\
FROM customers, orders, orderdetails\
WHERE customers.customerNumber = orders.customerNumber AND orderdetails.orderNumber= orders.orderNumber\
GROUP BY customerName\
ORDER BY total_amount_spent DESC\
LIMIT 10;
"""
sql_query1 = pd.read_sql_query(query1, connection)
sql_query2 = pd.read_sql_query(query2, connection)
sql_query3 = pd.read_sql_query(query3, connection)
df1 = pd.DataFrame(sql_query1)
df2 = pd.DataFrame(sql_query2)
df3 = pd.DataFrame(sql_query3)
client = bigquery.Client()
# load the data to bigquery tables
client.load_table_from_dataframe(df1, 'dataset.product_dem')
client.load_table_from_dataframe(df2, 'dataset.toporders')
client.load_table_from_dataframe(df3, 'dataset.customer_spe')
def message():
print("data successfully loaded into gc-bigquery")
default_args = {
'owner':'airflow',
'depends_on_past' : False,
'start_date': airflow.utils.dates.days_ago(7),
}
mysql_to_gcp = DAG(
'mysql-to-gcp', #name of the dag
default_args = default_args,
schedule_interval = timedelta(minutes=20),
catchup = False
)
creating_dataset = PythonVirtualenvOperator(
task_id='creating-a-dataset-in-bigquery',
python_callable = create_dataset,
requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
system_site_packages=False,
dag = mysql_to_gcp,
)
truncating_tables = PythonVirtualenvOperator(
task_id='truncating-tables-in-bigquery',
python_callable = truncate,
requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
dag = mysql_to_gcp,
)
inserting_data = PythonVirtualenvOperator(
task_id='inserting-data-into-bigquery',
python_callable = insert,
requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
dag = mysql_to_gcp,
)
message_out = PythonOperator(
task_id = 'task-complete-message',
python_callable = message,
dag = mysql_to_gcp,
)
creating_dataset >> truncating_tables >> inserting_data >> message_out
Airflow-Bigquery integration
Airflow provides a variety of operators, such as Airflow BigQuery Operators, to assist you in managing your data. Airflow BigQuery Operators, in particular, are widely used because they aid in data management by analyzing and extracting meaningful insights. You can use Airflow BigQuery Operators to do the following:
- Control Datasets
- Table Management
- Run BigQuery Jobs
- Validate the Data
Top comments (0)