DEV Community

Cover image for Building an Event-Driven System for GeoJSON File Processing and Serving
AissaGeek
AissaGeek

Posted on

Building an Event-Driven System for GeoJSON File Processing and Serving

Introduction

Event-Driven architectures are powerful inn handling asynchronous task, providing scalability, and decoupling various components of a system. In this article we'll build an event-driven system that processes GeoJSON files. if you are interested, see how it is done:

  1. File Watcher: Detects new files in a directory.
  2. Message Broker (RabbitMQ): Receives message about the new files.
  3. Consumer: Processes files and interacts with a database and object storage.
  4. FastAPI Server: Serves the files through streaming.

System Overview

Our system consists of four main component.

Simple Event-Driven file processing and serving

  1. A python script that watches a directory for new GeoJSON files, then publishes information about the new file into the RabbitMQ broker, information such, file name and absolute path.
  2. A message broker that queues this information.
  3. A python script consumer, that processes each file, stores in PostGIS database, and uploads it to MinIO.
  4. A FastAPI API that provides an endpoint to stream the file form MinIO.

Technologies used

  • RabbitMQ: for message queuing.
  • PostGIS: A spatial database extender for PostgresSQL.
  • MinIO: An object storage solution.
  • FastAPI: A modern, fast framework for building APIs. Now for practical implementation of the system.

System Setup

In order to quickly build our system, we will use docker-compose to orchestrate some services, see below the docker-compose.yml file

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
  postgis:
    image: postgis/postgis
    environment:
      POSTGRES_DB: postgis
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
  minio:
    image: minio/minio
    volumes:
      - minio_data:/data
    ports:
      - "9000:9000"
    environment:
      MINIO_ROOT_USER: minio
      MINIO_ROOT_PASSWORD: minio123
    command: server /data
volumes:
  minio_data:
Enter fullscreen mode Exit fullscreen mode

What is next ? Let's put in place the python watcher and consumer so our system will be partially done.

File Watcher Implementation
The file watcher uses watchdog to monitor a directory:

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import pika
import json

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:
            self.send_message(event.src_path)

    def send_message(self, file_path):
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='file_queue')
        message = json.dumps({'file_path': file_path})
        channel.basic_publish(exchange='', routing_key='file_queue', body=message)
        connection.close()

observer = Observer()
observer.schedule(Handler(), path='/path/to/watch', recursive=False)
observer.start()
Enter fullscreen mode Exit fullscreen mode

Python Consumer for Processing Files
This script below consumes messages, processes files, and interacts with PostGIS and MinIO:

import pika
import json
import geopandas as gpd
from minio import Minio

def process_file(file_path):
    # Read GeoJSON file
    gdf = gpd.read_file(file_path)
    # TODO Process and save to PostGIS
    # ...... Here its your turn to complete this project ....
    # Upload to MinIO
    minio_client.fput_object("your-bucket", "file_name.geojson", file_path)

minio_client = Minio('minio:9000', access_key='minio', secret_key='minio123', secure=False)

connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.queue_declare(queue='file_queue')

def callback(ch, method, properties, body):
    file_info = json.loads(body)
    process_file(file_info['file_path'])

channel.basic_consume(queue='file_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Enter fullscreen mode Exit fullscreen mode

FastAPI Endpoint for streaming Files
FastAPI provides an endpoint to stream files form MinIO:

from fastapi import FastAPI
from minio import Minio
from starlette.responses import StreamingResponse

app = FastAPI()
minio_client = Minio('minio:9000', access_key='minio', secret_key='minio123', secure=False)

@app.get("/files/{file_name}")
async def get_file(file_name: str):
    obj = minio_client.get_object("your-bucket", file_name)
    return StreamingResponse(obj.stream(32*1024), media_type="application/octet-stream")

Enter fullscreen mode Exit fullscreen mode

What is next ?

You think we are all done, then you are wrong, try to put all together what we have seen, and see if it works.
Something messing right ? Of course, there are some missing pieces, like the database ORM model to map the database tables. That is not all, how about creating the database with necessary tables.

How about you complete this project and let me know about your precious contribution ;)

Conclusion

We have built an event-driven system for processing and serving GeoJSON files. This setup demonstrates the power of combining several technologies and Python scripts to create a scalable and efficient system.
By leveraging Docker, we ensure that our components are easily deployable and maintainable. This architecture is not only limited to GeoJSON files but can be adapted to various scenarios requiring automated file processing and serving.

Top comments (0)