<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: AutoMQ</title>
    <description>The latest articles on DEV Community by AutoMQ (@automq).</description>
    <link>https://dev.to/automq</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1413364%2F9c562138-e04b-4455-8dc3-09a925aff88f.jpg</url>
      <title>DEV Community: AutoMQ</title>
      <link>https://dev.to/automq</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/automq"/>
    <language>en</language>
    <item>
      <title>Introducing AutoMQ: a cloud-native replacement of Apache Kafka</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 05 Aug 2024 07:40:14 +0000</pubDate>
      <link>https://dev.to/automq/introducing-automq-a-cloud-native-replacement-of-apache-kafka-kkc</link>
      <guid>https://dev.to/automq/introducing-automq-a-cloud-native-replacement-of-apache-kafka-kkc</guid>
      <description>&lt;blockquote&gt;
&lt;p&gt;Author: Xinyu Zhou, AutoMQ CTO&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;strong&gt;AutoMQ&lt;/strong&gt; is a Kafka alternative designed with a cloud-first philosophy. AutoMQ innovatively redesigns the storage layer of Apache Kafka based on the cloud, bringing a 10x cost reduction and a 100x increase in elasticity while being 100% compatible with Kafka by separating persistence to EBS and S3. It also has better performance than Apache Kafka. Low latency, high throughput, low cost, easy to use, all in one. The community edition of AutoMQ is &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;source code available on Github&lt;/a&gt;, and you can deploy and test AutoMQ for free now.&lt;/p&gt;

&lt;h1&gt;
  
  
  The Growing &lt;strong&gt;&lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;AutoMQ Community&lt;/a&gt;&lt;/strong&gt;
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2F4n1vap.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2F4n1vap.png" alt="AutoMQ Github Community"&gt;&lt;/a&gt;&lt;br&gt;
The &lt;strong&gt;AutoMQ&lt;/strong&gt; community is a vibrant and diverse group of individuals and organizations committed to the growth and development of AutoMQ. As a source-available software on GitHub, AutoMQ has amassed an impressive following. With &lt;strong&gt;2900+ stargazers&lt;/strong&gt; and counting, the community's enthusiasm for our project is palpable.&lt;/p&gt;

&lt;p&gt;Our community's diversity and engagement are testaments to the broad appeal and applicability of AutoMQ. We're excited to continue fostering this dynamic community, driving innovation, and shaping the future of "Data in Motion" together.&lt;/p&gt;

&lt;h1&gt;
  
  
  The Evolution of the Streaming World
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2F9kcafz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2F9kcafz.png" alt="Streaming World Evolution"&gt;&lt;/a&gt;&lt;br&gt;
The stream storage industry has undergone a significant transformation over the past decade, marked by technical evolution and the emergence of innovative solutions.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Kafka is the Begining&lt;/strong&gt;: Apache Kafka, birthed a decade ago, marked the beginning of a new era in stream storage. Kafka integrated advanced technologies of its era, such as the append-only log and zero-copy technique, which dramatically enhanced data writing efficiency and throughput.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Commercial Leads Innovation&lt;/strong&gt;: As the industry matured, commercial opportunities began to surface. Companies like Confluent and Redpanda emerged, driving technical innovations in the Kafka ecosystem. Confluent introduced significant architectural innovations, namely KRaft and Tiered Storage, which streamlined the architecture and substantially reduced storage costs. Redpanda rewrite Kafka in the native language CPP and replacing the ISR with the Raft replication protocol to achieved lower tail latency. They are both based on a Shared-Nothing replication architecture and have adopted tiered storage optimization.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cloud Reshapes Architecture&lt;/strong&gt;:The advent of cloud-native technologies has further reshaped the stream storage industry. Warpstream has rewritten Kafka in Go language, with a storage layer fully built on S3. It achieves a cloud-native elastic architecture by sacrificing latency and is compatible at the Kafka API protocol level. &lt;strong&gt;AutoMQ&lt;/strong&gt; innovatively redesigns and implements the storage layer of Apache Kafka based on the cloud. On the basis of 100% compatibility with Kafka, it achieves a 10x cost reduction and a 100x elasticity improvement by separating persistence to EBS and S3, without sacrificing any latency and throughput performance.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h1&gt;
  
  
  Truly Cloud-Native Architecture of AutoMQ
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fkrundq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fkrundq.png" alt="AutoMQ's Cloud-Native Architecture"&gt;&lt;/a&gt;&lt;br&gt;
The cloud-native architecture of AutoMQ is a result of careful design decisions, innovative approaches, and the strategic use of cloud storage technologies. We aimed to create a system that could leverage the benefits of the cloud while overcoming the limitations of traditional stream storage solutions.&lt;/p&gt;

&lt;h3&gt;
  
  
  Decoupling Durability to Cloud Storage
&lt;/h3&gt;

&lt;p&gt;The first step in realizing the cloud-native architecture of AutoMQ was to decouple durability to cloud storage. Unlike the typical decoupling of storage, where we refer to separating the storage to a distributed and replicated storage software, decoupling durability takes it a step further. In the former case, we are left with two types of clusters that need to be managed, as seen in Apache Pulsar, where you need to manage both the broker cluster and the bookkeeper cluster.&lt;/p&gt;

&lt;p&gt;However, AutoMQ has taken a different route, opting to decouple durability to cloud storage, with S3 serving as the epitome. S3 already offers a durability rate of 99.999999999%, making it a reliable choice for this purpose. In the realm of cloud computing, merely decoupling storage is insufficient; we must also decouple durability to cloud storage.&lt;/p&gt;

&lt;p&gt;The essence of the Decoupling Durability architecture lies in its reliance on cloud storage for durability, eliminating the need for replication protocols such as Raft. This approach is gaining traction over the traditional Decoupling Storage architecture. Guided by this philosophy, we developed S3Stream, a stream storage library that combines the advantages of EBS and S3.&lt;/p&gt;

&lt;h3&gt;
  
  
  Stateless Broker with S3Stream
&lt;/h3&gt;

&lt;p&gt;With S3Stream in place, we replaced the storage layer of the Apache Kafka broker, transforming it from a Shared-Nothing architecture to a Shared-Storage architecture, and in the process, making the Broker stateless. This is a significant shift, as it reduces the complexity of managing the system. In the AutoMQ architecture, the Broker is the only component. Once it becomes stateless, we can even deploy it using cost-effective Spot instances, further enhancing the cost-efficiency of the system.&lt;/p&gt;

&lt;h3&gt;
  
  
  Automate Everything for Elasticity
&lt;/h3&gt;

&lt;p&gt;The final step in realizing the cloud-native architecture of AutoMQ was to automate everything to achieve an elastic architecture. Once AutoMQ became stateless, it was straightforward to automate various aspects, such as auto-scaling and auto-balancing of traffic.&lt;/p&gt;

&lt;p&gt;We have two automated controllers that collect key metrics from the cluster. The auto-scaling controller monitors the load of the cluster and decides whether to scale in or scale out the cluster. The auto-balancing controller minimizes hot-spotting by dynamically reassigning partitions across the entire cluster. This level of automation is integral to the flexibility and scalability of AutoMQ, and it is also the inspiration behind its name.&lt;/p&gt;

&lt;h1&gt;
  
  
  Moving Toward Multi-Cloud Native Architecture
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fucdddj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fucdddj.png" alt="Multi-Cloud Native Architecutre"&gt;&lt;/a&gt;&lt;br&gt;
As we move toward a multi-cloud native architecture, the need for a flexible and adaptable storage solution becomes critical. AutoMQ's shared storage design is an embodiment of this flexibility, designed to integrate seamlessly with a variety of cloud providers.&lt;/p&gt;

&lt;h3&gt;
  
  
  Shared Storage: WAL Meets Object Storage
&lt;/h3&gt;

&lt;p&gt;At the heart of this design lies the concept of S3Stream, a shared stream storage repository. It is essentially composed of a shared Write-Ahead Log (WAL) and shared object storage.&lt;/p&gt;

&lt;p&gt;Data is first persistently written to the WAL and then uploaded to object storage in near real-time. The WAL does not provide data reading capabilities. Instead, it serves as a recovery mechanism in the event of a failure. Consumers read data directly from S3. To enhance performance, a memory cache is implemented for acceleration, which means that tailing-read consumers do not need to access object storage directly.&lt;/p&gt;

&lt;p&gt;This architecture of S3Stream is highly flexible due to the variety of mediums that can be used for the WAL. For instance, EBS, Regional EBS, S3, or even a combination of these can be used to form a Replication WAL. This flexibility is primarily due to the varying capabilities of cloud storage offered by different cloud providers. The aim is to pursue an architecture that is optimal across multiple cloud providers.&lt;/p&gt;

&lt;h3&gt;
  
  
  Adapting Architecture to Different Cloud Providers
&lt;/h3&gt;

&lt;p&gt;The architecture of AutoMQ's shared storage model is designed to be adaptable to the specific capabilities of different cloud providers. The choice of architecture depends primarily on the specific features and services offered by each cloud provider.&lt;/p&gt;

&lt;p&gt;For instance, Azure, Google Cloud, and Alibaba Cloud all provide regional EBS. Given this feature, the best practice for these cloud providers is to use regional EBS as the WAL. This allows the system to tolerate zone failures, ensuring reliable and consistent performance.&lt;/p&gt;

&lt;p&gt;In contrast, AWS does not offer regional EBS. However, AWS does provide S3 Express One Zone, which boasts single-digit millisecond latency. Although this service is limited to a single availability zone, AutoMQ can still ensure tolerance to zone failures by using a replication WAL. In this setup, data is written both to the S3 One Zone bucket and an EBS volume.&lt;/p&gt;

&lt;p&gt;In cases where you have access to a low-latency alternative to S3 or your business can tolerate hundreds of milliseconds of latency, it is possible to use S3 as the WAL. This means the entire architecture relies solely on S3 for both WAL and data storage. Yes, AutoMQ also provides a warpstream-like architecture easily.&lt;/p&gt;

&lt;p&gt;By understanding and leveraging the unique features of each cloud provider, AutoMQ ensures optimal performance and reliability across a variety of cloud environments. This flexibility and adaptability are key to the success of a multi-cloud native architecture.&lt;/p&gt;

&lt;h1&gt;
  
  
  Performance Data and Benefits of AutoMQ
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fb93u98.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fb93u98.png" alt="AutoMQ's performance"&gt;&lt;/a&gt;&lt;br&gt;
To fully appreciate the capabilities and advantages of AutoMQ, let's take a look at some key benchmark data and performance metrics.&lt;/p&gt;

&lt;p&gt;The advantages of AutoMQ compared to Apache Kafka can be summarized as follows:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;⚡ &lt;strong&gt;10x cost-effective than Apache Kafka&lt;/strong&gt;: AutoScaling、Support Spot Instance、Separate Storage to S3. All this make AutoMQ 10x cost-effective than Apache Kafka.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;👍 &lt;strong&gt;Easy to operate&lt;/strong&gt;: No need to manage the cluster's capacity yourself. Stateless Broker that can autoscale in seconds. Forget data skew, hot and cold data competition. Self-blancing fixes them all automatically.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;🚀 &lt;strong&gt;High performance&lt;/strong&gt;:  Single digit ms latency with high throughput as Apache Kafka, but with much better catch-up reads performance.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;😄 &lt;strong&gt;Easy to migrate&lt;/strong&gt;: 100% Compatible with Apache Kafka,  so you don't need to change anyting you already have. Access to the new bootstrap server endpoint and all things are done.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  10x Cost Effective
&lt;/h3&gt;

&lt;p&gt;AutoMQ's innovative architecture brings unprecedented cost savings in the realm of data-intensive software. Its design focuses on optimizing both computational and storage resources, resulting in a cost advantage that's nearly tenfold compared to traditional solutions.&lt;/p&gt;

&lt;p&gt;The first major advantage comes from the optimization of EC2 resources. By eliminating data replication, AutoMQ removes the need for extra resources to manage replication traffic. And, coupled with the platform's elastic nature that dynamically adjusts the cluster size in response to workload, results in a dramatic reduction of EC2 resources—up to 90%.&lt;/p&gt;

&lt;p&gt;Furthermore, AutoMQ's stateless architecture allows the use of Spot instances. This strategy leads to a significant cost reduction, further enhancing computational resource savings.&lt;/p&gt;

&lt;p&gt;On the storage front, AutoMQ also shines. Instead of adhering to the traditional three-replication EBS storage, it utilizes a single-replica object storage model. This innovative approach reduces storage costs by as much as 90%.&lt;/p&gt;

&lt;p&gt;Our detailed cost comparison chart, based on real bill comparisons from stress testing on AWS, illustrates these savings. For more in-depth information, we invite you to access the complete report from our website.&lt;/p&gt;

&lt;h3&gt;
  
  
  Instant Elastic Efficiency
&lt;/h3&gt;

&lt;p&gt;AutoMQ's shared storage architecture greatly enhances operational efficiency. For example, reassigning partitions in AutoMQ no longer involves data replication and can be completed within seconds, unlike in Kafka where it could take up to several hours. Additionally, when it comes to cluster scaling, AutoMQ can balance the traffic of new nodes with the cluster in just about one minute by reassigning partitions in batches. In contrast, this process could take days with Kafka.&lt;/p&gt;

&lt;h3&gt;
  
  
  100% Compatibility
&lt;/h3&gt;

&lt;p&gt;Perhaps one of the most important aspects of AutoMQ is its compatibility. We've replaced Kafka's storage layer with s3stream while keeping all the code from the computation layer. This ensures that AutoMQ is fully compatible with Kafka's protocols and features. For instance, newer versions of Apache Kafka that support features such as Compact Topics, Idempotent Producer, and Transactional Messages are fully supported by AutoMQ.&lt;/p&gt;

&lt;p&gt;Furthermore, we replace Kafka's storage layer through a very small LogSegment aspect. This approach makes it very easy for us to synchronize code from the Kafka upstream, meaning that we can easily merge new features of Apache Kafka in the future. This is a significant advantage over solutions like WarpStream, where such compatibility and future-proofing can be a challenge.&lt;/p&gt;

&lt;p&gt;In summary, AutoMQ's flexible architecture, cost savings, operational efficiency, and compatibility make it a powerful solution for stream storage in the cloud.&lt;/p&gt;

&lt;h1&gt;
  
  
  Roadmap: streaming data to data lake
&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fftfe6k.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fimage.automq.com%2F20240805bot%2Fftfe6k.png" alt="AutoMQ's roadmap"&gt;&lt;/a&gt;&lt;br&gt;
In this final section, we outline our vision for the future of streaming data into data lakes, a critical aspect of our roadmap.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Shift Toward Shared Data
&lt;/h3&gt;

&lt;p&gt;We're witnessing a trend where all data-intensive software eventually stores data on object storage to leverage the benefits of shared storage. However, even with all data stored on object storage, there isn't a straightforward way to share data between different systems. This process typically requires Extract, Transform, Load (ETL) operations and data format conversions.&lt;/p&gt;

&lt;p&gt;We believe the transition from shared storage to shared data will be the next critical evolution in modern data technology. Table storage solutions like Delta Lake and Iceberg have unified the data format in the data lake, making this transition feasible.&lt;/p&gt;

&lt;h3&gt;
  
  
  From Stream to Lake: A Data Journey
&lt;/h3&gt;

&lt;p&gt;In the future, we envision data usage to be a seamless, interconnected process that maximizes data utility and operational efficiency.&lt;/p&gt;

&lt;p&gt;The journey begins with data generation. As data is produced in a streaming manner, it is immediately stored in stream storage. This continuous flow of information forms the foundation of our data landscape.&lt;/p&gt;

&lt;p&gt;Next, we unlock the real-time value of this data. Tools like Flink Jobs, Spark Jobs, or Kafka consumers dive into the data stream, extracting valuable insights on the fly through the Stream API. This step is crucial in keeping pace with the dynamic nature of the data.&lt;/p&gt;

&lt;p&gt;As the data matures and loses its freshness, the built-in Compactor in AutoMQ steps in. Quietly and transparently, it transforms the data into the Iceberg table format. This conversion process ensures the data remains accessible and usable even after it has passed its real-time relevance.&lt;/p&gt;

&lt;p&gt;Finally, we arrive at the stage of large-scale analysis. The entire big data technology stack can now access the converted data, using a zero ETL approach. This approach eliminates the need for additional data processing, allowing for direct, efficient analysis.&lt;/p&gt;

&lt;p&gt;In conclusion, as we continue to innovate and evolve, our goal remains the same: to provide a powerful, efficient, and cost-effective solution for stream storage in the cloud. By streamlining the process of streaming data to data lakes, we aim to further enhance the value and utility of big data for businesses.&lt;/p&gt;

&lt;h1&gt;
  
  
  Embracing the Future with AutoMQ
&lt;/h1&gt;

&lt;p&gt;AutoMQ, our cloud-native solution, is more than an alternative to existing technologies—it's a leap forward in the realm of data-intensive software. It promises cost savings, operational efficiency, and seamless compatibility.&lt;/p&gt;

&lt;p&gt;We envision a future where data effortlessly streams into data lakes, unlocking the potential of real-time generative AI. This approach will enhance the utility of big data, leading to more comprehensive analyses and insights.&lt;/p&gt;

&lt;p&gt;Finally, we invite you to join us on this journey and contribute to the evolution of AutoMQ. Visit our website to access the GitHub repository and join our Slack group for communication: &lt;a href="https://www.automq.com/" rel="noopener noreferrer"&gt;https://www.automq.com/&lt;/a&gt;. Let's shape the future of data together with AutoMQ.&lt;/p&gt;

&lt;h1&gt;
  
  
  References
&lt;/h1&gt;

&lt;p&gt;Here are some useful links to deepen your understanding of AutoMQ. Feel free to reach out if you have any queries.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;AutoMQ Website: &lt;a href="https://www.automq.com/" rel="noopener noreferrer"&gt;https://www.automq.com/&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;AutoMQ Repository: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;AutoMQ Architecture Overview: &lt;a href="https://docs.automq.com/automq/architecture/overview" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/architecture/overview&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;AutoMQ S3Stream Overview: &lt;a href="https://docs.automq.com/automq/architecture/s3stream-shared-streaming-storage/overview" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/architecture/s3stream-shared-streaming-storage/overview&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;AutoMQ Technical Advantages: &lt;a href="https://docs.automq.com/automq/architecture/technical-advantage/overview" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/architecture/technical-advantage/overview&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;The Difference between AutoMQ and Kafka: &lt;a href="https://docs.automq.com/automq/what-is-automq/difference-with-apache-kafka" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/what-is-automq/difference-with-apache-kafka&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;The Difference between AutoMQ and WarpStream: &lt;a href="https://docs.automq.com/automq/what-is-automq/difference-with-warpstream" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/what-is-automq/difference-with-warpstream&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;The Difference between AutoMQ and Tiered Storage: &lt;a href="https://docs.automq.com/automq/what-is-automq/difference-with-tiered-storage" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/what-is-automq/difference-with-tiered-storage&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;AutoMQ Customers: &lt;a href="https://www.automq.com/customer" rel="noopener noreferrer"&gt;https://www.automq.com/customer&lt;/a&gt;
&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>opensource</category>
      <category>productivity</category>
      <category>cloud</category>
      <category>tooling</category>
    </item>
    <item>
      <title>How to Monitor AutoMQ Cluster using Guance Cloud</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 09:07:18 +0000</pubDate>
      <link>https://dev.to/automq/how-to-monitor-automq-cluster-using-guance-cloud-26l9</link>
      <guid>https://dev.to/automq/how-to-monitor-automq-cluster-using-guance-cloud-26l9</guid>
      <description>&lt;p&gt;&lt;strong&gt;Preface&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;Guance Cloud&lt;/strong&gt;&lt;br&gt;
Guance Cloud [1] is a unified real-time monitoring application designed for cloud platforms, cloud-native environments, applications, and business-related needs. It integrates three main signals: metrics, logs, and traces, covering testing, prerelease, and production environments to achieve observability across the entire software development lifecycle. Through Guance Cloud, enterprises can build comprehensive application full-link observability, enhancing the transparency and controllability of the overall IT architecture.&lt;br&gt;
As a powerful data analysis platform, Guance Cloud includes several core modules such as the DataKit [2] unified data collector and the DataFlux Func data processing development platform.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvanuowwtlmwp73ykhvun.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvanuowwtlmwp73ykhvun.png" alt="Image description" width="800" height="341"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;AutoMQ&lt;br&gt;
AutoMQ [3] is a next-generation Apache Kafka distribution redesigned based on cloud-native concepts. It provides up to 10 times the cost and elasticity advantages while maintaining 100% compatibility with the Apache Kafka protocol. Moreover, AutoMQ stores data entirely on S3, allowing it to quickly handle sudden traffic spikes during cluster expansion without the need for data replication. In contrast, Apache Kafka requires substantial bandwidth for partition data replication after scaling, making it difficult to manage sudden traffic surges. With features like automatic scaling, self-balancing, and automatic fault recovery, AutoMQ achieves a high degree of system autonomy, offering higher levels of availability without the need for manual intervention. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Observability Interface of AutoMQ&lt;/strong&gt;&lt;br&gt;
Due to AutoMQ's full compatibility with Kafka and support for open Prometheus-based metrics collection ports, it can be integrated with Guance Cloud's data collection tool, DataKit. This enables users to monitor and manage the status of AutoMQ clusters conveniently. The Guance Cloud platform also supports user-defined aggregation and querying of metrics data. By utilizing the provided dashboard templates or custom dashboards, we can effectively compile various information about the AutoMQ cluster, such as common Topics, Brokers, Partitions, and Group statistics.&lt;br&gt;
Based on observable data from Metrics, we can also query the errors encountered during the operation of the AutoMQ cluster and various current system utilization metrics, such as JVM CPU usage, JVM heap usage, and cache size. These metrics can help us quickly identify and resolve issues when the cluster encounters anomalies, which is highly beneficial for system high availability and quick recovery. Next, I will introduce how to monitor the AutoMQ cluster status using the Observability Cloud Platform.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Steps to Integrate with the Observability Cloud&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;Enable Metric Fetch Interface in AutoMQ&lt;/strong&gt;&lt;br&gt;
Refer to the AutoMQ documentation: Cluster Deployment | AutoMQ [4]. Before deployment and startup, add the following configuration parameters to enable the Prometheus fetch interface. After starting the AutoMQ cluster with the following parameters, each node will additionally open an HTTP interface for fetching AutoMQ monitoring metrics. The format of the metrics will follow Prometheus Metrics format.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-server-start.sh ...\
--override  s3.telemetry.metrics.exporter.type=prometheus \
--override  s3.metrics.exporter.prom.host=0.0.0.0 \
--override  s3.metrics.exporter.prom.port=8890 \
....
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the AutoMQ monitoring metrics are enabled, you can fetch Prometheus format monitoring metrics from any node via HTTP protocol at the address: http://{node_ip}:8890. A sample response is as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;....
kafka_request_time_mean_milliseconds{otel_scope_name="io.opentelemetry.jmx",type="DescribeDelegationToken"} 0.0 1720520709290
kafka_request_time_mean_milliseconds{otel_scope_name="io.opentelemetry.jmx",type="CreatePartitions"} 0.0 1720520709290
...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For more information on metrics, refer to the official AutoMQ documentation: Metrics | AutoMQ [5].&lt;br&gt;
&lt;strong&gt;Install and Configure the DataKit Collection Tool&lt;/strong&gt;&lt;br&gt;
DataKit is an open-source monitoring collection tool provided by the Observability Cloud, supporting the fetching of Prometheus Metrics. We can use DataKit to fetch monitoring data from AutoMQ and aggregate it into the Observability Cloud platform.&lt;br&gt;
&lt;strong&gt;Installation of DataKit Tool&lt;/strong&gt;&lt;br&gt;
For more details on installing DataKit, refer to the documentation: Host Installation - Observability Cloud Documentation [6].&lt;br&gt;
First, register for an Observability Cloud account and log in. Then, from the main interface, click on "Integration" on the left side and select "DataKit" at the top. You will see the DataKit installation command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;DK_DATAWAY="https://openway.guance.com?token=&amp;lt;TOKEN&amp;gt;" bash -c "$(curl -L https://static.guance.com/datakit/install.sh)" 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Copy the above command and run the DataKit installation command on all nodes in the cluster to complete the installation.&lt;br&gt;
DataKit needs to be installed on all Brokers in the cluster that need to be monitored.&lt;/p&gt;

