<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Aiven</title>
    <description>The latest articles on DEV Community by Aiven (@aiven_io).</description>
    <link>https://dev.to/aiven_io</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Forganization%2Fprofile_image%2F7331%2F5b5d2f91-7f33-42fc-b492-80010d6e4fcc.jpg</url>
      <title>DEV Community: Aiven</title>
      <link>https://dev.to/aiven_io</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/aiven_io"/>
    <language>en</language>
    <item>
      <title>Introducing Aiven's AI Database Optimizer: The First Built-In SQL Optimizer for Enhanced Performance</title>
      <dc:creator>Francesco Tisiot</dc:creator>
      <pubDate>Tue, 28 May 2024 12:00:00 +0000</pubDate>
      <link>https://dev.to/aiven_io/introducing-aivens-ai-database-optimizer-the-first-built-in-sql-optimizer-for-enhanced-performance-402b</link>
      <guid>https://dev.to/aiven_io/introducing-aivens-ai-database-optimizer-the-first-built-in-sql-optimizer-for-enhanced-performance-402b</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgchfpvltsf4zlcic373r.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgchfpvltsf4zlcic373r.png" alt="Hero Image" width="800" height="418"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;An efficient data infrastructure is a vital component in building &amp;amp; operating scalable and performant applications that are widely adopted, satisfy customers, and ultimately, drive business growth. Unfortunately, the speed of new feature delivery coupled with a lack of database optimization knowledge is exposing organizations to high risk performance issues. The new &lt;a href="https://aiven.io/solutions/aiven-ai-database-optimizer?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=DB-Optimizer" rel="noopener noreferrer"&gt;Aiven AI Database Optimizer&lt;/a&gt; helps organizations address performance both in the development and production phase, making it simple to quickly deploy, fully optimized, scalable, and cost efficient applications.&lt;/p&gt;

&lt;p&gt;Fully integrated with &lt;a href="https://aiven.io/postgresql?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=DB-Optimizer" rel="noopener noreferrer"&gt;Aiven for PostgreSQL&lt;/a&gt;®, Aiven AI Database Optimizer offers AI-driven performance insights, index, and SQL rewrite suggestions to maximize database performance, minimize costs, and make the best out of your cloud investment. &lt;/p&gt;

&lt;h2&gt;
  
  
  How does AI Database Optimizer work?
&lt;/h2&gt;

&lt;p&gt;Aiven AI Database Optimizer is a non-intrusive solution powered by &lt;a href="https://www.eversql.com/?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=DB-Optimizer" rel="noopener noreferrer"&gt;EverSQL by Aiven&lt;/a&gt; that gathers information about database workloads, metadata and supporting data structures, such as indexes. Information about the number of query executions and average query times are continually processed by a mix of heuristic and AI models to determine which SQL statements can be further optimized. AI Database Optimizer then delivers accurate, secure optimization suggestions that you can trust, and that can be adopted to speed up query performance.&lt;/p&gt;

&lt;p&gt;Recommendations from Aiven’s AI Database Optimizer are already trusted by over 120,000 users in organizations ranging from startups to the largest global enterprises, who have optimized more than 2 million queries to date.&lt;/p&gt;

&lt;h2&gt;
  
  
  How does AI Database Optimizer help organizations?
&lt;/h2&gt;

&lt;p&gt;During development, AI Database Optimizer enables early performance testing, allowing easier redesign or refactoring of queries before they impact production. This enables customers to foster a culture of considering performance from the get-go, ensuring it is a priority throughout development rather than an afterthought.&lt;/p&gt;

&lt;p&gt;AI Database Optimizer also helps businesses gain an optimal user experience: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;With fast query response times that ensure a smooth and responsive user experience, especially in data-intensive applications.&lt;/li&gt;
&lt;li&gt;By identifying and fixing performance bottlenecks organizations can reduce costs, avoid outages and deliver continuous service availability.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A fleet of Aiven for PostgreSQL® databases is powering &lt;a href="https://www.youtube.com/watch?v=mPWxizlA3so" rel="noopener noreferrer"&gt;La Redoute&lt;/a&gt;’s marketplace functionality, driving 30% of their business. &lt;br&gt;
Diogo Passadouro - OPS-DBA Team Lead stated &lt;em&gt;"Aiven AI Database Optimizer has revolutionized the way we analyze database performance, providing a simple, clear and highly effective approach and has proven instrumental in enhancing the performance of our databases."&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqwsxlkj5bc36zcc1blpy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqwsxlkj5bc36zcc1blpy.png" alt="Quote from Diogo Passadouro" width="800" height="418"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aiven.io/case-studies/conrad-electronic-expands-e-commerce-platform-with-aiven" rel="noopener noreferrer"&gt;Conrad&lt;/a&gt; is an advanced B2B sourcing platform selling 9 million products from 6,000 brands, powered by Aiven. Janek Wonner - Head of SRE &amp;amp; Cloud Technology stated &lt;em&gt;"Aiven for PostgreSQL is underpinning our fundamental company functionalities, we are looking forward to adopt Aiven AI database optimizer to empower our developers to create scalable code and empower our development teams with better performance insights and improvement suggestions to reduce the time to fix performance issues"&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwkc8t0qbybx2firird70.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwkc8t0qbybx2firird70.png" alt="Quote from Janek Wonner" width="800" height="418"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;More information is available in the &lt;a href="https://aiven.io/solutions/aiven-ai-database-optimizer?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=DB-Optimizer" rel="noopener noreferrer"&gt;Aiven AI Database Optimizer page&lt;/a&gt;. You can experience it for yourself in any &lt;a href="https://aiven.io/postgresql" rel="noopener noreferrer"&gt;Aiven for PostgreSQL&lt;/a&gt; service for free during the Early Availability phase. Simply navigate to the “AI Insights” tab. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://console.aiven.io/signup?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=DB-Optimizer" rel="noopener noreferrer"&gt;Try it now&lt;/a&gt; or &lt;a href="https://aiven.io/book-demo?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=DB-Optimizer" rel="noopener noreferrer"&gt;Contact us&lt;/a&gt; today to check it out!&lt;/p&gt;

</description>
      <category>announcements</category>
      <category>ai</category>
      <category>postgres</category>
    </item>
    <item>
      <title>Managing data drift with Apache Kafka® Connect and a schema registry</title>
      <dc:creator>Francesco Tisiot</dc:creator>
      <pubDate>Mon, 29 Jan 2024 14:00:00 +0000</pubDate>
      <link>https://dev.to/aiven_io/managing-data-drift-with-apache-kafkar-connect-and-a-schema-registry-kop</link>
      <guid>https://dev.to/aiven_io/managing-data-drift-with-apache-kafkar-connect-and-a-schema-registry-kop</guid>
      <description>&lt;p&gt;Data flows across many technologies, teams, and people in today's businesses. Businesses are always growing and changing, so the way we collect and share data changes all the time too. We need to know not only who owns certain data but also what to do if that data changes. This problem is often referred to as "data drift."&lt;/p&gt;

&lt;p&gt;Consider the scenario where a piece of data is modified at its source — what implications does this have for other systems reliant on it? How do we communicate necessary changes to stakeholders? Conversely, how do we prevent changes that could disrupt the system?&lt;/p&gt;

&lt;p&gt;Having a robust plan for managing data drift is imperative. Businesses require data systems that function seamlessly and remain consistent, even amidst changes at the data source. Additionally, mechanisms are needed to assess and decide on changes, ensuring smooth operations for everyone involved with the data.&lt;/p&gt;

&lt;p&gt;This tutorial will show you how tools like Apache Kafka®, Apache Kafka Connect, and the built in schema registry functionality provided by &lt;a href="https://www.karapace.io/" rel="noopener noreferrer"&gt;Karapace&lt;/a&gt;, can help businesses keep an eye on data drift. It will also explain how to either deny or allow changes based on what a business needs.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why Apache Kafka and why a schema registry?
&lt;/h2&gt;

&lt;p&gt;Apache Kafka is widely adopted as a backend data hub, empowering companies to move their data supported by a reliable, fast and scalable technology. Apache Kafka provides the benefit of decoupling data producers and consumers, by allowing the producers to reliably send the data without having to worry about consumers being ready to read, or being fast enough to keep up with throughput. &lt;/p&gt;

