<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: 罗梦婷</title>
    <description>The latest articles on DEV Community by 罗梦婷 (@metaluo).</description>
    <link>https://dev.to/metaluo</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2228032%2F01cc77ab-2ca4-4b18-93bc-526690221fdb.jpg</url>
      <title>DEV Community: 罗梦婷</title>
      <link>https://dev.to/metaluo</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/metaluo"/>
    <language>en</language>
    <item>
      <title>How to Sync Data from Elasticsearch to Elasticsearch</title>
      <dc:creator>罗梦婷</dc:creator>
      <pubDate>Sat, 09 May 2026 05:50:32 +0000</pubDate>
      <link>https://dev.to/metaluo/how-to-sync-data-from-elasticsearch-to-elasticsearch-16pp</link>
      <guid>https://dev.to/metaluo/how-to-sync-data-from-elasticsearch-to-elasticsearch-16pp</guid>
      <description>&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;Elasticsearch is a popular search engine that forms part of the modern data stack alongside relational databases, caching, real-time data warehouses, and message-oriented middleware.&lt;/p&gt;

&lt;p&gt;While writing data to Elasticsearch is relatively straightforward, real-time data synchronization can be more challenging.&lt;/p&gt;

&lt;p&gt;This article describes how to migrate and sync data from Elasticsearch to Elasticsearch using &lt;a href="https://www.bladepipe.com" rel="noopener noreferrer"&gt;BladePipe&lt;/a&gt; and the &lt;strong&gt;Elasticsearch incremental data capture plugin&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Highlights
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Elasticsearch Plugin
&lt;/h3&gt;

&lt;p&gt;Elasticsearch does not explicitly provide a method for real-time change data capture. However, its plugin API &lt;strong&gt;IndexingOperationListener&lt;/strong&gt; can track &lt;strong&gt;INDEX&lt;/strong&gt; and &lt;strong&gt;DELETE&lt;/strong&gt; events. The &lt;strong&gt;INDEX&lt;/strong&gt; event includes INSERT or UPDATE operations, while the &lt;strong&gt;DELETE&lt;/strong&gt; event refers to traditional DELETE operations.&lt;/p&gt;

&lt;p&gt;Once the mechanism for capturing incremental data is established, the next challenge is how to make this data available in downstream tools.&lt;/p&gt;

&lt;p&gt;We use a dedicated index, &lt;code&gt;cc_es_trigger_idx&lt;/code&gt;, as a container for incremental data.&lt;/p&gt;

&lt;p&gt;This approach has several benefits:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;No dependency on third-party components (e.g., message-oriented middleware).&lt;/li&gt;
&lt;li&gt;Easy management of Elasticsearch indices.&lt;/li&gt;
&lt;li&gt;Consistency with the incremental data capture method of other BladePipe data sources, allowing for code reuse.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F28c7dm6z1oxs0fi02odj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F28c7dm6z1oxs0fi02odj.png" width="800" height="451"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The structure of the &lt;code&gt;cc_es_trigger_idx&lt;/code&gt; index is as follows, where &lt;code&gt;row_data&lt;/code&gt; holds the data after the INDEX operations, and &lt;code&gt;pk&lt;/code&gt; stores the document &lt;strong&gt;_id&lt;/strong&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"mappings"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"_doc"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"properties"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"create_time"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"date"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"format"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"yyyy-MM-dd'T'HH:mm:ssSSS"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"event_type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"text"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"analyzer"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"standard"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"idx_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"text"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"analyzer"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"standard"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"pk"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"text"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"analyzer"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"standard"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"row_data"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"text"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"index"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"scn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"long"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Trigger Data Scanning
&lt;/h3&gt;

&lt;p&gt;As for the incremental data generated by using the Elasticsearch plugin, simply perform batch scanning in the order of the &lt;code&gt;scn&lt;/code&gt; field in the &lt;code&gt;cc_es_trigger_idx&lt;/code&gt; index to consume the data.&lt;/p&gt;

&lt;p&gt;The coding style for data consumption is consistent with that used for the SAP Hana as a Source.&lt;/p&gt;

&lt;h3&gt;
  
  
  Open-source Plugin
&lt;/h3&gt;

&lt;p&gt;Elasticsearch strictly identifies third-party packages that plugins depend on. If there are conflicts or version mismatches with Elasticsearch's own dependencies, the plugin cannot be loaded. Therefore, the plugin must be compatible with the exact version of Elasticsearch, including the minor version.&lt;/p&gt;

&lt;p&gt;Given the impracticality of releasing numerous pre-compiled packages and to encourage widespread use, we place the open-source plugin on &lt;a href="https://github.com/ClouGence/cloudcanal-es-trigger" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Procedure
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Step 1: Install the Plugin on Source Elasticsearch
&lt;/h3&gt;

