<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Mangesh Walimbe</title>
    <description>The latest articles on DEV Community by Mangesh Walimbe (@mangesh_walimbe).</description>
    <link>https://dev.to/mangesh_walimbe</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3282959%2F573f7e1d-7157-4534-85e9-4574db7c6934.jpg</url>
      <title>DEV Community: Mangesh Walimbe</title>
      <link>https://dev.to/mangesh_walimbe</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/mangesh_walimbe"/>
    <language>en</language>
    <item>
      <title>Event-Driven Architecture(EDA) — An Overview</title>
      <dc:creator>Mangesh Walimbe</dc:creator>
      <pubDate>Sun, 05 Apr 2026 16:15:47 +0000</pubDate>
      <link>https://dev.to/mangesh_walimbe/event-driven-architectureeda-an-overview-2a1p</link>
      <guid>https://dev.to/mangesh_walimbe/event-driven-architectureeda-an-overview-2a1p</guid>
      <description>&lt;h3&gt;
  
  
  What is Event-Driven Architecture (EDA) ?
&lt;/h3&gt;

&lt;p&gt;It is a scalable system designed to handle real-time processing of user activities in a distributed environment. It is a software design pattern that allows the system to decouple the components by using events which help to improve scalability, modularity, and responsiveness of the system.&lt;/p&gt;

&lt;h3&gt;
  
  
  Architectural Style:
&lt;/h3&gt;

&lt;p&gt;An event-driven architecture uses a publish-subscribe model and an event streaming model.&lt;/p&gt;

&lt;p&gt;The system communicates with each other asynchronously to events through an event broker where applications do not need to know where they are publishing and consuming the information. Events represent a change in the state or user actions (e.g., an order being placed, user signed up).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvu1oi1h69hs7sa0g85n5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvu1oi1h69hs7sa0g85n5.png" alt="Event Driven Architecture" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;There are 3 main components:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Producer:&lt;/strong&gt;&lt;br&gt;
It is an application, service or any component where the data is generated, and an event is published. The producer doesn’t know who will subscribe to this event and how the data will be processed. It can handle high volumes of events without any blocker from consumers.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Broker:&lt;/strong&gt;&lt;br&gt;
It is a middleware between producers and consumers. The key role of the event broker is to receive events from producers, store them, and route events to consumers. This communication is asynchronous. The routing is based on topics, queues filters.&lt;/p&gt;

&lt;p&gt;Event brokers support multiple delivery models like publish-subscribe, point-to-point etc. It makes sure that the events are not lost through various techniques like retries, message persistence, dead-letter queues. It can also handle high volumes of events and can scale horizontally. Some of the examples are Apache Kafka, RabbitMQ, AWS EventBridge etc.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Consumer:&lt;/strong&gt;&lt;br&gt;
It is a component, application or service that subscribes to events published by event producers through event brokers. It processes the event asynchronously as soon as it receives. Each consumer handles specific types of events and will not know who publishes that. Multiple instances of consumers can consume events parallelly.&lt;/p&gt;

&lt;h3&gt;
  
  
  Communication Models:
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Publish-Subscribe model:&lt;/strong&gt;&lt;br&gt;
In this model, even producer/publisher publishes events. It will not know who will consume those. Event subscribers/consumers look for specific events and subscribe to those events as soon as those are available. Producers and consumers work independently because of decoupled structure.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Streaming model:&lt;/strong&gt;&lt;br&gt;
In this model, event data is continuously generated and processed in real time. Events act as a stream of records that are processed, stored and analyzed as soon as they are created. Applications which need real-time analysis and monitoring can make use of this model.&lt;/p&gt;

&lt;h3&gt;
  
  
  When to use Event-Driven Architecture (EDA)?
&lt;/h3&gt;

&lt;p&gt;Event-driven architecture is a great choice when your application needs to respond to user actions instantly and handle a high volume of events without slowing down. Each component can send and listen to events instead of having all parts of the application directly talk to each other. This decoupling of services makes it easier to update or add any new features later without affecting the rest of the system. This system helps your application to be reliable and flexible, even if traffic spikes or something goes down temporarily.&lt;/p&gt;