&lt;p&gt;By default, Apache Kafka doesn't impose or verify the structure of data. Messages are pushed and retrieved in any format agreed upon by the producer and the consumer. However, in complex systems, where the same information needs to be reused across multiple consumers from different parts of the company, a simple external agreement is often insufficient. Apache Kafka must not only ensure that consumers can retrieve the data but also make sense of it, even if the structure of the messages changes slightly over time.&lt;/p&gt;

&lt;p&gt;This is where the schema registry functionality enabled by Karapace comes into play: a way to decouple the structure of the message from its content and a method to verify that updates in the data structure won't break downstream consumers of the information. With Karapace, we can define the structure of each topic, along with the compatibility level that determines which data structure changes are allowed or rejected.&lt;/p&gt;

&lt;p&gt;In the following sections, we will explore how the schema registry can be used in conjunction with Apache Kafka® Connect, both as a source and a sink, to check data structure changes and propagate them if they meet compatibility requirements.. &lt;/p&gt;

&lt;h2&gt;
  
  
  The overall architecture
&lt;/h2&gt;

&lt;p&gt;To simulate a typical company data flow, we will employ PostgreSQL® as our source, serving as our transactional database. Extracting data from it will involve using Apache Kafka, Apache Kafka Connect, and the Debezium source connector, enabling a real-time change data capture process. Once the data resides in Apache Kafka, we will leverage the integrated integration with Karapace to store the data schema and assess changes for compatibility. Finally, the results of our data changes will manifest in a MySQL database and an Amazon S3 bucket, mirroring two use cases: departmental analytics and long-term data storage.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyv002q5v825tz0ivx1j8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyv002q5v825tz0ivx1j8.png" alt="Overall architecture including PostgreSQL as Source, CDC with Debezium, Apache Kafka and two sinks to S3 and MySQL" width="800" height="221"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We'll use &lt;a href="https://aiven.io/kafka" rel="noopener noreferrer"&gt;Aiven for Apache Kafka®&lt;/a&gt;, &lt;a href="https://aiven.io/postgresql" rel="noopener noreferrer"&gt;Aiven for PostgreSQL®&lt;/a&gt;, &lt;a href="https://aiven.io/mysql" rel="noopener noreferrer"&gt;Aiven for MySQL&lt;/a&gt; and a Debezium Kafka Connector to demonstrate this. &lt;a href="https://console.aiven.io/signup" rel="noopener noreferrer"&gt;Sign up for an Aiven account&lt;/a&gt; to follow along. &lt;/p&gt;

&lt;p&gt;We can create the whole flow using Aiven's &lt;a href="https://docs.aiven.io/docs/tools/cli" rel="noopener noreferrer"&gt;command line interface&lt;/a&gt;. You'll also need to install &lt;code&gt;psql&lt;/code&gt;. Run the following commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service create demo-drift-postgresql &lt;span class="nt"&gt;-t&lt;/span&gt; pg &lt;span class="nt"&gt;--cloud&lt;/span&gt; aws-eu-west-1 &lt;span class="nt"&gt;-p&lt;/span&gt; free-1-5gb
avn service create demo-drift-mysqldb &lt;span class="nt"&gt;-t&lt;/span&gt; mysql &lt;span class="nt"&gt;--cloud&lt;/span&gt; aws-eu-west-1 &lt;span class="nt"&gt;-p&lt;/span&gt; free-1-5gb
avn service create demo-drift-kafka         &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-t&lt;/span&gt; kafka                                &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--cloud&lt;/span&gt; aws-eu-west-1                   &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-p&lt;/span&gt; business-4                           &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-c&lt;/span&gt; kafka.auto_create_topics_enable&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="nv"&gt;kafka_connect&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt;                   &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="nv"&gt;kafka_rest&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt;                      &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="nv"&gt;schema_registry&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above three commands will start:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;An Aiven for PostgreSQL database named &lt;code&gt;demo-drift-postgresql&lt;/code&gt; in the &lt;code&gt;aws-eu-west-1&lt;/code&gt; cloud region using Aiven's free tier&lt;/li&gt;
&lt;li&gt;An Aiven for MySQL database named &lt;code&gt;demo-drift-mysql&lt;/code&gt; in the &lt;code&gt;aws-eu-west-1&lt;/code&gt; cloud region using Aiven's free tier&lt;/li&gt;
&lt;li&gt;An Aiven for Apache Kafka® service named &lt;code&gt;demo-drift-kafka&lt;/code&gt; in the &lt;code&gt;aws-eu-west-1&lt;/code&gt; cloud region using Aiven's &lt;code&gt;business-4&lt;/code&gt; plan and enabling:

&lt;ul&gt;
&lt;li&gt;The automatic creation of topics&lt;/li&gt;
&lt;li&gt;Apache Kafka Connect, running on the same nodes as Apache Kafka&lt;/li&gt;
&lt;li&gt;Kafka REST APIs&lt;/li&gt;
&lt;li&gt;Kafka Schema Registry functionality powered by Karapace &lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;We can wait for the above services to be created with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service &lt;span class="nb"&gt;wait &lt;/span&gt;demo-drift-postgresql
avn service &lt;span class="nb"&gt;wait &lt;/span&gt;demo-drift-kafka
avn service &lt;span class="nb"&gt;wait &lt;/span&gt;demo-drift-mysqldb
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Create the source dataset in PostgreSQL
&lt;/h2&gt;

&lt;p&gt;The first step of the data journey will be in the PostgreSQL database, acting as a company transactional backend. In this section we'll connect to the database and include some data. To connect, we can use the prebuilt Aiven CLI command (that requires &lt;code&gt;psql&lt;/code&gt; to be installed locally):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service cli demo-drift-postgresql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After connecting, we can create a basic &lt;code&gt;USERS&lt;/code&gt; table and include some data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;USERS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt; &lt;span class="nb"&gt;SERIAL&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;USERNAME&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HERO&lt;/span&gt; &lt;span class="nb"&gt;BOOLEAN&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;USERS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;USERNAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HERO&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;VALUES&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'Spiderman'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;TRUE&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'Flash'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;TRUE&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'Joker'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;FALSE&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'Batman'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;TRUE&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Change Data Capture from PostgreSQL to Apache Kafka
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F643ky145a0la7kk60eir.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F643ky145a0la7kk60eir.png" alt="Change Data Capture with PostgreSQL, Debezium Source Connector, Apache Kafka and Schema Registry" width="800" height="318"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After mimicking the OLTP (Online Transaction Processing) system, we can now create the change data capture pipeline allowing us to track the &lt;code&gt;USERS&lt;/code&gt; table in Apache Kafka. We'll set up the CDC flow using a &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg" rel="noopener noreferrer"&gt;Debezium connector&lt;/a&gt; and the following configuration file, which we'll name &lt;code&gt;cdc-deb.json&lt;/code&gt;. Be sure to replace values like &lt;code&gt;&amp;lt;DATABASE_HOST&amp;gt;&lt;/code&gt; in the below example.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;   
    &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"pg-source-users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.connector.postgresql.PostgresConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.server.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.hostname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;DATABASE_HOST&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.port"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;DATABASE_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.user"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;POSTGRESQL_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.dbname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"defaultdb"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"plugin.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"pgoutput"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"slot.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"myslot1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"publication.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"mypub1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"publication.autocreate.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"filtered"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.sslmode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"require"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"table.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public.users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.basic.auth.credentials.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.basic.auth.user.info"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin:&amp;lt;SCHEMA_REGSITRY_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.basic.auth.credentials.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.basic.auth.user.info"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the above connector we are defining:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The Debezium PostgreSQL connector in the &lt;code&gt;connector.class&lt;/code&gt; parameter&lt;/li&gt;
&lt;li&gt;The PostgreSQL connection settings in the set of &lt;code&gt;database.*&lt;/code&gt; parameters, We can get the list of needed parameters with the following call:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;  avn service get demo-drift-postgresql &lt;span class="nt"&gt;--format&lt;/span&gt; &lt;span class="s1"&gt;'{service_uri_params}'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The PostgreSQL replication plugin name, slot name, publication name and mode. We can either create the slot and publication in PostgreSQL beforehand or have the connector create them for us.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The list of tables to include in the replication (&lt;code&gt;public.users&lt;/code&gt;)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The usage of Avro and Apache Kafka schema registry functionality for both message keys and values. We can fetch the needed connection parameters (&lt;code&gt;&amp;lt;KAFKA_HOST&amp;gt;&lt;/code&gt;, &lt;code&gt;&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;&lt;/code&gt;, &lt;code&gt;&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;&lt;/code&gt;)&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;  avn service get demo-drift-kafka &lt;span class="nt"&gt;--json&lt;/span&gt; | jq &lt;span class="nt"&gt;-r&lt;/span&gt; &lt;span class="s1"&gt;'.connection_info.schema_registry_uri'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above command will report the Kafka schema registry URI in the form:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight html"&gt;&lt;code&gt;  https://avnadmin:&lt;span class="nt"&gt;&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;&lt;/span&gt;@&lt;span class="nt"&gt;&amp;lt;KAFKA_HOST&amp;gt;&lt;/span&gt;:&lt;span class="nt"&gt;&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once we've replaced the placeholder values in the file, we can create the connector with the following call where &lt;code&gt;cdc-deb.json&lt;/code&gt; is the file containing the connector settings:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service connector create demo-drift-kafka @cdc-deb.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Check the data in Kafka
&lt;/h2&gt;

