<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Elvis David</title>
    <description>The latest articles on DEV Community by Elvis David (@davidelvis).</description>
    <link>https://dev.to/davidelvis</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1031819%2Fed1b880e-cf2d-47ec-8e96-b133b8c02674.jpeg</url>
      <title>DEV Community: Elvis David</title>
      <link>https://dev.to/davidelvis</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/davidelvis"/>
    <language>en</language>
    <item>
      <title>How to query Postgres from Cloudflare Workers with Neon serverless driver</title>
      <dc:creator>Elvis David</dc:creator>
      <pubDate>Thu, 15 Feb 2024 16:24:25 +0000</pubDate>
      <link>https://dev.to/hackmamba/how-to-query-postgres-from-cloudflare-workers-with-neon-serverless-driver-514a</link>
      <guid>https://dev.to/hackmamba/how-to-query-postgres-from-cloudflare-workers-with-neon-serverless-driver-514a</guid>
      <description>&lt;p&gt;Cloudflare Workers is a serverless platform that allows developers to build and deploy applications on the edge. By leveraging &lt;a href="https://developers.cloudflare.com/workers/?utm_source=hackmamba&amp;amp;utm_medium=blog&amp;amp;utm_id=HMBcommunity" rel="noopener noreferrer"&gt;Cloudflare Workers&lt;/a&gt;, you can execute your code in close proximity to your users, reducing latency and improving performance. However, one challenge developers face when building serverless applications is accessing databases. This article will explore how to query a PostgreSQL database from Cloudflare Workers using the Neon serverless driver.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://neon.tech/?ref=hm" rel="noopener noreferrer"&gt;The Neon serverless driver&lt;/a&gt; is a PostgreSQL client library specifically designed for serverless environments, such as Cloudflare Workers. It provides a lightweight and efficient way to connect to a PostgreSQL database within your Cloudflare Workers code. The driver is built on top of the popular &lt;a href="https://node-postgres.com/?utm_source=hackmamba&amp;amp;utm_medium=blog&amp;amp;utm_id=HMBcommunity" rel="noopener noreferrer"&gt;node-postgres&lt;/a&gt; library, but with some optimizations and modifications to make it work better in serverless environments.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;Before getting started, you’ll need to make sure that you have the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A &lt;a href="https://dash.cloudflare.com/sign-up/workers-and-pages?utm_source=hackmamba&amp;amp;utm_medium=blog&amp;amp;utm_id=HMBcommunity" rel="noopener noreferrer"&gt;Cloudflare account&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;A &lt;a href="https://neon.tech/?ref=hm" rel="noopener noreferrer"&gt;Neon Cloud&lt;/a&gt; account&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://docs.npmjs.com/getting-started" rel="noopener noreferrer"&gt;NPM&lt;/a&gt; installed with it’s package runner NPX.&lt;/li&gt;
&lt;li&gt;A PostgreSQL database.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here is the link to the &lt;a href="https://github.com/Davidelvis/Query-Postgres-from-Cloudflare-workers-with-Neon-serverless-driver?utm_source=hackmamba&amp;amp;utm_medium=blog&amp;amp;utm_id=HMBcommunity" rel="noopener noreferrer"&gt;GitHub project repository&lt;/a&gt; to make it easy to follow along.&lt;/p&gt;

&lt;h2&gt;
  
  
  Create a Neon project
&lt;/h2&gt;

&lt;p&gt;To create your first Neon project, follow these steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;a href="https://console.neon.tech/login" rel="noopener noreferrer"&gt;Sign in&lt;/a&gt; to your Neon account.&lt;/li&gt;
&lt;li&gt;Once signed in, you will be redirected to the Neon console.&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;On the console, you will see a project creation dialog. This dialog allows us to specify the following details for our project:&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;      -  **Project name**: Choose a descriptive name for your project.
      -  **Postgres version**: Select the desired version of PostgreSQL.
      -  **Database name**: Provide a name for your database.
      -  **Region**: Choose the region where your project is to be hosted.
&lt;/code&gt;&lt;/pre&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1702337729090_neon-tech.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1702337729090_neon-tech.png" alt="Neon project creation"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After creating a project, you get a redirect to the Neon console, and a pop-up modal with a &lt;code&gt;connection string&lt;/code&gt; appears. Copy the connection string, you’ll use it in the next step.&lt;/p&gt;

&lt;p&gt;The connection string is used to connect to your project's database, which is automatically created when you create a new project.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1702337739034_neon-tech1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1702337739034_neon-tech1.png" alt="Connection string"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Loading demo data into the database
&lt;/h2&gt;

&lt;p&gt;For demonstration purposes, you’ll populate the database with sample product data for querying.&lt;br&gt;
To load the data into your database, you must connect to it using the connection string. Open a terminal window and run the following command:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

    psql &lt;span class="s1"&gt;'postgresql://xxx:xxxx@ep-xxx-xxx-29168360.us-east-2.aws.neon.tech/xxx?sslmode=require'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This command connects to the database using the provided connection string.&lt;/p&gt;

&lt;p&gt;Once you're connected to the database, you can load the demo data by following the steps below:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create a folder named &lt;code&gt;sql&lt;/code&gt; in your project directory.&lt;/li&gt;
&lt;li&gt;Download the &lt;code&gt;products.sql&lt;/code&gt; from here.&lt;/li&gt;
&lt;li&gt;Move the &lt;code&gt;products.sql&lt;/code&gt; file into the "sql" folder.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Once you have set up the &lt;code&gt;sql&lt;/code&gt; folder and added the &lt;code&gt;products.sql&lt;/code&gt; file, you can run the following command to load the demo data into your database.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

    psql &lt;span class="s1"&gt;'postgresql://xxx:xxxx@ep-xxx-xxx-29168360.us-east-2.aws.neon.tech/xxx?sslmode=require'&lt;/span&gt; &amp;lt; products.sql


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This command loads the &lt;code&gt;products.sql&lt;/code&gt; file into the database. The &lt;code&gt;products.sql&lt;/code&gt; file contains SQL statements that create a table and insert demo data into the table. &lt;/p&gt;

&lt;p&gt;After running the command, you should see some output in your terminal, indicating that the demo data has been successfully loaded into the database. The output should look something like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1701177894777_Screenshot%2Bfrom%2B2023-11-28%2B16-24-35.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1701177894777_Screenshot%2Bfrom%2B2023-11-28%2B16-24-35.png" alt="Data loaded response"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The output &lt;code&gt;CREATE SEQUENCE&lt;/code&gt;, &lt;code&gt;CREATE TABLE&lt;/code&gt;, and &lt;code&gt;INSERT 0 5&lt;/code&gt; indicates that the sequence, the table, and the 5 rows were inserted into the table, respectively.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1701178104819_Screenshot%2Bfrom%2B2023-11-28%2B16-27-56.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fpaper-attachments.dropboxusercontent.com%2Fs_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1701178104819_Screenshot%2Bfrom%2B2023-11-28%2B16-27-56.png" alt="Data loaded in Neon database"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating a worker application
&lt;/h2&gt;

&lt;p&gt;First, use the &lt;code&gt;create-cloudflare&lt;/code&gt; CLI to create a new Worker application. To do this, open a terminal window and run the following command:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="nx"&gt;npx&lt;/span&gt; &lt;span class="nx"&gt;wrangler&lt;/span&gt; &lt;span class="nx"&gt;init&lt;/span&gt; &lt;span class="nx"&gt;neon&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="nx"&gt;query&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="nx"&gt;db&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This will prompt you to install the &lt;code&gt;create-cloudflare&lt;/code&gt; package and lead you through a setup wizard.&lt;/p&gt;

&lt;p&gt;To continue with this guide, follow these steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;When prompted, provide a name for your new Worker application.&lt;/li&gt;
&lt;li&gt;Select &lt;code&gt;"Hello World" Worker&lt;/code&gt; as the type of application.&lt;/li&gt;
&lt;li&gt;Choose &lt;code&gt;"Yes"&lt;/code&gt; to use TypeScript.&lt;/li&gt;
&lt;li&gt;Select &lt;code&gt;"No"&lt;/code&gt; when asked if you want to deploy your application.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you choose to deploy your application, you may be asked to authenticate if you're not logged in already. Your project will then be deployed. Even if you deploy, you can modify your Worker code and redeploy it at the end of this tutorial.&lt;/p&gt;