&lt;p&gt;Using events makes your application more scalable, reliable, increases responsiveness, and simpler to grow over time. Overall, if you need a system that reacts in real time, handles high volume of traffic gracefully, and allows you to build and change services without breaking everything else, event-driven architecture is a great choice.&lt;/p&gt;

&lt;h3&gt;
  
  
  Use Case:
&lt;/h3&gt;

&lt;p&gt;Let’s take an example of an online food delivery application like UberEats in which customers, restaurants, and food delivery people(drivers) need to get real time updates at the same time. Event-Driven architecture design pattern works perfectly in this case. It will allow all the different components in this system to work independently but still will stay in sync.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fw9e50gps6igdkufrrfor.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fw9e50gps6igdkufrrfor.png" alt="Flow Diagram for Online Food Delivery system" width="800" height="394"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here are the main components:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Order Service:&lt;/strong&gt;&lt;br&gt;
Customer places an order using the application. The order service publishes an event ‘OrderPlaced’. It will include information about customer, payment, order etc.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Restaurant Service:&lt;/strong&gt;&lt;br&gt;
Restaurant service is responsible for taking orders, send it to restaurant, etc. It listens to ‘OrderPlaced’ event. As soon as the Restaurant Service receives the event, it notifies the restaurant about the order to prepare and updates the restaurant dashboard for tracking in real time. Once order is ready, it publishes ‘OrderReady’ event.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Payment Service:&lt;/strong&gt;&lt;br&gt;
Payment service also looks for ‘OrderPlaced’ event. Once it receives, the service will process the payment and publishes a new event ‘PaymentSuccessful’.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Delivery Service:&lt;/strong&gt;&lt;br&gt;
This service listens to ‘PaymentSuccessful’ event. it tracks down all the available drivers, sends them delivery request and publishes ‘DriverAssigned’ event.&lt;/p&gt;

&lt;p&gt;Once the restaurant service publishes ‘OrderReady’ event, delivery service listens to this event and publishes a new event ‘OrderOutForDelivery’.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Notification Service:&lt;/strong&gt;&lt;br&gt;
This service listens to all the events ‘OrderPlaced’, ‘PaymentSuccessful’, ‘DriverAssigned’, ‘OrderReady’, ‘OrderOutForDelivery’ and sends real-time updates to the customer about the order.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tracking Service:&lt;/strong&gt;&lt;br&gt;
This service is used for real-time tracking. It tracks down all the events to send updates to notification service.&lt;/p&gt;

&lt;h3&gt;
  
  
  Challenges:
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Debugging and Monitoring:&lt;/strong&gt;&lt;br&gt;
In Event-Driven architecture, different components in system communicates to each other through events. Even though it is asynchronous, it is difficult to trace the data flow which makes the testing complex.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consistency:&lt;/strong&gt;&lt;br&gt;
Sometimes, events might comes out of order which can cause consistency issues if these are not handled properly.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Duplication:&lt;/strong&gt;&lt;br&gt;
Because of network issues or retries, it is possible to receive same event multiple times. If the same event received more than once, the application must be designed to handle duplicate events gracefully.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Operation Overhead:&lt;/strong&gt;&lt;br&gt;
There is always operational overhead as you need to set up and maintain tools like brokers (e.g., Apache Kafka, RabbitMQ, AWS EventBridge) to keep track of how the data moves. If the system is not scaled properly, it may cause performance degradation and can increase operational cost.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Error Handling:&lt;/strong&gt;&lt;br&gt;
Since Event-driven architecture mainly uses asynchronous communication, a challenge with it is error handling. Thus, if something goes wrong while processing an event, you need a mechanism to retry or fix it without affecting other services.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Security and Access:&lt;/strong&gt;&lt;br&gt;
These events can have sensitive data like payment, PII etc. and can be consumed by services which are not authorized use those.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Loss:&lt;/strong&gt;&lt;br&gt;
Because of broken failure, network issues, or incorrect configuration, it is possible that the events can be lost.&lt;/p&gt;