&lt;p&gt;Follow the instructions in &lt;strong&gt;&lt;a href="https://www.bladepipe.com/docs/dataMigrationAndSync/datasource_func/ElasticSearch/prepare_for_es_as_src/" rel="noopener noreferrer"&gt;Preparation for Elasticsearch CDC&lt;/a&gt;&lt;/strong&gt; to install the incremental data capture plugin.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 2: Install BladePipe
&lt;/h3&gt;

&lt;p&gt;Follow the instructions in &lt;a href="https://www.bladepipe.com/docs/productOP/byoc/installation/install_worker_docker/" rel="noopener noreferrer"&gt;Install Worker (Docker)&lt;/a&gt; or &lt;a href="https://www.bladepipe.com/docs/productOP/byoc/installation/install_worker_binary/" rel="noopener noreferrer"&gt;Install Worker (Binary)&lt;/a&gt; to download and install a BladePipe Worker.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 3: Add DataSources
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;Log in to the &lt;a href="https://cloud.bladepipe.com" rel="noopener noreferrer"&gt;BladePipe Cloud&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Click &lt;strong&gt;DataSource&lt;/strong&gt; &amp;gt; &lt;strong&gt;Add DataSource&lt;/strong&gt;, and add 2 DataSources.&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Step 4: Create a DataJob
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;Click &lt;strong&gt;DataJob&lt;/strong&gt; &amp;gt; &lt;a href="https://www.bladepipe.com/docs/operation/job_manage/create_job/create_full_incre_task/" rel="noopener noreferrer"&gt;&lt;strong&gt;Create DataJob&lt;/strong&gt;&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Select the source and target DataSources, and click &lt;strong&gt;Test Connection&lt;/strong&gt; to ensure the connection to the source and target DataSources are both successful.&lt;/li&gt;
&lt;li&gt;Select &lt;strong&gt;Incremental&lt;/strong&gt; for DataJob Type, together with the &lt;strong&gt;Full Data&lt;/strong&gt; option.&lt;/li&gt;
&lt;/ol&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;&lt;br&gt;
In the &lt;strong&gt;Specification&lt;/strong&gt; settings, make sure that you select a specification of at least &lt;strong&gt;1 GB&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Allocating too little memory may result in Out of Memory (OOM) errors during DataJob execution.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;ol&gt;
&lt;li&gt;Select the indices to be replicated.&lt;/li&gt;
&lt;li&gt;Select the fields to be replicated.&lt;/li&gt;
&lt;/ol&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;&lt;br&gt;
If you need to select specific fields for synchronization, you can first create the index on the target Elasticsearch instance. This allows you to define the schemas and fields that you want to synchronize.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;ol&gt;
&lt;li&gt;Confirm the DataJob creation.&lt;/li&gt;
&lt;/ol&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;&lt;br&gt;
The DataJob creation process involves several steps. Click &lt;strong&gt;Sync Settings&lt;/strong&gt; &amp;gt; &lt;a href="https://www.bladepipe.com/docs/operation/job_setting/console_job_manage/" rel="noopener noreferrer"&gt;&lt;strong&gt;ConsoleJob&lt;/strong&gt;&lt;/a&gt;, find the DataJob creation record, and click &lt;strong&gt;Details&lt;/strong&gt; to view it.&lt;/p&gt;

&lt;p&gt;The DataJob creation with a source Elasticsearch instance includes the following steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Schema Migration&lt;/li&gt;
&lt;li&gt;Initialization of Elasticsearch Triggers and Offsets&lt;/li&gt;
&lt;li&gt;Allocation of DataJobs to BladePipe Workers&lt;/li&gt;
&lt;li&gt;Creation of DataJob FSM (Finite State Machine)&lt;/li&gt;
&lt;li&gt;Completion of DataJob Creation&lt;/li&gt;
&lt;/ul&gt;
&lt;/blockquote&gt;

&lt;ol&gt;
&lt;li&gt;Wait for the DataJob to automatically run.&lt;/li&gt;
&lt;/ol&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;&lt;br&gt;
Once the DataJob is created and started, BladePipe will automatically run the following DataTasks:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Schema Migration&lt;/strong&gt;: The index mapping definition in the source Elasticsearch instance will be migrated to the Target. If an index with the same name already exists in the Target, it will be ignored.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Full Data Migration&lt;/strong&gt;: All existing data in the Source will be fully migrated to the Target.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Incremental Synchronization&lt;/strong&gt;: Ongoing data changes will be continuously synchronized to the target instance.&lt;/li&gt;
&lt;/ul&gt;
&lt;/blockquote&gt;

</description>
      <category>elasticsearch</category>
      <category>database</category>
      <category>tutorial</category>
      <category>devops</category>
    </item>
  </channel>
</rss>