&lt;p&gt;The above command creates a new directory named neon-query-db containing the scaffolding for a new Cloudflare Worker project. Navigate into this directory by running &lt;code&gt;cd neon-query-db&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;After running &lt;code&gt;npx wrangler init&lt;/code&gt; in your project directory, the following files have been generated:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;wrangler.toml&lt;/code&gt;: This file is your Wrangler configuration file.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;src/index.ts&lt;/code&gt;: It contains a minimal Hello World Worker written in TypeScript.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;package.json&lt;/code&gt;: This file is a minimal Node dependencies configuration file. It will be generated if you select "Yes" when prompted during the &lt;code&gt;wrangler init&lt;/code&gt; command.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;tsconfig.json&lt;/code&gt;: This file is the TypeScript configuration that includes Worker types. It is only generated if specified in the wrangler init command.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For this tutorial, you only need to focus on the &lt;code&gt;wrangler.toml&lt;/code&gt; and &lt;code&gt;src/index.ts&lt;/code&gt; files. You don't need to edit the other files; they should be left as they are.&lt;/p&gt;

&lt;h2&gt;
  
  
  Installing the Neon package
&lt;/h2&gt;

&lt;p&gt;To install the Neon package, run the following code on the terminal.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="nx"&gt;npm&lt;/span&gt; &lt;span class="nx"&gt;install&lt;/span&gt; &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;neondatabase&lt;/span&gt;&lt;span class="sr"&gt;/serverles&lt;/span&gt;&lt;span class="err"&gt;s
&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Configuring connection to Postgres database
&lt;/h2&gt;

&lt;p&gt;There are two methods to connect to your PostgreSQL database: using a connection string or setting explicit parameters.&lt;/p&gt;
&lt;h3&gt;
  
  
  Use  a connection string
&lt;/h3&gt;

&lt;p&gt;To connect, run the following code:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="nx"&gt;npx&lt;/span&gt; &lt;span class="nx"&gt;wrangler&lt;/span&gt; &lt;span class="nx"&gt;secret&lt;/span&gt; &lt;span class="nx"&gt;put&lt;/span&gt; &lt;span class="nx"&gt;DATABASE_URL&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Paste in your connection string when prompted (you’ll find this in your Neon dashboard).&lt;/p&gt;

&lt;h3&gt;
  
  
  Set explicit parameters
&lt;/h3&gt;

&lt;p&gt;Configure each database parameter as an environment variable via the Cloudflare dashboard or in your &lt;code&gt;wrangler.toml&lt;/code&gt; file. &lt;/p&gt;

&lt;p&gt;The example below shows how to configure the parameters in the &lt;code&gt;wrangler.toml&lt;/code&gt; file.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="c1"&gt;//wrangler.toml file&lt;/span&gt;

    &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;vars&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nx"&gt;DB_USERNAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;postgres&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;
    &lt;span class="err"&gt;#&lt;/span&gt; &lt;span class="nb"&gt;Set&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;password&lt;/span&gt; &lt;span class="nx"&gt;by&lt;/span&gt; &lt;span class="nx"&gt;creating&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;secret&lt;/span&gt; &lt;span class="nx"&gt;so&lt;/span&gt; &lt;span class="nx"&gt;it&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="nx"&gt;not&lt;/span&gt; &lt;span class="nx"&gt;stored&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nx"&gt;plain&lt;/span&gt; &lt;span class="nx"&gt;text&lt;/span&gt;
    &lt;span class="nx"&gt;DB_HOST&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ep-aged-sound-175961.us-east-2.aws.neon.tech&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;
    &lt;span class="nx"&gt;DB_PORT&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;5432&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;
    &lt;span class="nx"&gt;DB_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;products&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;To set a secret for your worker, use &lt;code&gt;npx wrangler secret put&lt;/code&gt;  &lt;code&gt;DB_PASSWORD&lt;/code&gt; :&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="nx"&gt;npx&lt;/span&gt; &lt;span class="nx"&gt;wrangler&lt;/span&gt; &lt;span class="nx"&gt;secret&lt;/span&gt; &lt;span class="nx"&gt;put&lt;/span&gt; &lt;span class="nx"&gt;DB_PASSWORD&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After executing the command, you’ll be prompted to enter the value of &lt;code&gt;DB_PASSWORD&lt;/code&gt; into the terminal to securely store the database password as a secret in your Cloudflare Workers environment variables.&lt;/p&gt;

&lt;h2&gt;
  
  
  Connecting to the Postgres database in the worker
&lt;/h2&gt;

&lt;p&gt;Next, import the &lt;code&gt;Client&lt;/code&gt; class into your Worker's main file from the pg library. Depending on your chosen connection method, use either the connection string or explicit parameters to establish a connection to the PostgreSQL database.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="c1"&gt;// src/index.ts file&lt;/span&gt;

    &lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;Client&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;@neondatabase/serverless&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;Env&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;DATABASE_URL&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;In the &lt;code&gt;fetch&lt;/code&gt; event handler, connect to the PostgreSQL database using your chosen method, either the connection string or the explicit parameters.&lt;/p&gt;

&lt;h3&gt;
  
  
  Using a connection string
&lt;/h3&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DATABASE_URL&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Setting explicit parameters
&lt;/h3&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
      &lt;span class="na"&gt;user&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DB_USERNAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="na"&gt;password&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DB_PASSWORD&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="na"&gt;host&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DB_HOST&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="na"&gt;port&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DB_PORT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="na"&gt;database&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DB_NAME&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Querying the database
&lt;/h2&gt;

&lt;p&gt;To demonstrate how to interact with the products database, let’s fetch data from the &lt;code&gt;products table&lt;/code&gt; by querying the table when a request is received.&lt;/p&gt;

&lt;p&gt;To fetch data from the products table, add the following code snippet inside the fetch event handler in your &lt;code&gt;index.ts&lt;/code&gt; file, after the existing query code:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

    &lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;fetch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="na"&gt;request&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;Request&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;env&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;Env&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;ExecutionContext&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DATABASE_URL&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;rows&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`
        select * from products,
        );
        ctx.waitUntil(client.end());  // this doesn’t hold up the response
        return new Response(JSON.stringify(rows), {
          headers: { "content-type": "application/json"},
        });
      }
    }
```&lt;/span&gt;
&lt;span class="nx"&gt;This&lt;/span&gt; &lt;span class="nx"&gt;code&lt;/span&gt; &lt;span class="nx"&gt;snippet&lt;/span&gt; &lt;span class="nx"&gt;does&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="na"&gt;following&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;


&lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="nx"&gt;Checks&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;request&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;GET&lt;/span&gt; &lt;span class="nx"&gt;request&lt;/span&gt; &lt;span class="nx"&gt;and&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;URL&lt;/span&gt; &lt;span class="nx"&gt;path&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="s2"&gt;`/products`&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
&lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="nx"&gt;Constructs&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;SELECT&lt;/span&gt; &lt;span class="nx"&gt;SQL&lt;/span&gt; &lt;span class="nx"&gt;query&lt;/span&gt; &lt;span class="nx"&gt;that&lt;/span&gt; &lt;span class="nx"&gt;fetches&lt;/span&gt; &lt;span class="nx"&gt;all&lt;/span&gt; &lt;span class="nx"&gt;rows&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;products&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt; &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
&lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="nx"&gt;Executes&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;query&lt;/span&gt; &lt;span class="nx"&gt;and&lt;/span&gt; &lt;span class="nx"&gt;fetches&lt;/span&gt; &lt;span class="nx"&gt;all&lt;/span&gt; &lt;span class="nx"&gt;rows&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;products&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt; &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
&lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="nx"&gt;Returns&lt;/span&gt; &lt;span class="nx"&gt;all&lt;/span&gt; &lt;span class="nx"&gt;rows&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

