Imagine you work with a client system where they upload files to an FTP folder. We’ve got to process the file as soon as it appears in the folder and push it to a database.
A real-time dashboard is accessing the database. Therefore, we must update the database without any delays.
You could run a periodic task and check for folder content. But let’s assume the quicker you update the database, the better for the user. The cost of the little delay when using a periodic task is high. Shorter periods might need more resources as your tasks run more often.
We need to build a filesystem trigger to accomplish the task.
Monitor new file creations in a folder.
We can use a Python script that actively listens to file system events in a folder.
We can start by installing a Python package called Watchdog. It’s available through the PyPI repository.
pip install watchdog
If you're using Poetry instead of Virtualenv
poetry add watchdog.
Here’s an example to start with. The following Python script will watch for file changes in the current directory. It’ll log all the changes when they happen.
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileCreateHandler(FileSystemEventHandler):
def on_created(self, event):
print("Created: " + event.src_path)
if __name__ == "__main__":
event_handler = FileCreateHandler()
# Create an observer.
observer = Observer()
# Attach the observer to the event handler.
observer.schedule(event_handler, ".", recursive=True)
# Start the observer.
observer.start()
try:
while observer.is_alive():
observer.join(1)
finally:
observer.stop()
observer.join()
The important part of the above code is the FileProcessor class. But before getting in there, we should create an observer object to attach an event handler.
We can attach an event handler to the observer using the schedule method. In the example above, we’ve attached it to watch events in the current and all its downstream directories.
If you run this code and create a new file in the current directory, we could see the Python script printing the event on the terminal.
We’ve used the current folder in the schedule method of the observer object. You could also use any path of your choice. You could also choose to get it from the command line argument.
Here’s a modification to the same code that converts our script into a CLI. Now you can pass the path to monitor using a command line argument.
...
import argparse
...
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Watchdog script to watch for new CSV files and load them into the database"
)
parser.add_argument("path", help="Path to watch for new CSV files")
args = parser.parse_args()
path = args.path
# Create an observer.
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
...
You can now run your script like the following in your terminal.
python <YourScript>.py /somewhare/in/your/computer
In my previous post, you can learn more about creating a command line interface using Python.
Process file changes in the handler class
In our example, we’ve created an event handler by subclassing the ‘FileSystemEventHandler’ class. All event handlers should be like this.
The parent class has placeholders for several methods for file system events. We’ve used the ‘on_create’ method to handle all new file creations. Likewise, you can also use on_deleted, on_modified, on_moved, and on_any_event methods to handle other types of events.
Let’s update the on_create to process the file and insert values into a database. Feel free to skip this section if it’s irrelevant to your use case.
import os
import pandas as pd
from sqlalchemy import create_engine
from watchdog.events import FileSystemEventHandler
...
class FileCreateHandler(FileSystemEventHandler):
def on_created(self, event):
# Create SQLAlchemy engine for postgres database using environment variables
engine = create_engine(os.environ["DATABASE_URL"])
with engine.connect() as con:
# Read the CSV data
df = pd.read_csv(event.src_path)
# Do the required transformations
df["date"] = pd.to_datetime(df["date"])
# Write the data to the database
df.to_sql("weather", con, if_exists="append", index=False)
...
This code might look very familiar if you’ve worked with Sqlalchemy and Pandas. What’s worth noting is how we get the path of the newly created file.
Each event trigger in the ‘FileSystemEventHandler’ class references the path in its event argument. We can access it with the src-path tag as shown in the code.
If you run the code and let the Python script listen to the changes, it’ll also immediately push those changes to the database.
Serve your app in the background.
By now, you’d have noticed that our app runs on a live terminal. But it’s not wise to do this in production. Anything to the terminal session can impact the app.
The best way to run such services in the background is through a system service. Windows users can use the tool NSSM (Non-sucking Service Manager.) It’d be pretty straightforward if you skim through their documentation.
But in this post, I’ll cover the Linux system's use of systemctl.
You can create a new system service by creating a file with the following content in ‘/etc/systemd/system’ folder. You can name it anything with an extension of ‘.service’
[Unit]
Description="Process Data"
[Service]
Restart=always
WorkingDirectory=<PATH_TO_PROJECT_DIRECTORY>
ExecStart=<PATH_TO_PYTHON_EXECUTABLE> <YOUR_SCRIPT>.py
[Install]
WantedBy=multi-user target
Once done, you can run the following commands on the terminal to activate the service.
# To make your new service available to systemctl utility.
$ systemctl daemon-reload
# To start the service
$ systemctl start <YOUR_SERVICE_FILE_NAME>.service
This will start the process. Now, as new files are being created on our FTP destination, this service will process and upload them to the database. And our real-time database will get fresh data without any delays.
You can check if your service is running properly with the following command.
systemctl status <YOUR_SERVICE_FILE_NAME>.service
Final thoughts
Processing file system change is rare these days as the world moves towards more robust integration between systems. But it doesn’t mean filesystem triggers have no usage.
There are many instances where we need to monitor new file creation or modifications. Take, for example, log stream processing. You could use the technique described here to process a new log line and push it to a data warehouse.
I’ve been using it for a long time now. And I don’t see the need for it to be reduced yet.
Did you like what you read? Consider subscribing to my email newsletter because I post more like this frequently.
Thanks for reading, friend! Say Hi to me on LinkedIn, Twitter, and Medium.
Top comments (0)