</description>
      <category>eventdrivenarchitecture</category>
      <category>dataengineering</category>
      <category>designpatterns</category>
      <category>realtimedatastreaming</category>
    </item>
    <item>
      <title>Yuniql: Plain SQL Database Migrations</title>
      <dc:creator>Mangesh Walimbe</dc:creator>
      <pubDate>Fri, 05 Dec 2025 22:12:18 +0000</pubDate>
      <link>https://dev.to/mangesh_walimbe/yuniql-plain-sql-database-migrations-50ad</link>
      <guid>https://dev.to/mangesh_walimbe/yuniql-plain-sql-database-migrations-50ad</guid>
      <description>&lt;p&gt;&lt;strong&gt;Yuniql&lt;/strong&gt; is a lightweight, open-source migration engine through which we can maintain version and deploy database changes using plain SQL with CI/CD pipelines. It uses migration-based and database-first delivery models.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why Yuniql?
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;1. Plain SQL and Folder based migration:&lt;/strong&gt; Yuniql uses simple, intuitive approach for db versioning. Each migration has a dedicated directory (v0.00, v0.01 etc). This directory contains plain .sql files and optional csv files for the data that need to bulk-seed master tables.&lt;br&gt;
&lt;strong&gt;2. No runtime dependencies:&lt;/strong&gt; The CLI is distributed as fully self-contained executable. It doesn’t need to install .NET CLR or any additional framework on target machines.&lt;br&gt;
&lt;strong&gt;3. Cross Platform and Cloud Ready:&lt;/strong&gt; It is fully tested across major cloud providers like Azure SQL, Amazon RDS, Google Cloud SQL etc. It also supports db engines like SQL Server, PostgreSQL, MySQL, MariaDB and preview drivers available for Snowflake, Redshift, and Oracle. This ensures flexibility for hybrid and multi cloud environments.&lt;br&gt;
&lt;strong&gt;4. DevOps Friendly:&lt;/strong&gt; Yuniql integrates with CI/CD pipelines seamlessly. It offers first-class support for Azure pipeline tasks and provides official docker images. It makes automated deployments and containerized environments straight forward and reliable.&lt;/p&gt;
&lt;h3&gt;
  
  
  Yuniql Installation:
&lt;/h3&gt;

&lt;p&gt;Yuniql can be installed in various ways.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Download yuniql.exe directly from &lt;a href="https://github.com/rdagumampan/yuniql/releases/download/latest/yuniql-cli-win-x64-latest.zip" rel="noopener noreferrer"&gt;Github&lt;/a&gt;. (Windows)&lt;/li&gt;
&lt;li&gt;Choco Package on windows (x64): Get the latest Yuniql CLI from &lt;a href="https://chocolatey.org/" rel="noopener noreferrer"&gt;Chocolatey&lt;/a&gt; Package Manager. Run below commands with admin access.
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;choco install yuniql -y
yuniql version
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt;.NET Global Tool Install latest Yuniql CLI with .Net global tool. It needs .Net core 3.0 SDK installed.
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;dotnet tool install -g yuniql.cli
yuniql version
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt;Azure Devops Extension: The free extension is available on &lt;a href="https://marketplace.visualstudio.com/items?itemName=rdagumampan.yuniql-azdevops-extensions" rel="noopener noreferrer"&gt;Azure Marketplace&lt;/a&gt;. Install it into organization.&lt;/li&gt;
&lt;li&gt;Docker Image:
Get the official docker image for yuniql and run the migration from containerized environment.
For more information, visit: &lt;a href="https://yuniql.io/docs/migrate-via-docker-container/" rel="noopener noreferrer"&gt;https://yuniql.io/docs/migrate-via-docker-container/&lt;/a&gt;
Docker hub: &lt;a href="https://hub.docker.com/repository/docker/yuniql/yuniql" rel="noopener noreferrer"&gt;https://hub.docker.com/repository/docker/yuniql/yuniql&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For other methods, check this: &lt;a href="https://yuniql.io/docs/install-yuniql/" rel="noopener noreferrer"&gt;https://yuniql.io/docs/install-yuniql/&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Directory Structure and Execution Flow in Yuniql:
&lt;/h3&gt;