&lt;span class="nx"&gt;When&lt;/span&gt; &lt;span class="nx"&gt;you&lt;/span&gt; &lt;span class="nx"&gt;send&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;GET&lt;/span&gt; &lt;span class="nx"&gt;request&lt;/span&gt; &lt;span class="nx"&gt;to&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt;&lt;span class="err"&gt;’&lt;/span&gt;&lt;span class="nx"&gt;s&lt;/span&gt; &lt;span class="nx"&gt;URL&lt;/span&gt; &lt;span class="kd"&gt;with&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="nx"&gt;products&lt;/span&gt; &lt;span class="nx"&gt;path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt; &lt;span class="nx"&gt;will&lt;/span&gt; &lt;span class="nx"&gt;fetch&lt;/span&gt; &lt;span class="nx"&gt;all&lt;/span&gt; &lt;span class="nx"&gt;rows&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;products&lt;/span&gt; &lt;span class="nx"&gt;table&lt;/span&gt; &lt;span class="nx"&gt;and&lt;/span&gt; &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;them&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

&lt;span class="err"&gt;##&lt;/span&gt; &lt;span class="nx"&gt;Deploying&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt;

&lt;span class="nx"&gt;Run&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;following&lt;/span&gt; &lt;span class="nx"&gt;command&lt;/span&gt; &lt;span class="nx"&gt;to&lt;/span&gt; &lt;span class="nx"&gt;deploy&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="na"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

&lt;span class="s2"&gt;```typescript
    npx wrangler deploy
```&lt;/span&gt;

&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;Screenshot&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;&lt;span class="na"&gt;https&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="c1"&gt;//paper-attachments.dropboxusercontent.com/s_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1702038762591_Screenshot+from+2023-12-08+15-31-41.png)&lt;/span&gt;


&lt;span class="nx"&gt;Your&lt;/span&gt; &lt;span class="nx"&gt;application&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="nx"&gt;now&lt;/span&gt; &lt;span class="nx"&gt;live&lt;/span&gt; &lt;span class="nx"&gt;and&lt;/span&gt; &lt;span class="nx"&gt;accessible&lt;/span&gt; &lt;span class="nx"&gt;at&lt;/span&gt; &lt;span class="s2"&gt;`&amp;lt;YOUR_WORKER&amp;gt;.&amp;lt;YOUR_SUBDOMAIN&amp;gt;.workers.dev`&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

&lt;span class="nx"&gt;After&lt;/span&gt; &lt;span class="nx"&gt;deploying&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;you&lt;/span&gt; &lt;span class="nx"&gt;can&lt;/span&gt; &lt;span class="nx"&gt;interact&lt;/span&gt; &lt;span class="kd"&gt;with&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;PostgreSQL&lt;/span&gt; &lt;span class="nx"&gt;products&lt;/span&gt; &lt;span class="nx"&gt;database&lt;/span&gt; &lt;span class="nx"&gt;using&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;Cloudflare&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt; &lt;span class="nx"&gt;Whenever&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;request&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="nx"&gt;made&lt;/span&gt; &lt;span class="nx"&gt;to&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt;&lt;span class="err"&gt;’&lt;/span&gt;&lt;span class="nx"&gt;s&lt;/span&gt; &lt;span class="nx"&gt;URL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;it&lt;/span&gt; &lt;span class="nx"&gt;will&lt;/span&gt; &lt;span class="nx"&gt;fetch&lt;/span&gt; &lt;span class="nx"&gt;data&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="s2"&gt;`products`&lt;/span&gt; &lt;span class="nx"&gt;table&lt;/span&gt; &lt;span class="nx"&gt;and&lt;/span&gt; &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;it&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nx"&gt;a&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt; &lt;span class="nx"&gt;You&lt;/span&gt; &lt;span class="nx"&gt;can&lt;/span&gt; &lt;span class="nx"&gt;modify&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;query&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nx"&gt;needed&lt;/span&gt; &lt;span class="nx"&gt;to&lt;/span&gt; &lt;span class="nx"&gt;retrieve&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;desired&lt;/span&gt; &lt;span class="nx"&gt;data&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;product&lt;/span&gt; &lt;span class="nx"&gt;database&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;Screenshot&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;&lt;span class="na"&gt;https&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="c1"&gt;//paper-attachments.dropboxusercontent.com/s_E5DFD9AAD2B93DCFCEDE3C785055C7D8015AC8E87BD28019E97637CA43906324_1702067189656_Screenshot+from+2023-12-08+23-26-05.png)&lt;/span&gt;


&lt;span class="nx"&gt;The&lt;/span&gt; &lt;span class="nx"&gt;product&lt;/span&gt; &lt;span class="nx"&gt;data&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="nx"&gt;successfully&lt;/span&gt; &lt;span class="nx"&gt;returned&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;database&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt; &lt;span class="nx"&gt;You&lt;/span&gt; &lt;span class="nx"&gt;can&lt;/span&gt; &lt;span class="nx"&gt;confirm&lt;/span&gt; &lt;span class="nx"&gt;that&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;Cloudflare&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt; &lt;span class="nx"&gt;successfully&lt;/span&gt; &lt;span class="nx"&gt;connected&lt;/span&gt; &lt;span class="nx"&gt;to&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;PostgreSQL&lt;/span&gt; &lt;span class="nx"&gt;database&lt;/span&gt; &lt;span class="nx"&gt;using&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;Neon&lt;/span&gt; &lt;span class="nx"&gt;serverless&lt;/span&gt; &lt;span class="nx"&gt;driver&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;

&lt;span class="err"&gt;##&lt;/span&gt; &lt;span class="nx"&gt;Conclusion&lt;/span&gt;

&lt;span class="nx"&gt;In&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt; &lt;span class="nx"&gt;article&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;you&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ve explored how to query a PostgreSQL database from Cloudflare Workers using the Neon serverless driver. By leveraging this driver, you can efficiently connect to your PostgreSQL databases and retrieve data from within your serverless applications. This enables you to build powerful and performant applications that utilize Cloudflare&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="nx"&gt;s&lt;/span&gt; &lt;span class="nx"&gt;edge&lt;/span&gt; &lt;span class="nx"&gt;network&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;


&lt;span class="err"&gt;##&lt;/span&gt; &lt;span class="nx"&gt;Resources&lt;/span&gt;
&lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;Build&lt;/span&gt; &lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;first&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;&lt;span class="na"&gt;https&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="c1"&gt;//developers.cloudflare.com/workers/get-started/guide/?utm_source=hackmamba&amp;amp;utm_medium=blog&amp;amp;utm_id=HMBcommunity)&lt;/span&gt;
&lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;Neon&lt;/span&gt; &lt;span class="nx"&gt;serverless&lt;/span&gt; &lt;span class="nx"&gt;driver&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;&lt;span class="na"&gt;https&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="c1"&gt;//neon.tech/docs/serverless/serverless-driver?ref=hm)&lt;/span&gt;





&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

</description>
    </item>
    <item>
      <title>Scala for Data Engineering: Harnessing the Power of Functional Programming</title>
      <dc:creator>Elvis David</dc:creator>
      <pubDate>Thu, 27 Jul 2023 21:40:10 +0000</pubDate>
      <link>https://dev.to/davidelvis/scala-for-data-engineering-harnessing-the-power-of-functional-programming-4h24</link>
      <guid>https://dev.to/davidelvis/scala-for-data-engineering-harnessing-the-power-of-functional-programming-4h24</guid>
      <description>&lt;p&gt;You want to write a program that handles data. Which language should you choose?&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In the dynamic world of data engineering, where processing and managing vast amounts of data have become paramount, programming languages that offer flexibility, efficiency, and scalability are highly sought after. Enter &lt;strong&gt;Scala&lt;/strong&gt; - a powerful and versatile language that has been gaining traction in the data science community for its exceptional capabilities.&lt;/p&gt;

&lt;p&gt;When choosing a programming language to use in writing your program that handles data, there are different options you can choose from. You might choose a dynamic language such as&lt;br&gt;
Python or R or a more traditional object-oriented language such as Java. &lt;/p&gt;

