Here is how the task of “adding a hash to an existing DataFrame” went from taking a couple of days to consuming almost an entire sprint.
In Q2 of 2022, I started working on a data pipeline that fetches market data from a REST service and stores it in a BigQuery table. This is a high-level explanation of the pipeline. The interesting part is how the data is queried data, converting it to DataFrame and then uploading it BigQuery tables using GCSToBigQueryOperator of AirFlow.
Initially, it seemed simple to write, but Airflow’s “idempotent” principle added it bit of challenge. What to fetch from this REST service was decided by another table and even if JOB is idempotent the table it used as reference could change between 2 runs. After spending additional time, talking to Data Engineers pipelines was ready By the end of Q3 of 2022.
Fast forward to Q1 of 2024. By this time, we had more users accessing the data, and we realized our query pattern was not using partitions properly. Or, rather, we wanted to access the data based on a string column, but it’s not possible to partition on a string column in BigQuery. This led to scanning large amounts of data and frequently hitting the daily quota.
This led us to consider how to partition data based on string columns. Our data engineer suggested converting that string column into an integer using FarmHash with an additional modulo operation. In the proof of concept, this reduced scanning by almost 90%, and query performance increased by 3-5x. We decided to proceed with this as the final solution. All we needed was to:
- Create a table with Farmhash fingerprint
- Change pipeline to compute the fingerprint
- Upload the data.
To compute FarmHash fingerprints in Python, there is a pyfarmhash module. I installed the module and used the code below to compute the hash, and locally it all worked as desired.
def get_hash(val: str) -> int:
return additonal_logic(pyfarmhash.fingerprint64(...))
df[‘hash’] = df[‘Col’].apply(get_hash)
With all the tests passing, it was now time to push the code to Airflow and run it. I was not expecting anything to go wrong at this stage. In fact, I was happy that everything worked as planned and within the estimated time.
With a happy mind and full of confidence, I pushed my changes, started the job, and then waited for 10-15 minutes for it to finish. Meanwhile, I switched to another task. Soon, I received an unexpected failure email from Airflow. I looked at the logs and was surprised to see that it failed while installing the pyfarmhash
module!
To help you understand the problem, I need to explain the structure of the job. The job has the following steps:
- Download the data in parquet format
- Upload to GCS bucket
- Delete existing data; if any. (avoid duplicate data)
- Upload the data to BQ tables.
In this process, task-1, which downloads the data, is a separate Python module. To run it, I used the PythonVirtualenvOperator from Airflow. This operator allows you to specify packages as requirements and then installs them in a newly created virtual environment. Once the package is installed, all its dependencies are also installed, and you are ready to roll.
I added pyfarmhash as a dependency to the module that downloads the data, and everything else remained unchanged. And it failed! Why?
pyfarmhash is a hashing library implemented in C/C++. Upon installation, it requires GCC to compile the package, and that was not present on the Airflow host. It made sense not to have GCC on the Airflow host, but unfortunately, this was a blocker for me.
I looked for a pure Python implementation of the pyfarmhash package, but there were none. Then, I looked for wheel packages, but again, there were none. I considered building wheel packages and pushing them, but that would have led to a long-term responsibility of providing wheel packages internally. I wanted to avoid additional, workaround-like steps. I explored all the options and discussed them with the team maintaining Airflow. They suggested creating a Docker image and running it in KubernetesPodOperator. This was a good option, as I could control the environment and include whatever was required without relying on an external environment. Additionally, this solution had no workarounds. The only short-term drawback was that it needed more time to implement.
Before starting with a Docker-based solution, I had already spent about 16-20 hours on this task. For the Docker-based solution, I additionally needed to:
- Change the Python package to have entry points to start download and purging logic.
- Create a Docker package and test it (this was my second Docker image).
Since I was no longer going to use PythonVirtualEnvOperator in Airflow, I decided to remove it completely also improve the workflow. I had to change the python package to have entry points to start download & purging logic
It took me an additional 30-36 hours to have a final solution with the Docker image ready, which is 6-7 working days and, with initial 2 days included, it became a sprint long task.
I look back on this and wonder, I had to throw away working solution, change the module structure, create a docker image, change 10+ AirFlow jobs to use Docker image for tasks, dealing with this reality and overcoming initial frustration. All of this only because, "A single python module required “gcc” to compile!"
Top comments (0)