&lt;p&gt;Once the connector is working, we can use &lt;a href="https://docs.aiven.io/docs/products/kafka/howto/kcat" rel="noopener noreferrer"&gt;kcat&lt;/a&gt; to check the data in Apache Kafka.&lt;/p&gt;

&lt;p&gt;To get the &lt;code&gt;kcat&lt;/code&gt; command for connecting to our Kafka service and also download the necessary SSL certificates, run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service connection-info kcat demo-drift-kafka &lt;span class="nt"&gt;-W&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, we can get the &lt;code&gt;avnadmin&lt;/code&gt; password with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service user-list &lt;span class="nt"&gt;--format&lt;/span&gt; &lt;span class="s1"&gt;'{password}'&lt;/span&gt; &lt;span class="nt"&gt;--project&lt;/span&gt; devrel-francesco demo-drift-kafka
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, we can take that &lt;code&gt;kcat&lt;/code&gt; command and use it to check the data.&lt;br&gt;
We need to add some parameters to explain what we want to read:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;-C&lt;/code&gt; to tell it to act as a Consumer,&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;-t sourcepg.public.users&lt;/code&gt; to tell it which topic to read from,&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;-s avro&lt;/code&gt; to tell it to use Avro, and&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;-r https://avnadmin:&amp;lt;SCHEMA_REGISTRY_PWD&amp;gt;@&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;&lt;/code&gt; to tell it where the schema registry is&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Putting all of that together, the command you run should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kcat &lt;span class="nt"&gt;-b&lt;/span&gt; &amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;KAFKA_PORT&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-X&lt;/span&gt; security.protocol&lt;span class="o"&gt;=&lt;/span&gt;SSL &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-X&lt;/span&gt; ssl.ca.location&lt;span class="o"&gt;=&lt;/span&gt;ca.pem &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-X&lt;/span&gt; ssl.key.location&lt;span class="o"&gt;=&lt;/span&gt;service.key &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-X&lt;/span&gt; ssl.certificate.location&lt;span class="o"&gt;=&lt;/span&gt;service.cert &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-s&lt;/span&gt; avro &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-r&lt;/span&gt; https://avnadmin:&amp;lt;SCHEMA_REGISTRY_PWD&amp;gt;@&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-t&lt;/span&gt; sourcepg.public.users
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We should see the same four rows we inserted previously appearing in the standard Debezium format:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;}{&lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"Value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Spiderman"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"boolean"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}}},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1.9.7.aiven"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"connector"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgresql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248192&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"snapshot"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"db"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"defaultdb"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"sequence"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"[null,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;235090528&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;]"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"schema"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"table"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"txId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1036&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"lsn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;235090528&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"xmin"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"r"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248487&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transaction"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;}{&lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"Value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Flash"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"boolean"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}}},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1.9.7.aiven"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"connector"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgresql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248192&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"snapshot"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"db"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"defaultdb"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"sequence"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"[null,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;235090528&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;]"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"schema"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"table"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"txId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1036&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"lsn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;235090528&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"xmin"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"r"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248493&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transaction"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;}{&lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"Value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Joker"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"boolean"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;}}},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1.9.7.aiven"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"connector"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgresql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248192&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"snapshot"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"db"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"defaultdb"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"sequence"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"[null,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;235090528&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;]"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"schema"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"table"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"txId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1036&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"lsn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;235090528&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"xmin"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"r"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248494&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transaction"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;}{&lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"Value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Batman"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"boolean"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}}},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1.9.7.aiven"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"connector"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgresql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248192&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"snapshot"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"last"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"db"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"defaultdb"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"sequence"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"[null,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;235090528&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;]"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"schema"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"table"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"txId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1036&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"lsn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;235090528&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"xmin"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"r"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"ts_ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"long"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1692017248494&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transaction"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Check the data definition in Karapace
&lt;/h2&gt;

&lt;p&gt;Having created the connector using Avro and the Karapace schema registry, we can examine the schema definition for the topic. By default, when utilizing Kafka Connect with a schema registry, two schemas are generated with names -value and -key to store the schema definition for the value and key, respectively.&lt;/p&gt;

&lt;p&gt;We can get the list of schemas defined in Karapace with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl https://avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;@&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;KAFKA_PORT&amp;gt;/subjects
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Which returns output similar to:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"sourcepg.public.users-key"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s2"&gt;"sourcepg.public.users-value"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above is the names of the two schemas for the Debezium topic. Each name is the concatenation of the &lt;code&gt;database.server.name&lt;/code&gt; parameter (&lt;code&gt;sourcepg&lt;/code&gt;), the schema and table name (&lt;code&gt;public.users&lt;/code&gt;) and either the &lt;code&gt;key&lt;/code&gt; or &lt;code&gt;value&lt;/code&gt; suffix.&lt;/p&gt;

&lt;p&gt;We can check which versions we have for the &lt;code&gt;sourcepg.public.users-key&lt;/code&gt; topic with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; GET https://avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;@&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;KAFKA_PORT&amp;gt;/subjects/sourcepg.public.users-key/versions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output should show version &lt;code&gt;1&lt;/code&gt; being available.&lt;/p&gt;

&lt;p&gt;To check the definition of the schema &lt;code&gt;sourcepg.public.users-key&lt;/code&gt; version &lt;code&gt;1&lt;/code&gt; we can use the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; GET https://avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;@&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;KAFKA_PORT&amp;gt;/subjects/sourcepg.public.users-key/versions/1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output shows all the fields included in the key, including the &lt;code&gt;id&lt;/code&gt; and &lt;code&gt;name&lt;/code&gt; we defined in the original PostgreSQL table.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "id": 1,
  "schema": "{\"connect.name\":\"sourcepg.public.users.Key\",\"fields\":[{\"default\":0,\"name\":\"id\",\"type\":{\"connect.default\":0,\"type\":\"int\"}}],\"name\":\"Key\",\"namespace\":\"sourcepg.public.users\",\"type\":\"record\"}",
  "subject": "sourcepg.public.users-key",
  "version": 1
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Sink the data to MySQL
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fisc45o5fhsldb0w1ll0s.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fisc45o5fhsldb0w1ll0s.png" alt="Sink data to MySQL with Kafka Connect JDBC sink" width="800" height="285"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now that we have the data in Apache Kafka, let's set up a consumer for the data to demonstrate how the solution manages drift. The initial consumer will be a MySQL database. We can establish the flow using a dedicated JDBC sink connector and the following code stored in mysql_jdbc_sink.json.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cdc-sink-mysql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.aiven.connect.jdbc.JdbcSinkConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"topics"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg.public.users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"extract"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connection.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"jdbc:mysql://&amp;lt;MYSQL_HOST&amp;gt;:&amp;lt;MYSQL_PORT&amp;gt;/&amp;lt;MYSQL_DB_NAME&amp;gt;?ssl-mode=REQUIRED"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connection.user"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connection.password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;MYSQL_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"table.name.format"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users_mysql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"insert.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"upsert"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"pk.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"record_key"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"pk.fields"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"auto.create"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"auto.evolve"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"extract"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.extract.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.transforms.ExtractNewRecordState"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.basic.auth.credentials.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.basic.auth.user.info"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.basic.auth.credentials.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.basic.auth.user.info"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the above connector we are defining:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The JDBC sink connector in the &lt;code&gt;connector.class&lt;/code&gt; parameter&lt;/li&gt;
&lt;li&gt;The MySQL connection settings in the &lt;code&gt;connection.url&lt;/code&gt; parameter, We can get the parameters to compose the URL and the credentials with the following call
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;  avn service get demo-drift-mysqldb &lt;span class="nt"&gt;--format&lt;/span&gt; &lt;span class="s1"&gt;'{service_uri_params}'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;The target table name will be &lt;code&gt;users_mysql&lt;/code&gt; with upsert mode (see &lt;code&gt;insert.mode&lt;/code&gt;), inserting or updating existing rows based on the &lt;code&gt;id&lt;/code&gt; field (see &lt;code&gt;pk.mode&lt;/code&gt; and &lt;code&gt;pk.fields&lt;/code&gt; parameters)&lt;/li&gt;
&lt;li&gt;The table will be created automatically if it does not exist (&lt;code&gt;"auto.create": "true"&lt;/code&gt;) and evolve following the changes in the Apache Kafka topic (&lt;code&gt;"auto.evolve": "true"&lt;/code&gt;). This will be key to propagating the drift to downstream technologies (MySQL in this case).&lt;/li&gt;
&lt;li&gt;A transformation called &lt;code&gt;extract&lt;/code&gt; to retrieve and propagate the status of the row after the change from the Debezium format&lt;/li&gt;
&lt;li&gt;The usage of Avro and Apache Kafka schema registry functionality for both message keys and values. We can fetch the needed connection parameters (&lt;code&gt;&amp;lt;KAFKA_HOST&amp;gt;&lt;/code&gt;, &lt;code&gt;&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;&lt;/code&gt;, &lt;code&gt;&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;&lt;/code&gt;)
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;  avn service get demo-drift-kafka &lt;span class="nt"&gt;--json&lt;/span&gt; | jq &lt;span class="nt"&gt;-r&lt;/span&gt; &lt;span class="s1"&gt;'.connection_info.schema_registry_uri'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above command will provide the Kafka schema registry uri in the form:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight html"&gt;&lt;code&gt;  https://avnadmin:&lt;span class="nt"&gt;&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;&lt;/span&gt;@&lt;span class="nt"&gt;&amp;lt;KAFKA_HOST&amp;gt;&lt;/span&gt;:&lt;span class="nt"&gt;&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Having replaced the placeholder values, we can create the connector with the following call, where &lt;code&gt;cdc-deb.json&lt;/code&gt; is the file containing the connector settings:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service connector create demo-drift-kafka @mysql_jdbc_sink.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can verify the status of the connector with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service connector status demo-drift-kafka cdc-sink-mysql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above command should show the connector in &lt;code&gt;RUNNING&lt;/code&gt; state&lt;/p&gt;