&lt;p&gt;In this post, we will explore how Scala differs from these languages and when it might make sense to use it.&lt;/p&gt;
&lt;h2&gt;
  
  
  Why Scala?
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Scala&lt;/strong&gt;, is a statically typed programming language, has been steadily gaining popularity in the data engineering community due to its unique combination of features that make it well-suited for handling data-intensive tasks.&lt;/p&gt;

&lt;p&gt;In the next sections, we examine how Scala compares to the programming languages in the field of data science.&lt;/p&gt;
&lt;h3&gt;
  
  
  Static typing and type inference
&lt;/h3&gt;

&lt;p&gt;Scala's static typing system offers remarkable versatility, as it allows a significant amount of information about the program's behavior to be encoded in types. This ensures a certain level of correctness, making it especially beneficial for rarely used code paths. In contrast, dynamic languages can only identify errors during specific execution branches, potentially leading to persistent bugs.&lt;/p&gt;

&lt;p&gt;One common criticism of statically typed object-oriented languages, like Java, is their verbosity. For instance, when initializing an instance of the Example class in Java, the class name is repeated twice, unnecessarily defining the compile-time type of the variable and constructing the instance.&lt;/p&gt;

&lt;p&gt;Scala, being a functional language, leverages type inference, enabling the compiler to deduce variable types from assigned instances. As a result, Scala code is more concise and readable, without compromising type safety. By specifying argument and return value types of functions, the compiler infers types for all variables within the function's body. Scala's elegant approach to type inference significantly streamlines code development, making it a compelling choice for data engineers and programmers.&lt;/p&gt;
&lt;h3&gt;
  
  
  Scala encourages immutability
&lt;/h3&gt;

&lt;p&gt;Scala promotes the adoption of immutable objects, making it effortless to define attributes as immutable. &lt;br&gt;
For instance:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;amountSpent&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;500&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Additionally, the default collections in Scala are immutable, as demonstrated with the List:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;clientIds&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"123"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"456"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;// List is immutable&lt;/span&gt;
&lt;span class="nf"&gt;clientIds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"589"&lt;/span&gt; &lt;span class="c1"&gt;// Compile-time error&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Embracing immutability eradicates a common source of bugs. By ensuring that certain objects cannot be changed once created, the number of potential bug locations is reduced. Instead of considering the object's lifetime, the focus narrows down to the constructor, leading to more robust and predictable code.&lt;/p&gt;

&lt;h3&gt;
  
  
  Scala and functional programs
&lt;/h3&gt;

&lt;p&gt;Scala strongly encourages functional programming, which involves using higher-order functions to transform collections. As a programmer, you don't have to worry about the intricate details of iterating over the collection. &lt;br&gt;
Let's take a look at an example of an "occurrencesOf" function in Scala:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;occurrencesOf&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;elem&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;List&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;List&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;Int&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;currentElem&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nv"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;zipWithIndex&lt;/span&gt;
    &lt;span class="nf"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;currentElem&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;elem&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this Scala code, we declare a new list, "collection.zipWithIndex," which contains pairs of elements and their respective indexes from the original collection. Then, by using a filter and a for-comprehension, we iterate over this collection, binding the "currentElem" variable to the current element and "index" to the index. We filter and return the indexes where "currentElem" is equal to "elem."&lt;/p&gt;

&lt;p&gt;The equivalent Java code for the same functionality looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;occurrencesOf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt; &lt;span class="n"&gt;elem&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;occurrences&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
  &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;equals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;elem&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;occurrences&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;add&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;occurrences&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In Java, we start by defining a mutable list to store occurrences as we find them. We then iterate over the collection using a counter and check each element to see if it matches "elem." If it does, we add its index to the list of occurrences. This Java code requires managing more moving parts, and the logic of the function is somewhat obscured by the iteration mechanism.&lt;/p&gt;

&lt;p&gt;It's important to note that this comparison is not meant to criticize Java; in fact, Java 8 introduced functional constructs like lambda expressions and stream processing. However, it highlights the benefits of functional approaches in Scala, which minimize the potential for errors and improve code clarity, making it easier to work with collections and focus on the core logic of the function.&lt;/p&gt;

&lt;h3&gt;
  
  
  Null pointer uncertainty
&lt;/h3&gt;

&lt;p&gt;In many scenarios, representing the possible absence of a value becomes necessary. For example, consider a case where we read a list of usernames from a CSV file, and some users have opted not to provide their email addresses. In Java, this absence of email information is often denoted by setting the reference to null, while in Python, None is used.&lt;/p&gt;

&lt;p&gt;However, this approach can be risky as it does not explicitly encode the possibility of a value's absence. Determining whether an instance attribute can be null becomes cumbersome in larger programs, leading to potential issues if not handled carefully. Scala, inspired by functional languages, addresses this concern by introducing the Option[T] type to represent attributes that might be absent.&lt;/p&gt;

&lt;p&gt;In Scala, we can achieve this by writing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;User&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="o"&gt;...&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;email&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;Email&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;
  &lt;span class="o"&gt;...&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By utilizing Option[Email], we clearly convey to programmers using the User class that email information may be absent. The compiler also becomes aware of this possibility, prompting us to handle the situation explicitly rather than risking null pointer exceptions at runtime.&lt;/p&gt;

&lt;p&gt;By eliminating the use of null, we can achieve a higher level of provable correctness and mitigate null-related issues. In languages without Option[T], developers often resort to writing unit tests on the client code to ensure correct behavior when dealing with null attributes.&lt;/p&gt;

&lt;p&gt;It's worth noting that similar functionality can be achieved in Java using external libraries like Google's Guava library or the Optional class in Java 8. However, the convention of using null to indicate the absence of a value has long been ingrained in Java. In contrast, Scala embraces Option[T], offering a more natural and safer way to handle optional values.&lt;/p&gt;

&lt;h3&gt;
  
  
  Easier parallelism
&lt;/h3&gt;

&lt;p&gt;Developing programs that leverage parallel architectures presents significant challenges, but it is an essential aspect of tackling most data science problems. Parallel programming poses difficulties because our natural inclination as programmers is to think sequentially. Reasoning about the potential order of events in concurrent programs becomes complex.&lt;/p&gt;

&lt;p&gt;Scala addresses these challenges by providing several abstractions that facilitate the creation of parallel code. These abstractions impose constraints on the approach to parallelism. For example, parallel collections require computations to be expressed as a sequence of operations, such as map, reduce, and filter, on collections. Actor systems encourage thinking in terms of encapsulated actors that communicate through message passing.&lt;/p&gt;

&lt;p&gt;The restriction of the programmer's freedom to write parallel code in any way they desire may seem paradoxical. However, it actually simplifies understanding the program's behavior. For instance, if an actor misbehaves, the problem is either in the actor's code or one of the messages it receives.&lt;/p&gt;

&lt;p&gt;To illustrate the power of coherent, restrictive abstractions, let's solve a probability problem using parallel collections in Scala. We aim to calculate the probability of getting at least 60 heads out of 100 coin tosses using a Monte Carlo simulation. By running the simulation repeatedly and aggregating the results, we can achieve this estimation with parallel collections, parallelizing the computation across multiple CPUs effortlessly.&lt;/p&gt;

&lt;p&gt;While not all problems are as straightforward to parallelize as the Monte Carlo example, Scala's rich set of intuitive abstractions makes writing parallel applications more manageable, providing an effective way to leverage parallel architectures and improve the performance of data science tasks.&lt;/p&gt;

&lt;h3&gt;
  
  
  Interoperability with Java
&lt;/h3&gt;

&lt;p&gt;Scala is built to run on the Java Virtual Machine (JVM), and its compiler translates Scala programs into Java bytecode. This compatibility allows Scala developers to seamlessly utilize Java libraries within their Scala code. Considering the vast number of Java applications, both open-source and in legacy systems, this interoperability between Scala and Java has significantly contributed to Scala's widespread adoption.&lt;/p&gt;

&lt;p&gt;Moreover, the interoperability between Scala and Java is not limited to one direction. Some Scala libraries, like the Play framework, have gained popularity among Java developers as well, indicating the bidirectional nature of this compatibility. This mutual interaction between the two languages fosters a thriving ecosystem and encourages developers from both communities to explore and leverage each other's tools and frameworks.&lt;/p&gt;