&lt;p&gt;After successfully executing the installation command, use the command &lt;code&gt;datakit monitor&lt;/code&gt; to verify whether DataKit was installed successfully.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7zb5e5ayvm01wvainkxd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7zb5e5ayvm01wvainkxd.png" alt="Image description" width="800" height="487"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;AutoMQ Collector Configuration and Activation&lt;/strong&gt;&lt;br&gt;
In this section, we will configure the AutoMQ collector for DataKit on the server where each data collection node resides. Navigate to the directory &lt;code&gt;/usr/local/datakit/conf.d/prom&lt;/code&gt; and create a collector configuration file named &lt;code&gt;prom.conf&lt;/code&gt;. The collector configuration includes the open observable data interface, collector name, prom instance name, and important collection interval. You can make adjustments to the configuration on each server as needed:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  [[inputs.prom]]

  urls = ["http://clientIP:8890/metrics"]   # clientIP 为你自己的服务器地址
  source = "AutoMQ"

  ## Keep Exist Metric Name
  ## If the keep_exist_metric_name is true, keep the raw value for field names.
  keep_exist_metric_name = true

  [inputs.prom.tags_rename]
    overwrite_exist_tags = true

  [inputs.prom.tags_rename.mapping]
    service_name = "job"
    service_instance_id = "instance"

  [inputs.prom.tags]
    component="AutoMQ"
  interval = "10s"

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Monitor the AutoMQ cluster through the Cloud Visualization Management.&lt;/strong&gt;&lt;br&gt;
The Observation Cloud platform has integrated AutoMQ and offers multiple default dashboards. You can view them at Dashboard Example [7]. Below are some commonly used templates, with a brief introduction to their functionalities:&lt;br&gt;
&lt;strong&gt;Cluster Monitoring&lt;/strong&gt;&lt;br&gt;
This primarily displays the number of active Brokers, total number of Topics, number of Partitions, etc. Additionally, you can specify which node to query by selecting it in the &lt;code&gt;Cluster_id&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0mh06wznkpr1mh2qeaw9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0mh06wznkpr1mh2qeaw9.png" alt="Image description" width="800" height="361"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;By monitoring the state of the Kafka cluster, we can promptly detect and resolve potential issues, such as node failures, insufficient disk space, and network latency, to ensure the system remains controllable and stable.&lt;br&gt;
&lt;strong&gt;Broker Monitoring&lt;/strong&gt;&lt;br&gt;
The AutoMQ Broker dashboard on Guance Cloud describes various metrics for all Brokers, such as the number of connections, the number of partitions, the number of messages received per second (ops), and the input/output data volume per second, measured in bytes.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fba7niyfuob0bjtnyhwwm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fba7niyfuob0bjtnyhwwm.png" alt="Image description" width="800" height="361"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Topic Monitoring&lt;/strong&gt;&lt;br&gt;
This section provides an overview of information for all Topics contained within all nodes. As mentioned above, you can specify and query Topic information under a specific node. These metrics mainly include the space occupied by each Topic, the number of messages received, and the Request Throughput, which indicates the ability to process requests per unit time.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqm0szsm8gp9i97jbd9oz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqm0szsm8gp9i97jbd9oz.png" alt="Image description" width="800" height="361"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;At this point, we have successfully monitored the status of the AutoMQ cluster using Guance Cloud, and the data on the dashboard is obtained by aggregating or querying Metrics indicators.&lt;br&gt;
&lt;strong&gt;Conclusion&lt;/strong&gt;&lt;br&gt;
In this article, we introduced how to perfectly integrate the Guance Cloud platform with AutoMQ to monitor the status information of the AutoMQ cluster. There are also many advanced operations, such as custom alert functions and custom data queries, which can be customized according to the rules provided by the official documentation. You can manually experiment with these operations to find the ones that suit your needs. We also hope that this article will help you when integrating the Guance Cloud platform with AutoMQ!&lt;br&gt;
&lt;strong&gt;References&lt;/strong&gt;&lt;br&gt;
[1] Guance Cloud: &lt;a href="https://docs.guance.com/getting-started/product-introduction/" rel="noopener noreferrer"&gt;https://docs.guance.com/getting-started/product-introduction/&lt;/a&gt;&lt;br&gt;
[2] DataKit: &lt;a href="https://docs.guance.com/datakit/" rel="noopener noreferrer"&gt;https://docs.guance.com/datakit/&lt;/a&gt;&lt;br&gt;
[3] AutoMQ: &lt;a href="https://www.automq.com" rel="noopener noreferrer"&gt;https://www.automq.com&lt;/a&gt;&lt;br&gt;
[4] Cluster Deployment of AutoMQ: &lt;a href="https://docs.automq.com/en/docs/automq-opensource/IyXrw3lHriVPdQkQLDvcPGQdnNh" rel="noopener noreferrer"&gt;https://docs.automq.com/en/docs/automq-opensource/IyXrw3lHriVPdQkQLDvcPGQdnNh&lt;/a&gt;&lt;br&gt;
[5] Host Installation - Guance Cloud Documentation: &lt;a href="https://docs.guance.com/datakit/datakit-install/" rel="noopener noreferrer"&gt;https://docs.guance.com/datakit/datakit-install/&lt;/a&gt;&lt;br&gt;
[6] Metrics | AutoMQ：&lt;a href="https://docs.automq.com/zh/docs/automq-opensource/ArHpwR9zsiLbqwkecNzcqOzXn4b" rel="noopener noreferrer"&gt;https://docs.automq.com/zh/docs/automq-opensource/ArHpwR9zsiLbqwkecNzcqOzXn4b&lt;/a&gt;&lt;br&gt;
[7] Dashboard Example: &lt;a href="https://console.guance.com/scene/dashboard/createDashboard?w=wksp_63b96920660e4962a07429b65ef163e7&amp;amp;lak=Scene" rel="noopener noreferrer"&gt;https://console.guance.com/scene/dashboard/createDashboard?w=wksp_63b96920660e4962a07429b65ef163e7&amp;amp;lak=Scene&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Challenges of Custom Cache Implementation in Netty-Based Streaming Systems: Memory Fragmentation and OOM Issues</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:59:00 +0000</pubDate>
      <link>https://dev.to/automq/challenges-of-custom-cache-implementation-in-netty-based-streaming-systems-memory-fragmentation-and-oom-issues-520c</link>
      <guid>https://dev.to/automq/challenges-of-custom-cache-implementation-in-netty-based-streaming-systems-memory-fragmentation-and-oom-issues-520c</guid>
      <description>&lt;p&gt;&lt;strong&gt;Preface&lt;/strong&gt;&lt;br&gt;
Kafka, as a stream processing platform, aims for end-to-end low latency in real-time stream computation and online business scenarios. In offline batch processing and peak shaving scenarios, it seeks high throughput for cold reads. Both scenarios require a well-designed data caching mechanism to support them. Apache Kafka stores data in local files and accesses them by mapping files into memory using mmap, naturally leveraging the operating system for file buffering, cache loading, and cache eviction.&lt;br&gt;
AutoMQ adopts a separation of storage and computation architecture, where storage is offloaded to object storage. With no local data files, it cannot directly use mmap for data caching like Apache Kafka. At this point, there are usually two approaches to cache data from object storage:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The first approach is to download object storage files to local files and then read the local files using mmap. This approach is relatively simple to implement but requires additional disk space to cache data. Depending on the size and rate of the cache required, it also necessitates purchasing disk space and IOPS, making it economically inefficient.&lt;/li&gt;
&lt;li&gt;The second approach is to directly use memory for data caching based on the data consumption characteristics of stream processing. This method is more complex to implement, essentially requiring the creation of a memory management system similar to an operating system. However, like everything in life has its pros and cons, implementing memory cache management oneself allows for achieving the best caching efficiency and cost-effectiveness based on business scenarios.
To reduce operational complexity and holding costs, and to improve cache efficiency, AutoMQ ultimately chose the second approach: "directly using memory for data caching."
&lt;strong&gt;AutoMQ Cache Design&lt;/strong&gt;
Directly leveraging memory for data caching, AutoMQ has designed two caching mechanisms for tail read and cold read scenarios based on their data access characteristics: LogCache and BlockCache.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgmv4md3667zjllk7gpl9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgmv4md3667zjllk7gpl9.png" alt="Image description" width="800" height="460"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;LogCache is designed for the tail read scenario. When data is uploaded to object storage, it is simultaneously cached in LogCache as a single RecordBatch. This allows hot data to be accessed directly from the cache, providing extremely low end-to-end latency. Compared to general-purpose OS cache designs, LogCache has the following two features:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;FIFO: Given the characteristic of continuous access to new data in tail read scenarios, LogCache uses a First In, First Out eviction policy to ensure the availability of the cache for new data.&lt;/li&gt;
&lt;li&gt;Low Latency: LogCache has a dedicated cache space solely responsible for caching hot data, avoiding the problem of cold data reads affecting hot data consumption.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;BlockCache is designed for cold read scenarios. When the required data cannot be accessed in LogCache, it is read from BlockCache. Compared to LogCache, BlockCache has the following two distinctions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;LRU: BlockCache uses the Least Recently Used eviction strategy, which offers better cache utilization in high fan-out cold read scenarios.&lt;/li&gt;
&lt;li&gt;High Throughput: Cold read scenarios focus on throughput; therefore, BlockCache reads and caches data in large chunks (~4MB) from object storage and uses a prefetching strategy to load data that is likely to be read next.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In Java programs, data can be cached in memory using either on-heap or off-heap memory. To alleviate the burden on JVM GC, AutoMQ uses off-heap Direct Memory for caching data. To improve the efficiency of Direct Memory allocation, it employs the industry-standard Netty PooledByteBufAllocator for memory allocation and release from a pooled memory.&lt;br&gt;
&lt;strong&gt;"The Incident" occurred.&lt;/strong&gt;&lt;br&gt;
The expectation was that by using Netty's PooledByteBufAllocator, AutoMQ could achieve efficient memory allocation speed through pooling, along with a well-honed memory allocation strategy to minimize overhead, providing peace of mind. However, during the performance testing of AutoMQ 1.0.0 RC, reality hit hard.&lt;br&gt;
AutoMQ was deployed on a 2C16G production model, with an off-heap memory limit set to 6GiB using -XX:MaxDirectMemorySize=6G. Memory allocation was set as 2GiB for LogCache + 1GiB for BlockCache + 1GiB for other small items, totaling ~4GiB, which is less than 6GiB. In theory, there was ample off-heap memory available. However, in practice, after running AutoMQ 1.0.0 RC for an extended period under various loads, an OutOfMemoryError (OOM) was encountered.&lt;/p&gt;

&lt;p&gt;Following the principle of suspecting our own code before suspecting mature libraries and operating systems.&lt;br&gt;
Upon observing the exception, the initial suspicion was whether there was a missed ByteBuf#release call in the code. Hence, the Netty leak detection level was set to -Dio.netty.leakDetection.level=PARANOID to check if any ByteBuf instances were being garbage collected without being released. After running for a while, no leak logs were found, ruling out the possibility of missed releases.&lt;/p&gt;

&lt;p&gt;Next, the suspicion shifted to whether any part of the code was allocating more memory than expected. Netty's ByteBufAllocatorMetric only provides global memory usage statistics, and traditional memory allocation flame graphs only offer memory request amounts at specific times. What we needed was the memory usage of various types at a given moment. Therefore, AutoMQ consolidated ByteBuf allocation into a custom ByteBufAlloc factory class, using WrappedByteBuf to track memory requests and releases of various types. This allowed us to record the memory usage of different types at any given moment and also record Netty's actual memory usage, thereby providing insight into AutoMQ's overall and categorized memory usage.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Buffer usage: 
ByteBufAllocMetric{allocatorMetric=PooledByteBufAllocatorMetric(usedDirectMemory: 2294284288; ...), // Physical Memory Size Allocated by Netty
allocatedMemory=1870424720, // Total Memory Size Requested By AutoMQ
1/write_record=1841299456, 11/block_cache=0, ..., // Detail Memory Size Requested By AutoMQ
pooled=true, direct=true} (com.automq.stream.s3.ByteBufAlloc)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After adding categorized memory statistics, it was found that the memory usage of various types was within the expected range. However, it was observed that there was a significant discrepancy between the memory requested by AutoMQ and the actual memory allocated by Netty. This discrepancy grew over time, sometimes even resulting in Netty's actual memory usage being twice that of AutoMQ's requested memory. This discrepancy was identified as memory fragmentation in memory allocation.&lt;/p&gt;

&lt;p&gt;Ultimately, the cause of the OOM was identified as memory fragmentation in Netty's PooledByteBufAllocator. Having initially identified the problem, the next step was to understand why Netty had memory fragmentation and how AutoMQ could mitigate this issue.&lt;br&gt;
&lt;strong&gt;Netty Memory Fragmentation&lt;/strong&gt;&lt;br&gt;
First, let's explore the causes of Netty's memory fragmentation. Netty's memory fragmentation can be divided into internal fragmentation and external fragmentation:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Internal Fragmentation: This type of fragmentation occurs due to size standardization alignment. For example, when you expect to allocate 1 byte, but the underlying system actually occupies 16 bytes, leading to an internal fragmentation waste of 15 bytes.&lt;/li&gt;
&lt;li&gt;External Fragmentation: Simply put, any fragmentation caused by factors other than internal fragmentation is considered external fragmentation. This usually results from memory layout fragmentation caused by allocation algorithms.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Internal and external fragmentation exhibit different behaviors in different versions of Netty. Below, we will briefly introduce the working mechanisms and causes of memory fragmentation for the Buddy Allocation Algorithm and the PageRun/PoolSubPage Allocation Algorithm, using Netty version 4.1.52 as a dividing line.&lt;br&gt;
Buddy Allocation Algorithm in Netty &amp;lt; 4.1.52&lt;br&gt;
Netty versions prior to 4.1.52 use the Buddy Allocation Algorithm, which originates from jemalloc3. To improve memory allocation efficiency, Netty requests a contiguous chunk of memory (PoolChunk) from the operating system at once. When a ByteBuf is requested from the upper layer, this chunk of memory is logically divided and returned as needed. The default size of a PoolChunk is 16MB, which is logically divided into 2048 pages, each 8KB in size. The memory usage is represented by a complete binary tree.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwm5ynyyl2suk7e9s00k3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwm5ynyyl2suk7e9s00k3.png" alt="Image description" width="800" height="287"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Each node in the complete binary tree uses one byte to represent the node's state (memoryMap):&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The initial value represents the number of layers, with the status value == number of layers indicating that the node is completely idle.&lt;/li&gt;
&lt;li&gt;When the number of layers &amp;lt; status value &amp;lt; 12, it means that the node is partially used but still has remaining space.&lt;/li&gt;
&lt;li&gt;When the status value == 12, it means that the node has been fully allocated.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Memory allocation is divided into four types: Tiny [0, 512 bytes], Small (512 bytes, 8KB), Normal [8KB, 16MB], and Huge (16MB, Max). Tiny and Small are managed by PoolSubpage, Normal is managed by PoolChunk, and Huge is allocated directly.&lt;/p&gt;

&lt;p&gt;First, let's look at the allocation efficiency of small memory blocks. Tiny [0, 512 bytes] and Small (512 bytes, 8KB) divide a Page into equally sized logical blocks through PoolSubpage, with a bitmap marking the usage of these blocks:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The basic unit of Tiny memory allocation is 16 bytes, meaning if the requested size is 50 bytes, 64 bytes are actually allocated, resulting in an internal fragmentation rate of 28%.&lt;/li&gt;
&lt;li&gt;The basic unit of Small memory allocation is 1KB, meaning if the requested size is 1.5KB, 2KB are actually allocated, resulting in an internal fragmentation rate of 25%.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Next, let's examine the allocation of medium-sized memory blocks, Normal [8KB, 16MB]. Suppose we request 2MB + 1KB = 2049KB from a completely idle PoolChunk:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;2049KB normalized upwards to 4MB using base 2, thus targeting a Depth-3 free node.&lt;/li&gt;
&lt;li&gt;Check node at index=1, find it free, then check the left subtree.&lt;/li&gt;
&lt;li&gt;Check node at index=2, find it free, then continue checking the left subtree.&lt;/li&gt;
&lt;li&gt;Check node at index=4, find it unallocated, mark the state of index=4 as 12, and update the parent node's state to the smallest of its children, thus changing the state of index=2 to 3, similarly updating parent nodes' states in succession.&lt;/li&gt;
&lt;li&gt;Allocation completed.
From the allocation result, we can see that requesting 2049KB of memory actually marks 4MB as occupied, implying an internal fragmentation rate of 49.9%.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Suppose another 9MB memory is requested. Although the previous PoolChunk still has 12MB of remaining space, due to the Buddy memory allocation algorithm, index=1 is partially occupied, requiring a new PoolChunk to allocate 9MB of memory. The resulting external fragmentation rate is 1 - (4MB + 9MB) / 32MB = 59.3%. The effective memory utilization rate, which is the required memory / actual underlying occupied memory, is only 34.3%.&lt;/p&gt;

&lt;p&gt;Furthermore, in scenarios of continuous allocation and release of variously sized memory blocks, even if the PoolChunk doesn't allocate a large space, it might be logically fragmented by scattered memory blocks, leading to increased external memory fragmentation. As shown in the figure below, although the upper-layer application ultimately retains only 4 * 8KB, it is no longer possible to request 4MB of memory from this PoolChunk.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb87eael7ldmwl5uxzifd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb87eael7ldmwl5uxzifd.png" alt="Image description" width="800" height="287"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;PageRun/PoolSubpage Allocation Algorithm in Netty &amp;gt;= 4.1.52&lt;br&gt;
Netty &amp;gt;= 4.1.52 adopts jemalloc4 to enhance memory allocation through the PageRun/PoolSubpage allocation strategy. Compared to the original Buddy allocation algorithm, it offers lower internal and external memory fragmentation rates for both small and large memory allocations.&lt;br&gt;
The PageRun/PoolSubpage allocation algorithm compared to the original Buddy allocation algorithm:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The default size of a Chunk has been reduced from 16MB to 4MB.&lt;/li&gt;
&lt;li&gt;The Chunk and Page concepts are retained, with the addition of the Run concept. A Run is a series of contiguous Pages used to allocate Normal (28KB to 4MB) medium-sized memory.&lt;/li&gt;
&lt;li&gt;Tiny and Small memory blocks are replaced with PoolSubpages, which can span multiple Pages, ranging from 16 bytes to 28KB, with a total of 38 basic allocation sizes.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb0ylmfinlkaosn9dvze5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb0ylmfinlkaosn9dvze5.png" alt="Image description" width="800" height="157"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let's first examine the efficiency of small memory block allocation with an example of requesting 1025 bytes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;First, 1025 bytes will be rounded to the nearest PoolSubpage allocation size, which is 1280 bytes.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sizeIdx2sizeTab=[16, 32, 48, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2048, 2560, 3072, 3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384, 20480, 24576, 28672, ...]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Then, PoolChunk will determine that the PoolSubPage should contain 5 pages by finding the least common multiple of 1280 bytes and the page size of 8KB, which is 40KB.&lt;/li&gt;
&lt;li&gt;It allocates 5 contiguous pages from PoolChunk and tracks the allocated elements via bitmapIdx.&lt;/li&gt;
&lt;li&gt;At this point, the allocation is complete, resulting in an internal fragmentation rate of 1 - 1025 / 1280 = 19.9%.
Thanks to the finer granularity of PoolSubPage, which has been refined from 2 levels to 38 levels, the allocation efficiency of small memory blocks has been significantly improved.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Next, let's examine the allocation efficiency of medium-sized memory blocks, Normal (28KB, 4MB]. Suppose a request is made to allocate 2MB + 1KB = 2049KB of memory from a completely idle PoolChunk:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;After rounding up 2049KB to the nearest multiple of 8KB, it is determined that 257 pages are needed.&lt;/li&gt;
&lt;li&gt;PoolChunk finds a run that satisfies the size requirement: Run{offset=0, size=512}.&lt;/li&gt;
&lt;li&gt;PoolChunk splits the run into Run{offset=0, size=257} and Run{offset=257, size=255}. The first run is returned to the requester, while the second run is added to the free run list (runsAvail).&lt;/li&gt;
&lt;li&gt;At this point, the allocation is complete, and the internal fragmentation rate is 1 - 2049KB / (257 * 8K) = 0.3%;
Through the PageRun mechanism, Netty can control the memory waste of memory block allocation greater than 28KB, not exceeding 8KB, with an internal fragmentation rate of less than 22.2%.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Assuming an additional 1MB of memory is applied for, the PoolChunk continues to run the same logic, splitting Run{offset=257, size=255} into Run{offset=257, size=128} and Run{offset=385, size=127}. The former is returned to the upper layer, while the latter is added to the list of free Runs. At this point, the external fragmentation rate is 25%. If we were to follow the old Buddy algorithm, in a scenario where the size of the PoolChunk is 4MB, a new PoolChunk would need to be opened, resulting in an external fragmentation rate of 62.5%.&lt;/p&gt;

&lt;p&gt;Although the PageRun/PoolSubpage allocation algorithm has a lower internal and external memory fragmentation rate compared to the original Buddy allocation algorithm, it does not compact fragmented memory through Garbage Collection (GC) like the JVM does. This results in scenarios where memory blocks of various sizes are continuously allocated and released, leading to fragmented available runs within a PoolChunk. Over time, the memory fragmentation rate gradually increases, eventually causing an Out Of Memory (OOM) error.&lt;br&gt;
&lt;strong&gt;AutoMQ's Response&lt;/strong&gt;&lt;br&gt;
After introducing the Netty memory allocation mechanism and scenarios where memory fragmentation occurs, how does AutoMQ solve the memory fragmentation issue?&lt;/p&gt;