&lt;h2&gt;
  
  
  Check the data in MySQL
&lt;/h2&gt;

&lt;p&gt;Once the above connector is running, we can head to MySQL to check the data. To get the connection parameters, we can retype the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service get demo-drift-mysqldb &lt;span class="nt"&gt;--format&lt;/span&gt; &lt;span class="s1"&gt;'{service_uri_params}'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And then connect with the following command, replacing the placeholders. Note the absence of spaces between the &lt;code&gt;-p&lt;/code&gt; parameter and the password.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;mysql &lt;span class="nt"&gt;-u&lt;/span&gt; avnadmin   &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-P&lt;/span&gt; &amp;lt;MYSQL_PORT&amp;gt;        &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-h&lt;/span&gt; &amp;lt;MYSQL_HOST&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-D&lt;/span&gt; defaultdb    &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-p&lt;/span&gt;&amp;lt;MYSQL_PASSWORD&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can then check the data with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;users_mysql&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The table is &lt;code&gt;users_mysql&lt;/code&gt; following the &lt;code&gt;table.name.format&lt;/code&gt; in the connector. The data should be in line with what we have in PostgreSQL.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----+-----------+------+
| id | username  | hero |
+----+-----------+------+
|  1 | Spiderman |    1 |
|  2 | Flash     |    1 |
|  3 | Joker     |    0 |
|  4 | Batman    |    1 |
+----+-----------+------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we check the table structure with &lt;code&gt;describe users_mysql&lt;/code&gt;, we can see that the &lt;code&gt;hero&lt;/code&gt; column has been mapped to a &lt;code&gt;TINYINT&lt;/code&gt; in MySQL.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| id       | int          | NO   | PRI | 0       |       |
| username | varchar(256) | YES  |     | NULL    |       |
| hero     | tinyint      | YES  |     | NULL    |       |
| points   | int          | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Let's talk Drift
&lt;/h2&gt;

&lt;p&gt;So far we've built a fairly traditional data pipeline. Now, let's include some changes to the original data structure in PostgreSQL to mimic drift. &lt;/p&gt;

&lt;h3&gt;
  
  
  Adding a column
&lt;/h3&gt;

&lt;p&gt;In the terminal connected to the PostgreSQL database, execute the following command to add a &lt;code&gt;POINTS&lt;/code&gt; integer column:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;USERS&lt;/span&gt; &lt;span class="k"&gt;ADD&lt;/span&gt; &lt;span class="k"&gt;COLUMN&lt;/span&gt; &lt;span class="n"&gt;POINTS&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Nothing happens immediately in the target MySQL table after the DDL execution in PostgreSQL. The structure and the data of &lt;code&gt;USERS_MYSQL&lt;/code&gt; is still the same.&lt;/p&gt;

&lt;p&gt;Now change the data in PostgreSQL, using the following update statement:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;UPDATE USERS SET POINTS &lt;span class="o"&gt;=&lt;/span&gt; CASE WHEN USERNAME &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Batman'&lt;/span&gt; &lt;span class="k"&gt;then &lt;/span&gt;5 &lt;span class="k"&gt;else &lt;/span&gt;10 end&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In MySQL, execute:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;users_mysql&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can see the effect on the MySQL table &lt;code&gt;points&lt;/code&gt; in near real time:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----+-----------+------+--------+
| id | username  | hero | points |
+----+-----------+------+--------+
|  1 | Spiderman |    1 |     10 |
|  2 | Flash     |    1 |     10 |
|  3 | Joker     |    0 |     10 |
|  4 | Batman    |    1 |      5 |
+----+-----------+------+--------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As mentioned in the sink connector definition, &lt;code&gt;"auto.create": "true"&lt;/code&gt; allows the automatic creation of the table if it doesn't exist, and &lt;code&gt;"auto.evolve": "true"&lt;/code&gt; allows the evolution of the table in cases when &lt;strong&gt;new data columns are included&lt;/strong&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Removing a column
&lt;/h3&gt;

&lt;p&gt;What about removing columns? Let's test it! Let's drop the same &lt;code&gt;points&lt;/code&gt; column we just added from the PostgreSQL terminal with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;USERS&lt;/span&gt; &lt;span class="k"&gt;DROP&lt;/span&gt; &lt;span class="k"&gt;COLUMN&lt;/span&gt; &lt;span class="n"&gt;POINTS&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we execute our previous query in MySQL again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;users_mysql&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We see that column is not dropped in MySQL, the structure of the &lt;code&gt;users_mysql&lt;/code&gt; is the same and the &lt;code&gt;points&lt;/code&gt; column is still filled.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;----+-----------+------+--------+&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;  &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;hero&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;points&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;----+-----------+------+--------+&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;Spiderman&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;    &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;     &lt;span class="mi"&gt;10&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;Flash&lt;/span&gt;     &lt;span class="o"&gt;|&lt;/span&gt;    &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;     &lt;span class="mi"&gt;10&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="mi"&gt;3&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;Joker&lt;/span&gt;     &lt;span class="o"&gt;|&lt;/span&gt;    &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;     &lt;span class="mi"&gt;10&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="mi"&gt;4&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;Batman&lt;/span&gt;    &lt;span class="o"&gt;|&lt;/span&gt;    &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;      &lt;span class="mi"&gt;5&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;----+-----------+------+--------+&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This makes sense because downstream applications might be using the &lt;code&gt;points&lt;/code&gt; column. An unexpected and unhandled drop of a column could have disastrous effects on the downstream data pipelines. However the risk actually is dealing with updated information, as the &lt;code&gt;points&lt;/code&gt; column has been dropped from PostgreSQL and therefore cannot be updated.&lt;/p&gt;

&lt;h3&gt;
  
  
  Changing the column type
&lt;/h3&gt;