&lt;h3&gt;
  
  
  When not to use Scala
&lt;/h3&gt;

&lt;p&gt;When considering whether to use Scala for your next project, there are certain factors to take into account. While Scala's strong type system, preference for immutability, functional capabilities, and parallelism abstractions make it an excellent choice for writing reliable programs and minimizing unexpected behavior, there are some reasons why you might decide against it.&lt;/p&gt;

&lt;p&gt;One crucial consideration is familiarity. Scala introduces various concepts, such as implicits, type classes, and composition using traits, which may not be familiar to programmers coming from an object-oriented background. Mastering Scala's expressive type system and harnessing its full power may require time and adapting to a new programming paradigm. Additionally, dealing with immutable data structures can feel unfamiliar to those coming from languages like Java or Python.&lt;/p&gt;

&lt;p&gt;However, with time and effort, these drawbacks can be overcome. Nevertheless, Scala does fall short in terms of library availability compared to other data science languages. For data exploration, the IPython Notebook coupled with matplotlib remains unparalleled. Although there are ongoing efforts to provide similar functionality in Scala (like Spark Notebooks or Apache Zeppelin), these projects may not have reached the same level of maturity.&lt;/p&gt;

&lt;p&gt;Considering the above, in the author's biased opinion, Scala shines when used for more permanent programs. If you're writing a quick throwaway script or primarily focusing on data exploration, you might find Python better suited for the task. However, for projects that require reusability and a higher level of provable correctness, Scala proves to be an extremely powerful and beneficial choice. Ultimately, the decision on whether to use Scala will depend on your project's specific requirements and your team's familiarity with the language and its ecosystem.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In conclusion, Scala presents a compelling option for developers seeking a robust and functional language, offering the potential to build cutting-edge applications and tackle complex data science challenges. Whether you choose Scala for its expressiveness and parallel processing capabilities or opt for another language based on familiarity and immediate project needs, making an informed decision will ultimately lead to successful and impactful software development endeavors.&lt;/p&gt;

</description>
      <category>scala</category>
      <category>programming</category>
      <category>datascience</category>
      <category>functionalreactiveprogramming</category>
    </item>
    <item>
      <title>Real-time data ingestion to Databricks Delta Lake using Redpanda (Kafka Connect)</title>
      <dc:creator>Elvis David</dc:creator>
      <pubDate>Thu, 27 Jul 2023 14:37:18 +0000</pubDate>
      <link>https://dev.to/davidelvis/real-time-data-ingestion-to-databricks-delta-lake-using-redpanda-kafka-connect-l9o</link>
      <guid>https://dev.to/davidelvis/real-time-data-ingestion-to-databricks-delta-lake-using-redpanda-kafka-connect-l9o</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;We are in the era where Real-time data ingestion has become one of the critical requirement for many organizations which are seeking to tap the value from their data in Real-time and near Real-time. Real-time data ingestion has grown to become one of the brands that sets apart many organizations in the competing market, but a research from Databricks revealed that, a staggering 73% of a company's data goes unused for analytics and decision-making when stored in a data lake. This means that machine learning models are doomed to return inaccurate results, perform poorly in real-world scenarios and many other implications.&lt;/p&gt;

&lt;p&gt;Delta lake is a game changer for big data, developed as an advanced open source storage layer, it provides an abstract layer on top of existing data lakes and it is optimized for Parquet-based storage. Databricks Delta lake  provides features such as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Support for ACID transactions - ACID stands for Atomicity, Consistency, Isolation, and Durability. All the transactions made on the delta lakes are ACID compliant.&lt;/li&gt;
&lt;li&gt;Schema enforcement - When writing data to storage, Delta lakes will enforce schema which helps in maintaining the columns and data types and achieving data reliability and high quality data.&lt;/li&gt;
&lt;li&gt;Scalable metadata handling - Delta lake will scale out all metadata processing operations using compute engines like Apache Spark and Apache Hive, allowing it to process the metadata for petabytes of data efficiently.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Databricks Delta lake integration can be used for a variety use cases to store, manage and analyze volumes of incoming data. For example, In customer analytics, Delta lakes can be used to store and process customer data for analytics purposes, and also it can be used to process IoT data because of it is ability to handle large volumes of data and even performing real-time data processing.&lt;/p&gt;

&lt;p&gt;In this tutorial, you will learn how to do the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create and configure Databricks Delta lakes&lt;/li&gt;
&lt;li&gt;Set up and run a Redpanda cluster and create topics for Kafka Connect usage&lt;/li&gt;
&lt;li&gt;Configure and run a Kafka Connect cluster for Redpanda and Databricks Delta lake integration&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Deep dive into Databricks deployment architecture
&lt;/h2&gt;

&lt;p&gt;Databricks deployment is structured to provide a fully managed, scalable, reliable and secure platform for data processing and analytics tasks. It's architecture is split into two main components: the &lt;strong&gt;control plane&lt;/strong&gt; and &lt;strong&gt;data plane&lt;/strong&gt; and this enables secure cross-functional team collaboration while keeping a significant amount of backend services managed by Databricks.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The control plane - It includes the backend services that Databricks manages within the could environment including access control, user authentication and resource management.&lt;/li&gt;
&lt;li&gt;The data plane - It's used to manage the storage and processing of data. It includes resources such as Databricks Cluster, which is a set of virtual machines that are provisioned on-demand to run notebooks and jobs, and the Databricks Delta Lake, which is a storage layer that provide an abstract layer on top of your data lake in the cloud.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The control plane and data plane work together to provide a smooth experience for the data teams.&lt;br&gt;
The separation of the control plane and data plane makes it possible for Databricks to scale each component independently which ensures the deployment architecture remains scalable, reliable and performant even as workloads grow in size and complexity.&lt;/p&gt;

&lt;p&gt;The overall Redpanda BYOC (bring your own cloud) deployment architecture follows the same structure as that of Databricks deployment architecture in the following ways:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Both Redpanda and Databricks architectures leverage the underlying cloud infrastructure to provide reliability, security and scalability for data processing and analytics tasks.&lt;/li&gt;
&lt;li&gt;Just like Databricks, Redpanda BYOC contains the control plane which is responsible for managing Redpanda clusters including provisioning and scaling, also the data plane which is responsible for processing and storing data.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Redpanda BYOC and Databricks both provide an interactive and collaborative workspace for data teams to operate in, facilitating faster iterations and the delivery of value.&lt;/p&gt;
&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;You'll need the following for this tutorial:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Docker installed on your machine, preferably Docker Desktop if you’re on Windows/Mac (this article uses Docker 20.10.21)&lt;/li&gt;
&lt;li&gt;A machine to install Redpanda and Kafka Connect (this tutorial uses Linux, but you can run Redpanda on Docker, MacOS and Windows, as well)&lt;/li&gt;
&lt;li&gt;Python 3.10 or higher.&lt;/li&gt;
&lt;li&gt;Java version 8 or later( this tutorial uses java 11)&lt;/li&gt;
&lt;li&gt;Delta Lake 1.2.1&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;All of the code resources for this tutorial can be found in this &lt;a href="https://github.com/Davidelvis/Real-time-data-ingestion-to-Databricks-Delta-Lake-with-Redpanda-Kafka-Connect-" rel="noopener noreferrer"&gt;repository&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Use case: Implementing data ingestion to Databricks Delta lakes with Redpanda (Kafka Connect)
&lt;/h2&gt;

&lt;p&gt;In this part we create a fictitious scenario to help you understand how you can ingest data to Databricks Delta Lake using Kafka Connect with Redpanda. This is just for demonstration purposes only.&lt;/p&gt;

&lt;p&gt;Let us imagine that you work as a Lead engineer at a e-commerce company known as EasyShop,that sells products its website. Your company is experiencing a rapid growth and due to this it is expanding its operations to new markets.&lt;/p&gt;

&lt;p&gt;To better understand customer behavior and preferences, the company wants to create a unified view of its sales and customer data which includes user clicks, page views, and orders.&lt;br&gt;
To achieve this, you decide to set up a data pipeline that ingests data in Real-time from the company website into Databricks Delta Lake.&lt;/p&gt;