&lt;p&gt;LogCache adopts a first-in, first-out eviction policy to cater to the characteristics of tailing read for continuous access to new data. This means memory allocated at adjacent times will be freed at adjacent times. AutoMQ employs a strategy called ByteBufSeqAlloc:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;ByteBufSeqAlloc requests ByteBuf of ChunkSize from Netty each time, avoiding external memory fragmentation and achieving zero external memory fragmentation;&lt;/li&gt;
&lt;li&gt;ByteBufSeqAlloc allocates memory through the underlying ByteBuf#retainSlice, which splits small memory segments from large contiguous memory blocks, avoiding internal memory fragmentation caused by size normalization, achieving zero internal memory fragmentation.&lt;/li&gt;
&lt;li&gt;When releasing, adjacent blocks are released together. It's possible that most of a block is released while a small portion is still in use, preventing the entire large block from being released. However, this waste occurs only once and will only waste the size of one ChunkSize.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The feature of BlockCache is to pursue high throughput for cold reads, reading large segments of data from object storage. AutoMQ's strategy is to cache large chunks of raw data from object storage:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;On-demand decoding: Data is decoded into specific RecordBatch only when queried, reducing the number of resident memory blocks and hence minimizing memory fragmentation.&lt;/li&gt;
&lt;li&gt;Structured splitting: In the future, large cache blocks can be split into structured 1MB memory blocks to avoid increasing memory fragmentation rates caused by continuous allocation and release of various sized memory blocks.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flgtjtlind6exf8t7xout.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flgtjtlind6exf8t7xout.png" alt="Image description" width="800" height="445"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;It can be seen that the essence of optimizing LogCache and BlockCache is to avoid memory fragmentation issues brought by Netty's memory allocation strategy through large and structured memory allocations according to the characteristics of their own caches. With this method, AutoMQ maintains an off-heap memory fragmentation rate below 35% in various long-term running scenarios, such as tail reads, cold reads, and mixed message sizes, without encountering off-heap memory OOM issues.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fapqoazo8lqceqvamecx1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fapqoazo8lqceqvamecx1.png" alt="Image description" width="800" height="518"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Summary&lt;/strong&gt;&lt;br&gt;
Netty's PooledByteBufAllocator is not a silver bullet; when using it, consider the actual memory space amplification caused by memory fragmentation and plan to reserve reasonable JVM memory size. If Netty is used only as a network layer framework, the memory lifecycle allocated by PooledByteBufAllocator will be relatively short, so the actual memory amplification caused by memory fragmentation will not be significant. However, it is still recommended to upgrade Netty's version to 4.1.52 or above for better memory allocation efficiency. If using Netty's PooledByteBufAllocator for caching, it is recommended to allocate large blocks of memory and then split them continuously to avoid Netty's memory fragmentation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Reference Document:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://netty.io/wiki/reference-counted-objects.html" rel="noopener noreferrer"&gt;https://netty.io/wiki/reference-counted-objects.html&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://netty.io/news/2020/09/08/4-1-52-Final.html" rel="noopener noreferrer"&gt;https://netty.io/news/2020/09/08/4-1-52-Final.html&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
    </item>
    <item>
      <title>AutoMQ vs Kafka: An Independent In-Depth Evaluation and Comparison by Little Red Book</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:55:21 +0000</pubDate>
      <link>https://dev.to/automq/automq-vs-kafka-an-independent-in-depth-evaluation-and-comparison-by-little-red-book-kl0</link>
      <guid>https://dev.to/automq/automq-vs-kafka-an-independent-in-depth-evaluation-and-comparison-by-little-red-book-kl0</guid>
      <description>&lt;p&gt;Test Background: The current Xiaohongshu message engine team is deeply collaborating with The AutoMQ Team to promote community building and explore cutting-edge cloud-native messaging engine technologies. This article provides a comprehensive evaluation of AutoMQ based on the OpenMessaging framework. We welcome everyone to join the community and share their evaluation experiences.&lt;br&gt;
&lt;strong&gt;1. Testing Conclusion&lt;/strong&gt;&lt;br&gt;
This article primarily evaluates the performance comparison between the cloud-native messaging engine AutoMQ and Apache Kafka® (version 3.4).&lt;br&gt;
Testing Conclusion:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Real-time Read/Write: With the same cluster size, AutoMQ's maximum read/write throughput is three times that of Apache Kafka, and the E2E latency is 1/13 of Apache Kafka.&lt;/li&gt;
&lt;li&gt;Catch-up Read: With the same cluster size, AutoMQ's peak catch-up read is twice that of Apache Kafka, and during the catch-up read, AutoMQ's write throughput and latency remain unaffected.&lt;/li&gt;
&lt;li&gt;Partition Reassignment: AutoMQ's partition reassignment takes seconds on average, whereas Apache Kafka's partition reassignment takes minutes to hours on average.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;2. Testing Configuration&lt;/strong&gt;&lt;br&gt;
The benchmark testing is enhanced based on the Linux Foundation's OpenMessaging Benchmark, simulating real user scenarios with dynamic workloads.&lt;br&gt;
2.1 Configuration Parameters&lt;br&gt;
By default, AutoMQ forces data to be flushed to disk before responding, using the following configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
acks=all
flush.message=1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;AutoMQ ensures high data durability through EBS's underlying multi-replica mechanism, making multi-replica configurations unnecessary on the Kafka side.&lt;br&gt;
For Apache Kafka, choose version 3.6.0, and based on Confluent's recommendations, do not set &lt;code&gt;flush.message = 1&lt;/code&gt;. Instead, use a three-replica, in-memory asynchronous flush to ensure data reliability (power outages in the data center may cause data loss), configured as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
acks=all
replicationFactor=3
min.insync.replicas=2
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;2.2 Machine Specifications&lt;br&gt;
16 cores, maximum network bandwidth of 800MB/s, configured with a cloud disk of 150MB/s bandwidth&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3. Detailed Comparison&lt;/strong&gt;&lt;br&gt;
3.1 Real-time Read and Write Performance Comparison&lt;br&gt;
This test measures the performance and throughput limits of AutoMQ and Apache Kafka® under the same cluster size and different traffic scales. The test scenarios are as follows:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Deploy 6 data nodes each, create a Topic with 100 partitions&lt;/li&gt;
&lt;li&gt;Starts with 100 MiB/s and 200 MiB/s 1:1 read/write traffic (message size=4kb, batch size=200kb); additionally, both are tested for their maximum throughput.
Load files: [tail-read-100mb.yaml], [tail-read-200mb.yaml], [tail-read-900mb.yaml]
Extreme Throughput Send Latency:&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhl4o9buqgzz26j505eer.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhl4o9buqgzz26j505eer.png" alt="Image description" width="800" height="599"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Extreme Throughput:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8ymy9m91bvnljfzj5471.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8ymy9m91bvnljfzj5471.png" alt="Image description" width="800" height="573"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Detailed Data on Send Duration and E2E Duration:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7w1jz4ntvq75uo3e8yvu.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7w1jz4ntvq75uo3e8yvu.jpeg" alt="Image description" width="800" height="391"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8flf3j89rk0dpjzy7qlb.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8flf3j89rk0dpjzy7qlb.jpeg" alt="Image description" width="800" height="398"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Analysis:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;In a cluster of the same scale, AutoMQ's maximum throughput (870MB/s) is three times that of Apache Kafka (280MB/s).&lt;/li&gt;
&lt;li&gt;Under the same cluster scale and traffic (200 MiB/s), AutoMQ's P999 latency is 1/50th that of Apache Kafka, and the E2E latency is 1/13th that of Apache Kafka.&lt;/li&gt;
&lt;li&gt;Under the same cluster scale and traffic (200 MiB/s), AutoMQ's bandwidth usage is 1/3rd that of Apache Kafka.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;3.2 Comparison of Catch-up Read Performance&lt;br&gt;
Catch-up reading is a common scenario in message and stream systems:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;For messages, they are typically used to decouple business processes and smooth out peaks and valleys. Smoothing out peaks requires the message queue to hold the upstream data so that the downstream can consume it slowly. In this case, the downstream is catching up on cold data that is not in memory.&lt;/li&gt;
&lt;li&gt;For streams, periodic batch processing tasks need to scan and compute data from several hours or even a day ago.&lt;/li&gt;
&lt;li&gt;Additionally, there are failure scenarios: Consumers may go down for several hours and then come back online; consumer logic issues may be fixed, requiring a catch-up on historical data.
Chasing read primarily focuses on two aspects:&lt;/li&gt;
&lt;li&gt;Speed of chasing read: The faster the chasing read, the quicker consumers can recover from failures, and batch processing tasks can produce analytical results faster.&lt;/li&gt;
&lt;li&gt;Isolation of read and write: Chasing read should minimize the impact on the production rate and latency.
Testing
This test measures the chasing read performance of AutoMQ and Apache Kafka® under the same cluster scale. The test scenario is as follows:&lt;/li&gt;
&lt;li&gt;Deploy 6 data nodes each, create a Topic with 100 partitions&lt;/li&gt;
&lt;li&gt;Continuously send data at a throughput of 300 MiB/s.&lt;/li&gt;
&lt;li&gt;After sending 1 TiB of data, start the consumer to consume from the earliest offset.
Load file: [catch-up-read.yaml]
Test Results:&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffvq9h04z89ud0usa0i4x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffvq9h04z89ud0usa0i4x.png" alt="Image description" width="800" height="535"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1zyxp954prwanlysnl9i.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1zyxp954prwanlysnl9i.jpeg" alt="Image description" width="800" height="312"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Analysis&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Under the same cluster size, AutoMQ's catch-up read peak is twice that of Apache Kafka.&lt;/li&gt;
&lt;li&gt;During the catch-up read, AutoMQ's sending throughput was unaffected, with an average send latency increase of approximately 0.4 ms. In contrast, Apache Kafka's sending throughput decreased by 10%, and the average send latency surged to 900 ms. This is because Apache Kafka reads from the disk during catch-up reads and does not perform IO isolation, occupying the cloud disk's read-write bandwidth. This reduces the write bandwidth, leading to a drop in sending throughput. Moreover, reading cold data from the disk contaminates the page cache, further increasing write latency. In comparison, AutoMQ separates reads and writes, utilizing object storage for reads during catch-up, which does not consume disk read-write bandwidth and hence does not affect sending throughput and latency.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;3.3 Partition Reassignment Capability Comparison&lt;/strong&gt;&lt;br&gt;
This test measures the time and impact of reassigning a partition with 30 GiB of data to a node that does not currently have a replica of the partition, under a scenario with regular send and consume traffic. The specific test scenario is as follows:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;2 brokers, with the following setup:

&lt;ul&gt;
&lt;li&gt;1 single-partition single-replica Topic A, continuously reading and writing at a throughput of 40 MiB/s.&lt;/li&gt;
&lt;li&gt;1 four-partition single-replica Topic B, continuously reading and writing at a throughput of 10 MiB/s as background traffic.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;After 10 minutes, migrate the only partition of Topic A to another node with a migration throughput limit of 100 MiB/s.
Load file: [partition-reassign.yaml]&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkypbflz6kdv5iz2lbauu.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkypbflz6kdv5iz2lbauu.jpeg" alt="Image description" width="742" height="406"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Analysis&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;AutoMQ partition migration only requires uploading the buffered data from EBS to S3 to safely open it on the new node. Typically, 500 MiB of data can be uploaded within 2 to 5 seconds. The time taken for AutoMQ partition migration is not dependent on the data volume of the partition. The average migration time is around 2 seconds. During the migration process, AutoMQ returns the NOT_LEADER_OR_FOLLOWER error code to clients. After the migration is complete, the client updates to the new Topic routing table and internally retries sending to the new node. As a result, the send latency for that partition will increase temporarily and will return to normal levels after the migration is complete.&lt;/li&gt;
&lt;li&gt;Apache Kafka® partition reassignment requires copying the partition's replicas to new nodes. While copying historical data, it must also keep up with newly written data. The reassignment duration is calculated as partition data size / (reassignment throughput limit - partition write throughput). In actual production environments, partition reassignment typically takes hours. In this test, reassigning a 30 GiB partition took 15 minutes. Besides the long reassignment duration, Apache Kafka® reassignment necessitates reading cold data from the disk. Even with throttle settings, it can still cause page cache contention, leading to latency spikes and affecting service quality.&lt;/li&gt;
&lt;/ul&gt;

</description>
    </item>
    <item>
      <title>Use Kafdrop to Manage AutoMQ</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:46:09 +0000</pubDate>
      <link>https://dev.to/automq/use-kafdrop-to-manage-automq-4ln1</link>
      <guid>https://dev.to/automq/use-kafdrop-to-manage-automq-4ln1</guid>
      <description>&lt;p&gt;&lt;strong&gt;Preface&lt;/strong&gt;&lt;br&gt;
Kafdrop [1] is a simple, intuitive, and powerful web UI tool designed for Kafka. It allows developers and administrators to easily view and manage key metadata of Kafka clusters, including Topics, partitions, Consumer Groups, and their offsets. By providing a user-friendly interface, Kafdrop greatly simplifies the monitoring and management of Kafka clusters, enabling users to quickly obtain cluster status information without relying on complex command-line tools.&lt;/p&gt;

&lt;p&gt;Thanks to AutoMQ's full compatibility with Kafka, it can seamlessly integrate with Kafdrop. By utilizing Kafdrop, AutoMQ users can also benefit from an intuitive user interface for real-time monitoring of Kafka cluster status, including Topics, partitions, Consumer Groups, and their offsets. This monitoring capability not only enhances problem diagnosis efficiency but also helps optimize cluster performance and resource utilization.&lt;br&gt;
This tutorial will teach you how to start the Kafdrop service and integrate it with an AutoMQ cluster to monitor and manage the cluster state.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftxbtmjgmdddfqpjnnxi0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftxbtmjgmdddfqpjnnxi0.png" alt="Image description" width="800" height="522"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Prerequisites&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafdrop Environment: AutoMQ cluster, JDK17, and Maven 3.6.3 or above.&lt;/li&gt;
&lt;li&gt;Kafdrop can be run through JAR files, Docker deployment, or protobuf deployment. Refer to the official documentation [3].&lt;/li&gt;
&lt;li&gt;Prepare 5 hosts to deploy the AutoMQ cluster. It is recommended to choose Linux amd64 hosts with 2 CPUs and 16GB of RAM and prepare two virtual storage volumes. &lt;/li&gt;
&lt;li&gt;Download the latest official binary installation package from AutoMQ Github Releases to install AutoMQ.
Below, I will first set up the AutoMQ cluster and then start Kafdrop.
&lt;strong&gt;Install and start the AutoMQ cluster.&lt;/strong&gt;
&lt;strong&gt;Configure S3URL.&lt;/strong&gt;
&lt;strong&gt;Step 1: Generate the S3 URL.&lt;/strong&gt;
AutoMQ provides the automq-kafka-admin.sh tool to quickly start AutoMQ. Simply provide the S3 URL containing the required S3 access points and authentication information to start AutoMQ with one click, without the need to manually generate cluster IDs or perform storage formatting.
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;### 命令行使用示例
bin/automq-kafka-admin.sh generate-s3-url \
--s3-access-key=xxx \
--s3-secret-key=yyy \
--s3-region=cn-northwest-1 \
--s3-endpoint=s3.cn-northwest-1.amazonaws.com.cn \
--s3-data-bucket=automq-data \
--s3-ops-bucket=automq-ops
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Note: You need to pre-configure an AWS S3 bucket. If you encounter errors, please ensure the parameters and format are correct.&lt;br&gt;
&lt;strong&gt;Output Result&lt;/strong&gt;&lt;br&gt;
After executing this command, the process will automatically proceed through the following stages:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Based on the provided accessKey and secret Key, test the core features of S3 to verify the compatibility between AutoMQ and S3.&lt;/li&gt;
&lt;li&gt;Generate the s3url based on identity information and access point information.&lt;/li&gt;
&lt;li&gt;Obtain the startup command for AutoMQ using the s3url. In the command, replace --controller-list and --broker-list with the actual CONTROLLER and BROKER that need to be deployed.
Example results are as follows:
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;############  Ping s3 ########################

[ OK ] Write s3 object
[ OK ] Read s3 object
[ OK ] Delete s3 object
[ OK ] Write s3 object
[ OK ] Upload s3 multipart object
[ OK ] Read s3 multipart object
[ OK ] Delete s3 object
############  String of s3url ################

Your s3url is:

s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xxx&amp;amp;s3-secret-key=yyy&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA


############  Usage of s3url  ################
To start AutoMQ, generate the start commandline using s3url.
bin/automq-kafka-admin.sh generate-start-command \
--s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" \
--controller-list="192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093"  \
--broker-list="192.168.0.4:9092;192.168.0.5:9092"

TIPS: Please replace the controller-list and broker-list with your actual IP addresses.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;Step 2: Generate a list of startup commands&lt;/strong&gt;&lt;br&gt;
Replace the --controller-list and --broker-list in the commands generated in the previous step with your host information. Specifically, substitute them with the IP addresses of the 3 CONTROLLERs and 2 BROKERs mentioned in the environment setup, and use the default ports 9092 and 9093.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/automq-kafka-admin.sh generate-start-command \
--s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" \
--controller-list="192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093"  \
--broker-list="192.168.0.4:9092;192.168.0.5:9092"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Output Result&lt;/strong&gt;&lt;br&gt;
Upon executing the command, a startup command for AutoMQ will be generated.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;############  Start Commandline ##############
To start an AutoMQ Kafka server, please navigate to the directory where your AutoMQ tgz file is located and run the following command.

Before running the command, make sure that Java 17 is installed on your host. You can verify the Java version by executing 'java -version'.

bin/kafka-server-start.sh --s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" --override process.roles=broker,controller --override node.id=0 --override controller.quorum.voters=0@192.168.0.1:9093,1@192.168.0.2:9093,2@192.168.0.3:9093 --override listeners=PLAINTEXT://192.168.0.1:9092,CONTROLLER://192.168.0.1:9093 --override advertised.listeners=PLAINTEXT://192.168.0.1:9092

bin/kafka-server-start.sh --s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" --override process.roles=broker,controller --override node.id=1 --override controller.quorum.voters=0@192.168.0.1:9093,1@192.168.0.2:9093,2@192.168.0.3:9093 --override listeners=PLAINTEXT://192.168.0.2:9092,CONTROLLER://192.168.0.2:9093 --override advertised.listeners=PLAINTEXT://192.168.0.2:9092

bin/kafka-server-start.sh --s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" --override process.roles=broker,controller --override node.id=2 --override controller.quorum.voters=0@192.168.0.1:9093,1@192.168.0.2:9093,2@192.168.0.3:9093 --override listeners=PLAINTEXT://192.168.0.3:9092,CONTROLLER://192.168.0.3:9093 --override advertised.listeners=PLAINTEXT://192.168.0.3:9092

bin/kafka-server-start.sh --s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" --override process.roles=broker --override node.id=3 --override controller.quorum.voters=0@192.168.0.1:9093,1@192.168.0.2:9093,2@192.168.0.3:9093 --override listeners=PLAINTEXT://192.168.0.4:9092 --override advertised.listeners=PLAINTEXT://192.168.0.4:9092

bin/kafka-server-start.sh --s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" --override process.roles=broker --override node.id=4 --override controller.quorum.voters=0@192.168.0.1:9093,1@192.168.0.2:9093,2@192.168.0.3:9093 --override listeners=PLAINTEXT://192.168.0.5:9092 --override advertised.listeners=PLAINTEXT://192.168.0.5:9092


TIPS: Start controllers first and then the brokers.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: The node.id is automatically generated starting from 0 by default.&lt;br&gt;
&lt;strong&gt;Step 3: Start AutoMQ&lt;/strong&gt;&lt;br&gt;
To start the cluster, sequentially execute the list of commands generated in the previous step on the pre-specified CONTROLLER or BROKER hosts. For instance, to start the first CONTROLLER process on 192.168.0.1, execute the first command template from the generated startup command list.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-server-start.sh --s3-url="s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=XXX&amp;amp;s3-secret-key=YYY&amp;amp;s3-region=cn-northwest-1&amp;amp;s3-endpoint-protocol=https&amp;amp;s3-data-bucket=automq-data&amp;amp;s3-path-style=false&amp;amp;s3-ops-bucket=automq-ops&amp;amp;cluster-id=40ErA_nGQ_qNPDz0uodTEA" --override process.roles=broker,controller --override node.id=0 --override controller.quorum.voters=0@192.168.0.1:9093,1@192.168.0.2:9093,2@192.168.0.3:9093 --override listeners=PLAINTEXT://192.168.0.1:9092,CONTROLLER://192.168.0.1:9093 --override advertised.listeners=PLAINTEXT://192.168.0.1:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Run in the background&lt;/strong&gt;&lt;br&gt;
If you need to run in background mode, please add the following code at the end of the command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;command &amp;gt; /dev/null 2&amp;gt;&amp;amp;1 &amp;amp;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Start the Kafdrop service&lt;/strong&gt;&lt;br&gt;
In the previous process, we set up the AutoMQ cluster and obtained the addresses and ports of all broker nodes. Next, we will start the Kafdrop service.&lt;br&gt;
Note: Ensure that the address where the Kafdrop service is located can access the AutoMQ cluster; otherwise, it will result in connection timeout issues.&lt;br&gt;
In this example, I use the JAR package method to start the Kafdrop service. The steps are as follows:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pull the Kafdrop repository source code: Kafdrop GitHub [4]
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone https://github.com/obsidiandynamics/kafdrop.git
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt;Use Maven to locally compile and package Kafdrop to generate the JAR file. Execute the following in the root directory:
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;mvn clean compile package
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt;Start the service, specifying the addresses and ports of the AutoMQ cluster brokers:
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
    -jar target/kafdrop-&amp;lt;version&amp;gt;.jar \
    --kafka.brokerConnect=&amp;lt;host:port,host:port&amp;gt;,...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ol&gt;
&lt;li&gt;Replace &lt;code&gt;kafdrop-&amp;lt;version&amp;gt;.jar&lt;/code&gt; with the specific version, such as &lt;code&gt;kafdrop-4.0.2-SNAPSHOT.jar&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;--kafka.brokerConnect=&lt;a&gt;host:port,host:port&lt;/a&gt; requires you to specify the host and port for the specific broker nodes in the cluster.
The console startup output is as follows:&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq722th9kr1j2rx0udgm9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq722th9kr1j2rx0udgm9.png" alt="Image description" width="800" height="279"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If not specified, kafka.brokerConnect defaults to localhost:9092.&lt;/p&gt;