&lt;p&gt;What about changing the column type? A change in the column type could be needed in cases, like this example, where we want to migrate from a &lt;code&gt;BOOLEAN&lt;/code&gt; to a &lt;code&gt;VARCHAR&lt;/code&gt; for the &lt;code&gt;HERO&lt;/code&gt; column. Let's execute the following in PostgreSQL:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;USERS&lt;/span&gt; &lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;COLUMN&lt;/span&gt; &lt;span class="n"&gt;HERO&lt;/span&gt; &lt;span class="k"&gt;TYPE&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As before nothing happens on the DDL statement, but, when we try to add some data using the new &lt;code&gt;VARCHAR&lt;/code&gt; column type:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;USERS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;USERNAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HERO&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;VALUES&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'Panda'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'middle'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The insert goes well PostgreSQL as expected, but the Debezium source connector crashes with the following error:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ERROR "Caused by: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic sourcepg.public.users : Incompatible schema, compatibility_mode=BACKWARD reader union lacking writer type: RECORD; error code: 409"
Backwards compatibility, old schema type is boolean (with null), new schema type is string... incompatible
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is because the schema is stored in Karapace with the &lt;code&gt;BACKWARDS&lt;/code&gt; compatibility setting. The &lt;code&gt;BACKWARDS&lt;/code&gt; compatibility ensures that consumers using an older schema definition are able to consume events produced with the current schema. The change, from &lt;code&gt;BOOLEAN&lt;/code&gt; to &lt;code&gt;VARCHAR&lt;/code&gt; could stop old consumers from being able to parse the information correctly, so it's not allowed and the connector fails.&lt;/p&gt;

&lt;h3&gt;
  
  
  Changing the compatibility level
&lt;/h3&gt;

&lt;p&gt;For the sake of this example, let's remove the &lt;code&gt;BACKWARDS&lt;/code&gt; compatibility setting and allowing all changes in the source system to propagate. We'll set compatibility to &lt;code&gt;NONE&lt;/code&gt; allowing all the changes to propagate to the Apache Kafka topic.&lt;/p&gt;

&lt;p&gt;First, we check the default compatibility level for the Apache Kafka service with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service schema configuration demo-drift-kafka
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This shows &lt;code&gt;BACKWARD&lt;/code&gt; being the default. The same default setting is applied to the &lt;code&gt;sourcepg.public.users-value&lt;/code&gt; topic, that we can check with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service schema subject-configuration demo-drift-kafka &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--subject&lt;/span&gt; sourcepg.public.users-value
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To change the compatibility level to &lt;code&gt;NONE&lt;/code&gt; for both key and value, run the following commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service schema subject-configuration-update demo-drift-kafka &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--subject&lt;/span&gt; sourcepg.public.users-value                        &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--compatibility&lt;/span&gt; NONE
avn service schema subject-configuration-update demo-drift-kafka &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--subject&lt;/span&gt; sourcepg.public.users-key                          &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--compatibility&lt;/span&gt; NONE
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, if we restart the Debezium Source connector task &lt;code&gt;0&lt;/code&gt; with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service connector restart-task demo-drift-kafka pg-source-users 0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We see that the source connector restarts correctly. Using &lt;code&gt;avn service connector status demo-drift-kafka pg-source-users&lt;/code&gt; shows the connector in the &lt;code&gt;RUNNING&lt;/code&gt; state:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"status"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"state"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"tasks"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
              &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"state"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"trace"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;""&lt;/span&gt;&lt;span class="w"&gt;
              &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;The JDBC sink connector to MySQL fails. Running &lt;code&gt;avn service connector status demo-drift-kafka cdc-sink-mysql&lt;/code&gt; returns an error:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Incorrect integer value: 'middle' for column 'hero' at row 1
java.sql.SQLException: Incorrect integer value: 'middle' for column 'hero' at row 1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The error indicates that the connector attempted to insert the new value (middle) into an integer column. This implies that the auto-evolution process did not alter the structure of the pre-existing column.&lt;/p&gt;

&lt;p&gt;To confirm this, we can execute describe users_mysql on the MySQL database and validate that the hero column remains a tinyint.&lt;/p&gt;

&lt;p&gt;In the JDBC sink connector documentation, the &lt;a href="https://github.com/aiven/jdbc-connector-for-apache-kafka/blob/master/docs/sink-connector.md#auto-evolution" rel="noopener noreferrer"&gt;&lt;code&gt;auto.evolution&lt;/code&gt; section&lt;/a&gt; says:&lt;/p&gt;

&lt;blockquote&gt;
&lt;ul&gt;
&lt;li&gt;The connector does not delete columns.&lt;/li&gt;
&lt;li&gt;The connector does not alter column types.&lt;/li&gt;
&lt;li&gt;The connector does not add primary keys constraints.&lt;/li&gt;
&lt;/ul&gt;
&lt;/blockquote&gt;

&lt;p&gt;We already talked about automatic column deletion being a dangerous action. The same is true for the automatic change of column types, since downstream applications could rely on functions that work specifically on particular column types. Therefore, modifying a column type should be handled as a breaking change, correctly making  the sink connector fail. &lt;/p&gt;

&lt;h2&gt;
  
  
  What about non relational targets? The AWS S3 example
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg05vzofqt7z0bgpc4bhm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg05vzofqt7z0bgpc4bhm.png" alt="Sink to AWS S3 with Kafka Connect and the s3 sink" width="800" height="294"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The scenario described above is one of the more strict scenarios possible, in terms of data evolution. Both the source and the target are relational databases with strict column type definition. In this second example we'll sink the data to an S3 bucket where the data structure is not defined upfront.&lt;/p&gt;

&lt;p&gt;We can create a sink connector to S3 with the following JSON configuration file stored in a file named &lt;code&gt;s3_sink.json&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"s3sink"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"aws.access.key.id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;AWS_SECRET_ID&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"aws.secret.access.key"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;AWS_SECRET_ACCESS&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"aws.s3.bucket.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;gt;AWS_BUCKET_NAME&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"aws.s3.region"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;AWS_REGION&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"topics"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sourcepg.public.users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"format.output.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"json"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.basic.auth.credentials.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.basic.auth.user.info"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;SCHEMA_REGISTRY_PORT&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.basic.auth.credentials.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.basic.auth.user.info"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"avnadmin:&amp;lt;SCHEMA_REGISTRY_PASSWORD&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"extract"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.extract.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.transforms.ExtractNewRecordState"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Where:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The set of &lt;code&gt;aws.*&lt;/code&gt; parameters refers to the S3-related secrets, as detailed in the &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/s3-sink-prereq.html" rel="noopener noreferrer"&gt;connector prerequisites documentation&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;topics&lt;/code&gt; parameter defines the source of information (specifically, the &lt;code&gt;sourcepg.public.users&lt;/code&gt; topic).&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;format.output.type&lt;/code&gt; parameter specifies how the data will be stored (in this case, as &lt;code&gt;json&lt;/code&gt;).&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;key.converter&lt;/code&gt; and &lt;code&gt;value.converter&lt;/code&gt; parameters enable the connector to retrieve schema information from Karapace.&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;transforms&lt;/code&gt; section allows the extraction of the value after the change from the Debezium format.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We can start the above connector with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service connector create demo-drift-kafka @s3_sink.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we check the data in S3, we should see a document in the bucket containing all the changes implemented in PostgreSQL.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Spiderman"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Flash"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Joker"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Batman"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Spiderman"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"points"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Flash"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"points"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Joker"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"points"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Batman"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"points"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;}},&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Panda"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nl"&gt;"hero"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"middle"&lt;/span&gt;&lt;span class="p"&gt;}}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output from the CDC -&amp;gt; Kafka -&amp;gt; S3 flow encompasses all events. Due to the Debezium compatibility mode being set to NONE, every change successfully stored in Kafka. Moreover, as S3 does not enforce a specific structure on the data, all changes, whether they involve new or deleted columns, have been written to the target bucket in JSON format.&lt;/p&gt;

&lt;h2&gt;
  
  
  Terminate the services
&lt;/h2&gt;

&lt;p&gt;If you followed this tutorial and want to remove the services used for testing, you can run the commands below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;avn service terminate demo-drift-postgresql &lt;span class="nt"&gt;--force&lt;/span&gt;
avn service terminate demo-drift-kafka &lt;span class="nt"&gt;--force&lt;/span&gt;
avn service terminate demo-drift-mysqldb &lt;span class="nt"&gt;--force&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Summary
&lt;/h2&gt;

&lt;p&gt;Data and Schema drift must be managed in scenarios where multiple consumers want to access the changes happening in a source system. Apache Kafka, and the Karapace schema registry, provide a method to propagate compatible changes and forbid breaking ones by stopping the pipeline. Pay special attention to column drops, since they are not propagated automatically to target systems (specifically if the target is another relational database) and could cause problems with updated data on the deleted columns.&lt;/p&gt;