&lt;p&gt;The following diagram explains the high-level architecture:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqigjep5gxuw6fgfsg4sf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqigjep5gxuw6fgfsg4sf.png" alt="Real time data streaming to Databricks Delta lake using Kafka connect"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Setting up Redpanda
&lt;/h2&gt;

&lt;p&gt;In this article, we assume that you already have Redpanda installed via Docker. If you have note installed, please refer to this &lt;a href="https://docs.redpanda.com/docs/get-started/quick-start/" rel="noopener noreferrer"&gt;Documentation&lt;/a&gt; on how to installed Redpanda via Docker.&lt;/p&gt;

&lt;p&gt;To check if these Redpanda is up and running, execute the command &lt;code&gt;docker ps&lt;/code&gt;. You’ll see an output like these if the service is running:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;CONTAINER ID   IMAGE                 COMMAND                  CREATED          STATUS          PORTS                                                                     NAMES
bb2305256285   vectorized/redpanda   &lt;span class="s2"&gt;"/entrypoint.sh redp…"&lt;/span&gt;   16 minutes ago   Up 16 minutes   8081/tcp, 9092/tcp, 9644/tcp, 0.0.0.0:8082-&amp;gt;8082/tcp, :::8082-&amp;gt;8082/tcp   redpanda-1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can show Redpanda is running via rpk CLI on the docker by executing this command &lt;code&gt;docker exec -it redpanda-1 rpk cluster info&lt;/code&gt; where &lt;code&gt;redpanda-1&lt;/code&gt; is the container_name.&lt;br&gt;
The output of the command should look similar to the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;CLUSTER
&lt;span class="o"&gt;=======&lt;/span&gt;
redpanda.36dda01b-4e8a-4949-bb38-04f83a13b009

BROKERS
&lt;span class="o"&gt;=======&lt;/span&gt;
ID    HOST     PORT
0&lt;span class="k"&gt;*&lt;/span&gt;    0.0.0.0  9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Creating a Redpanda Topic
&lt;/h2&gt;

&lt;p&gt;To create a topic in Redpanda, you can use rpk which is a CLI tool for connecting to and interacting with Redpanda brokers. Open a terminal and connect to Redpanda’s container session by executing the command &lt;code&gt;docker exec -it redpanda-1 bash&lt;/code&gt;. You should see your terminal session connected now to the &lt;code&gt;redpanda-1&lt;/code&gt; container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;redpanda@bb2305256285:/&lt;span class="nv"&gt;$ &lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, execute the command &lt;code&gt;rpk topic create customer-data&lt;/code&gt; to create the topic customer-data. You can check the existence of the topic created with the command &lt;code&gt;rpk topic list&lt;/code&gt;. This will list all the topics which are up and running and you should see an output like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;NAME           PARTITIONS  REPLICAS
customer-data  1           1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;OR you can verify the created topics by getting the cluster information using the command &lt;code&gt;rpk cluster info&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;You will see the following output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;CLUSTER
&lt;span class="o"&gt;=======&lt;/span&gt;
redpanda.36dda01b-4e8a-4949-bb38-04f83a13b009

BROKERS
&lt;span class="o"&gt;=======&lt;/span&gt;
ID    HOST     PORT
0&lt;span class="k"&gt;*&lt;/span&gt;    0.0.0.0  9092

TOPICS
&lt;span class="o"&gt;======&lt;/span&gt;
NAME           PARTITIONS  REPLICAS
customer-data  1           1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Developing the producer code for a scenario
&lt;/h2&gt;

&lt;p&gt;You have Redpanda services running and a topic available in the redpanda container to receive events, move on to create a application that will publish messages to this topic.&lt;br&gt;
This will be a two step process:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;First, you will set up a virtual environment and install the necessary dependencies.&lt;/li&gt;
&lt;li&gt;Create a producer code to generate sample customer data and publish it to the created customer-data topic&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Setting up virtual environment
&lt;/h2&gt;

&lt;p&gt;Before creating a producer and consumer code, you have to your Python virtual environment and install the dependencies.&lt;/p&gt;

&lt;p&gt;To begin, create a project directory,&lt;code&gt;Real-time data ingestion to Databricks Delta Lake with Redpanda (Kafka Connect)&lt;/code&gt; in your machine.&lt;/p&gt;

&lt;p&gt;Create a subdirectory &lt;code&gt;customer-data&lt;/code&gt; in the project directory.&lt;br&gt;
The &lt;code&gt;customer-data&lt;/code&gt; directory will hold the Python application that will publish our data.&lt;/p&gt;

&lt;p&gt;While inside the &lt;code&gt;customer-data&lt;/code&gt; subdirectory, run &lt;code&gt;python3 -m venv venv&lt;/code&gt; command to create a virtual environment setup for this demo project.&lt;/p&gt;

&lt;p&gt;In the same subdirectory, create a  &lt;code&gt;requirements.txt&lt;/code&gt; file and paste the content from the &lt;a href="https://github.com/Davidelvis/Real-time-data-ingestion-to-Databricks-Delta-Lake-with-Redpanda-Kafka-Connect-" rel="noopener noreferrer"&gt;Github Demo repo&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;To install the dependencies, run &lt;code&gt;pip install -r requirements.txt&lt;/code&gt; .&lt;/p&gt;
&lt;h2&gt;
  
  
  Producing data
&lt;/h2&gt;

&lt;p&gt;Create a file called &lt;code&gt;redpanda_producer.py&lt;/code&gt; that has producer code that generates and inserts 10000 random JSON entries into the &lt;code&gt;customer-data&lt;/code&gt; topic.&lt;/p&gt;

&lt;p&gt;Paste this piece of code into the file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;KafkaAdminClient&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka.admin&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;NewTopic&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;sleep&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faker&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Faker&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;faker_commerce&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;faker.providers&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;internet&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="n"&gt;topic_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;REDPANDA_TOPIC&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;broker_ip&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;REDPANDA_BROKER_IP&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;types_of_categories&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;clothing&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;electronics&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;home &amp;amp; kitchen&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;beauty &amp;amp; personal care&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;toys &amp;amp; games&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;fake&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Faker&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_provider&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;faker_commerce&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Provider&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_provider&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;internet&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Create Kafka topic
&lt;/span&gt;        &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;NewTopic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;topic_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;num_partitions&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;replication_factor&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;admin&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaAdminClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;broker_ip&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;admin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;create_topics&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Topic &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;topic_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; is already created&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;broker_ip&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
                             &lt;span class="n"&gt;value_serializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ascii&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;fake_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;date_this_month&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;pyint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;categories&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;word&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ext_word_list&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;types_of_categories&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;product_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;name&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;email_addr&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;email&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;str_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fake_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;units_sold&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;pyint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;unit_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;pyint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;500&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;country&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fake&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;country&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;str_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;product_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;category&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;categories&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                   &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;email&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;email_addr&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;units_sold&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;units_sold&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                   &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;unit_price&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;unit_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;country&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;country&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Inserted entry &lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt; to topic&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To run the script, execute the command &lt;code&gt;python redpanda_producer.py&lt;/code&gt; and if it runs successfully, you should see the messages produced on the topics.&lt;/p&gt;

&lt;p&gt;You can also check the output using Redpanda’s CLI tool, &lt;code&gt;rpk&lt;/code&gt; by running the following command:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;rpk topic consume customer-data --brokers &amp;lt;IP:PORT&amp;gt;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;You should see the output below, which has the &lt;code&gt;topic&lt;/code&gt;, &lt;code&gt;value&lt;/code&gt;, &lt;code&gt;timestamp&lt;/code&gt;,&lt;code&gt;partition&lt;/code&gt;, and &lt;code&gt;offset&lt;/code&gt; fields.&lt;/p&gt;

&lt;h2&gt;
  
  
  Configuring Databricks Delta lake
&lt;/h2&gt;