&lt;p&gt;Note: Starting from Kafdrop 3.10.0, a ZooKeeper connection is no longer required. All necessary cluster information is retrieved through the Kafka management API.&lt;br&gt;
Open your browser and navigate to &lt;a href="http://localhost:9000" rel="noopener noreferrer"&gt;http://localhost:9000&lt;/a&gt;. You can override the port by adding the following configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;--server.port=&amp;lt;port&amp;gt; --management.server.port=&amp;lt;port&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Final effect&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Complete interface&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1uwmf71tcw4qahslouqa.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1uwmf71tcw4qahslouqa.png" alt="Image description" width="800" height="522"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Displays the number of partitions, number of topics, and other cluster state information.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Creating a Topic&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkewe0q7kf9jmqedmh3yt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkewe0q7kf9jmqedmh3yt.png" alt="Image description" width="800" height="519"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Detailed Broker Node Information&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzpogg5irxajdqdw1kg0c.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzpogg5irxajdqdw1kg0c.png" alt="Image description" width="800" height="462"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Detailed Topic Information&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fa5ktvsoz19o6zg5a2c25.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fa5ktvsoz19o6zg5a2c25.png" alt="Image description" width="800" height="459"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Message Information Under the Topic&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi71f3qpydg4jns0q8e5r.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi71f3qpydg4jns0q8e5r.png" alt="Image description" width="800" height="434"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Summary&lt;/strong&gt;&lt;br&gt;
Through this tutorial, we explored the key features and functionalities of Kafdrop, as well as the methods to integrate it with AutoMQ clusters. We demonstrated how to easily monitor and manage AutoMQ clusters. The use of Kafdrop not only helps teams better understand and control their data flow but also enhances development and operational efficiency, ensuring a highly efficient and stable data processing workflow. We hope this tutorial provides you with valuable insights and assistance when using Kafdrop with AutoMQ clusters.&lt;br&gt;
&lt;strong&gt;References&lt;/strong&gt;&lt;br&gt;
[1] Kafdrop：&lt;a href="https://github.com/obsidiandynamics/kafdrop" rel="noopener noreferrer"&gt;https://github.com/obsidiandynamics/kafdrop&lt;/a&gt;&lt;br&gt;
[2] AutoMQ：&lt;a href="https://www.automq.com/zh" rel="noopener noreferrer"&gt;https://www.automq.com/zh&lt;/a&gt;&lt;br&gt;
[3] Kafdrop Deployment: &lt;a href="https://github.com/obsidiandynamics/kafdrop/blob/master/README.md#getting-started" rel="noopener noreferrer"&gt;https://github.com/obsidiandynamics/kafdrop/blob/master/README.md#getting-started&lt;/a&gt;&lt;br&gt;
[4] Kafdrop project repository: &lt;a href="https://github.com/obsidiandynamics/kafdrop" rel="noopener noreferrer"&gt;https://github.com/obsidiandynamics/kafdrop&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Delving deep into the adoption of Alibaba Cloud's cloud-native technologies in AutoMQ</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:38:50 +0000</pubDate>
      <link>https://dev.to/automq/delving-deep-into-the-adoption-of-alibaba-clouds-cloud-native-technologies-in-automq-2j17</link>
      <guid>https://dev.to/automq/delving-deep-into-the-adoption-of-alibaba-clouds-cloud-native-technologies-in-automq-2j17</guid>
      <description>&lt;p&gt;Author information: Zhou Xinyu, Co-founder &amp;amp; CTO of AutoMQ&lt;/p&gt;

&lt;p&gt;Introduction: AutoMQ[1] is a groundbreaking cloud-native Kafka built on a shared storage architecture. By utilizing its compute-storage separation and integrating deeply with Alibaba Cloud's robust and advanced services such as Object Storage OSS, Block Storage ESSD, Elastic Scaling ESS, and Spot Instances, AutoMQ provides a cost advantage ten times greater than Apache Kafka while offering automatic scalability.&lt;/p&gt;

&lt;p&gt;Leading the charge toward the cloud-native era, our mission at Alibaba Cloud and AutoMQ is to enhance our customers' capabilities in the cloud-based business landscape. As the industry evolves, we've observed that many products hastily claim to be cloud-native without fundamentally embracing cloud computing capabilities. Merely supporting deployment on Kubernetes does not suffice. True cloud-native products must exploit the full potential, elasticity, and scalability of cloud computing, thereby achieving significant cost and efficiency benefits.&lt;br&gt;
Today, we delve into how Alibaba Cloud leverages cloud-native technologies with AutoMQ, addressing practical challenges effectively.&lt;br&gt;
&lt;strong&gt;Object Storage OSS&lt;/strong&gt;&lt;br&gt;
With data increasingly migrating to the cloud, object storage has emerged as the primary storage solution for big data and data lake ecosystems. The shift from file APIs to object APIs is becoming prevalent, especially as stream data, often handled by Kafka, increasingly flows into these data lakes.&lt;br&gt;
AutoMQ has developed the [S3Stream][1] stream storage library utilizing object storage, which enables efficient reading and ingestion of stream data via the Object API. By adopting a storage-compute separation architecture, it integrates Apache Kafka's storage layer with object storage, fully capitalizing on the technical and cost advantages of shared storage:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The standard version storage price of OSS with in-city redundancy is 0.12 yuan/GiB/month, vastly more economical—over eight times less—than ESSD PL1 priced at 1 yuan/1 GiB/month. Moreover, OSS inherently provides multi-zone availability and durability. Without the need for additional data replication, it significantly reduces costs by 25 times compared to the conventional cloud disk-based 3-copy architecture.&lt;/li&gt;
&lt;li&gt;In contrast to a Shared-Nothing architecture, the shared storage model achieves a true separation of storage and compute, decoupling data from compute nodes. Consequently, when AutoMQ undertakes partition reassignment, it avoids data replication, facilitating true second-level lossless partition reassignments. This feature supports AutoMQ's capability for real-time self-balancing and rapid horizontal scaling of nodes.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;While AutoMQ has effectively harnessed OSS for cost and architectural benefits, this represents merely the beginning. The adoption of shared storage is set to spur a wave of technical and product innovations at AutoMQ.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Disaster Recovery: As a fundamental aspect of software infrastructure, the greatest concern is the failure of a cluster to continue delivering services or the inability to restore data following a cluster failure. Potential issues include software bugs and catastrophic data center-level disasters. Thanks to shared storage and a straightforward metadata snapshot system, it is feasible to shut down a compromised cluster and restart it as a new cluster using the data stored on OSS to resume operations.&lt;/li&gt;
&lt;li&gt;Cross-Region Disaster Recovery: OSS provides near real-time replication across different regions. Companies don't have to establish their own cross-regional networks or set up costly data connectivity clusters. Paired with the previously mentioned disaster recovery technology, this enables straightforward, code-free implementation of cross-region disaster recovery strategies.&lt;/li&gt;
&lt;li&gt;Shared Read-Only Copies: High fan-out is a critical business use case for consuming streaming data. In a data-driven company, a single data item might be accessed by dozens of subscribers. The original cluster is unable to manage the increased load. With OSS, it is possible to create read-only copies directly from OSS without data duplication, offering scalable high fan-out capabilities.&lt;/li&gt;
&lt;li&gt;Zero ETL: Modern data technology frameworks rely on object storage. When data resides in a common storage pool and possesses a level of self-description, data silos can be dismantled at a minimal cost without the necessity to construct ETL pipelines. Various analytical tools or computing engines can access shared data from multiple sources.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;On the other hand, incorporating stream data into lakes completes the modern data stack, laying the groundwork for the Stream-Lake architecture. This is the source of the vast creative potential behind Confluent's TableFlow[2]. Data is produced and stored in stream formats, which align with the nature of continuously generated and evolving information in the real world. Real-time data must be in stream form, enabling stream computing frameworks to extract more immediate value. Eventually, as data ages, it transitions to table formats like Iceberg[3] for broader scale data analysis. From a lifecycle perspective, the move from streams to tables naturally matches the data's progression from high frequency to low frequency, from hot to cold, and constructing a stream-table integrated data technology stack on object storage represents a forward-looking trend.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Block Storage ESSD&lt;/strong&gt;&lt;br&gt;
If ECS is still regarded as a physical server, cloud disk ESSD faces a similar predicament. Users generally harbor two misconceptions about ESSD:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Comparing ESSD to local disks, they are concerned about data durability, apprehensive that problems typical of physical disks such as faulty disks or bad sectors might persist.&lt;/li&gt;
&lt;li&gt;It's commonly believed that ESSD is a cloud disk, which leads to assumptions of poor remote write performance, uncontrollable latency, and jitter.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;However, ESSD is supported by a robust distributed file system, utilizing triple-replica technology that ensures nine nines of data durability. Users are insulated from errors in physical storage media, as the system automatically detects and corrects faults across millions of physical disks.&lt;br&gt;
Moreover, ESSD functions as shared storage. In the event of ECS failures, ESSD volumes can be mounted on other nodes to continue providing read and write services. In this respect, ESSD, similar to OSS, is shared storage and not a stateful local disk, which is a key reason why AutoMQ is touted as a stateless data software.&lt;br&gt;
From a performance standpoint, ESSD benefits from combined software and hardware enhancements, including offloading the ESSD client to the Shenlong MOC[5] for hardware acceleration. It employs a high-performance proprietary network protocol and a congestion control algorithm based on RDMA technology, bypassing the traditional TCP stack to meet the low-latency and low packet loss requirements of data centers. These improvements ensure stable IOPS and throughput performance, as well as highly scalable storage capacity.&lt;br&gt;
AutoMQ employs ESSD innovatively in three ways:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;First, reliability separation, by fully utilizing the multi-replica technology of ESSD to circumvent the need for replication mechanisms such as Raft or ISR at the application layer, significantly reducing storage costs and network replication bandwidth.&lt;/li&gt;
&lt;li&gt;Second, using ESSD as a WAL, where data is cyclically written to ESSD as bare devices and Direct IO, exclusively for recovery in fault scenarios. The shared nature of ESSD allows AutoMQ's WAL to be a remote, shareable WAL that can be taken over and recovered by any node in the cluster.&lt;/li&gt;
&lt;li&gt;Finally, a cloud service-oriented billing design, where ESSD provides at least approximately 100 MiB/s throughput and about 1800 IOPS for any volume size. AutoMQ requires only a minimal ESSD volume as the WAL disk, such as a 2GiB ESSD PL0 volume, costing just 1 yuan per month to deliver the aforementioned performance. For enhanced storage performance on a single machine, simply combine multiple small-spec WAL disks for linear expansion.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;ESSD and OSS offer distinctly different storage characteristics. ESSD provides high performance, low latency, and high IOPS, albeit at a higher cost. AutoMQ, however, has developed a cost-effective approach to utilizing ESSD. OSS is not ideal for environments requiring high IOPS as it charges per IO operation, yet it offers economical storage with virtually unlimited scalability in both throughput and capacity. As primary storage, OSS delivers high throughput, low cost, high availability, and limitless scalability; ESSD provides durable, highly available, low-latency storage ideal for storing WAL, and its virtualized nature allows for requesting very small storage capacities. AutoMQ's proprietary streaming library, S3Stream[1], cleverly merges the benefits of both ESSD and OSS shared storage, achieving low-latency, high-throughput, low-cost, and unlimited capacity streaming storage.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F006ipmw5fps9cpoxan9z.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F006ipmw5fps9cpoxan9z.png" alt="Image description" width="800" height="515"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Multiple mounting and NVMe protocol&lt;/strong&gt;&lt;br&gt;
Although ESSD is shared storage, it functions as a block device. To efficiently share ESSD, additional storage technology support is necessary, specifically multiple mounting and the NVMe PR protocol.&lt;br&gt;
Cloud disks natively support remounting to other nodes for recovery after unloading, but when the original mounting node encounters issues, such as an ECS Hang, the unloading time for the cloud disk becomes unpredictable. Therefore, with ESSD's multiple mounting capability, it's feasible to mount directly to another ECS node without unmounting the cloud disk.&lt;br&gt;
Taking the AutoMQ Failover process as an example, when a Broker node is identified as a Failed Broker, its cloud disk is multiply mounted to a healthy Broker for data recovery. Before commencing the actual Recovery process, it's crucial to ensure that the original node has ceased writing. AutoMQ utilizes the NVMe protocol's PR lock for IO Fencing on the original node.&lt;br&gt;
Both these processes are millisecond-level operations, effectively transforming ESSD into shared storage within the AutoMQ framework.&lt;br&gt;
&lt;strong&gt;Regional ESSD&lt;/strong&gt;&lt;br&gt;
While ESSD typically uses a multi-replica architecture, these replicas are often confined to a single AZ, restricting ESSD's capability to handle AZ-level failures. Regional EBS[6] is crafted to solve this problem. By spreading the underlying multi-replica redundancy across multiple AZs with robust consistency read-write technology, it can withstand single AZ failures.&lt;br&gt;
In terms of shared mounting, it supports cross-AZ mounting within a region and multi-AZ shared mounting, with preemptive IO Fencing and NVMe PR lock as forms of IO Fencing. Regional ESSD, offered by major international cloud providers, is also soon to be launched on Alibaba Cloud. This product enables AutoMQ to handle single AZ failures at a very low cost, satisfying the requirements of scenarios that demand higher availability.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Elastic Cloud Server (ECS)&lt;/strong&gt;&lt;br&gt;
Over the past decade, businesses have increasingly adopted a Rehost strategy to transition to the cloud, primarily replacing traditional on-premise physical servers with cloud-based servers like ECS. A key distinction between ECS and on-premise servers is the SLA services that ECS offers. Leveraging virtualization technologies, ECS addresses many hardware and software failures typical of physical servers. For those failures that cannot be avoided, cloud servers can quickly recover on a new physical server, substantially reducing downtime and limiting disruption to business operations.&lt;/p&gt;

&lt;p&gt;Alibaba Cloud offers a 99.975% SLA for individual ECS instances. Operating a service on a single ECS node can ensure an availability of 99.9%, making it well-suited for production environments and fulfilling the availability demands of numerous services. For example, running a single-node AutoMQ cluster on an ECS setup with 2 CPUs and 16GB of RAM can achieve this level of availability and provide a write capacity of 80MiB/s, all while keeping costs low.&lt;br&gt;
Since its development, AutoMQ has been designed to operate on ECS as a cloud service rather than as a physical server. Should an ECS failure occur, the system depends on the quick recovery features of the ECS node, such as automatic reassignment and restart. AutoMQ initiates proactive failover only after detecting several missed heartbeats from a node, considering two primary factors:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;In the event of physical hardware or kernel failures, ECS can recover within seconds. Therefore, AutoMQ relies on the swift recovery capabilities of ECS to manage such issues, while avoiding overly sensitive failover mechanisms that might prompt unnecessary disaster recovery efforts.&lt;/li&gt;
&lt;li&gt;AutoMQ's failover mechanisms are activated only in the event of ECS failures, network partitions, or even failures at the availability zone level, taking advantage of the features provided by ESSD and OSS for proactive disaster recovery.
&lt;strong&gt;Elastic Scaling Service (ESS)&lt;/strong&gt;
In March 2024, AutoMQ was officially launched on the Alibaba Cloud marketplace through a collaborative release with Alibaba Cloud. From the general availability of AutoMQ's core features to its swift listing on the Alibaba Cloud marketplace, two key products played a pivotal role: Alibaba Cloud Compute Nest, which ensures standardized delivery processes for service providers, and Elastic Scaling Service (ESS). Although AutoMQ's architecture naturally supports elastic scaling, providing these capabilities seamlessly presents challenges[4]. AutoMQ leverages ESS to simplify the final delivery steps.
AutoMQ chose ESS over Kubernetes for its public cloud deployment for a variety of reasons:&lt;/li&gt;
&lt;li&gt;AutoMQ's initial deployment model is BYOC, which simplifies dependencies and eliminates the need for each user to set up a Kubernetes cluster when installing AutoMQ.&lt;/li&gt;
&lt;li&gt;Elastic Scaling Service (ESS) offers configuration management, automatic scaling, scheduled scaling, instance management, multi-AZ deployment, and health checks, all akin to the core deployment features of Kubernetes. We view ESS as a streamlined version of Kubernetes at the IaaS layer.&lt;/li&gt;
&lt;li&gt;Future sections will explore AutoMQ's use of multiple mounts, Regional ESSDs, and other advanced features provided by cloud vendors, which Kubernetes may not immediately support. Using APIs at the IaaS layer, rather than Kubernetes APIs, is similar to the difference between the C++ and Java programming languages; native functionalities need to be accessible at the Kubernetes level for effective use.
Certainly, Kubernetes is an exceptional platform, and we plan to support deployments on Kubernetes in the future, particularly in private cloud scenarios, to abstract many of the differences at the IaaS layer.
&lt;strong&gt;Spot Instances&lt;/strong&gt;
Elastic capabilities are not inherently available to cloud providers; they must incur considerable holding costs to offer adequate elasticity, which often results in an excess of unused computing resources. These resources are made available through spot instances, which function just like regular ECS instances but can offer savings of up to 90% compared to standard pay-as-you-go rates.
Unlike regular pay-as-you-go instances, the pricing of spot instances varies with market supply and demand. For example, if demand for computing power decreases overnight, prices typically drop, adding a temporal dimension to spot instance pricing. If all users adopt spot instances, prices will adjust accordingly, promoting optimal usage times for different workloads. For instance, AutoMQ conducts extensive testing overnight using spot instances, drastically cutting testing costs.
Another characteristic of preemptive instances is their ability to be interrupted and reclaimed at any time, which indeed poses a high barrier to their adoption. However, the compute-storage separation architecture employed by AutoMQ ensures that Broker nodes maintain no local state, enabling them to gracefully manage the reclamation of preemptive instances. The diagram below illustrates the process of WAL recovery via the ESSD API when preemptive instances are reclaimed by AutoMQ. Through this approach, AutoMQ achieves a tenfold reduction in costs, with preemptive instances playing a significant role in lowering compute expenses.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkdf3313p5pazgr6hrsfd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkdf3313p5pazgr6hrsfd.png" alt="Image description" width="800" height="694"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Closing Remarks&lt;/strong&gt;&lt;br&gt;
Today, much of the foundational software that supports the rapid growth of big data and the internet was developed a decade ago. Yet, software crafted for IDC environments does not translate to high efficiency or low costs in the current mature cloud computing landscape. Thus, there is a significant push to redesign foundational software for the cloud, including components for observability storage, TP and AP databases, and data lake software. As an essential piece of flow storage software within the big data ecosystem, Kafka occupies a crucial position, representing 10% to 20% of IT spending in data-centric enterprises. Redesigning Kafka with cloud-native features is vital in today’s environment of cost reduction. AutoMQ utilizes deep cloud integration and cloud-native capabilities to reengineer Apache Kafka®, achieving a tenfold cost advantage. In comparison to Kafka, AutoMQ’s shared storage architecture has led to a drastic improvement in operational metrics, such as partition reassignment, dynamic node scaling, and traffic self-balancing.&lt;br&gt;
Cloud computing has heralded a new era, and embracing a cloud-native approach ensures no regrets in transitioning to the cloud. We are convinced that all foundational software should be reengineered based on cloud architectures to fully capitalize on its benefits.&lt;br&gt;
&lt;strong&gt;References&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Open-source cloud-native version of Kafka — AutoMQ: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Confluent recently introduced Tableflow, merging streaming and analytical computing: &lt;a href="https://www.confluent.io/blog/introducing-tableflow/" rel="noopener noreferrer"&gt;https://www.confluent.io/blog/introducing-tableflow/&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Official site of the open table format Iceberg: &lt;a href="https://iceberg.apache.org/" rel="noopener noreferrer"&gt;https://iceberg.apache.org/&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Why is it difficult to fully utilize the elasticity of public clouds? &lt;a href="https://www.infoq.cn/article/tugbtfhemdiqlxm1x63y" rel="noopener noreferrer"&gt;https://www.infoq.cn/article/tugbtfhemdiqlxm1x63y&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Alibaba Cloud's in-house developed "Shenlong Architecture": &lt;a href="https://developer.aliyun.com/article/743920" rel="noopener noreferrer"&gt;https://developer.aliyun.com/article/743920&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Announced at the 2023 Yunqi Conference, the Regional ESSD: &lt;a href="https://developer.aliyun.com/article/1390447" rel="noopener noreferrer"&gt;https://developer.aliyun.com/article/1390447&lt;/a&gt;
&lt;/li&gt;
&lt;/ol&gt;

</description>
    </item>
    <item>
      <title>Understanding Kafka Producer</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:36:27 +0000</pubDate>
      <link>https://dev.to/automq/understanding-kafka-producer-5b45</link>
      <guid>https://dev.to/automq/understanding-kafka-producer-5b45</guid>
      <description>&lt;p&gt;&lt;strong&gt;Introduction&lt;/strong&gt;&lt;br&gt;
Today, we present an in-depth analysis of the Kafka Producer (based on [Apache Kafka 3.7][2]). Given the extensive nature of the topic, the article is divided into two segments; the first part elucidates the usage and principles of the Kafka Producer, while the second part will discuss the implementation details and prevalent issues associated with the Kafka Producer.&lt;br&gt;
&lt;strong&gt;Usage&lt;/strong&gt;&lt;br&gt;
Before we dive into the specifics of the Kafka Producer implementation, let's first understand how to utilize it. Here's the example code for sending a message to a specified topic using Kafka Producer:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;

// 配置并创建一个 Producer
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer&amp;lt;String, String&amp;gt; producer = new KafkaProducer&amp;lt;&amp;gt;(kafkaProps);

// 向指定 topic 发送一条消息
ProducerRecord&amp;lt;String, String&amp;gt; record = new ProducerRecord&amp;lt;&amp;gt;("my-topic", "my-key", "my-value");
producer.send(record, (metadata, exception) -&amp;gt; {
    if (exception != null) {
        // 发送失败
        exception.printStackTrace();
    } else {
        // 发送成功
        System.out.println("Record sent to partition " + metadata.partition() + " with offset " + metadata.offset());
    }
});

// 关闭 Producer，释放资源
producer.close();


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Subsequently, the primary interfaces of Kafka Producer are outlined.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;