&lt;p&gt;When you run below command after installation, it frames directory structure to stage the migration and utility scripts.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;yuniql init
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here is the default directory structure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Db-name/
├─ _init/      -- It executes only once during first migration. It sets up objects and configuration.
├─ _pre/       -- It runs before every versioned migration for pre-check or environment setup.
├─ v0.00/      -- baseline version folder (initial schema and seed data)
├─ v1.00/      -- next version folder 
├─ _draft/     -- run every time after latest version. Typically for work in-progress scripts or temporary fixes.
├─ _post/      -- Runs at the end of migration. For cleanup or post deployment scripts.
└─ _erase/     -- Runs manual cleanup scripts for yuniql erase command for controlled db cleanup.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Azure Devops YAML Pipeline using Yuniql.
&lt;/h3&gt;

&lt;p&gt;Below are the steps to setup an Azure DevOps YAML pipeline.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. Yuniql Workspace:&lt;/strong&gt; Run the command yuniql init in the directory where the db migration will run from. Once it is run, it will create yuniql workspace (above folder structure)&lt;br&gt;
&lt;strong&gt;2. Azure DevOps setup:&lt;/strong&gt; As mentioned above, acquire the free Yuniql extension for Azure Devops from Azure Marketplace and install into the organization. &lt;/p&gt;
&lt;h4&gt;
  
  
  Pipeline:
&lt;/h4&gt;

&lt;p&gt;Here is the sample pipeline for db migration for PostgreSql&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- task: UseYuniqlCLI@1
    inputs:
      version: "latest"

- task: RunYuniqlCLI@1
    inputs:
      version: "latest"
      connectionString: "$(PostgresDBConnectionString)"
      workspacePath: "$(Build.SourcesDirectory)"
      targetPlatform: "postgresql"
      additionalArguments: "--meta-schema public --meta-table -yuniql_version --debug"

- task: VerifyYUNIQLCLI@1
    displayName: ‘Run verify task (no commit)'
    inputs:
      version: 'latest'
      connectionString: "$(PostgresDBConnectionString)"
      workspacePath: "$(Build.SourcesDirectory)"
      targetPlatform: “postgresql”
      additionalArguments: '--debug'

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here are the properties used in this pipeline:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;Version&lt;/code&gt;: It shows yuniql version. It not mentioned, it will install latest version of yuniql cli.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;connectionString&lt;/code&gt;: database connection string&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;workspacePath&lt;/code&gt;: The yuniql workspace location from where the db migration will take place.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;targetPlatform&lt;/code&gt;: The target db platform. Here it is PostgreSQL.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;additionalArguments&lt;/code&gt;: any additional CLI arguments. Here 2 arguments are used.
&lt;code&gt;-meta-schema:&lt;/code&gt; postgres schema where the db migration version is logged.
&lt;code&gt;-meta-table:&lt;/code&gt; The db version will be stored in this table.
&lt;code&gt;-debug:&lt;/code&gt; it enables detailed trace messages.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here are the additional properties that can be used:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;autoCreateDatabase&lt;/code&gt;: Yuniql will create and configure database of this property is set to true.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;tokenKeyValuePair&lt;/code&gt;: It allows custom tokens and values to migration scripts at runtime. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here one more task is added for verification. It performs uncommitted migration just to see if the migration scripts succeed before applying changes.&lt;br&gt;
&lt;strong&gt;Note:&lt;/strong&gt; Instead of exposing db connection publicly, use Azure Keyvault to fetch values for db connection and other required details.&lt;/p&gt;

&lt;h4&gt;
  
  
  Practical considerations:
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Keep the v0.00 as clean baseline. Write forward only scripts.&lt;/li&gt;
&lt;li&gt;Use token or &lt;code&gt;tokenkeyvaluepair&lt;/code&gt; for environment specific names&lt;/li&gt;
&lt;li&gt;Bulk upload: use csv to see master data. Separator and batch size can be configured. &lt;/li&gt;
&lt;li&gt;Tracing: Use meta-schema/table to log and trace migration.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Rollback in Yuniql:
&lt;/h3&gt;