&lt;p&gt;To summarize how changes are propagated: &lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Action&lt;/th&gt;
&lt;th&gt;Status&lt;/th&gt;
&lt;th&gt;Description&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Add column&lt;/td&gt;
&lt;td&gt;✅&lt;/td&gt;
&lt;td&gt;Propagates downstream if &lt;code&gt;auto.evolve&lt;/code&gt; is set to &lt;code&gt;true&lt;/code&gt;.&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Remove column&lt;/td&gt;
&lt;td&gt;⚠️&lt;/td&gt;
&lt;td&gt;Does not propagate downstream in case of sink to relational database. Possible use of stale data for the dropped column.&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Change datatype&lt;/td&gt;
&lt;td&gt;⚠️&lt;/td&gt;
&lt;td&gt;Depends on the change, compatibility settings and target technology. Not propagated in case of JDBC sink.&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;A summary of Schema registry compatibility:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;BACKWARDS&lt;/code&gt; allows you to stop the pipeline before ingesting data in Kafka, since breaking changes will not be included in the topic&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;NONE&lt;/code&gt; allows you to continue ingesting, but might break downstream data pipelines if the downstream tech is relational or has precise column definition and evolution is not straightforward&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>apachekafka</category>
      <category>data</category>
    </item>
    <item>
      <title>Consume Apache Kafka® messages via REST APIs</title>
      <dc:creator>Francesco Tisiot</dc:creator>
      <pubDate>Wed, 30 Aug 2023 13:00:00 +0000</pubDate>
      <link>https://dev.to/aiven_io/consume-apache-kafkar-messages-via-rest-apis-10i0</link>
      <guid>https://dev.to/aiven_io/consume-apache-kafkar-messages-via-rest-apis-10i0</guid>
      <description>&lt;p&gt;You have an Apache Kafka topic that you want to consume via REST APIs (think &lt;code&gt;curl&lt;/code&gt; or just a web browser), how to do it?&lt;/p&gt;

&lt;p&gt;The reply is &lt;a href="https://www.karapace.io/" rel="noopener noreferrer"&gt;Karapace&lt;/a&gt; an Open Source (Apache 2.0) tool providing Schema Registry and REST proxy functionality. &lt;/p&gt;

&lt;p&gt;Let's check out how to quickly run it on Docker.&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 1: the Apache Kafka® Cluster
&lt;/h2&gt;

&lt;p&gt;You need an Apache Kafka cluster up &amp;amp; running, if you have one already you can skip to the next section. &lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/Aiven-Open/karapace/" rel="noopener noreferrer"&gt;Karapace GitHub repo&lt;/a&gt; contains a Docker compose version of it. Alternatively you can spin a Kafka cluster up in minutes with &lt;a href="https://aiven.io/" rel="noopener noreferrer"&gt;Aiven&lt;/a&gt; by:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Navigating to the &lt;a href="https://go.aiven.io/francesco-signup" rel="noopener noreferrer"&gt;Aiven Console&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Selecting a new Apache Kafka service&lt;/li&gt;
&lt;li&gt;Selecting the cloud provider and region of your preference&lt;/li&gt;
&lt;li&gt;Selecting the plan&lt;/li&gt;
&lt;li&gt;Providing the service name&lt;/li&gt;
&lt;li&gt;Clicking on &lt;strong&gt;Create service&lt;/strong&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;After a couple minutes the Apache Kafka® cluster will be up&amp;amp;running. &lt;/p&gt;

&lt;h2&gt;
  
  
  Step 2: the connection parameters
&lt;/h2&gt;

&lt;p&gt;In order to connect to Apache Kafka, you'll need:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the &lt;strong&gt;bootstrap URI&lt;/strong&gt; information&lt;/li&gt;
&lt;li&gt;the security information (it being &lt;code&gt;PLAINTEXT&lt;/code&gt;, &lt;code&gt;SSL&lt;/code&gt; or &lt;code&gt;SASL&lt;/code&gt;) with the required certificates&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you are using Aiven, you can download the three certificates (&lt;code&gt;ca.pem&lt;/code&gt;, &lt;code&gt;service.crt&lt;/code&gt;, &lt;code&gt;service.key&lt;/code&gt;) in the &lt;a href="https://console.aiven.io/" rel="noopener noreferrer"&gt;Aiven Console&lt;/a&gt;, in the Kafka service overview.&lt;/p&gt;

&lt;p&gt;Create a folder named &lt;code&gt;certs&lt;/code&gt; and place the certificates there.&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 3: Run Karapace REST proxy in Docker
&lt;/h2&gt;

&lt;p&gt;Once you collected the required information and certificates, it's time to run the REST proxy part of Karapace on Docker with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;-it&lt;/span&gt;  &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_BOOTSTRAP_URI&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'&amp;lt;KAFKA_HOST&amp;gt;:&amp;lt;KAFKA_PORT&amp;gt;'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_SSL_CAFILE&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"/certs/ca.pem"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_SSL_CERTFILE&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"/certs/service.crt"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_SSL_KEYFILE&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"/certs/service.key"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_SECURITY_PROTOCOL&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"SSL"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_HOST&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'0.0.0.0'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KARAPACE_PORT&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;8082 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-v&lt;/span&gt; &lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;pwd&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;/certs:/certs &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-p&lt;/span&gt; 8082:8082 &lt;span class="se"&gt;\&lt;/span&gt;
    ghcr.io/aiven/karapace:latest ./start.sh rest
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The command above is:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Running the &lt;code&gt;latest&lt;/code&gt; Karapace image&lt;/li&gt;
&lt;li&gt;Starting the &lt;code&gt;rest&lt;/code&gt; part of Karapace with the &lt;code&gt;./start.sh&lt;/code&gt; command&lt;/li&gt;
&lt;li&gt;Passing the parameters:

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;KARAPACE_BOOTSTRAP_URI&lt;/code&gt;: for the Kafka bootstrap URI, you will need to replace the placeholders&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KARAPACE_SECURITY_PROTOCOL&lt;/code&gt;: the protocol to use to connect to Kafka, &lt;code&gt;SSL&lt;/code&gt; in this example&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KARAPACE_SSL_*&lt;/code&gt;: the list of certificates used for the &lt;code&gt;SSL&lt;/code&gt; connection&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KARAPACE_HOST&lt;/code&gt; and &lt;code&gt;KARAPACE_PORT&lt;/code&gt;: the host and port used to start Karapace in the Docker container&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;-v $(pwd)/certs:/certs&lt;/code&gt;: mapping the &lt;code&gt;certs&lt;/code&gt; folder created locally to &lt;code&gt;/certs&lt;/code&gt; in the Docker container&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;-p 8082:8082&lt;/code&gt; mapping the port &lt;code&gt;8082&lt;/code&gt; to the Docker port &lt;code&gt;8082&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;If the above command is successful you should see the below message appearing&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;======== Running on http://0.0.0.0:8082 ========
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And should be able to execute&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl http://0.0.0.0:8082 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Returning&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 3: Consume Apache Kafka® messages via REST APIs
&lt;/h2&gt;

&lt;p&gt;The last step is to consume messages via REST APIs. &lt;/p&gt;