&lt;p&gt;In this section, you'll configure Delta Lake using Spark session. For this, we will use the &lt;code&gt;configure_delta-lake.py&lt;/code&gt; script from our &lt;a href="https://github.com/Davidelvis/Real-time-data-ingestion-to-Databricks-Delta-Lake-with-Redpanda-Kafka-Connect-" rel="noopener noreferrer"&gt;Github Demo repo&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Running the &lt;code&gt;configure_delta-lake&lt;/code&gt; script will accomplish the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Generate a Spark session&lt;/li&gt;
&lt;li&gt;Obtain the schema from the topic we are generating the data&lt;/li&gt;
&lt;li&gt;Create the Delta table&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Let us look deep into the steps&lt;/p&gt;

&lt;p&gt;Using Pyspark, you'll create a new Spark session with the correct packages and add the corresponding additional Delta Lake dependencies to interact with Delta Lake. You can do this using the following code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;delta&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pyspark&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pyspark&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_spark_session&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;broker_ip&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;REDPANDA_BROKER_IP&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;REDPANDA_TOPIC&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pyspark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;MyApp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;spark.sql.extensions&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;io.delta.sql.DeltaSparkSessionExtension&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;spark.sql.catalog.spark_catalog&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;org.apache.spark.sql.delta.catalog.DeltaCatalog&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;configure_spark_with_delta_pip&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can find the packages used inside the &lt;code&gt;.env&lt;/code&gt; file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;REDPANDA_BROKER_IP&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0.0.0:9092
&lt;span class="nv"&gt;REDPANDA_TOPIC&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;customer-data
&lt;span class="nv"&gt;PYSPARK_SUBMIT_ARGS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'--packages io.delta:delta-core_2.12:1.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'&lt;/span&gt;
&lt;span class="nv"&gt;DELTA_LAKE_TABLE_DIR&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'/tmp/delta/customer-table'&lt;/span&gt;
&lt;span class="nv"&gt;DELTA_LAKE_CHECKOUT_DIR&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'/tmp/checkpoint/'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once, you've set up the Spark session, you'll use it to obtain the schema of the &lt;code&gt;customer-data&lt;/code&gt; topic.&lt;br&gt;
You can find this logic in the &lt;code&gt;get_schema()&lt;/code&gt; function inside &lt;code&gt;get_schema.py&lt;/code&gt; file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_schema&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;broker_ip&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;REDPANDA_BROKER_IP&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;REDPANDA_TOPIC&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_json&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kafka&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kafka.bootstrap.servers&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;broker_ip&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;subscribe&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;startingOffsets&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;endingOffsets&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;latest&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;failOnDataLoss&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;false&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                &lt;span class="c1"&gt;# filter out empty values
&lt;/span&gt;                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;expr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;string(value)&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;isNotNull&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                &lt;span class="c1"&gt;# get latest version of each record
&lt;/span&gt;                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;expr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;struct(offset, value) r&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;expr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;max(r) r&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; 
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;r.value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="c1"&gt;# decode the json values
&lt;/span&gt;    &lt;span class="n"&gt;df_read&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;rdd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;multiLine&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;df_read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You have Spark session running and the schema from the topic, now you can go ahead to create the Delta table. The paths are configurable and can be modified as per your preference by editing the &lt;code&gt;.env&lt;/code&gt;file.&lt;br&gt;
You can use the following code to create a delta table:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_delta_tables&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;table_df&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;table_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;DELTA_LAKE_TABLE_DIR&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;table&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createDataFrame&lt;/span&gt;&lt;span class="p"&gt;([],&lt;/span&gt; &lt;span class="n"&gt;table_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;mergeSchema&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;delta&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;table_path&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You've successfully  configured Databricks Delta lake and created a Delta table for the data to be ingested.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting up Kafka Connect
&lt;/h2&gt;

&lt;p&gt;Kafka Connect is an integration tool released with the Apache KafkaⓇ project. It’s scalable and flexible, and it provides reliable data streaming between Apache Kafka and external systems. You can use it to integrate with any system, including databases, search indexes, and cloud storage providers. Redpanda is fully compatible with the Kafka API.&lt;/p&gt;

&lt;p&gt;Kafka Connect uses source and sink connectors for integration. Source connectors stream data from an external system to Kafka, while sink connectors stream from Kafka to an external system.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F45bih7e9jwjxtxsho8xj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F45bih7e9jwjxtxsho8xj.png" alt="Kafka Connect Architecture"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To install Kafka connect, you have to download the Apache Kafka package. Navigate to the &lt;a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz" rel="noopener noreferrer"&gt;Apache downloads page&lt;/a&gt; for Kafka and click the suggested download link for the Kafka 3.1.0 binary package.&lt;/p&gt;

&lt;p&gt;Run the following commands to create a folder called &lt;code&gt;pandabooks_integration&lt;/code&gt; in your home directory and extract the Kafka binaries file to this directory.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;mkdir &lt;/span&gt;pandabooks_integration &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nb"&gt;mv&lt;/span&gt; ~/Downloads/kafka_2.13-3.1.0.tgz pandabooks_integration &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nb"&gt;cd &lt;/span&gt;pandabooks_integration &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nb"&gt;tar &lt;/span&gt;xzvf kafka_2.13-3.1.0.tgz
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Configuring the connect cluster
&lt;/h2&gt;

&lt;p&gt;Before running a kafka connect cluster, you have to set up a configuration file in the properties format.&lt;br&gt;
Navigate to the &lt;code&gt;pandabooks_integration&lt;/code&gt; and create a folder called &lt;code&gt;configuration&lt;/code&gt;. While inside the folder, create a file known as &lt;code&gt;connect.properties&lt;/code&gt; and paste this contents:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;#Kafka broker addresses
&lt;/span&gt;&lt;span class="n"&gt;bootstrap&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;

&lt;span class="c1"&gt;#Cluster level converters
#These applies when the connectors don't define any converter
&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JsonConverter&lt;/span&gt;
&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JsonConverter&lt;/span&gt;

&lt;span class="c1"&gt;#JSON schemas enabled to false in cluster level
&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;enable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;true&lt;/span&gt;
&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;enable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;true&lt;/span&gt;

&lt;span class="c1"&gt;#Where to keep the Connect topic offset configurations
&lt;/span&gt;&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;storage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="o"&gt;=/&lt;/span&gt;&lt;span class="n"&gt;tmp&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;offsets&lt;/span&gt;
&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;flush&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ms&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10000&lt;/span&gt;

&lt;span class="c1"&gt;#Plugin path to put the connector binaries
&lt;/span&gt;&lt;span class="n"&gt;plugin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Set the &lt;code&gt;bootstrap.servers&lt;/code&gt; value to &lt;code&gt;localhost:9092&lt;/code&gt; to configure the Connect cluster to use the Redpanda cluster.&lt;/p&gt;

&lt;p&gt;Also it is necessary to configure &lt;code&gt;plugin.path&lt;/code&gt;, which you’ll use to put the connector binaries in.&lt;/p&gt;

&lt;p&gt;Create a folder called &lt;code&gt;plugins&lt;/code&gt; in the &lt;code&gt;pandabooks_integration&lt;/code&gt; directory. Navigate to the &lt;code&gt;plugins&lt;/code&gt; folder and create another folder and call it &lt;code&gt;delta-lake&lt;/code&gt;, this is where you will add the connector binaries.&lt;/p&gt;

&lt;p&gt;Navigate to this &lt;a href="https://docs.confluent.io/kafka-connectors/databricks-delta-lake-sink/current/overview.html#databricks-delta-lake-sink-connector-cp" rel="noopener noreferrer"&gt;web page&lt;/a&gt; and click &lt;strong&gt;Download&lt;/strong&gt; to download the archived binaries. Unzip the file and copy the files in the &lt;strong&gt;lib&lt;/strong&gt; folder into a folder called kafka-connect-delta-lake, placed in the &lt;code&gt;plugins&lt;/code&gt; directory.&lt;/p&gt;

&lt;p&gt;The final folder structure for pandapost_integration should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pandapost_integration
├── configuration
│   ├── connect.properties
├── plugins
│   ├── kafka-connect-delta-lake
└── kafka_2.13-3.1.0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You will need to change the &lt;code&gt;plugin.path&lt;/code&gt; value to &lt;code&gt;/home/_YOUR_USER_NAME_/pandabooks_integration/plugins&lt;/code&gt;. This will configure the Connect cluster to use the Redpanda cluster.&lt;/p&gt;

&lt;p&gt;Now, the final &lt;code&gt;connect.properties&lt;/code&gt; file should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;#Kafka broker addresses
&lt;/span&gt;&lt;span class="n"&gt;bootstrap&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;localhost&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;9092&lt;/span&gt;

&lt;span class="c1"&gt;#Cluster level converters
#These applies when the connectors don't define any converter
&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JsonConverter&lt;/span&gt;
&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JsonConverter&lt;/span&gt;

&lt;span class="c1"&gt;#JSON schemas enabled to false in cluster level
&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;enable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;true&lt;/span&gt;
&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;enable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;true&lt;/span&gt;

&lt;span class="c1"&gt;#Where to keep the Connect topic offset configurations
&lt;/span&gt;&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;storage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="o"&gt;=/&lt;/span&gt;&lt;span class="n"&gt;tmp&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;offsets&lt;/span&gt;
&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;flush&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ms&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10000&lt;/span&gt;

&lt;span class="c1"&gt;#Plugin path to put the connector binaries
&lt;/span&gt;&lt;span class="n"&gt;plugin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;_YOUR_HOME_DIRECTORY_&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;pandapost_integration&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;plugins&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Configuring the Delta Lake Connector
&lt;/h2&gt;

&lt;p&gt;You have successfully set up connector plugins in a kafka connector cluster to achieve integration with external systems but this is not enough, you need to configure the sink connector, that is Delta Lake Connector plugin that will enable Kafka connect to write data directly to Delta Lake.&lt;/p&gt;

&lt;p&gt;To do this, create a file named &lt;code&gt;delta-lake-sink-connector.properties&lt;/code&gt; in the &lt;code&gt;~/pandabooks_integration/configuration&lt;/code&gt; directory and paste the following contents:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;delta&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;lake&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;connector&lt;/span&gt;

&lt;span class="c1"&gt;# Connector class
&lt;/span&gt;&lt;span class="n"&gt;connector&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;class&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;io&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;delta&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DeltaLakeSinkConnector&lt;/span&gt;

&lt;span class="c1"&gt;# Format class
&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;class&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;io&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;delta&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;standalone&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DeltaInputFormat&lt;/span&gt;

&lt;span class="c1"&gt;# The key converter for this connector
&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;storage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StringConverter&lt;/span&gt;

&lt;span class="c1"&gt;# The value converter for this connector
&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JsonConverter&lt;/span&gt;

&lt;span class="c1"&gt;# Identify, if value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;converter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;enable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;false&lt;/span&gt;

&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;

&lt;span class="c1"&gt;# Topic name to get data from
&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;

&lt;span class="c1"&gt;# Table to ingest data into
&lt;/span&gt;&lt;span class="n"&gt;tableName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;table&lt;/span&gt;

&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ignore&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;true&lt;/span&gt;

&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ignore&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;true&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Remember to change the following values for the keys in the delta-lake-sink-connector.properties file:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;connector.class&lt;/li&gt;
&lt;li&gt;topic&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Running the Kafka Connect cluster
&lt;/h2&gt;

&lt;p&gt;You have to run the Kafka connect cluster with the configurations that you have made. Open a new terminal, navigate to the &lt;code&gt;_YOUR_HOME_DIRECTORY_/pandapost_integration/configuration&lt;/code&gt; directory and run the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;../kafka_2.13-3.1.0/bin/connect-standalone.sh connect.properties delta-lake-sink-connector.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If everything was done correctly, the output will look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;...output omitted...
 &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect-delta-lake-sink-connector] Successfully joined group with generation Generation&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nv"&gt;generationId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;25, &lt;span class="nv"&gt;memberId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'connector-consumer-delta-lake-sink-connector-0-eb21795e-f3b3-4312-8ce9-46164a2cdb27'&lt;/span&gt;, &lt;span class="nv"&gt;protocol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'range'&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:595&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2022-04-17 03:37:06,872] INFO &lt;span class="o"&gt;[&lt;/span&gt;delta-lake-sink-connector|task-0] &lt;span class="o"&gt;[&lt;/span&gt;Consumer &lt;span class="nv"&gt;clientId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connector-consumer-delta-lake-sink-connector-0, &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect-delta-lake-sink-connector] Finished assignment &lt;span class="k"&gt;for &lt;/span&gt;group at generation 25: &lt;span class="o"&gt;{&lt;/span&gt;connector-consumer-delta-lake-sink-connector-0-eb21795e-f3b3-4312-8ce9-46164a2cdb27&lt;span class="o"&gt;=&lt;/span&gt;Assignment&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;partitions&lt;/span&gt;&lt;span class="o"&gt;=[&lt;/span&gt;customer-data-0]&lt;span class="o"&gt;)}&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:652&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2022-04-17 03:37:06,891] INFO &lt;span class="o"&gt;[&lt;/span&gt;delta-lake-sink-connector|task-0] &lt;span class="o"&gt;[&lt;/span&gt;Consumer &lt;span class="nv"&gt;clientId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connector-consumer-delta-lake-sink-connector-0, &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect-delta-lake-sink-connector] Successfully synced group &lt;span class="k"&gt;in &lt;/span&gt;generation Generation&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nv"&gt;generationId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;25, &lt;span class="nv"&gt;memberId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'connector-consumer-delta-lake-sink-connector-0-eb21795e-f3b3-4312-8ce9-46164a2cdb27'&lt;/span&gt;, &lt;span class="nv"&gt;protocol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'range'&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:761&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2022-04-17 03:37:06,891] INFO &lt;span class="o"&gt;[&lt;/span&gt;delta-lake-sink-connector|task-0] &lt;span class="o"&gt;[&lt;/span&gt;Consumer &lt;span class="nv"&gt;clientId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connector-consumer-delta-lake-sink-connector-0, &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect-delta-lake-sink-connector] Notifying assignor about the new Assignment&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;partitions&lt;/span&gt;&lt;span class="o"&gt;=[&lt;/span&gt;customer-data-0]&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:279&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2022-04-17 03:37:06,893] INFO &lt;span class="o"&gt;[&lt;/span&gt;delta-lake-sink-connector|task-0] &lt;span class="o"&gt;[&lt;/span&gt;Consumer &lt;span class="nv"&gt;clientId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connector-consumer-delta-lake-sink-connector-0, &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect-delta-lake-sink-connector] Adding newly assigned partitions: customer-data-0 &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:291&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2022-04-17 03:37:06,903] INFO &lt;span class="o"&gt;[&lt;/span&gt;delta-lake-sink-connector|task-0] &lt;span class="o"&gt;[&lt;/span&gt;Consumer &lt;span class="nv"&gt;clientId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connector-consumer-delta-lake-sink-connector-0, &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect-delta-lake-sink-connector] Setting offset &lt;span class="k"&gt;for &lt;/span&gt;partition customer-data-0 to the committed offset FetchPosition&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nv"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3250, &lt;span class="nv"&gt;offsetEpoch&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;Optional.empty, &lt;span class="nv"&gt;currentLeader&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;LeaderAndEpoch&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nv"&gt;leader&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;Optional[localhost:9092 &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;id&lt;/span&gt;: 0 rack: null&lt;span class="o"&gt;)]&lt;/span&gt;, &lt;span class="nv"&gt;epoch&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;absent&lt;span class="o"&gt;}}&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In conclusion, real-time data ingestion to Databricks Delta Lake with Redpanda (Kafka Connect) is a powerful and flexible solution for processing and analyzing streaming data. In this article, we've discussed step by step on implementing data ingestion to Databricks Delta Lake with Redpanda (Kafka Connect).&lt;/p&gt;

&lt;p&gt;Remember, you can find the code resources for this tutorial in this &lt;a href="https://github.com/Davidelvis/Real-time-data-ingestion-to-Databricks-Delta-Lake-with-Redpanda-Kafka-Connect-" rel="noopener noreferrer"&gt;repository&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>devops</category>
      <category>kafka</category>
      <category>datascience</category>
    </item>
  </channel>
</rss>