&lt;p&gt;Yuniql is forward only migration tool. So, the rollback is handled differently here.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;There is no automatic rollback in Yuniql. The version is logged in db in table. So, it must be deleted manually&lt;/li&gt;
&lt;li&gt;Yuniql provides &lt;code&gt;_erase&lt;/code&gt; folder. Place the cleanup script in this folder and run below command.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;yuniql erase --platform postgres \
-c "&amp;lt;connection-string&amp;gt;" \
--debug
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is used for non-production environments.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;For production environments, create reverse script for each version and run manually. Common approach is to maintain separate folder like rollback/v1.00 with undo scripts and execute manually.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Limitations:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;It supports Plain-SQL only:&lt;/strong&gt;  Yuniql is designed for Plain-SQL intentionally for organized versioned folders. It needs more manual efforts for ORM/native code migration. &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Platform differences:&lt;/strong&gt; Yuniql supports multiple engines. It means to account for different syntax and batch processes. It is not portable across engines.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Manual rollback:&lt;/strong&gt; As mentioned earlier, yuniql doesn’t support automatic rollback. Rollback strategies must be engineered during design.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;No operational features:&lt;/strong&gt; Operational features like checks, pre-deployment validations, tests are outside of Yuniql. Additional scripting is needed. &lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  My take on Yuniql:
&lt;/h3&gt;

&lt;p&gt;Even though it has several drawbacks, Yuniql is a great fit for Plain-SQL migration which has zero dependencies as it has self-contained CLI. The integration with Azure pipeline and Docker is straightforward with tasks and images.&lt;/p&gt;

</description>
      <category>devops</category>
      <category>postgressql</category>
      <category>automation</category>
      <category>azurepipeline</category>
    </item>
    <item>
      <title>Data Ingestion using Logstash: PostgreSql to Elastic</title>
      <dc:creator>Mangesh Walimbe</dc:creator>
      <pubDate>Thu, 26 Jun 2025 17:36:02 +0000</pubDate>
      <link>https://dev.to/mangesh_walimbe/data-ingestion-using-logstash-postgresql-to-elastic-9im</link>
      <guid>https://dev.to/mangesh_walimbe/data-ingestion-using-logstash-postgresql-to-elastic-9im</guid>
      <description>&lt;h3&gt;
  
  
  What is Logstash?
&lt;/h3&gt;

&lt;p&gt;Logstash is an open-source data processing pipeline from Elastic. It is being used to ingest, transform and ship data to different sources including Elasticsearch, Kafka, flat files etc.&lt;/p&gt;

&lt;p&gt;Logstash pipeline includes 3 different processes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Input: It is the data source from which the data is collected for ingestion&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Filter: It transforms (cleanup, aggregate etc.) the data using plugins like Grok, Mutate, Date etc.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Output: Destination for ingestion.(Elasticsearch, flat files, db etc)&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Below are the prerequisites to send data using Logstash to elastic:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Logstash installed on the system with JDBC driver for Postgres&lt;/li&gt;
&lt;li&gt;Postgres database with a table or function to sync.&lt;/li&gt;
&lt;li&gt;Elasticsearch instance is running.
&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Logstash Setup (for Windows):
&lt;/h3&gt;