&lt;p&gt;We can list the topics in Kafka with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="s2"&gt;"http://localhost:8082/topics"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In my test example the response is only a &lt;code&gt;test&lt;/code&gt; topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"test"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next we create a consumer &lt;code&gt;my_consumer&lt;/code&gt; with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Create JSON consumer
curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/vnd.kafka.v2+json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept: application/vnd.kafka.v2+json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--data&lt;/span&gt; &lt;span class="s1"&gt;'{"name": "my_consumer",  "format": "json", "auto.offset.reset": "earliest"}'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    http://localhost:8082/consumers/json_consumers
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we subscribe the &lt;code&gt;my_consumer&lt;/code&gt; to &lt;code&gt;test&lt;/code&gt; topic with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/vnd.kafka.v2+json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--data&lt;/span&gt; &lt;span class="s1"&gt;'{"topics":["test"]}'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    http://localhost:8082/consumers/json_consumers/instances/my_consumer/subscription
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally we consume the data from the &lt;code&gt;test&lt;/code&gt; topic with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
  http://localhost:8082/consumers/json_consumers/instances/my_consumer/records?timeout=1000
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output will showcase the existing data in the topic like&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"offset"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"partition"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1693386610137&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"test"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"francesco"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"offset"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"partition"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1693387327332&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"test"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Karapace will sync the consumer offset to a topic in Kafka named &lt;code&gt;__consumer_offsets&lt;/code&gt;, therefore the next time we issue the consume command, we'll see only the messages appeared in the Kafka topic since the last poll (the output will be empty if no new messages are in the topic).&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>rest</category>
      <category>http</category>
      <category>opensource</category>
    </item>
    <item>
      <title>2023-07 Edition: What’s Up With DevRel @ Aiven?</title>
      <dc:creator>Team Aiven</dc:creator>
      <pubDate>Fri, 04 Aug 2023 18:56:50 +0000</pubDate>
      <link>https://dev.to/aiven_io/2023-07-edition-whats-up-with-devrel-aiven-4cih</link>
      <guid>https://dev.to/aiven_io/2023-07-edition-whats-up-with-devrel-aiven-4cih</guid>
      <description>&lt;h1&gt;
  
  
  ❓Who are we and what do we do?
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1P2EFHZk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/oro9v66pizqgytfgjt5o.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1P2EFHZk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/oro9v66pizqgytfgjt5o.jpeg" alt="The Developer Relations team at Aiven" width="800" height="654"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We’re a team that helps our customers (technical ones, but that’s basically all of them with a platform like Aiven) work with Aiven more easily. We work to boost awareness of Aiven among developers, and boost awareness of developers within Aiven.&lt;/p&gt;

&lt;p&gt;More specifically we:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;produce (and review) &lt;a href="https://aiven.io/developer"&gt;technical content&lt;/a&gt; and &lt;a href="https://www.youtube.com/aiven_io"&gt;videos&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;present (and help Aiven present) at &lt;a href="https://aiven.io/events"&gt;tech events&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;provide community spaces at &lt;a href="https://www.meetup.com/pro/open-source-data-infrastructure-community/"&gt;meetups&lt;/a&gt; and our &lt;a href="https://aiven.io/community/forum/"&gt;forum&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;work closely with product teams to provide feedback&lt;/li&gt;
&lt;li&gt;connect with customers and prospects&lt;/li&gt;
&lt;li&gt;represent Aiven in the community and industry (appearing on podcasts, live streams, etc)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To learn more about our team, see &lt;a href="https://aiven.io/community/devrel"&gt;Aiven DevRel team: enabling success in open source data tech&lt;/a&gt;.&lt;/p&gt;




&lt;h1&gt;
  
  
  👀 Month At-A-Glance
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;💕 Read about awesome community members we’re collaborating with in the Community Shout-Outs section.&lt;/li&gt;
&lt;li&gt;📆 We did talks at Events including Europython and FOSSY 2023.&lt;/li&gt;
&lt;li&gt;🌍 Additionally, Meetups were held around the world in Berlin, Amsterdam, Jakarta and New York.&lt;/li&gt;
&lt;li&gt;💻 Our team delivered tons of great Technical Content, like on OPA (Open Policy Agent) and much more!&lt;/li&gt;
&lt;/ul&gt;




&lt;h1&gt;
  
  
  💕 Community Shout-Outs
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://www.linkedin.com/in/mjonss/?originalSubdomain=nl"&gt;Mattias Jonsson&lt;/a&gt;, a senior database engineer from&lt;a href="https://www.pingcap.com/"&gt; PingCAP&lt;/a&gt; gave an introduction into an open source distributed SQL database&lt;a href="https://www.pingcap.com/tidb/"&gt; TiDB&lt;/a&gt;, at the July &lt;a href="https://www.meetup.com/berlin-open-source-data-infrastructure-meetup/"&gt;Berlin Open Source Data Infrastructure meetup&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;The July &lt;a href="https://www.meetup.com/paris-open-source-data-infrastructure-meetup/"&gt;Paris OSDI meetup&lt;/a&gt; was hosted by our friends at &lt;a href="https://www.scaleway.com/en/"&gt;Scaleway&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;At the July &lt;a href="https://www.meetup.com/new-york-open-source-data-infrastructure-meetup/"&gt;NYC OSDI meetup&lt;/a&gt;, &lt;a class="mentioned-user" href="https://dev.to/brooke_jamieson"&gt;@brooke_jamieson&lt;/a&gt;, Senior Developer Advocate at AWS spoke about vector databases.&lt;/li&gt;
&lt;/ul&gt;




&lt;h1&gt;
  
  
  💻 Technical Content
&lt;/h1&gt;

&lt;p&gt;This section covers new technical content published on Aiven’s…&lt;/p&gt;

&lt;p&gt;📣 &lt;a href="https://aiven.io/blog"&gt;Blog&lt;/a&gt;: thought-leadership pieces, announcements&lt;/p&gt;

&lt;p&gt;🔧 &lt;a href="https://aiven.io/developer"&gt;Developer Center&lt;/a&gt;: tutorials, example code&lt;/p&gt;

&lt;p&gt;📺 &lt;a href="https://www.youtube.com/c/Aiven_io"&gt;YouTube&lt;/a&gt;: how-tos, conference talks, meetup recordings, livestreams&lt;/p&gt;

&lt;p&gt;💬 &lt;a href="https://aiven.io/community/forum/"&gt;Forum&lt;/a&gt;: frequently asked questions, release announcements&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s---9whVKtP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ietzul177uwh5nzxysfm.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---9whVKtP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ietzul177uwh5nzxysfm.jpeg" alt="ClickHouse CTO Alexey Milovidov at the Amsterdam meetup" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  🚀 (General)
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;📣 &lt;a class="mentioned-user" href="https://dev.to/dewanahmed"&gt;@dewanahmed&lt;/a&gt; authored a blog: &lt;a href="https://aiven.io/developer/enforce-fine-grained-policy-control-across-your-data-infrastructure-with-opa-terraform"&gt;Enforce fine-grained policy control across your data infrastructure with Open Policy Agent and Terraform&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;📺 The &lt;a href="https://www.youtube.com/watch?v=CIRSQ8BITHg"&gt;video for the OSDI meetup in Paris&lt;/a&gt; hosted at Scaleway is now live. The topic discussed was database testing through simulation.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🐜 Apache Kafka
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;📺 The &lt;a href="https://www.youtube.com/watch?v=IH__EnuivUQ"&gt;video for the Milan Open Source Data Infrastructure meetup on scaling machine learning and GitOps for Kafka&lt;/a&gt;, is now live.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🏠 ClickHouse®
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;📺The &lt;a href="https://www.youtube.com/watch?v=cbdeJzJ2hl0"&gt;video recording of the Amsterdam Open Source Data Infrastructure meetup&lt;/a&gt; with ClickHouse CTO &lt;a href="https://www.linkedin.com/in/alexey-milovidov-7b658135/"&gt;Alexey Milovidov&lt;/a&gt; and our &lt;a class="mentioned-user" href="https://dev.to/olena_kutsenko"&gt;@olena_kutsenko&lt;/a&gt; is available for on-demand viewing.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🐬 MySQL
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;📣 &lt;a href="https://aiven.io/blog/mysql-bug-story-or-why-aiven-works"&gt;Scale up: a MySQL bug story, or why Aiven works&lt;/a&gt; - by Celeste Horgan &amp;amp; Leon Bezuidenhout&lt;/li&gt;
&lt;li&gt;📺 &lt;a href="https://youtu.be/eAGXJio6Zj0"&gt;Getting started with SQL series&lt;/a&gt; - by &lt;a class="mentioned-user" href="https://dev.to/jennjunod"&gt;@jennjunod&lt;/a&gt; &lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🐘 PostgreSQL®
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;📺 &lt;a href="https://youtu.be/6DzCWzeVFD0"&gt;Creating Databases, Schemas, and Tables on pgAdmin&lt;/a&gt; - by Jenn Junod &lt;/li&gt;
&lt;li&gt;📺 &lt;a href="https://youtu.be/bonsKCHHS3k"&gt;Connecting PostgreSQL to pgAdmin&lt;/a&gt; - by Jenn Junod&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🏗 Terraform
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;💬 &lt;a href="https://aiven.io/community/forum/t/terraform-provider-v4-8-0-is-out/383/1"&gt;Terraform provider v4.8.0 is out&lt;/a&gt; - Google’s Private Service Connect (PSC) is now supported in Aiven’s Provider for Terraform.&lt;/li&gt;
&lt;/ul&gt;