public class ProducerRecord&amp;lt;K, V&amp;gt; {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

public interface Callback {
    void onCompletion(RecordMetadata metadata, Exception exception);
}

public interface Producer&amp;lt;K, V&amp;gt; {
    // ...
    Future&amp;lt;RecordMetadata&amp;gt; send(ProducerRecord&amp;lt;K, V&amp;gt; record);
    Future&amp;lt;RecordMetadata&amp;gt; send(ProducerRecord&amp;lt;K, V&amp;gt; record, Callback callback);
    void flush();
    void close();
    // ...
}


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Note: The Producer interface also includes several transaction-related interfaces, such as beginTransaction, commitTransaction, etc., which have been discussed in another article and will not be addressed here.&lt;br&gt;
&lt;strong&gt;ProducerRecord&lt;/strong&gt;&lt;br&gt;
A message sent by the Producer possesses the following properties&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;topic: Required. Specifies the topic to which the record is sent&lt;/li&gt;
&lt;li&gt;partition: Optional. Indicates the sequence number of the partition to which the record is sent (zero-indexed). When unspecified, a partition is chosen using either the specified Partitioner or the default BuiltInPartitioner (details provided below)&lt;/li&gt;
&lt;li&gt;headers: Optional. User-defined additional key-value pair information&lt;/li&gt;
&lt;li&gt;key: Optional. The key value of the message&lt;/li&gt;
&lt;li&gt;value: Optional. The content of the message&lt;/li&gt;
&lt;li&gt;timestamp: Optional. The timestamp when the message is sent, determined by the following logic

&lt;ul&gt;
&lt;li&gt;If the topic's message.timestamp.type configuration is "CreateTime"&lt;/li&gt;
&lt;li&gt;If the user provides a timestamp, that value is utilized&lt;/li&gt;
&lt;li&gt;Otherwise, the timestamp defaults to the message's creation time, roughly when the send method is invoked&lt;/li&gt;
&lt;li&gt;If the topic's &lt;code&gt;message.timestamp.type&lt;/code&gt; is set to "LogAppendTime", the broker's write time is used for the message, irrespective of any user-specified timestamp
&lt;strong&gt;Callback&lt;/strong&gt;
Utilized in callbacks following message acknowledgment, potential exceptions include:&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Non-retriable

&lt;ul&gt;
&lt;li&gt;InvalidTopicException: Topic name is invalid, e.g., too long, empty, or includes prohibited characters.&lt;/li&gt;
&lt;li&gt;OffsetMetadataTooLarge: Metadata string used in Producer#sendOffsetsToTransaction is excessively long (controlled by offset.metadata.max.bytes, default 4 KiB).&lt;/li&gt;
&lt;li&gt;RecordBatchTooLargeException: Size of the sent batch exceeds limits.&lt;/li&gt;
&lt;li&gt;Exceeded the maximum size allowed (broker configuration message.max.bytes or topic configuration max.message.bytes, default 1MiB + 12 B)&lt;/li&gt;
&lt;li&gt;Exceeded the segment size (broker configuration &lt;code&gt;log.segment.bytes&lt;/code&gt; or topic configuration &lt;code&gt;segment.bytes&lt;/code&gt;, default is 1 GiB)
Note: This error may only occur in older versions of the Client&lt;/li&gt;
&lt;li&gt;RecordTooLargeException: Size of a single message&lt;/li&gt;
&lt;li&gt;Exceeded the maximum size of a single producer request (producer configuration max.request.size, default 1MiB)&lt;/li&gt;
&lt;li&gt;Exceeded the size of the producer buffer (producer configuration buffer.memory, default 32 MiB)&lt;/li&gt;
&lt;li&gt;Exceeded the maximum size allowed (broker configuration message.max.bytes or topic configuration max.message.bytes, default 1MiB + 12 B)&lt;/li&gt;
&lt;li&gt;TopicAuthorizationException, ClusterAuthorizationException: Authentication failed&lt;/li&gt;
&lt;li&gt;UnknownProducerIdException: In transaction requests, the PID has expired or the records associated with the PID have expired&lt;/li&gt;
&lt;li&gt;InvalidProducerEpochException: In transaction requests, the epoch is illegal&lt;/li&gt;
&lt;li&gt;UnknownServerException: An unspecified error occurred.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Retriable

&lt;ul&gt;
&lt;li&gt;CorruptRecordException: CRC check failed, typically because of a network error.&lt;/li&gt;
&lt;li&gt;InvalidMetadataServerException: The client-side metadata is outdated.&lt;/li&gt;
&lt;li&gt;UnknownTopicOrPartitionException: The topic or partition does not exist, potentially due to expired metadata&lt;/li&gt;
&lt;li&gt;NotLeaderOrFollowerException: The requested broker is not the leader, possibly due to ongoing leader election&lt;/li&gt;
&lt;li&gt;FencedLeaderEpochException: The leader epoch in the request is outdated, potentially caused by slow metadata refresh&lt;/li&gt;
&lt;li&gt;NotEnoughReplicasException, NotEnoughReplicasAfterAppendException: Insufficient number of insync replicas (configured under broker's min.insync.replicas or similar named topic configuration, default is 1). Note that NotEnoughReplicasAfterAppendException occurs after a record is written, and retries by the producer may lead to duplicate data&lt;/li&gt;
&lt;li&gt;TimeoutException: Processing timeout, which could have two possible causes&lt;/li&gt;
&lt;li&gt;Synchronous operations often experience delays, especially when the producer buffer is full or there are timeouts in fetching metadata.&lt;/li&gt;
&lt;li&gt;Asynchronous operations can also encounter timeouts, for example, when throttling restricts the producer from sending messages, or if a broker fails to respond promptly.
Producer#send
Initiate sending a message asynchronously, and if needed, activate a Callback upon message acknowledgment.
Ensure that Callbacks for sending requests to the same partition are executed in the sequence they were initiated.
Producer#flush
Label all messages in the producer's cache as ready to send immediately, and block the current thread until all previously dispatched messages receive acknowledgments.
Note: This action will block the current thread only; other threads may continue sending messages, although the completion time for messages sent post-flush is not assured.
Producer#close
Shut down the producer and block until all messages are sent.
Note:&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Invoking close in the Callback will immediately shut down the producer&lt;/li&gt;

&lt;li&gt;Any send method still in the synchronous call phase (pulling metadata, waiting for memory allocation) will be terminated immediately and throw a KafkaException
&lt;strong&gt;Core Components&lt;/strong&gt;
Next, we will discuss the specific implementation of the Kafka Producer, which consists of the following core components&lt;/li&gt;

&lt;li&gt;ProducerMetadata &amp;amp; Metadata
Responsible for caching and refreshing the metadata needed on the Producer side, including all metadata of the Kafka Cluster such as broker addresses, the distribution status of partitions in topics, and information about leaders and followers.&lt;/li&gt;

&lt;li&gt;RecordAccumulator
Responsible for managing the Producer's buffer, it groups messages for transmission based on partition, time (linger.ms), and size (batch.size) into RecordBatch and holds them for dispatch.&lt;/li&gt;

&lt;li&gt;Sender
Manages a daemon thread named "kafka-producer-network-thread | {client.id}" which facilitates the sending of Produce requests and processes Produce responses, as well as managing timeouts, error handling, and retries.&lt;/li&gt;

&lt;li&gt;TransactionManager
Charged with implementing idempotence and transaction capabilities, this involves assigning sequence numbers, tackling message loss and disorder, and managing transaction states.
&lt;strong&gt;Sending Process&lt;/strong&gt;
The message sending process is depicted in the diagram below:&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsufu5fzelu4zygx7lsbo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsufu5fzelu4zygx7lsbo.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The process is divided into the following steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Refreshing Metadata&lt;/li&gt;
&lt;li&gt;Serialize the message using the specified Serializer&lt;/li&gt;
&lt;li&gt;Select the target partition for sending the message using either a user-specified Partitioner or the BuiltInPartitioner&lt;/li&gt;
&lt;li&gt;Insert the message into the RecordAccumulator for batching&lt;/li&gt;
&lt;li&gt;Sender asynchronously retrieves the sendable batch from the RecordAccumulator (grouped by node), registers a callback, and sends&lt;/li&gt;
&lt;li&gt;Sender handles the response, and based on the situation, returns results, exceptions, or retries
The following sections will detail each of these steps
&lt;strong&gt;Refreshing Metadata&lt;/strong&gt;
ProducerMetadata is tasked with caching and updating the metadata needed on the producer side, ensuring a comprehensive view of all the topics required for the producer. It will&lt;/li&gt;
&lt;li&gt;Add topics in the following scenarios

&lt;ul&gt;
&lt;li&gt;When sending a message, if the specified topic is not present in the cached metadata&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;Remove topics in the following scenarios

&lt;ul&gt;
&lt;li&gt;When it is determined that the metadata for a topic has been inactive for a continuous period defined by metadata.max.idle.ms&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;Refresh metadata in the following scenarios:

&lt;ul&gt;
&lt;li&gt;When sending a Message, the specified start-up command is not in the cached metadata (this occurs when the number of partitions in a Topic increases)&lt;/li&gt;
&lt;li&gt;When sending a Message, the leader of the specified partition is unknown&lt;/li&gt;
&lt;li&gt;After sending a Message, an InvalidMetaException response is received&lt;/li&gt;
&lt;li&gt;When the continuous &lt;code&gt;metadata.max.age.ms&lt;/code&gt; setting does not refresh metadata
Associated configurations include&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;metadata.max.idle.ms
The cache timeout for topic metadata, specifically, if no messages are sent to a certain topic within the specified duration, the metadata for that topic will expire; the default setting is 5 minutes.&lt;/li&gt;
&lt;li&gt;metadata.max.age.ms
Mandatory metadata refresh interval, specifically, if metadata is not refreshed within the specified duration, an update is forcibly initiated; the default setting is 5 minutes.
&lt;strong&gt;Partition selection&lt;/strong&gt;
In KIP-794[3], addressing the issue of the Sticky Partitioner in previous versions, which led to an uneven distribution of messages across brokers, a new Uniform Sticky Partitioner was introduced (and set as the default built-in Partitioner). This partitioner, without key constraints, more effectively distributes messages to faster brokers.
When selecting partitions, there are two scenarios:&lt;/li&gt;
&lt;li&gt;If a user specifies a Partitioner, then that Partitioner is used to select the partition&lt;/li&gt;
&lt;li&gt;If not, the default BuiltInPartitioner is used

&lt;ul&gt;
&lt;li&gt;If a record key is set, a unique partition is determined based on the hash value of the key&lt;/li&gt;
&lt;li&gt;Records with the same key are consistently assigned to the same partition.&lt;/li&gt;
&lt;li&gt;However, this consistency is not maintained when the number of partitions within a topic is altered, as the same key may not be assigned to the original partition.&lt;/li&gt;
&lt;li&gt;If no key is specified, or if the &lt;code&gt;partitioner.ignore.keys&lt;/code&gt; is set to "true", Kafka defaults to sending more messages to faster brokers.
Associated configurations include&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;partitioner.class
The class name for the partition selector can be customized by users to meet specific requirements.

&lt;ul&gt;
&lt;li&gt;DefaultPartitioner and UniformStickyPartitioner: These "sticky" partitioners allocate messages sequentially to each partition, filling one partition before moving to the next. However, their implementation has been problematic as it tends to overload slower brokers, leading to its deprecation.&lt;/li&gt;
&lt;li&gt;RoundRobinPartitioner: This partitioner disregards the record key and distributes messages evenly across all partitions in a cyclic manner. It is important to note that it can lead to uneven message distribution when initiating new batches.
It is advisable to either use the built-in partitioner or develop a custom one to suit your needs.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;partitioner.adaptive.partitioning.enable
The decision to determine the number of messages sent based on broker speed can be enabled or disabled; if disabled, partitions are chosen at random. This is only applicable when the partitioner.class is not set, with the default setting being "true".&lt;/li&gt;
&lt;li&gt;partitioner.availability.timeout.ms
This setting is effective only when partitioner.adaptive.partitioning.enable is set to "true". Should the time lapse between accumulating a batch of messages for a specific broker and sending them exceed this setting, the system will halt message allocation to that broker; a setting of 0 indicates that this feature is disabled. This applies only when the partitioner.class is not configured, with the default set to 0.&lt;/li&gt;
&lt;li&gt;partitioner.ignore.keys
When selecting a partition, if "false" is selected, the partition is determined based on the hash value of the key; if not, the key is disregarded. This setting applies only if partitioner.class is not set. The default setting is "false".
&lt;strong&gt;Message Batching&lt;/strong&gt;
In the RecordAccumulator, batches to be sent are organized by partition. Key methods include:&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;

public RecordAppendResult append(String topic,
                                 int partition,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 AppendCallbacks callbacks,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch,
                                 long nowMs,
                                 Cluster cluster) throws InterruptedException;

public ReadyCheckResult ready(Metadata metadata, long nowMs);

public Map&amp;lt;Integer, List&amp;lt;ProducerBatch&amp;gt;&amp;gt; drain(Metadata metadata, Set&amp;lt;Node&amp;gt; nodes, int maxSize, long now);


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;append: Adds a message to the buffer, registers a future, and returns it. This future completes either when the message is successfully sent or if it fails.&lt;/li&gt;
&lt;li&gt;ready: Identifies nodes that have messages prepared for dispatch. Scenarios include:

&lt;ul&gt;
&lt;li&gt;A batch of messages has reached the batch.size&lt;/li&gt;
&lt;li&gt;Messages have been batched continuously for longer than linger.ms&lt;/li&gt;
&lt;li&gt;The memory allocated to the producer has been exhausted; specifically, the total size of the messages in the buffer has exceeded buffer.memory&lt;/li&gt;
&lt;li&gt;The batch requiring a retry has already been delayed for at least retry.backoff.ms&lt;/li&gt;
&lt;li&gt;The user called &lt;code&gt;Producer#flush&lt;/code&gt; to ensure message delivery.&lt;/li&gt;
&lt;li&gt;The producer is in the process of shutting down.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Drain: For each node, go through every partition, pulling the earliest batch from each partition (if present), until the message size hits the &lt;code&gt;max.request.size&lt;/code&gt;, or all partitions have been checked.
Associated configurations include&lt;/li&gt;

&lt;li&gt;linger.ms
Each batch will wait for the maximum time, which by default is 0.
It's important to note that setting it to 0 doesn't eliminate batching; instead, it means there's no delay before sending. To completely disable batching, you should set &lt;code&gt;batch.size&lt;/code&gt; to 0 or 1.
Enhancing this configuration will

&lt;ul&gt;
&lt;li&gt;increase throughput (as the overhead of sending each message is reduced and the benefits of compression are enhanced)&lt;/li&gt;
&lt;li&gt;Slightly increases latency&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;batch.size
determining the maximum size of each batch, which by default is 16 KiB.
Setting this value to 0 (effectively the same as setting it to 1) disables batching, so each batch contains only one Message.
When a single Message exceeds the batch.size, it is sent as a standalone batch.
Enhancing this configuration will

&lt;ul&gt;
&lt;li&gt;Boosts throughput&lt;/li&gt;
&lt;li&gt;Utilizes more memory (each new batch creation allocates a memory block of batch.size)&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;max.in.flight.requests.per.connection
A producer can send up to 5 batches to each broker without awaiting a response, by default&lt;/li&gt;

&lt;li&gt;max.request.size
The maximum total message size per all requests is also the limit for individual messages, set by default at 1 MiB
Be aware that the broker's configuration message.max.bytes and the topic's configuration max.message.bytes also set boundaries on the maximum message size
&lt;strong&gt;Timeout Handling&lt;/strong&gt;
The Kafka Producer offers various timeout-related settings to manage the allowable duration for each phase of the message delivery process, as detailed below:&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxoyox5y1vgtof3nvg4wl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxoyox5y1vgtof3nvg4wl.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;These settings include&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;buffer.memory
the maximum capacity of the producer buffer, set by default at 32 MiB.
When this buffer is full, it will block and wait for up to max.block.ms before it issues an error.&lt;/li&gt;
&lt;li&gt;max.block.ms
By default, when the send method is invoked, the current thread may be blocked for a maximum of 60 seconds.
It encompasses

&lt;ul&gt;
&lt;li&gt;Time required to retrieve metadata&lt;/li&gt;
&lt;li&gt;Time incurred waiting when the producer buffer is at capacity
Excluding&lt;/li&gt;
&lt;li&gt;Duration required to serialize the message&lt;/li&gt;
&lt;li&gt;Time spent by the Partitioner to determine a partition&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;request.timeout.ms
Maximum duration from sending a request to receiving a response, typically 30s.&lt;/li&gt;

&lt;li&gt;delivery.timeout.ms
The entire duration of asynchronous message delivery, namely, from the moment the send method returns to the activation of the Callback. The default is set at 120s.
It encompasses

&lt;ul&gt;
&lt;li&gt;the time required for batching by the producer&lt;/li&gt;
&lt;li&gt;Sending a request to the broker and waiting for a response&lt;/li&gt;
&lt;li&gt;The time for each retry
Its value should be no less than linger.ms + request.timeout.ms.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;retries
The maximum number of retries, by default, is set to Integer.MAX_VALUE.&lt;/li&gt;

&lt;li&gt;retry.backoff.ms and retry.backoff.max.ms
These settings govern the exponential backoff strategy for retries following a send failure: each retry attempt begins with a wait time of retry.backuff.ms, which increases exponentially by a factor of 2, plus an additional 20% jitter, capped at retry.backoff.max.ms. The default values are 100ms and 1000ms, respectively.
&lt;strong&gt;Summary&lt;/strong&gt;
Our project, AutoMQ[1], is committed to developing the next-generation, cloud-native Apache Kafka® system, specifically designed to tackle the cost and elasticity challenges of traditional Kafka. As devoted supporters and active contributors to the Kafka ecosystem, we persist in delivering top-tier Kafka technical content to enthusiasts. In our previous article, we covered the functionality of Kafka Producers and the fundamental principles behind their implementation; our upcoming article will delve into further implementation details and address typical challenges associated with Kafka Producers. Keep an eye out for more updates.&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;References&lt;/strong&gt;&lt;br&gt;
[1] AutoMQ: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;&lt;br&gt;
[2] Kafka 3.7: &lt;a href="https://github.com/apache/kafka/releases/tag/3.7.0" rel="noopener noreferrer"&gt;https://github.com/apache/kafka/releases/tag/3.7.0&lt;/a&gt;&lt;br&gt;
[3] KIP-794: &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner" rel="noopener noreferrer"&gt;https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>AutoMQ Integration with Redpanda Console</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:26:04 +0000</pubDate>
      <link>https://dev.to/automq/automq-integration-with-redpanda-console-419p</link>
      <guid>https://dev.to/automq/automq-integration-with-redpanda-console-419p</guid>
      <description>&lt;p&gt;&lt;strong&gt;Managing Kafka/AutoMQ clusters more conveniently with Kafka Web UI&lt;/strong&gt;&lt;br&gt;
With the rapid development of big data technology, Kafka, as a high-throughput, low-latency distributed messaging system, has become a core component of real-time data processing in enterprises. However, managing and monitoring Kafka clusters is not an easy task. Traditional command-line tools and scripts, although powerful, are complex and not intuitive for developers and operations personnel. To address these challenges, Kafka Web UI emerged, providing users with a more convenient and efficient way to manage Kafka clusters.&lt;/p&gt;

&lt;p&gt;Over more than a decade of development, Apache Kafka® has accumulated a very rich ecosystem. As the successor to Apache Kafka®, AutoMQ can fully leverage the products in its ecosystem due to its complete compatibility with Kafka. AutoMQ Business Edition offers a very powerful control plane. If you are using AutoMQ, you can also manage AutoMQ clusters with products like Kafdrop and Redpanda Console [1].&lt;/p&gt;

&lt;p&gt;Today, we will share how to monitor the state of AutoMQ clusters using Redpanda Console [1] to enhance system maintainability and stability.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Integrating AutoMQ with Redpanda Console&lt;/strong&gt;&lt;br&gt;
Redpanda Console is a Kafka Web UI interface provided by Redpanda [2] for monitoring and managing Redpanda or Kafka clusters. It offers an intuitive user interface where users can easily view cluster states, monitor performance metrics, and manage topics and partitions. This console is designed to simplify the daily operations of data flow systems, enabling users to more effectively maintain and monitor their clusters.&lt;br&gt;
Thanks to AutoMQ's complete compatibility with Kafka, it can be seamlessly integrated with Redpanda Console. By utilizing Redpanda Console, AutoMQ users can also benefit from an intuitive user interface to monitor the real-time status of AutoMQ clusters, including topics, partitions, consumer groups, and their offsets. This monitoring capability not only enhances the efficiency of issue diagnosis but also helps optimize cluster performance and resource utilization.&lt;br&gt;
This tutorial will teach you how to start the Redpanda Console service and use it in conjunction with AutoMQ clusters to achieve cluster state monitoring and management.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fio6n48nwljigpoobtkor.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fio6n48nwljigpoobtkor.png" alt="Image description" width="800" height="455"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Prerequisites&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Deploy AutoMQ Cluster&lt;/li&gt;
&lt;li&gt;Prepare Redpanda Console Environment
&lt;strong&gt;Deploy AutoMQ Cluster&lt;/strong&gt;
Please refer to the official AutoMQ documentation: Cluster Deployment | AutoMQ [3].
&lt;strong&gt;Deploy Redpanda Console&lt;/strong&gt;
There are two ways to deploy Redpanda Console: Docker deployment and release version deployment. Docker deployment is simpler, and if you want to quickly and easily experience the integration of AutoMQ and Redpanda Console, it is recommended to choose Docker for deployment. If you have special requirements, such as login authentication, SASL authentication, TLS configuration, and log level settings, you can opt for the release version deployment. Below, I will introduce both configuration methods separately.
&lt;strong&gt;Docker Deployment&lt;/strong&gt;
Redpanda Console can be deployed via Docker, as referenced in Quick Start [4]. In this process, after setting up the AutoMQ cluster, you will know all the addresses and ports of the Broker nodes that are listening. Therefore, you can establish a connection between Redpanda Console and the AutoMQ cluster by specifying the KAFKA_BROKERS parameter in the Docker startup command. The Docker container startup command is as follows:
docker run -p 8080:8080 -e KAFKA_BROKERS=192.168.0.4:9092,192.168.0.5:9092,192.168.0.6:9092 docker.redpanda.com/redpandadata/console:latest&lt;/li&gt;
&lt;li&gt;-p 8080:8080: Specify the port mapping for accessing the Redpanda Console service.&lt;/li&gt;
&lt;li&gt;KAFKA_BROKERS: This should be specified as the Broker addresses of your AutoMQ cluster.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Release Deployment&lt;/strong&gt;&lt;br&gt;
You need to download and extract the suitable version from the GitHub Releases page of Redpanda Console: Release Redpanda Console [5], into a specified folder, such as /opt. The command is as follows:&lt;/p&gt;

&lt;h1&gt;
  
  
  ubuntu Linux
&lt;/h1&gt;

&lt;p&gt;cd /opt&lt;br&gt;
sudo curl -L -o redpanda_console.tar.gz &lt;a href="https://github.com/redpanda-data/console/releases/download/v2.6.0/redpanda_console_2.6.0_linux_amd64.tar.gz" rel="noopener noreferrer"&gt;https://github.com/redpanda-data/console/releases/download/v2.6.0/redpanda_console_2.6.0_linux_amd64.tar.gz&lt;/a&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  unzip, get redpanda_console
&lt;/h1&gt;

&lt;p&gt;sudo tar -xzf redpanda_console.tar.gz&lt;/p&gt;

&lt;h1&gt;
  
  
  config set
&lt;/h1&gt;

&lt;p&gt;sudo mkdir -p /etc/redpanda&lt;/p&gt;

&lt;h1&gt;
  
  
  write config
&lt;/h1&gt;

&lt;p&gt;sudo vim /etc/redpanda/redpanda-console-config.yaml&lt;br&gt;
An example of the content of the redpanda-console-config.yaml configuration file is as follows:&lt;br&gt;
kafka:&lt;br&gt;
  #Brokers is a list of bootstrap servers with&lt;br&gt;
  #port (for example "localhost:9092").&lt;br&gt;
  brokers:&lt;br&gt;
    - broker-0.mycompany.com:19092&lt;br&gt;
    - broker-1.mycompany.com:19092&lt;br&gt;
    - broker-2.mycompany.com:19092&lt;br&gt;
Note: Please ensure that the server where you are currently installing Redpanda Console can access the servers where the Broker nodes specified in the configuration file are located.&lt;br&gt;
For more detailed settings, you can refer to the official documentation: Redpanda Console Configuration [6]. After completing the configuration, you need to set environment variables so that the Redpanda Console executable can obtain the configuration file information and start the Redpanda Console:&lt;/p&gt;

&lt;h1&gt;
  
  
  set env
&lt;/h1&gt;

&lt;p&gt;export CONFIG_FILEPATH="/etc/redpanda/redpanda-console-config.yaml"&lt;/p&gt;

&lt;h1&gt;
  
  
  /opt/ run console
&lt;/h1&gt;

&lt;p&gt;./redpanda-console&lt;br&gt;
You will get the following results:&lt;br&gt;
./redpanda-console&lt;br&gt;
{"level":"info","ts":"2024-07-10T09:52:52.958+0800","msg":"started Redpanda Console","version":"2.6.0","built_at":"1717083695"}&lt;br&gt;
{"level":"info","ts":"2024-07-10T09:52:52.963+0800","msg":"connecting to Kafka seed brokers, trying to fetch cluster metadata"}&lt;br&gt;
{"level":"info","ts":"2024-07-10T09:52:54.780+0800","msg":"successfully connected to kafka cluster","advertised_broker_count":1,"topic_count":2,"controller_id":0,"kafka_version":"at least v3.6"}&lt;br&gt;
{"level":"info","ts":"2024-07-10T09:53:05.620+0800","msg":"Server listening on address","address":"[::]:8080","port":8080}&lt;br&gt;
&lt;strong&gt;Access the console page&lt;/strong&gt;&lt;br&gt;
fter completing the above deployment operations, you can access the console service by entering the address (e.g., http://{console_ip}:8080) in the browser. The display effect is as follows:&lt;br&gt;
&lt;strong&gt;Cluster Overview&lt;/strong&gt;&lt;br&gt;
The Cluster Overview page provides users with a macro perspective, displaying core information of the AutoMQ cluster. This includes but is not limited to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Cluster Health Status: Shows the current health status of the cluster, aiding users in quickly identifying issues.&lt;/li&gt;
&lt;li&gt;Storage Usage: Displays the data storage usage within the cluster, making it easier for users to manage and plan their storage.&lt;/li&gt;
&lt;li&gt;Version Information: Shows the current version of the AutoMQ cluster, facilitating tracking and upgrades.&lt;/li&gt;
&lt;li&gt;Number of Online Brokers: Displays the real-time number of online Brokers, which is a critical metric.&lt;/li&gt;
&lt;li&gt;Number of Topics and Replicas: Provides information on the number of Topics and Replicas, helping users understand the scale of the cluster and data replication status.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0shhh4y044xzeghfx6tl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0shhh4y044xzeghfx6tl.png" alt="Image description" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Monitoring the cluster status is crucial for ensuring the stability and performance of the messaging queue system. By monitoring the real-time status of the cluster, storage usage, version information, the number of online Brokers, as well as the number of Topics and Replicas, operations personnel can quickly identify and resolve potential issues, preventing system failures from impacting business operations. Additionally, these metrics aid in capacity planning and resource management, ensuring the system can handle future data growth. Moreover, knowing the cluster's version information helps users to perform timely software upgrades, leveraging the latest features and security fixes, thereby enhancing overall system reliability and efficiency.&lt;br&gt;
**Topic Overview&lt;br&gt;
**On the Topic list page, users can see a list of all Topics in the current AutoMQ cluster, including key information for each Topic, such as the number of partitions and replication strategy. Users can quickly browse and manage Topics through this page.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuq21plfys3gte4tvil3d.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuq21plfys3gte4tvil3d.png" alt="Image description" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Topic Details&lt;/strong&gt;&lt;br&gt;
After clicking on a specific Topic, users will be taken to the detailed page of that Topic, where they can explore and manage various aspects:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Message List: Browse and search messages within the Topic, which is very useful for message tracking and debugging.&lt;/li&gt;
&lt;li&gt;Consumer Information: Displays information about the consumers and Consumer Groups currently subscribed to the Topic, facilitating monitoring of consumption status.&lt;/li&gt;
&lt;li&gt;Partition Status: Shows detailed information for each partition, including key metrics such as Leader and ISR.&lt;/li&gt;
&lt;li&gt;Configuration Information: List the configuration details of the Topic, supporting modifications to optimize performance or behavior.&lt;/li&gt;
&lt;li&gt;ACL (Access Control List): Manage access permissions for the Topic to ensure data security.
Additionally, Redpanda Console supports users in manually creating and publishing messages, which is highly valuable for testing or injecting messages in specific scenarios.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwe8tztm7ul1zve4p2zoo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwe8tztm7ul1zve4p2zoo.png" alt="Image description" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Monitoring Topic details allows us to gain deep insights into the operation of the message queue. By browsing the message list, we can track and debug messages, monitor consumer information to assess consumption status, understand partition states to ensure data distribution and high availability, manage configuration information to optimize performance, and set access controls to ensure data security. These features help in timely identification and resolution of issues, thereby improving the overall efficiency and reliability of the system.&lt;br&gt;
&lt;strong&gt;Summary&lt;/strong&gt;&lt;br&gt;
This article introduces the integration process of Redpanda Console with AutoMQ, demonstrating how this powerful tool simplifies and enhances the management of AutoMQ clusters. It is hoped that this article provides practical references for users aiming to improve the efficiency and functionality of message queue management.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;References&lt;/strong&gt;&lt;br&gt;
[1] Redpanda Console: &lt;a href="https://redpanda.com/redpanda-console-kafka-ui" rel="noopener noreferrer"&gt;https://redpanda.com/redpanda-console-kafka-ui&lt;/a&gt;&lt;br&gt;
[2] Redpanda: &lt;a href="https://redpanda.com/" rel="noopener noreferrer"&gt;https://redpanda.com/&lt;/a&gt;&lt;br&gt;
[3] Cluster Deployment of AutoMQ: &lt;a href="https://docs.automq.com/zh/docs/automq-opensource/IyXrw3lHriVPdQkQLDvcPGQdnNh" rel="noopener noreferrer"&gt;https://docs.automq.com/zh/docs/automq-opensource/IyXrw3lHriVPdQkQLDvcPGQdnNh&lt;/a&gt;&lt;br&gt;
[4] Quick Start: &lt;a href="https://github.com/redpanda-data/console?tab=readme-ov-file#quick-start" rel="noopener noreferrer"&gt;https://github.com/redpanda-data/console?tab=readme-ov-file#quick-start&lt;/a&gt;&lt;br&gt;
[5] Release Redpanda Console: &lt;a href="https://github.com/redpanda-data/console/releases/tag/v2.6.0" rel="noopener noreferrer"&gt;https://github.com/redpanda-data/console/releases/tag/v2.6.0&lt;/a&gt;&lt;br&gt;
[6] Redpanda Console Configuration: &lt;a href="https://docs.redpanda.com/current/reference/console/config/#example-redpanda-console-configuration-file" rel="noopener noreferrer"&gt;https://docs.redpanda.com/current/reference/console/config/#example-redpanda-console-configuration-file&lt;/a&gt;&lt;br&gt;
[7] Kafdrop Github:  &lt;a href="https://github.com/obsidiandynamics/kafdrop" rel="noopener noreferrer"&gt;https://github.com/obsidiandynamics/kafdrop&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Implementing Kafka to Run on S3 with a Hundred Lines of Code</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:21:38 +0000</pubDate>
      <link>https://dev.to/automq/implementing-kafka-to-run-on-s3-with-a-hundred-lines-of-code-14bi</link>
      <guid>https://dev.to/automq/implementing-kafka-to-run-on-s3-with-a-hundred-lines-of-code-14bi</guid>
      <description>&lt;p&gt;&lt;strong&gt;TL;DR&lt;/strong&gt;&lt;br&gt;
Yes, you read that correctly. AutoMQ[1] currently supports being fully built on object storage like S3. You can refer to the quick start guide[3] to get started immediately. AutoMQ, with its existing stream storage engine, achieves features that other competitors pride themselves on by extending the top-level WAL abstraction with minimal code, enabling the entire stream system to be built on object storage like S3. Notably, we have made this part of the source code fully open, allowing developers to use the S3Stream[2] stream storage engine to easily deploy a Kafka service entirely on object storage in their environment, with extremely low storage costs and operational complexity.&lt;/p&gt;

&lt;p&gt;The core stream storage engine of AutoMQ can achieve this capability so effortlessly due to its excellent top-level abstraction around WAL and shared storage architecture design. It is precisely based on this excellent top-level abstraction that we have implemented the highly innovative S3Stream[2] stream storage engine. In this article, we will share the design details of AutoMQ's shared stream storage engine, the underlying considerations, and the evolution process. After reading the previous content, you will truly understand why we say that only a hundred lines of code are needed to run Kafka on S3.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Embarking from the Shared Storage Architecture&lt;/strong&gt;&lt;br&gt;
Over a decade ago, Kafka emerged in an era where Internet Data Centers (IDC) were the primary scenarios. At that time, compute and storage resources were typically tightly coupled, forming an integrated Share-Nothing architecture. This architecture was highly effective in the physical data center environments of that period. However, as Public Cloud technology matured, the limitations of this architecture became apparent. The tightly coupled compute-storage nature of the Share-Nothing architecture made it impossible to decouple the storage layer completely and offload capabilities such as durability and high availability to cloud storage services. This also meant that the Share-Nothing architecture could not leverage the technical and cost benefits of scalable cloud storage services. Furthermore, the integrated compute-storage architecture made Kafka lack elasticity and difficult to scale. When adjusting Kafka cluster capacity, it involves substantial data replication, which affects the efficiency of capacity adjustments and impacts normal read and write requests during this period.&lt;/p&gt;

&lt;p&gt;AutoMQ is committed to fully leveraging the advantages of the cloud, adhering to a Cloud-First philosophy. Through a shared storage architecture, AutoMQ decouples data durability and offloads it to mature cloud storage services like S3 and EBS, thereby fully exploiting the potential of these cloud storage services. Problems such as lack of elasticity, high costs, and complex operations associated with Kafka due to the Share-Nothing architecture are resolved under AutoMQ's new shared storage architecture.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fo6jp8kxzagy4oeyjdr7s.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fo6jp8kxzagy4oeyjdr7s.png" alt="Image description" width="800" height="285"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Stream Storage Top-Level Abstraction: Shared WAL + Shared Object&lt;/strong&gt;&lt;br&gt;
The core architecture of AutoMQ's shared storage is Shared WAL and Shared Object. Under this shared storage architecture abstraction, we can have various implementations. The Shared WAL abstraction allows us to reassign this WAL implementation to any shared storage medium, enjoying the advantages brought by different shared storage media. Readers familiar with software engineering would know that every software design has trade-offs, and different shared storage media will have varying benefits and drawbacks as their trade-offs change. AutoMQ's top-level Shared WAL abstraction enables it to adapt to these changes. AutoMQ can reassign the Shared WAL implementation freely to any shared storage service and even combine them. Shared Object is primarily built on mature cloud object storage services, enjoying extremely low storage costs and the scalability benefits of cloud object storage services. As the S3 API becomes the de facto standard for object storage protocols, AutoMQ can also use Shared Object to adapt to various object storage services, offering multi-cloud storage solutions to users. Shared WAL can be adapted to low-latency storage media like EBS and S3E1Z, providing users with low-latency stream services.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fchvu95fvi01mu3irhql8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fchvu95fvi01mu3irhql8.png" alt="Image description" width="800" height="1229"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnai211ne64dnynx945co.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnai211ne64dnynx945co.png" alt="Image description" width="800" height="380"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;[图片]&lt;/p&gt;

&lt;p&gt;Best Shared WAL Implementation in the Cloud: EBS WAL&lt;br&gt;
WAL was initially used in relational databases to achieve data atomicity and consistency. With the maturity of cloud storage services like S3 and EBS, combining WAL with low-latency storage and asynchronously writing data to low-cost storage like S3 balances latency and cost. AutoMQ is the first in the stream domain to use WAL based on a shared storage architecture, fully harnessing the advantages of different cloud storage. We believe that the EBS WAL implementation is the best for cloud stream storage engines because it combines the low-latency and high-durability advantages of EBS with the low-cost benefits of object storage. Through clever design, it also mitigates the expensive drawbacks of EBS.&lt;/p&gt;

&lt;p&gt;The following diagram illustrates the core implementation process of EBS WAL:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The Producer writes data to EBS WAL through the S3Stream stream storage engine. Once the data is successfully persisted to disk, a success response is immediately returned to the client, fully leveraging the low-latency and high-durability characteristics of EBS.&lt;/li&gt;
&lt;li&gt;Consumers can read newly written data directly from the cache.&lt;/li&gt;
&lt;li&gt;Once the data in the cache is asynchronously and batch-written to S3 in parallel, it becomes invalid.&lt;/li&gt;
&lt;li&gt;If consumers need to read historical data, they should directly access the object storage.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fz4n88xjwka9juxh2zgio.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fz4n88xjwka9juxh2zgio.png" alt="Image description" width="800" height="500"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A common misconception is confusing the Shared WAL built on EBS with Kafka’s tiered storage. The primary way to distinguish between them is to check whether the compute node broker is entirely stateless. For tiered storage implementations by Confluent and Aiven, their brokers are still stateful. Kafka's tiered storage requires the last log segment of its partition to be on the local disk, hence their local storage data is tightly coupled with the compute layer brokers. However, AutoMQ’s EBS WAL implementation does not have this limitation. When a broker node crashes, other healthy broker nodes can take over the EBS volume within milliseconds via Multi Attach, write the small fixed-size WAL data (usually 500MB) to S3, and then delete the volume.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdenjh0lenb08525iyx0g.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdenjh0lenb08525iyx0g.png" alt="Image description" width="800" height="557"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Natural Evolution of Shared WAL: S3 WAL&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;S3 WAL is the natural evolution of the Shared WAL storage architecture. AutoMQ currently supports building the entire storage layer on S3, which is a specific implementation of Shared WAL. This WAL implementation built directly on S3 is what we refer to as S3 WAL. Thanks to the top-level abstraction of Shared WAL and the foundational implementation of EBS WAL, the core processes of S3 WAL are identical to those of EBS WAL. Therefore, the AutoMQ Team was able to support the implementation of S3 WAL within just a few weeks.&lt;/p&gt;

&lt;p&gt;Implementing S3 WAL is a natural evolution of the AutoMQ Shared WAL architecture and helps AutoMQ expand its capability boundaries. When using S3 WAL, all user data is written to object storage, which leads to some latency increase compared to EBS WAL. However, with this trade-off, the entire architecture becomes more streamlined and efficient due to fewer dependent services. On "special" cloud providers like AWS, which do not offer cross-AZ EBS, and in private IDC scenarios using self-built object storage services like minio, the S3 WAL architecture provides stronger cross-AZ availability guarantees and flexibility.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;S3WAL Benchmark&lt;/strong&gt;&lt;br&gt;
AutoMQ has optimized the performance of S3 WAL significantly, especially its latency. In our test scenarios, the average latency for S3 WAL Append is 168ms, with P99 at 296ms.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fu93r0ij36ikkmd6hvf2r.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fu93r0ij36ikkmd6hvf2r.png" alt="Image description" width="800" height="237"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Kafka Produce request processing latency averages 170ms, with P99 at 346ms.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg0w7twjmq2y37w9klbnk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg0w7twjmq2y37w9klbnk.png" alt="Image description" width="800" height="133"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Average send latency is 230ms, with P99 at 489ms.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3y4nfso9v4lwjoyddp9l.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3y4nfso9v4lwjoyddp9l.png" alt="Image description" width="800" height="261"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;How AutoMQ Achieves S3 WAL with Hundreds of Lines of Code&lt;/strong&gt;&lt;br&gt;
In AutoMQ's GitHub repository, you can find the core stream storage repository, S3Stream [2]. The class &lt;code&gt;com.automq.stream.s3.wal.WriteAheadLog&lt;/code&gt; contains the top-level abstraction for WAL, while the implementation class &lt;code&gt;ObjectWALService&lt;/code&gt; includes more than 100 lines of implementation code for S3 WAL. In this sense, we leveraged over 100 lines of implementation code in conjunction with the existing EBS WAL infrastructure to fully build AutoMQ on S3.&lt;/p&gt;

&lt;p&gt;Of course, implementing hundreds of lines of code does not mean you only need to write over 100 lines of code to run Kafka on S3. This is merely an appearance. The key lies in thoroughly understanding the WAL-based shared storage architecture concept of AutoMQ. Within this framework, whether achieving fully S3-based shared storage or implementing on other shared storage media in the future, the approach remains consistent. In AutoMQ's architecture, Shared WAL is one of the core components. By organizing the code through the top-level abstraction of Shared WAL, we can reassign the implementation methods of Shared WAL to any other shared storage media. Specifically, when implementing a shared storage WAL on AutoMQ, the actual workload and complexity have already been absorbed by the underlying architecture. You only need to focus on efficiently writing and reading WAL to the target storage media. Because AutoMQ's stream storage engine has already paved the way for you, once you fully understand the concept of Shared WAL and the S3Stream stream storage engine, implementing a fully S3-based S3WAL is as simple as writing 100 lines of code.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Summary&lt;/strong&gt;&lt;br&gt;
This article reveals the core concept of the shared storage architecture based on Shared WAL behind AutoMQ's storage architecture by introducing its thoughts and evolution. In the future, AutoMQ will continue to optimize the capabilities of the stream storage engine foundation based on this abstraction, building a more powerful Kafka stream service for everyone. In the near future, S3E1Z WAL will also officially meet everyone, so please stay tuned to us.&lt;/p&gt;

&lt;p&gt;References&lt;br&gt;
[1] AutoMQ: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;&lt;br&gt;
[2] S3Stream:&lt;a href="https://github.com/AutoMQ/automq/tree/main/s3stream" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq/tree/main/s3stream&lt;/a&gt;&lt;br&gt;
[3] Direct S3 Cluster Deployment:  &lt;a href="https://docs.automq.com/automq/getting-started/deploy-direct-s3-cluster" rel="noopener noreferrer"&gt;https://docs.automq.com/automq/getting-started/deploy-direct-s3-cluster&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Industry Standard for Cloud Instance Initialization: Cloud-Init</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Fri, 07 Jun 2024 06:11:27 +0000</pubDate>
      <link>https://dev.to/automq/industry-standard-for-cloud-instance-initialization-cloud-init-5b52</link>
      <guid>https://dev.to/automq/industry-standard-for-cloud-instance-initialization-cloud-init-5b52</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Cloud-Init[1] is the industry-standard tool for initializing cloud instances across multiple platforms. It is endorsed by all leading public cloud providers and is ideal for configuring private cloud infrastructures and bare-metal environments. At boot-up, Cloud-Init detects its cloud environment, accesses any provided metadata, and initializes the system. This process may include setting up network and storage configurations, establishing SSH access keys, among other system settings. Following this, Cloud-Init processes any additional user or vendor data supplied to the instance. Whether you're creating custom Linux deployment images or launching new Linux servers, Cloud-Init is pivotal for automating and streamlining these processes.&lt;/p&gt;

&lt;h2&gt;
  
  
  Current Context: Cloud-Init's Ubiquity Across Cloud Platforms
&lt;/h2&gt;

&lt;p&gt;Cloud-Init has become the industry standard for initializing virtual machines in the cloud computing sector, with widespread use across all major cloud platforms. An examination of the data sources that Cloud-Init supports shows its extensive compatibility, catering to numerous cloud service providers like AWS (Amazon Web Services), Azure (Microsoft Cloud), and Alibaba Cloud, as well as various private cloud and container virtualization solutions including CloudStack, OpenNebula, OpenStack, and LXD. This broad adoption highlights Cloud-Init's essential role in automating cloud infrastructure deployments across an array of platforms and services.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Amazon EC2&lt;/li&gt;
&lt;li&gt;Alibaba cloud (AliYun)&lt;/li&gt;
&lt;li&gt;Azure&lt;/li&gt;
&lt;li&gt;Google Compute Engine&lt;/li&gt;
&lt;li&gt;LXD&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Objective: What Issues Does Cloud-Init Address?
&lt;/h2&gt;

&lt;p&gt;Cloud-Init primarily addresses the need for rapid and automated configuration and startup of cloud instances, to efficiently adapt to the dynamic demands of the cloud computing environment. This tool was initially designed to simplify the initialization process of cloud instances. Since its inception as an open-source project, Cloud-Init has quickly gained widespread recognition and has become a standard feature supported by nearly all major cloud service providers, including Amazon Web Services, Google Cloud Platform, and Microsoft Azure.&lt;br&gt;
&lt;strong&gt;Challenges in Cloud Computing Deployment&lt;/strong&gt;&lt;br&gt;
In the early stages of cloud computing, setting up and configuring virtual machines was a time-consuming and complex process, especially when dealing with large-scale configurations and dependent software installations. Although pre-configured system images could achieve rapid deployment, as computing needs diversified and architectures became more complex, this approach gradually appeared less flexible and efficient. Operations staff had to manually configure each instance, such as setting up networks, storage, SSH keys, software packages, and various other system aspects, which not only increased the workload but also heightened the possibility of errors.&lt;br&gt;
&lt;strong&gt;Cloud-Init's Solution&lt;/strong&gt;&lt;br&gt;
Cloud-Init emerged to address this pain point. It allows users to automatically execute a series of customized configuration tasks at the first startup of a cloud instance, such as setting hostnames, network configurations, user management, and software package installations, significantly simplifying the deployment and management of cloud instances. By using Cloud-Init, users can customize startup scripts and configuration files for cloud instances, achieving a truly "configure once, run anywhere" capability, which greatly enhances the deployment efficiency and flexibility of cloud resources.&lt;br&gt;
During the startup process of cloud instances, Cloud-Init is responsible for identifying the cloud environment in which it operates and accordingly initializing the system. This means that at first startup, the cloud instance is automatically configured with network settings, storage, SSH keys, software packages, and other various system settings, without the need for additional manual intervention.&lt;br&gt;
The core value of Cloud-Init lies in providing a seamless bridge for the startup and connection of cloud instances, ensuring that the instances function as expected. For users of cloud services, Cloud-Init offers a first-time startup configuration management solution that does not require installation. For cloud providers, it offers instance settings that can be integrated with their cloud services.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxh3j0tjr65zjz8goegck.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxh3j0tjr65zjz8goegck.png" alt="Image description" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Features and Use Cases of Cloud-Init
&lt;/h2&gt;

&lt;p&gt;Cloud-Init provides a suite of capabilities designed for automated configuration and management across diverse cloud computing platforms. These features enable robust support for automated deployments and management in cloud settings, greatly improving the flexibility and efficiency of configuring cloud resources.&lt;br&gt;
**Common use cases for Cloud-Init&lt;br&gt;
**Cloud-Init is routinely employed to carry out custom initialization tasks prior to the actual startup of application processes. Typical initialization tasks include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Setting up the hostname&lt;/li&gt;
&lt;li&gt;Adding SSH keys&lt;/li&gt;
&lt;li&gt;Executing a script on the first boot&lt;/li&gt;
&lt;li&gt;Formatting and mounting a data disk&lt;/li&gt;
&lt;li&gt;Launching an Ansible playbook&lt;/li&gt;
&lt;li&gt;Install a DEB/RPM package.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our project, AutoMQ[2], is a cloud-native Kafka implementation that leverages cloud infrastructure. On platforms like AWS, AutoMQ utilizes ASG and EC2 for operations when not deploying via Kubernetes. Before initiating AutoMQ, several preparatory steps and configurations are required. Here is the Cloud-Init script content from the Enterprise Edition of AutoMQ, detailing the key initialization steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Initialize the systemd service files.&lt;/li&gt;
&lt;li&gt;Utilize the AWS SDK to authenticate with the ECS RAM Role, ensuring proper access to additional cloud services.&lt;/li&gt;
&lt;li&gt;Set up the necessary environment variables for AutoMQ.&lt;/li&gt;
&lt;li&gt;Launch the AutoMQ systemd service using a script.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
#cloud-config

write_files:
  - path: /etc/systemd/system/kafka.service
    permissions: '0644'
    owner: root:root
    content: |
      // ignore some code...


  - path: /opt/automq/scripts/run.info
    permissions: '0644'
    owner: root:root
    content: |
      role=
      wal.path=
      init.finish=

runcmd:

    // ignore some code....

    echo "Start getting the meta and wal volume ids" &amp;gt; ${AUTOMQ_HOME}/scripts/automq-server.log
    region_id=$(curl -s http://169.254.169.254/latest/meta-data/placement/region)

    aws configure set default.region ${region_id} --profile ec2RamRoleProfile
    aws configure set credential_source Ec2InstanceMetadata --profile ec2RamRoleProfile
    aws configure set role_arn #{AUTOMQ_INSTANCE_PROFILE} --profile ec2RamRoleProfile

    instance_id=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)


  - |
    echo "AUTOMQ_ENABLE_LOCAL_CONFIG=#{AUTOMQ_ENABLE_LOCAL_CONFIG}" &amp;gt;&amp;gt; ${AUTOMQ_HOME}/scripts/env.info
    // ignore some code....


  - |
    echo "export AUTOMQ_NODE_ROLE='#{AUTOMQ_NODE_ROLE}'" &amp;gt;&amp;gt; /etc/bashrc
    // ignore some code....

    source /etc/bashrc

  - sh ${AUTOMQ_HOME}/scripts/automq-server.sh up --s3url="#{AUTOMQ_S3URL}" &amp;gt;&amp;gt; ${AUTOMQ_HOME}/scripts/automq-server.log 2&amp;gt;&amp;amp;1 &amp;amp;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: This userdata content is incomplete and is for illustrative purposes only; it requires integration with other AutoMQ scripts and Enterprise Edition code to be fully operational.&lt;br&gt;
&lt;strong&gt;Why choose Cloud-Init when I have Docker or Kubernetes?&lt;/strong&gt;&lt;br&gt;
When you think about setting up your environment, Docker and Kubernetes likely come to mind. However, it's great to know that choosing isn't necessary. Even if you opt for Docker or Kubernetes, you'll still need to install and configure their elements on your machines, which is precisely where Cloud-Init comes into play. They simply offer different abstraction levels in runtime environments; they're not mutually exclusive. Think of Cloud-Init as essentially the Dockerfile for the VM world.&lt;/p&gt;

&lt;h2&gt;
  
  
  How does Cloud-Init work?
&lt;/h2&gt;

&lt;p&gt;The process is broken down into two primary phases, taking place early in the boot process (local boot stage) and thereafter.&lt;br&gt;
&lt;strong&gt;Early Boot Stage&lt;/strong&gt;&lt;br&gt;
In the local boot stage, before the network configuration kicks in, Cloud-Init primarily carries out the following tasks:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Identify data sources: It determines the data source of the running instance by examining built-in hardware values. Data sources are the wellsprings of all configuration data.&lt;/li&gt;
&lt;li&gt;Fetch configuration data: After pinpointing the data source, Cloud-Init pulls configuration data from it. This data provides Cloud-Init with directives on the actions to take, which may encompass instance metadata (like machine ID, hostname, and network settings), vendor data, and user data (userdata). Vendor data comes from cloud providers, and user data (userdata) is usually implemented following network configurations.&lt;/li&gt;
&lt;li&gt;Network Configuration Writing: Cloud-Init writes network configurations and sets up DNS, prepping the system for network services to be implemented at startup.
&lt;strong&gt;Late Startup Phase&lt;/strong&gt;
Following the network configuration, during the subsequent startup phase, Cloud-Init executes non-critical configuration tasks using vendor data and user data (userdata) to tailor the running instance. Specific tasks include:&lt;/li&gt;
&lt;li&gt;Configuration Management: Cloud-Init interfaces with management tools such as Puppet, Ansible, or Chef to apply intricate configurations and ensure the system remains current.&lt;/li&gt;
&lt;li&gt;Software Installation: At this juncture, Cloud-Init installs necessary software and performs updates to guarantee that the system is fully operational and up-to-date.&lt;/li&gt;
&lt;li&gt;User Accounts: Cloud-Init manages the creation and modification of user accounts, sets default passwords, and configures permissions accordingly.&lt;/li&gt;
&lt;li&gt;Execute User Scripts: Cloud-Init executes custom scripts included in the user data, facilitating the installation of additional software, the application of security measures, and more. It also injects SSH keys into the instance's authorized_keys file to enable secure remote access.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Subdivision of the Startup Phase&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Detect: Use the platform identification tool ds-identify to ascertain the platform on which the instance operates.&lt;/li&gt;
&lt;li&gt;Local: Functions under Cloud-Init-local.service, chiefly responsible for detecting "local" data sources and setting up network configurations.&lt;/li&gt;
&lt;li&gt;Network: Operates under Cloud-Init.service, which necessitates all configured networks to be active and processes user data.&lt;/li&gt;
&lt;li&gt;Config: Runs under cloud-config.service, executing configuration-only modules, such as runcmd.&lt;/li&gt;
&lt;li&gt;Final: Performs under cloud-final.service, marking the conclusion of the boot sequence, where user-defined scripts are executed.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Differences and workflows between Cloud-Init and other tools
&lt;/h2&gt;

&lt;p&gt;While Cloud-Init, Packer, and Ansible are all automation tools used in deployment and configuration, they vary in their functionality, positioning, and workflows.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Cloud-Init is primarily designed for the initial boot and configuration stages of cloud instances.&lt;/li&gt;
&lt;li&gt;Packer specializes in creating immutable machine images that can be reused across various platforms.&lt;/li&gt;
&lt;li&gt;Ansible serves as a more comprehensive tool for configuration management and application deployment, ideal for automating system setups and deploying applications.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;While there is some functional overlap, using these tools in tandem can enhance and streamline automation during different phases of deployment and management.&lt;/p&gt;

&lt;h2&gt;
  
  
  Summary
&lt;/h2&gt;

&lt;p&gt;This article offers an in-depth look at the functionalities and use cases of Cloud-Init, highlighting its differences from other deployment automation tools. We hope you find this information useful.&lt;/p&gt;

&lt;p&gt;AutoMQ[2] is committed to advancing messaging and streaming systems into the cloud-native era. Our goal is to fully utilize mature, scalable cloud services to unlock the full potential of the cloud. Understanding the features, pricing, and principles of various cloud services thoroughly is essential. Moving forward, we will continue to share insights on cloud technology, striving to be your go-to cloud expert and helping everyone maximize the benefits of cloud services.&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;[1] Cloud-Init:  &lt;a href="https://github.com/canonical/Cloud-Init" rel="noopener noreferrer"&gt;https://github.com/canonical/Cloud-Init&lt;/a&gt;&lt;br&gt;
[2] AutoMQ: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;&lt;br&gt;
[3] Introduction to Cloud-Init: &lt;a href="https://cloudinit.readthedocs.io/en/latest/explanation/introduction.html#how-does-Cloud-Init-work" rel="noopener noreferrer"&gt;https://cloudinit.readthedocs.io/en/latest/explanation/introduction.html#how-does-Cloud-Init-work&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>AutoMQ Automated Streaming System Continuous Testing Platform Technical Insider</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Fri, 07 Jun 2024 03:57:34 +0000</pubDate>
      <link>https://dev.to/automq/automq-automated-streaming-system-continuous-testing-platform-technical-insider-2c0f</link>
      <guid>https://dev.to/automq/automq-automated-streaming-system-continuous-testing-platform-technical-insider-2c0f</guid>
      <description>&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;AutoMQ[1], as a streaming system, is widely used in critical customer operations that demand high reliability. Consequently, a simulated, long-term testing environment that replicates real-world production scenarios is essential to ensure the viability of SLAs. This level of assurance is critical for the confidence in releasing new versions and for client adoption. With this objective, we created an automated, continuous testing platform for streaming systems, named Marathon. Before rolling out the Marathon framework, we established three key design principles:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Scalable: The platform must accommodate the growth of test cases and deployment modes as the system under test evolves&lt;/li&gt;
&lt;li&gt;Observable: Being a testing platform, encountering bugs is expected. Thus, robust debugging tools are essential for pinpointing and resolving root causes&lt;/li&gt;
&lt;li&gt;Cost-effective: Given the fluctuating traffic patterns in test scenarios, resource consumption should dynamically adjust according to traffic changes
These three principles guided subsequent technology choices and architectural decisions.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Architectural Overview
&lt;/h2&gt;

&lt;p&gt;Let’s begin with an overview of the architecture diagram&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5zxep1ehubllc98kjnn4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5zxep1ehubllc98kjnn4.png" alt="Image description" width="800" height="410"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Marathon project's Controller, Worker, and the AutoMQ Enterprise Edition control plane are all integrated within Kubernetes (K8S):&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The Controller interacts with the AutoMQ Enterprise Edition control plane within the same VPC to oversee the creation, modification, and deletion of Kafka clusters, while also coordinating test tasks and managing the quantity and configuration of Workers.&lt;/li&gt;
&lt;li&gt;Worker: Operates Kafka clients to generate the necessary workload for tasks and is also tasked with reporting observability data and performing client-side SLA assessments&lt;/li&gt;
&lt;li&gt;AutoMQ Enterprise Edition control plane: Delivers a comprehensive set of productized features for the data plane, including cluster lifecycle management, observability, security auditing, and cluster reassignment. Marathon predominantly leverages its OpenAPI related to cluster lifecycle management to create, modify, and destroy clusters, facilitating the execution of the entire testing process&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The architecture of the Controller and Worker is crafted as a distributed system: The Controller functions akin to a K8S Operator, dynamically adjusting the number and setup of Workers via a tuning loop to align with task demands; Workers are fully stateless systems that inform the Controller about various events to manage corresponding actions. This setup provides the architecture with remarkable flexibility, supporting the scalability demands of tasks. Moreover, the lightweight, adaptable Workers can dynamically scale and even operate on Spot instances[2], considerably lowering operational expenses and enabling the feasibility of ultra-large-scale elastic tasks&lt;/p&gt;

&lt;h2&gt;
  
  
  Technical Details
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Running the Controller&lt;br&gt;
Startup process&lt;/strong&gt;&lt;br&gt;
The Controller is designed for resource management and task orchestration, initiating several resource managers at the outset:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Service Discovery: Monitors the operational status of Workers&lt;/li&gt;
&lt;li&gt;Event Bus: Acts as the communication conduit with Workers&lt;/li&gt;
&lt;li&gt;Alert Service: Alerts administrators to events requiring immediate attention&lt;/li&gt;
&lt;li&gt;Kafka Cluster Manager: Oversees the status of Kafka clusters; tracks Kafka release updates and manages upgrades&lt;/li&gt;
&lt;li&gt;Signal Processor: Detects SIG_TERM to begin the termination process, reclaiming any resources created&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The Controller accommodates various types of Kafka clusters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Existing Kafka clusters: Rapidly confirms the functionality of designated clusters&lt;/li&gt;
&lt;li&gt;Managed Kafka Clusters: Managed by a Controller that oversees the entire lifecycle of the cluster, these Kafka clusters leverage the control plane capabilities of AutoMQ for creation and destruction
&lt;strong&gt;Task cycles&lt;/strong&gt;
The Controller uses a mechanism akin to a K8S Operator, dynamically adjusting the number and configuration of Workers based on task requirements during a tuning cycle. Each task corresponds to a test scenario, where tasks are programmed to send and receive messages from Kafka, constructing various traffic models for black-box testing
Each task is divided into four stages, sequentially executed within the same thread:&lt;/li&gt;
&lt;li&gt;Resource creation&lt;/li&gt;
&lt;li&gt;Warm-up&lt;/li&gt;
&lt;li&gt;Running task load&lt;/li&gt;
&lt;li&gt;Resource recovery
The Marathon framework provides a comprehensive set of utility classes designed to streamline the process of task creation. These include functionalities for generating Kafka topics, managing consumer backlogs, adjusting worker traffic, monitoring specific events, and introducing faults into Kafka clusters. Paired with Workers, these tools facilitate the simulation of traffic across any scale and enable testing in unique scenarios, such as large-scale cold reads or the deliberate shutdown of a Kafka node to assess data integrity.
Coding tasks offer the flexibility to craft specific scenarios with the sole restriction of avoiding non-interruptible blocking operations. If a Worker's Spot instance is reclaimed, the Controller intervenes to interrupt the task thread, reclaim resources, and retry the task as needed.
&lt;strong&gt;Managing Workers&lt;/strong&gt;
&lt;strong&gt;Creation and service discovery of Workers&lt;/strong&gt;
Conducting stress tests on a Kafka cluster can demand bandwidths exceeding tens of GB/s, clearly surpassing the capabilities of a single machine. Thus, designing a distributed system becomes imperative. The initial step involves determining how to locate newly established Workers and communicate with them. Our decision to manage the system with Kubernetes (K8s) naturally leads us to employ K8s mechanisms for service discovery.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgb6mvc12ssi4x05q1qmg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgb6mvc12ssi4x05q1qmg.png" alt="Image description" width="800" height="505"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We conceptualize a collection of identically configured Workers as a Worker Deployment, aligning with the Deployment model in K8s. Each Worker functions as a Pod within this Deployment. Creating Workers through the Controller is comparable to deploying a Deployment to the API Server and awaiting the activation of all Pods, as illustrated in Steps 1 and 2. K8s nodes scale appropriately, provisioning the necessary Spot instance virtual machines.&lt;br&gt;
Upon initialization, each Worker generates a Configmap that catalogs the events of interest, initially concentrating on initialization events (Step 3). The Controller monitors for newly created Configmaps using the K8s Watch API (Step 4), subsequently dispatching initialization events containing configurations to these Workers (Step 5).&lt;br&gt;
This completes the service discovery and initialization process for Workers. Workers then update their Configmaps to subscribe to additional events of interest. This mechanism of service discovery empowers the Controller with the dynamic ability to create Workers, setting the groundwork for the event bus outlined in the subsequent section.&lt;br&gt;
&lt;strong&gt;Event Bus&lt;/strong&gt;&lt;br&gt;
Leveraging the service discovery mechanism discussed previously, the Controller now identifies the service addresses of each Worker (combining Pod IP and port) and the events these Workers are interested in (such as subscribing to Configmap changes), allowing the Controller to push events directly to specific Workers.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb3pigh1nb529rqqh9bn1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb3pigh1nb529rqqh9bn1.png" alt="Image description" width="800" height="411"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Numerous RPC frameworks are available, and Marathon has opted for Vert.x. It supports the traditional request-reply communication model as well as the multi-receiver publish-subscribe model, which proves invaluable in scenarios where multiple nodes must acknowledge an event (illustrated in the figure for the Adjust throughput command).&lt;br&gt;
&lt;strong&gt;Spot Instance Application&lt;/strong&gt;&lt;br&gt;
As deduced from the preceding sections, Workers can be dynamically generated as needed by tasks, and commands to execute tasks on Workers can also be dispatched through the event bus (as illustrated in the figure for the Initialize new worker command). Essentially, Workers are stateless and can be rapidly created or destroyed, making the utilization of Spot Instances viable (the Controller, utilizing minimal resources, can operate on a smaller-scale Reserved Instance).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhqe42hgzgg0b23cihmvz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhqe42hgzgg0b23cihmvz.png" alt="Image description" width="702" height="690"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Controller employs Kubernetes' Watch API to monitor the status of Pods, pausing and restarting the current task upon detecting an unexpected termination of a Pod. This enables prompt detection and mitigation of task impacts during the reclamation of Spot Instances. Spot Instances, derived from the excess capacity of cloud providers, offer significant cost savings compared to Reserved Instances. By leveraging Spot Instances, Marathon can drastically cut the costs of executing tasks with lower stability demands over prolonged periods.&lt;br&gt;
&lt;strong&gt;Test Scenarios&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;Scenario Description and Resource Management.&lt;/strong&gt;&lt;br&gt;
Marathon test scenarios are outlined in code by inheriting from an Abstract class, defining the test case configuration, and implementing its lifecycle methods. Here are some of the existing test scenarios:&lt;br&gt;
&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4ky1ma2wznwggma0gjys.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4ky1ma2wznwggma0gjys.png" alt="Image description" width="800" height="341"&gt;&lt;/a&gt;&lt;br&gt;
Test case configurations utilize generics, for instance, taking CatchUpReadTask as an example, the class is structured as&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class CatchUpReadTask extends AbstractTask&amp;lt;CatchUpReadTaskConfig&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The related configuration class, CatchUpReadTaskConfig, outlines the necessary parameters for executing this task, which users can dynamically set&lt;/p&gt;

&lt;p&gt;Each task scenario is characterized through the implementation of the following lifecycle methods to simulate a specific traffic pattern:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7fnutxfqswe1ju31s8bu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7fnutxfqswe1ju31s8bu.png" alt="Image description" width="712" height="612"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;prepare: Establish the necessary resources for the task&lt;/li&gt;
&lt;li&gt;warmup: Ready the Worker and the cluster for testing&lt;/li&gt;
&lt;li&gt;workload: Generate the task workload&lt;/li&gt;
&lt;li&gt;cleanup: Remove the resources established for the task&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Taking CatchUpReadTask as an example:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcfzr48mxgel4u1azqoaz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcfzr48mxgel4u1azqoaz.png" alt="Image description" width="776" height="1280"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Workload stage is the key differentiator among various task scenarios, where the CatchUpReadTask needs to build an appropriate backlog volume and then ensure it can be consumed within 5 minutes. For ChaosTask, the approach shifts to terminating a node and verifying that its partitions can be reassigned to other nodes within 1 minute. To cater to the diverse requirements of these tasks, the Marathon framework offers a toolkit for crafting test scenarios, as illustrated in the figure above:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;KafkaUtils: Create/Delete Topic (a resource type within Kafka clusters)&lt;/li&gt;
&lt;li&gt;WorkerDeployment: Create Worker&lt;/li&gt;
&lt;li&gt;ThroughputChecker: Continuously monitor whether the throughput meets the expected standards&lt;/li&gt;
&lt;li&gt;AwaitUtils: Confirm that the piled-up messages can be consumed within five minutes&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Task Orchestration&lt;/strong&gt;&lt;br&gt;
With a variety of implementations of AbstractTask, a wide range of testing scenarios is possible. Orchestrating different task stages and even distinct tasks is essential for the Controller to execute the aforementioned scenarios.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8e7brn4tphbk3q90ynhl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8e7brn4tphbk3q90ynhl.png" alt="Image description" width="764" height="1138"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Exploring additional methods in AbstractTask reveals its inheritance from the Runnable interface. By overriding the run method, it sequentially executes the lifecycle stages: prepare, warmup, workload, and cleanup, enabling the Task to be assigned to a thread for execution.&lt;br&gt;
Upon initialization, the Controller sets up a task loop, constructs the required Task objects based on user specifications, and activates them by invoking the start method to launch a new thread for each task. The Controller then employs the join method to await the completion of each Task's lifecycle before moving on to the next one. This cycle is repeated to maintain the stability of the system under test.&lt;br&gt;
In the event of unrecoverable errors (such as Spot instances being reclaimed) or when operational commands are manually executed to interrupt the task, the Controller calls the interrupt method on the current Task to halt the thread and stop the task. The task loop then handles resource recovery, proceeds with the next task, or pauses, awaiting further instructions based on the situation.&lt;br&gt;
&lt;strong&gt;Assertions, Observability, and Alerts&lt;br&gt;
Assertions&lt;/strong&gt;&lt;br&gt;
The framework categorizes assertions based on the type of metrics detected into the following groups:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Client-side assertions include Message continuity assertions and transaction isolation level assertions.&lt;/li&gt;
&lt;li&gt;Server-side state assertions encompass Traffic threshold assertions and load balancing assertions.&lt;/li&gt;
&lt;li&gt;Time-based Assertions: These include stack accumulation duration assertions, task timeout verifications, and more
If standard assertion rules are insufficient, the Checker interface can be implemented to tailor custom assertions as needed
&lt;strong&gt;Observability&lt;/strong&gt;
Building a robust system necessitates essential observability tools; without them, monitoring is reduced to passively observing alerts. The Marathon framework efficiently collects runtime data from Controllers and Workers, and it non-intrusively captures observability data from the tested systems. Utilizing Grafana's visualization tools, one can easily examine metrics, logs, profiling, and other observability data
&lt;strong&gt;Metrics&lt;/strong&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpt1icgqpsqfubnesncea.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpt1icgqpsqfubnesncea.png" alt="Image description" width="800" height="438"&gt;&lt;/a&gt;&lt;br&gt;
&lt;strong&gt;Log&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fljiz238w76fg38y0ba3d.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fljiz238w76fg38y0ba3d.png" alt="Image description" width="800" height="686"&gt;&lt;/a&gt;&lt;strong&gt;Profiling&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9ytb164pesdlu8hjgqcp.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9ytb164pesdlu8hjgqcp.png" alt="Image description" width="800" height="417"&gt;&lt;/a&gt;&lt;strong&gt;Alerts&lt;/strong&gt;&lt;br&gt;
In an event-driven architecture, unsatisfied assertions trigger specific events with varying severity levels. Alerts are issued for those events that require immediate attention from operational staff and are sent to the OnCall group for assessment. Combined with observability data, this approach enables quick and accurate issue identification, allows preemptive action by customers to address and mitigate potential risks, and facilitates ongoing performance optimization&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftqa5yng9w1n22z5q25jq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftqa5yng9w1n22z5q25jq.png" alt="Image description" width="800" height="454"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion and Future Outlook
&lt;/h2&gt;

&lt;p&gt;**Focus on spot instances, Kubernetes, and stateless applications&lt;br&gt;
**Reflecting on our three design principles—scalability, observability, and cost-efficiency—it is critical that the Marathon framework addresses operations right from the start:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;How can we build resilient loads for various task scenarios?&lt;/li&gt;
&lt;li&gt;Considering the different resource demands of these loads, is it possible for the underlying machine resources to dynamically scale accordingly?&lt;/li&gt;
&lt;li&gt;Costs are categorized into usage costs and operational costs.

&lt;ul&gt;
&lt;li&gt;In terms of usage costs, how can we quickly create and dismantle resources to reduce barriers for users?&lt;/li&gt;
&lt;li&gt;As for operational costs, how can we efficiently construct the required loads using the fewest resources possible?&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;Marathon leverages Spot instances, K8s, and stateless Workers to address the problem, each representing the infrastructure layer, operational management layer, and application layer respectively.&lt;br&gt;
Given the demand for both flexibility and cost-efficiency, Spot instances in the cloud are the obvious choice, priced at just 10% of what comparable Reserved instances cost. However, Spot instances introduce challenges, particularly the unpredictability of instance termination, which presents a significant architectural hurdle for applications. For Marathon, however, this is less of a concern as tasks can be rerun as needed.&lt;br&gt;
The most straightforward design strategy is essentially no design: Marathon focuses on scenario description and task orchestration, leaving the scheduling responsibilities to K8s. Marathon concentrates on determining the necessary workload size and the required number of cores per workload unit; the elasticity of the underlying resources is managed by K8s, starting with an initial application for a Spot instance node group and then focusing on the logic of the testing scenario.&lt;br&gt;
Nonetheless, the capability to utilize the benefits of Spot instances and K8s hinges on the application being stateless; otherwise, managing state persistence and reassignment becomes essential. This consideration is crucial in the design of the Worker module.&lt;br&gt;
&lt;strong&gt;Generalization of testing scenarios&lt;/strong&gt;&lt;br&gt;
Marathon exhibits excellent abstraction in many of its modules, including service discovery, task scheduling, and load generation, all of which are readily adaptable to other contexts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Service discovery: Currently based on APIs provided by the K8s API server, the data structure is abstracted into Node and Registration. Node represents the address and port of a Worker node, while Registration corresponds to the events of interest to each Worker. Thus, any shared storage capable of supporting these two data structures can act as a component for service functioning, whether it's MySQL or Redis.&lt;/li&gt;
&lt;li&gt;Task scheduling: Workers are currently packaged as Docker images and deployed via K8s Deployment. Alternatively, they could be packaged as AMIs for direct launch on EC2 via cloud interfaces, or deployed using tools such as Vagrant and Ansible.&lt;/li&gt;
&lt;li&gt;Load Generation: Currently, Marathon has incorporated a Kafka workload for each worker, which primarily involves deploying a specific number of Kafka clients to send and receive messages as dictated by the Controller's settings. Replacing Kafka clients with RocketMQ clients or HTTP clients can be accomplished with minimal effort.
Thanks to its robust abstraction features, Marathon's dependencies on external systems are modular and pluggable. Consequently, it functions not only as a continuous reliability testing platform for Kafka, but can also be seamlessly adapted to assess any distributed system, whether it operates in cloud-based or on-premises environments.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;[1] AutoMQ: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;&lt;br&gt;
[2] Spot Instance: &lt;a href="https://docs.aws.amazon.com/zh_cn/AWSEC2/latest/UserGuide/using-spot-instances.html****" rel="noopener noreferrer"&gt;https://docs.aws.amazon.com/zh_cn/AWSEC2/latest/UserGuide/using-spot-instances.html****&lt;/a&gt;&lt;/p&gt;

</description>
      <category>javascript</category>
      <category>kafka</category>
    </item>
    <item>
      <title>ZhongAn Insurance's Wang Kai Analyzes Kafka Network Communication</title>
      <dc:creator>AutoMQ</dc:creator>
      <pubDate>Fri, 07 Jun 2024 03:22:11 +0000</pubDate>
      <link>https://dev.to/automq/zhongan-insurances-wang-kai-analyzes-kafka-network-communication-1p6i</link>
      <guid>https://dev.to/automq/zhongan-insurances-wang-kai-analyzes-kafka-network-communication-1p6i</guid>
      <description>&lt;p&gt;Author: Kai Wang, Java Development Expert at ZhongAn Online Insurance Basic Platform&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Today, we explore the core workflow of network communication in Kafka, specifically focusing on Apache Kafka 3.7[2]. This discussion also includes insights into the increasingly popular AutoMQ, highlighting its network communication optimizations and enhancements derived from Kafka.&lt;/p&gt;

&lt;h2&gt;
  
  
  I. How to Construct a Basic Request and Handle Responses
&lt;/h2&gt;

&lt;p&gt;As a message queue, network communication essentially involves two key aspects:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Communication between message producers and the message queue server (in Kafka, this involves producers "pushing" messages to the queue)&lt;/li&gt;
&lt;li&gt;Communication between message consumers and the message queue server (in Kafka, this involves consumers "pulling" messages from the queue)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fy3zy26u15w7nxqsm9ikc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fy3zy26u15w7nxqsm9ikc.png" alt="Image description" width="800" height="396"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This diagram primarily illustrates the process from message dispatch to response reception.&lt;br&gt;
Client:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;KafkaProducer initializes the Sender thread&lt;/li&gt;
&lt;li&gt;The Sender thread retrieves batched data from the RecordAccumulator (for detailed client-side sending, see [&lt;a href="https://mp.weixin.qq.com/s/J2_O1l81duknfdFvHuBWxw%5D" rel="noopener noreferrer"&gt;https://mp.weixin.qq.com/s/J2_O1l81duknfdFvHuBWxw]&lt;/a&gt;)&lt;/li&gt;
&lt;li&gt;The Sender thread employs the NetworkClient to check the connection status and initiates a connection if necessary&lt;/li&gt;
&lt;li&gt;The Sender thread invokes the NetworkClient's doSend method to transmit data to the KafkaChannel&lt;/li&gt;
&lt;li&gt;The Sender thread utilizes the NetworkLink's poll method for actual data transmission
Server:&lt;/li&gt;
&lt;li&gt;KafkaServer initializes SocketServer, dataPlaneRequestProcessor (KafkaApis), and dataPlaneRequestHandlerPool&lt;/li&gt;
&lt;li&gt;SocketServer sets up the RequestChannel and dataPlaneAcceptor&lt;/li&gt;
&lt;li&gt;The dataPlaneAcceptor takes charge of acquiring connections and delegating tasks to the appropriate Processor&lt;/li&gt;
&lt;li&gt;The Processor thread pulls tasks from the newConnections queue for processing
Processor threads handle prepared IO events&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;configureNewConnections()&lt;/code&gt;: Establish new connections&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;processNewResponses()&lt;/code&gt;: Dispatch Response and enqueue it in the inflightResponses temporary queue&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;poll()&lt;/code&gt;: Execute NIO polling to retrieve ready I/O operations on the respective SocketChannel&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;processCompletedReceives()&lt;/code&gt;: Enqueue received Requests in the RequestChannel queue&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;processCompletedSends()&lt;/code&gt;: Implement callback logic for Responses in the temporary Response queue&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;processDisconnected()&lt;/code&gt;: Handle connections that have been disconnected due to send failures&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;closeExcessConnections()&lt;/code&gt;: Terminate connections that surpass quota limits&lt;/li&gt;
&lt;li&gt;The KafkaRequestHandler retrieves the ready events from the RequestChannel and assigns them to the appropriate KafkaApi for processing.&lt;/li&gt;
&lt;li&gt;After processing by the KafkaApi, the response is returned to the RequestChannel.&lt;/li&gt;
&lt;li&gt;The Processor thread then delivers the response to the client.
This completes a full cycle of message transmission in Kafka, encompassing both client and server processing steps.
##Ⅱ.Kafka Network Communication
&lt;strong&gt;1. Server-side Communication Thread Model&lt;/strong&gt;
Unlike RocketMQ, which relies on Netty for efficient network communication, Kafka uses Java NIO to implement a master-slave Reactor pattern for network communication (for further information, see [&lt;a href="https://jenkov.com/tutorials/java-nio/overview.html%5D" rel="noopener noreferrer"&gt;https://jenkov.com/tutorials/java-nio/overview.html]&lt;/a&gt;).&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm5h28w945lgjyb9s0awt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm5h28w945lgjyb9s0awt.png" alt="Image description" width="800" height="660"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Both DataPlanAcceptor and ControlPlanAcceptor are subclasses of Acceptor, a thread class that executes the Runnable interface. The primary function of an Acceptor is to listen for and receive requests between Clients and Brokers, as well as to set up transmission channels (SocketChannel). It employs a polling mechanism to delegate these to a Processor for processing. Additionally, a RequestChannel (ArrayBlockingQueue) is utilized to facilitate connections between Processors and Handlers. The MainReactor (Acceptor) solely manages the OP_ACCEPT event; once detected, it forwards the SocketChannel to the SubReactor (Processor). Each Processor operates with its own Selector, and the SubReactor listens to and processes other events, ultimately directing the actual requests to the KafkaRequestHandlerPool.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Initialization of the main components in the thread model&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkswzz8j0r8z82kxmirvx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkswzz8j0r8z82kxmirvx.png" alt="Image description" width="800" height="545"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The diagram illustrates that during the broker startup, the KafkaServer's startup method is invoked (assuming it operates in zookeeper mode)&lt;br&gt;
The startup method primarily establishes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;KafkaApis handlers: creating dataPlaneRequestProcessor and controlPlaneRequestByProcessor&lt;/li&gt;
&lt;li&gt;KafkaRequestHandlerPool: forming dataPlaneRequestHandlerPool and controlPlaneRequestHandlerPool&lt;/li&gt;
&lt;li&gt;Initialization of socketServer&lt;/li&gt;
&lt;li&gt;Establishment of controlPlaneAcceptorAndProcessor and dataPlaneAcceptorAndProcessor
Additionally, an important step not depicted in the diagram but included in the startup method is the thread startup: enableRequestProcessing is executed via the initialized socketServer.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;3.Addition and Removal of Processor&lt;/strong&gt;&lt;br&gt;
1.Addition&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Processor is added when the broker starts&lt;/li&gt;
&lt;li&gt;Actively adjust the number of num.network.threads processing threads
2.Startup&lt;/li&gt;
&lt;li&gt;Processor starts when the broker launches the acceptor&lt;/li&gt;
&lt;li&gt;Actively start the new processing threads that were not started during the adjustment
3.Remove from the queue and destroy&lt;/li&gt;
&lt;li&gt;broker shutdown&lt;/li&gt;
&lt;li&gt;Actively adjusting the num.network.threads to eliminate excess threads and close them&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5h1j66ysquy2d69onwod.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5h1j66ysquy2d69onwod.png" alt="Image description" width="800" height="632"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;4. KafkaRequestHandlePool and KafkaRequestHandler&lt;/strong&gt;&lt;br&gt;
  &lt;strong&gt;1.KafkaRequestHandlerPool&lt;/strong&gt;&lt;br&gt;
The primary location for processing Kafka requests, this is a request handling thread pool tasked with creating, maintaining, managing, and dismantling its associated request handling threads.&lt;br&gt;
  &lt;strong&gt;2.KafkaRequestHandler&lt;/strong&gt;&lt;br&gt;
The actual class for business request handling threads, where each request handling thread instance is tasked with retrieving request objects from the SocketServer's RequestChannel queue and processing them.&lt;br&gt;
Below is the method body processed by KafkaRequestHandler:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def run(): Unit = {
  threadRequestChannel.set(requestChannel)
  while (!stopped) {
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    val startSelectTime = time.nanoseconds
    // 从请求队列中获取下一个待处理的请求
    val req = requestChannel.receiveRequest(300)
    val endTime = time.nanoseconds
    val idleTime = endTime - startSelectTime
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

    req match {
      case RequestChannel.ShutdownRequest =&amp;gt;
        debug(s"Kafka request handler $id on broker $brokerId received shut down command")
        completeShutdown()
        return

      case callback: RequestChannel.CallbackRequest =&amp;gt;
        val originalRequest = callback.originalRequest
        try {

          // If we've already executed a callback for this request, reset the times and subtract the callback time from the 
          // new dequeue time. This will allow calculation of multiple callback times.
          // Otherwise, set dequeue time to now.
          if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
            val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
            originalRequest.callbackRequestCompleteTimeNanos = None
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos)
          } else {
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds())
          }

          threadCurrentRequest.set(originalRequest)
          callback.fun(requestLocal)
        } catch {
          case e: FatalExitError =&amp;gt;
            completeShutdown()
            Exit.exit(e.statusCode)
          case e: Throwable =&amp;gt; error("Exception when handling request", e)
        } finally {
          // When handling requests, we try to complete actions after, so we should try to do so here as well.
          apis.tryCompleteActions()
          if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
            originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds())
          threadCurrentRequest.remove()
        }
     // 普通情况由KafkaApis.handle方法执行相应处理逻辑
      case request: RequestChannel.Request =&amp;gt;
        try {
          request.requestDequeueTimeNanos = endTime
          trace(s"Kafka request handler $id on broker $brokerId handling request $request")
          threadCurrentRequest.set(request)
          apis.handle(request, requestLocal)
        } catch {
          case e: FatalExitError =&amp;gt;
            completeShutdown()
            Exit.exit(e.statusCode)
          case e: Throwable =&amp;gt; error("Exception when handling request", e)
        } finally {
          threadCurrentRequest.remove()
          request.releaseBuffer()
        }

      case RequestChannel.WakeupRequest =&amp;gt; 
        // We should handle this in receiveRequest by polling callbackQueue.
        warn("Received a wakeup request outside of typical usage.")

      case null =&amp;gt; // continue
    }
  }
  completeShutdown()
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, line 56 will reassign the task to KafkaApis's handle for processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Ⅲ.unified request handling dispatch
&lt;/h2&gt;

&lt;p&gt;The primary business processing class in Kafka is actually KafkaApis, which serves as the core of all communication and thread handling efforts.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  def handleError(e: Throwable): Unit = {
    error(s"Unexpected error handling request ${request.requestDesc(true)} " +
      s"with context ${request.context}", e)
    requestHelper.handleError(request, e)
  }

  try {
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")

    if (!apiVersionManager.isApiEnabled(request.header.apiKey, request.header.apiVersion)) {
      // The socket server will reject APIs which are not exposed in this scope and close the connection
      // before handing them to the request handler, so this path should not be exercised in practice
      throw new IllegalStateException(s"API ${request.header.apiKey} with version ${request.header.apiVersion} is not enabled")
    }

    request.header.apiKey match {
      case ApiKeys.PRODUCE =&amp;gt; handleProduceRequest(request, requestLocal)
      case ApiKeys.FETCH =&amp;gt; handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS =&amp;gt; handleListOffsetRequest(request)
      case ApiKeys.METADATA =&amp;gt; handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR =&amp;gt; handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA =&amp;gt; handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA =&amp;gt; handleUpdateMetadataRequest(request, requestLocal)
      case ApiKeys.CONTROLLED_SHUTDOWN =&amp;gt; handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT =&amp;gt; handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.OFFSET_FETCH =&amp;gt; handleOffsetFetchRequest(request).exceptionally(handleError)
      case ApiKeys.FIND_COORDINATOR =&amp;gt; handleFindCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP =&amp;gt; handleJoinGroupRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.HEARTBEAT =&amp;gt; handleHeartbeatRequest(request).exceptionally(handleError)
      case ApiKeys.LEAVE_GROUP =&amp;gt; handleLeaveGroupRequest(request).exceptionally(handleError)
      case ApiKeys.SYNC_GROUP =&amp;gt; handleSyncGroupRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_GROUPS =&amp;gt; handleDescribeGroupsRequest(request).exceptionally(handleError)
      case ApiKeys.LIST_GROUPS =&amp;gt; handleListGroupsRequest(request).exceptionally(handleError)
      case ApiKeys.SASL_HANDSHAKE =&amp;gt; handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS =&amp;gt; handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS =&amp;gt; maybeForwardToController(request, handleCreateTopicsRequest)
      case ApiKeys.DELETE_TOPICS =&amp;gt; maybeForwardToController(request, handleDeleteTopicsRequest)
      case ApiKeys.DELETE_RECORDS =&amp;gt; handleDeleteRecordsRequest(request)
      case ApiKeys.INIT_PRODUCER_ID =&amp;gt; handleInitProducerIdRequest(request, requestLocal)
      case ApiKeys.OFFSET_FOR_LEADER_EPOCH =&amp;gt; handleOffsetForLeaderEpochRequest(request)
      case ApiKeys.ADD_PARTITIONS_TO_TXN =&amp;gt; handleAddPartitionsToTxnRequest(request, requestLocal)
      case ApiKeys.ADD_OFFSETS_TO_TXN =&amp;gt; handleAddOffsetsToTxnRequest(request, requestLocal)
      case ApiKeys.END_TXN =&amp;gt; handleEndTxnRequest(request, requestLocal)
      case ApiKeys.WRITE_TXN_MARKERS =&amp;gt; handleWriteTxnMarkersRequest(request, requestLocal)
      case ApiKeys.TXN_OFFSET_COMMIT =&amp;gt; handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_ACLS =&amp;gt; handleDescribeAcls(request)
      case ApiKeys.CREATE_ACLS =&amp;gt; maybeForwardToController(request, handleCreateAcls)
      case ApiKeys.DELETE_ACLS =&amp;gt; maybeForwardToController(request, handleDeleteAcls)
      case ApiKeys.ALTER_CONFIGS =&amp;gt; handleAlterConfigsRequest(request)
      case ApiKeys.DESCRIBE_CONFIGS =&amp;gt; handleDescribeConfigsRequest(request)
      case ApiKeys.ALTER_REPLICA_LOG_DIRS =&amp;gt; handleAlterReplicaLogDirsRequest(request)
      case ApiKeys.DESCRIBE_LOG_DIRS =&amp;gt; handleDescribeLogDirsRequest(request)
      case ApiKeys.SASL_AUTHENTICATE =&amp;gt; handleSaslAuthenticateRequest(request)
      case ApiKeys.CREATE_PARTITIONS =&amp;gt; maybeForwardToController(request, handleCreatePartitionsRequest)
      // Create, renew and expire DelegationTokens must first validate that the connection
      // itself is not authenticated with a delegation token before maybeForwardToController.
      case ApiKeys.CREATE_DELEGATION_TOKEN =&amp;gt; handleCreateTokenRequest(request)
      case ApiKeys.RENEW_DELEGATION_TOKEN =&amp;gt; handleRenewTokenRequest(request)
      case ApiKeys.EXPIRE_DELEGATION_TOKEN =&amp;gt; handleExpireTokenRequest(request)
      case ApiKeys.DESCRIBE_DELEGATION_TOKEN =&amp;gt; handleDescribeTokensRequest(request)
      case ApiKeys.DELETE_GROUPS =&amp;gt; handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.ELECT_LEADERS =&amp;gt; maybeForwardToController(request, handleElectLeaders)
      case ApiKeys.INCREMENTAL_ALTER_CONFIGS =&amp;gt; handleIncrementalAlterConfigsRequest(request)
      case ApiKeys.ALTER_PARTITION_REASSIGNMENTS =&amp;gt; maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
      case ApiKeys.LIST_PARTITION_REASSIGNMENTS =&amp;gt; maybeForwardToController(request, handleListPartitionReassignmentsRequest)
      case ApiKeys.OFFSET_DELETE =&amp;gt; handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_CLIENT_QUOTAS =&amp;gt; handleDescribeClientQuotasRequest(request)
      case ApiKeys.ALTER_CLIENT_QUOTAS =&amp;gt; maybeForwardToController(request, handleAlterClientQuotasRequest)
      case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS =&amp;gt; handleDescribeUserScramCredentialsRequest(request)
      case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS =&amp;gt; maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
      case ApiKeys.ALTER_PARTITION =&amp;gt; handleAlterPartitionRequest(request)
      case ApiKeys.UPDATE_FEATURES =&amp;gt; maybeForwardToController(request, handleUpdateFeatures)
      case ApiKeys.ENVELOPE =&amp;gt; handleEnvelope(request, requestLocal)
      case ApiKeys.DESCRIBE_CLUSTER =&amp;gt; handleDescribeCluster(request)
      case ApiKeys.DESCRIBE_PRODUCERS =&amp;gt; handleDescribeProducersRequest(request)
      case ApiKeys.UNREGISTER_BROKER =&amp;gt; forwardToControllerOrFail(request)
      case ApiKeys.DESCRIBE_TRANSACTIONS =&amp;gt; handleDescribeTransactionsRequest(request)
      case ApiKeys.LIST_TRANSACTIONS =&amp;gt; handleListTransactionsRequest(request)
      case ApiKeys.ALLOCATE_PRODUCER_IDS =&amp;gt; handleAllocateProducerIdsRequest(request)
      case ApiKeys.DESCRIBE_QUORUM =&amp;gt; forwardToControllerOrFail(request)
      case ApiKeys.CONSUMER_GROUP_HEARTBEAT =&amp;gt; handleConsumerGroupHeartbeat(request).exceptionally(handleError)
      case ApiKeys.CONSUMER_GROUP_DESCRIBE =&amp;gt; handleConsumerGroupDescribe(request).exceptionally(handleError)
      case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS =&amp;gt; handleGetTelemetrySubscriptionsRequest(request)
      case ApiKeys.PUSH_TELEMETRY =&amp;gt; handlePushTelemetryRequest(request)
      case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =&amp;gt; handleListClientMetricsResources(request)
      case _ =&amp;gt; throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
    }
  } catch {
    case e: FatalExitError =&amp;gt; throw e
    case e: Throwable =&amp;gt; handleError(e)
  } finally {
    // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
    // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
    // expiration thread for certain delayed operations (e.g. DelayedJoin)
    // Delayed fetches are also completed by ReplicaFetcherThread.
    replicaManager.tryCompleteActions()
    // The local completion time may be set while processing the request. Only record it if it's unset.
    if (request.apiLocalCompleteTimeNanos &amp;lt; 0)
      request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;From the code discussed, key components are identifiable, such as the ReplicaManager, which manages replicas, the GroupCoordinator, which oversees consumer groups, the KafkaController, which operates the Controller components, and the most frequently used operations, KafkaProducer.send (to send messages) and KafkaConsumer.consume (to consume messages).&lt;/p&gt;

&lt;h2&gt;
  
  
  IV. AutoMQ Thread Model
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;1. Optimization of Processing Threads&lt;/strong&gt;&lt;br&gt;
AutoMQ, drawing inspiration from the CPU pipeline, refines Kafka's processing model into a pipeline mode, striking a balance between sequentiality and efficiency.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Sequentiality: Each TCP connection is tied to a single thread, with one network thread dedicated to request parsing and one RequestHandler thread responsible for processing the business logic;&lt;/li&gt;
&lt;li&gt;Efficiency: The stages are pipelined, allowing a network thread to parse MSG2 immediately after finishing MSG1, without waiting for MSG1’s persistence. Similarly, once the RequestHandler completes verification and sequencing of MSG1, it can start processing MSG2 right away. To further improve persistence efficiency, AutoMQ groups data into batches for disk storage.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;2. Optimization of the RequestChannel&lt;/strong&gt;&lt;br&gt;
AutoMQ has redesigned the RequestChannel into a multi-queue architecture, allowing requests from the same connection to be consistently directed to the same queue and handled by a specific KafkaRequestHandler, thus ensuring orderly processing during the verification and sequencing stages.&lt;br&gt;
Each queue is directly linked to a particular KafkaRequestHandler, maintaining a one-to-one relationship.&lt;br&gt;
After the Processor decodes the request, it assigns it to a specific queue based on the hash(channelId) % N formula.&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;[1] AutoMQ: &lt;a href="https://github.com/AutoMQ/automq" rel="noopener noreferrer"&gt;https://github.com/AutoMQ/automq&lt;/a&gt;&lt;br&gt;
[2] Kafka 3.7: &lt;a href="https://github.com/apache/kafka/releases/tag/3.7.0" rel="noopener noreferrer"&gt;https://github.com/apache/kafka/releases/tag/3.7.0&lt;/a&gt;&lt;br&gt;
[3] JAVANIO: &lt;a href="https://jenkov.com/tutorials/java-nio/overview.html" rel="noopener noreferrer"&gt;https://jenkov.com/tutorials/java-nio/overview.html&lt;/a&gt;&lt;br&gt;
[4] AutoMQ Thread Optimization: [&lt;a href="https://mp.weixin.qq.com/s/kDZJgUnMoc5K8jTuV08OJw" rel="noopener noreferrer"&gt;https://mp.weixin.qq.com/s/kDZJgUnMoc5K8jTuV08OJw&lt;/a&gt;]&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>javascript</category>
    </item>
  </channel>
</rss>