&lt;p&gt;Below are the steps in brief to install and run Logstash locally. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. Install java:&lt;/strong&gt; &lt;br&gt;
Download JDK package (java 8 or later) from &lt;a href="https://www.oracle.com/java/technologies/downloads/" rel="noopener noreferrer"&gt;Official Oracle Website&lt;/a&gt; . Once the download is complete, extract the files to the preferred location. &lt;br&gt;
Once the files are extracted, the environment variable needs to be added for the system to recognize java commands.&lt;br&gt;
Go to environment variables, add a new variable with name JAVA_HOME and point it to the directory where the java files are located. Append &lt;code&gt;%JAVA_HOME%\bin&lt;/code&gt; to the path.&lt;br&gt;
To verify the successful installation, go to command prompt and run below command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;java -version
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If everything is set up correctly, it will show the java version.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Install Logstash:&lt;/strong&gt;&lt;br&gt;
To install Logstash, download the package from &lt;a href="https://www.elastic.co/downloads/logstash" rel="noopener noreferrer"&gt;Official Elastic Website&lt;/a&gt; and extract to preferred location.&lt;br&gt;
To test this locally, open command prompt, go to the bin folder from Logstash folder and run below command to test.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;logstash -e "input { stdin {} } output { stdout {} }" 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Logstash Ingestion Pipeline:
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;1. Install required JDBC driver:&lt;/strong&gt;&lt;br&gt;
Download the Postgres driver from &lt;a href="https://jdbc.postgresql.org/" rel="noopener noreferrer"&gt;Official PostgreSql Website&lt;/a&gt;&lt;br&gt;
Place the jar file to some accessible location. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Create logstash pipeline:&lt;/strong&gt;&lt;br&gt;
Here is the sample pipeline.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;input {
    jdbc {
        jdbc_driver_library =&amp;gt; "c:/logstash/jdbc/postgresql.jar"
        jdbc_driver_class =&amp;gt; "org.postgresql.Driver"
        jdbc_connection_string =&amp;gt; "${JDBC_HOST}"
        jdbc_user =&amp;gt; "${DB_USER}"
        jdbc_password =&amp;gt; "${DB_PWD}"
        jdbc_paging_enabled =&amp;gt; true
        jdbc_page_size =&amp;gt; 1000
        schedule =&amp;gt; "* * * * *"  # schedule to run every minute
        statement =&amp;gt; "SELECT * FROM employee WHERE updated_at &amp;gt; :sql_last_value"
        use_column_value =&amp;gt; true
        tracking_column =&amp;gt; "updated_at"
        tracking_column_type =&amp;gt; "timestamp"
        last_run_metadata_path =&amp;gt; "c:/logstash/employee.tracker"
    }
}

filter {
}
    mutate {
        remove_field =&amp;gt; ["date", "@timestamp", "host"]
    }

    # Example of parsing JSON fields if needed
    json {
         source =&amp;gt; "first_name"
         target =&amp;gt; "name"
    }
}