&lt;h1&gt;
  
  
  📆 Meetups &amp;amp; Events
&lt;/h1&gt;

&lt;p&gt;Each month, we host&lt;a href="https://www.meetup.com/pro/open-source-data-infrastructure-community/"&gt; Open Source Data Infrastructure (OSDI) Meetups&lt;/a&gt; around the world for a developer audience, and speak at conferences, make podcast appearances, create livestreams, and more!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--6yGtalPg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/mvomv5n5sbdbri8s00ap.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--6yGtalPg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/mvomv5n5sbdbri8s00ap.jpeg" alt="Jenn at the NYC OSDI meetup" width="800" height="563"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  🌍 EMEA
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;🇪🇺 &lt;a href="https://ep2023.europython.eu/"&gt;Europython (Prague)&lt;/a&gt;: Tony Ibbs talked about "&lt;a href="https://ep2023.europython.eu/session/fish-and-chips-and-apache-kafka"&gt;Fish and chips and Apache Kafka®&lt;/a&gt;" to a full room. The slides and demo code can be found at&lt;a href="https://github.com/Aiven-Labs/fish-and-chips-and-kafka"&gt; Aiven Labs&lt;/a&gt;, updated with a link to the video recording soon.&lt;/li&gt;
&lt;li&gt;🇩🇪 &lt;a href="https://www.meetup.com/berlin-open-source-data-infrastructure-meetup/"&gt;Berlin OSDI Meetup&lt;/a&gt;: &lt;a class="mentioned-user" href="https://dev.to/olena_kutsenko"&gt;@olena_kutsenko&lt;/a&gt;  spoke about how to in Apache Kafka correctly distribute data across partitions and prevent unbalanced partitions. &lt;a href="https://www.linkedin.com/in/mjonss/?originalSubdomain=nl"&gt;Mattias Jonsson&lt;/a&gt;, a senior database engineer from&lt;a href="https://www.pingcap.com/"&gt; PingCAP&lt;/a&gt; gave an introduction into an open source distributed SQL database &lt;a href="https://www.pingcap.com/tidb/"&gt;TiDB&lt;/a&gt;. The recording's up: &lt;a href="https://www.youtube.com/watch?v=8t-dvQb_uzc"&gt;https://www.youtube.com/watch?v=8t-dvQb_uzc&lt;/a&gt; View an event recap at: &lt;a href="https://aiven.io/community/forum/t/summer-edition-of-open-source-data-infrastructure-meetup-in-berlin/370"&gt;https://aiven.io/community/forum/t/summer-edition-of-open-source-data-infrastructure-meetup-in-berlin/370&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--t2z3jN3o--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ke7s97lb72a08f4psbej.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--t2z3jN3o--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ke7s97lb72a08f4psbej.jpeg" alt="Matty Stratton at AWS Summit" width="800" height="601"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  🌎 AMER
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;🇺🇸 &lt;a href="https://2023.fossy.us/"&gt;FOSSY 2023&lt;/a&gt; (Portland, OR): Angie Byron spoke on &lt;a href="https://2023.fossy.us/schedule/presentation/30/"&gt;Lessons Learned From Scaling An Open Source Community By 10,000%&lt;/a&gt;. This was the inaugural year of the conference, organized by the &lt;a href="https://sfconservancy.org/"&gt;Software Freedom Conservancy&lt;/a&gt;.

&lt;ul&gt;
&lt;li&gt;Session Recording: (forthcoming; here are the &lt;a href="https://docs.google.com/presentation/d/1Fc4hdb8Y2mLqjsMa1QJDSpUPn5WSVspTordKsq8zQlA/edit#slide=id.g22ea887a526_0_1238"&gt;slides&lt;/a&gt;)&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;🇺🇸 &lt;a href="https://www.meetup.com/new-york-open-source-data-infrastructure-meetup/events/294326349"&gt;NYC OSDI Meetup&lt;/a&gt; (July 27): run by Jenn Junod, with Brooke Jamieson, Senior Developer Advocate at AWS speaking about vector databases, following &lt;a href="https://aws.amazon.com/events/summits/new-york/"&gt;AWS Summit&lt;/a&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🌏 APAC
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;🇮🇩 &lt;a href="https://www.meetup.com/jakarta-open-source-data-infrastructure-meetup/events/294758762/"&gt;Jakarta OSDI Meetup&lt;/a&gt;: Aiven's Budi Kusuma Utama ran the meetup, July 27, 37 people showed up and we're asking about when the next event would take place!&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Ba8onQtF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kjju4303hakari1ujf96.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Ba8onQtF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kjju4303hakari1ujf96.jpeg" alt="The July Jakarta Open Source Data Infrastructure meetup" width="800" height="600"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h1&gt;
  
  
  📆 Meetups &amp;amp; Events (Upcoming)
&lt;/h1&gt;

&lt;p&gt;Here are the events you will find us at next: &lt;a href="https://aiven.io/events"&gt;https://aiven.io/events&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  🌍 EMEA
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;🇳🇴 &lt;a href="https://2023.javazone.no/"&gt;Javazone&lt;/a&gt; (September 6-7, Oslo): Make sure you join Olena Kutsenko for her talk “ClickHouse: what is behind the fastest columnar database?”&lt;/li&gt;
&lt;li&gt;🇬🇧 &lt;a href="https://london-2023.devrelcon.dev/"&gt;DevRelCon London&lt;/a&gt; (September 7-8): &lt;a class="mentioned-user" href="https://dev.to/ugbot"&gt;@ugbot&lt;/a&gt; will talk about improving the visibility and buy-in of DevRel work, and &lt;a class="mentioned-user" href="https://dev.to/floord"&gt;@floord&lt;/a&gt; will lead the Unconference. &lt;/li&gt;
&lt;li&gt;🥨 &lt;a href="https://www.aws-community-day.de/"&gt;AWS Community Day DACH&lt;/a&gt; (September 13): Meet the AWS community, and join Olena Kutsenko for her talk “Using OpenSearch and Apache Kafka to explore Mastodon”.&lt;/li&gt;
&lt;li&gt;🇩🇪 &lt;a href="https://www.containerdays.io/"&gt;Containerdays Hamburg&lt;/a&gt; (September 11-12): Floor Drees will be one of the room hosts for Containerdays Hamburg.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🌎 AMER
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;🇺🇸 &lt;a href="https://devopsdays.org/events/2023-chicago/welcome/"&gt;Devopsdays Chicago&lt;/a&gt; (August 9-10): Dewan Ahmed will run a workshop titled “Enforce fine-grained policy control across development and production environments”. Join the &lt;a href="https://www.meetup.com/chicago-open-source-data-infrastructure-meetup/events/294067714/"&gt;Chicago Open Source Data Infrastructure meetup&lt;/a&gt; in August, at Discover, for an evening full of inspiring conversations and exciting talks.&lt;/li&gt;
&lt;li&gt;🇺🇸 &lt;a href="https://cloud.withgoogle.com/next/"&gt;Google Cloud Next&lt;/a&gt; (August 29 - 31): Ben Gamble and Ian Massingham, among a much larger Aiven delegation, will join Google Cloud Next - which we joined as a Velocity sponsor.&lt;/li&gt;
&lt;li&gt;🇺🇸 &lt;a href="https://www.confluent.io/events/current/"&gt;Current 2023&lt;/a&gt; (September 26-27, San Jose, California): Chris Egerton, Greg Harris and &lt;a class="mentioned-user" href="https://dev.to/ftisiot"&gt;@ftisiot&lt;/a&gt; will all be speaking at &lt;em&gt;the&lt;/em&gt; Apache Kafka® and real-time streaming event of the year: Current.&lt;/li&gt;
&lt;li&gt;🇺🇸 &lt;a href="https://opensearchcon2023.splashthat.com/"&gt;OpenSearchCon&lt;/a&gt; (September 27-29, Seattle): Join Jonah Kowall at OpenSearchCon for his session “18 months of operating OpenSearch at scale”.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  🌏 APAC
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;🇸🇬 &lt;a href="https://www.meetup.com/singapore-open-source-data-infrastructure-meetup/events/294569944"&gt;OSDI Singapore&lt;/a&gt; (August 2): Join the Open Source Data Infrastructure meetup at Google Asia Pacific, for talks by Bhanu Jamwal (PingCap) and Kaijun Xu (Google Cloud).&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>opensource</category>
      <category>database</category>
      <category>mysql</category>
      <category>postgres</category>
    </item>
  </channel>
</rss>