output {
    stdout { codec =&amp;gt; json_lines }
    elasticsearch {
        hosts =&amp;gt; ["http://localhost:9200"]
        index =&amp;gt; "my_table_index"
        custom_headers =&amp;gt; {
                "Authorization" =&amp;gt; "${AUTH_KEY}"
            }
        document_id =&amp;gt; "%{table_id}" # Unique identifier from the table
        timeout =&amp;gt; 120
    }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above pipeline is used for incremental ingestion. It means that it tracks the last run and takes the records from the last run to ingest the data on the schedule.&lt;/p&gt;

&lt;p&gt;Here are the key concepts used:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Input:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;jdbc_driver_library&lt;/code&gt;: location where the jdbc driver file(.jar) is stored.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;jdbc_driver_class&lt;/code&gt;: the driver class being used.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;jdbc_connection_string&lt;/code&gt;: postgres db connection string&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;jdbc_user&lt;/code&gt;: Database username&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;jdbc_password&lt;/code&gt;: database password for the user&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;paging&lt;/code&gt;: In this pipeline, the data will be sent in multiple pages with page size of 1000. It will improve the performance of the pipeline and will help to track the number of records sent to elastic search.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;schedule&lt;/code&gt;: The above pipeline is scheduled to run every minute. &lt;br&gt;
Here is the format for schedule.&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;* * * * *
│ │ │ │ │
│ │ │ │ └─── Day of the week (0 - 7) (Sunday is both 0 and 7)
│ │ │ └───── Month (1 - 12)
│ │ └─────── Day of the month (1 - 31)
│ └───────── Hour (0 - 23)
└─────────── Minute (0 - 59)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;statement&lt;/code&gt;: It is a SQL statement which the pipeline will execute. To execute complex statements, it can be saved in a separate .sql file and mention the file path to &lt;code&gt;statement_filepath&lt;/code&gt;instead of statement.
It is better to use view or materialized view instead of a query with complex joins.&lt;/li&gt;
&lt;li&gt;The last section is for incremental ingestion.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use_column_value =&amp;gt; true
tracking_column =&amp;gt; "updated_dt"
tracking_column_type =&amp;gt; "timestamp"
last_run_metadata_path =&amp;gt; "c:/project/logstash/date.tracker"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;use_column_value&lt;/code&gt; is set to true. It lets the Logstash know to track the actual value of the column &lt;code&gt;updated_at&lt;/code&gt; used in &lt;code&gt;tracking_column&lt;/code&gt; instead of using the time when the query was run last time. In this case, &lt;code&gt;:sql_last_value&lt;/code&gt; will use &lt;code&gt;updated_dt&lt;/code&gt; value.&lt;/p&gt;

&lt;p&gt;If it is set to false, Logstash will use the last query execution time for &lt;code&gt;:sql_last_value&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The last run time will be saved in the file mentioned in &lt;code&gt;last_run_metadata_path&lt;/code&gt;. It will be used to track the last time the pipeline was run. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Filter&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This is an optional section to manipulate the data before sending it to destination.&lt;/p&gt;

&lt;p&gt;In the above pipeline, the date field is being removed from ingestion. Also, it is sending the first_name from the data to name field in destination.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Output&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This section defines the destination for the data. In this case, it is Elasticsearch endpoint, authorization key if any, elastic index, &lt;code&gt;document_id&lt;/code&gt;. &lt;code&gt;document_id&lt;/code&gt; is a unique identifier of the elastic document in index. If this field is not mentioned, elastic search will automatically assign a unique identifier to the document. &lt;/p&gt;

&lt;p&gt;In case of incremental ingestion, it is recommended to define this field. During ingestion, elastic search would look for this field in index, if it matched, it would update the same document. &lt;/p&gt;

&lt;p&gt;If the field is not defined, it creates a new document in the index resulting duplicate records.&lt;/p&gt;

&lt;h3&gt;
  
  
  Run the pipeline
&lt;/h3&gt;

&lt;p&gt;To run this pipeline, open command prompt, go to the Logstash folder and run below command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/logstash -f c:/logstash/sample_pipeline.conf
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here is the output of the pipeline.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbxubzpsd2ia0ftnkvabo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbxubzpsd2ia0ftnkvabo.png" alt="Logstash Output window" width="800" height="193"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Output from elastic search index.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "testing",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "name": "James",
                    "id": 1,
                    "last_name": "Smith",
                    "updated_dt": "2024-12-12T16:10:57.349Z",
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.167442600Z"
                }
            },
            {
                "_index": "testing",
                "_id": "2",
                "_score": 1.0,
                "_source": {
                    "name": "John",
                    "id": 2,
                    "last_name": "Doe",
                    "updated_dt": "2024-12-12T16:10:57.349Z",
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.169021400Z"
                }
            },
            {
                "_index": "testing",
                "_id": "3",
                "_score": 1.0,
                "_source": {
                    "name": "Kate",
                    "id": 3,
                    "last_name": "Williams",
                    "updated_dt": "2024-12-12T16:10:57.349Z",                    
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.170098800Z"
                }
            }
        ]
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There are a few advantages of this method.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Logstash is an open-source tool and easy to implement.&lt;/li&gt;
&lt;li&gt;There are over 200+ plugins available for data transformation. Using these plugins, data can be parsed and transformed using filters.&lt;/li&gt;
&lt;li&gt;It is a decoupled architecture between data source and the elastic search&lt;/li&gt;
&lt;li&gt;It has seamless integration with elastic search.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Although this is an open-source and simple method to implement, there are some disadvantages.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Latency issues: It is not ideal where very low latency or real time data is required. As the pipeline grows, it takes time to load, transform/filter, and send the data.&lt;/li&gt;
&lt;li&gt;Error handling: Unless it is explicitly monitored, it is difficult to track down the errors and can result in data drop.&lt;/li&gt;
&lt;li&gt;It can create duplicates if not the pipeline not defined properly.&lt;/li&gt;
&lt;li&gt;It takes longer time to start compared to other tools.&lt;/li&gt;
&lt;li&gt;It uses YAML style config files which make it complex and can be difficult to maintain.&lt;/li&gt;
&lt;li&gt;Resource utilization: It can utilize more resources with heavy loads and complex pipelines.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Above pipeline can be used if someone is looking for a more robust and centralized data streaming pipeline. It is not ideal for real time data shipping.&lt;/p&gt;

</description>
      <category>logstash</category>
      <category>elasticsearch</category>
      <category>dataengineering</category>
    </item>
  </channel>
</rss>
