<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: amaendeepm</title>
    <description>The latest articles on DEV Community by amaendeepm (@amaendeepm).</description>
    <link>https://dev.to/amaendeepm</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F904778%2F512cd676-602d-4f5a-b4c0-2dad09f9cebd.jpeg</url>
      <title>DEV Community: amaendeepm</title>
      <link>https://dev.to/amaendeepm</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/amaendeepm"/>
    <language>en</language>
    <item>
      <title>I Fell for `?`. Then I Couldn't Debug Anything.</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Fri, 13 Mar 2026 03:48:37 +0000</pubDate>
      <link>https://dev.to/amaendeepm/i-fell-for-then-i-couldnt-debug-anything-35jl</link>
      <guid>https://dev.to/amaendeepm/i-fell-for-then-i-couldnt-debug-anything-35jl</guid>
      <description>&lt;p&gt;The &lt;code&gt;?&lt;/code&gt; operator in Rust is genuinely beautiful. You go from a mess of &lt;code&gt;match&lt;/code&gt; arms to something that reads like a sentence:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;process_report&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;u8&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;AppError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;envelope&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;parse_envelope&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;member&lt;/span&gt;   &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;resolve_member&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;envelope&lt;/span&gt;&lt;span class="py"&gt;.member_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;positions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;extract_positions&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;envelope&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nf"&gt;persist_positions&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;member&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;positions&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Clean. Compositional. I used it everywhere.&lt;/p&gt;

&lt;p&gt;Then one day, records stopped being written to the database. No panic. No error log. No trace of anything going wrong.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The problem was obvious in hindsight.&lt;/strong&gt; Any one of those four &lt;code&gt;?&lt;/code&gt;s could have triggered an early return. The error was propagating up to a caller that wasn't logging it properly. I had no idea which step was the culprit without manually re-running the pipeline with test fixtures.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;?&lt;/code&gt; operator had silently swallowed my debugging surface.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The fix that worked:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;envelope&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;parse_envelope&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;.map_err&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;error!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;step&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"parse_envelope"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"failed"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;member&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;resolve_member&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;envelope&lt;/span&gt;&lt;span class="py"&gt;.member_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;.map_err&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;error!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;step&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"resolve_member"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"failed"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Every potential exit point now announces itself. The error still propagates, the caller still gets it, but the &lt;em&gt;point of propagation&lt;/em&gt; is now observable.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The lesson:&lt;/strong&gt; &lt;code&gt;?&lt;/code&gt; is ergonomics for writing code. Logs are ergonomics for operating it. In any pipeline where a silent early return would cause missing data or unnoticed failures, wrap every &lt;code&gt;?&lt;/code&gt; with a log. The verbosity is worth it.&lt;/p&gt;

</description>
      <category>rust</category>
      <category>debugging</category>
      <category>backend</category>
      <category>errors</category>
    </item>
    <item>
      <title>My Quest for Speed: How a Clickhouse Type Improvement Led Me Down a Caching Rabbit Hole in Rust</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Fri, 07 Nov 2025 01:44:35 +0000</pubDate>
      <link>https://dev.to/amaendeepm/my-quest-for-speed-how-a-clickhouse-type-improvement-led-me-down-a-caching-rabbit-hole-in-rust-4e1c</link>
      <guid>https://dev.to/amaendeepm/my-quest-for-speed-how-a-clickhouse-type-improvement-led-me-down-a-caching-rabbit-hole-in-rust-4e1c</guid>
      <description>&lt;p&gt;It all started with a wonderful quality-of-life improvement in the clickhouse crate. The recent 0.14.0 release introduced support for the &lt;strong&gt;RowBinaryWithNamesAndTypes&lt;/strong&gt; format. This was a game-changer!&lt;/p&gt;

&lt;p&gt;Gone were the days of fetching data as untyped strings and manually converting them into Rust types like Decimal. Now, I could get strongly-typed rows directly, making my code safer, cleaner, and more pleasant to write.&lt;/p&gt;

&lt;p&gt;But with great power comes great...greed.&lt;/p&gt;

&lt;p&gt;Seeing my data pipeline become so elegant lit a fire in me. My Axum web application was now efficiently talking to ClickHouse, but I thought, "What if we could be even faster? Why hit the clickhouse database for every single request for spot prices?" The answer seemed obvious: cache it.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Naive Beginning: Infinite Growth&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;My first instinct was to reach for the trusty &lt;em&gt;std::collections::HashMap&lt;/em&gt;. &lt;br&gt;
The plan was simple: store the fetched ClickHouse data in an in-memory cache.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use std::collections::HashMap;
use std::sync::RwLock;
use indexmap::IndexMap;

struct ApplicationContext {
    // Cache mapping symbols to their historical spot prices
    spot_price_cache: RwLock&amp;lt;HashMap&amp;lt;String, IndexMap&amp;lt;String, Decimal&amp;gt;&amp;gt;&amp;gt;,
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I used IndexMap specifically to preserve the timestamp-order of price data, which was crucial for time-series analysis. I wrapped it in an RwLock for thread-safety, and... immediately saw a problem. This cache had no memory. It would grow indefinitely, consuming more and more RAM as we tracked more symbols and time periods. A cache with no eviction policy is just a memory leak with extra steps.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Crossroads: lru vs. moka&lt;/strong&gt;&lt;br&gt;
It was time for a proper caching solution. A quick search led me to two popular contenders in the Rust ecosystem:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;lru&lt;/strong&gt;: A classic, bare-bones LRU (Least Recently Used) cache implementation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;moka&lt;/strong&gt;: A feature-rich caching library inspired by Caffeine from the Java world.&lt;/p&gt;

&lt;p&gt;I was confused, but after some evaluation, my argument for moka was compelling:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Simplicity&lt;/strong&gt;: moka's constructor-based configuration felt intuitive. I could set up a cache with a maximum capacity in one line.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Thread-Safety&lt;/strong&gt;: moka handles concurrency internally. No need to mess with Arc&amp;gt; semantics myself. This is a huge win for reducing boilerplate and potential deadlocks.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Observability&lt;/strong&gt;: This was the clincher. moka provides built-in metrics like entry_count, hit_count, and miss_count. Being able to see how my cache was performing was invaluable for debugging and tuning.&lt;/p&gt;

&lt;p&gt;The choice was clear. I was going with moka.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Implementation: A Seemingly Robust Solution&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I refactored my cache to use a moka::sync::Cache.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
use moka::sync::Cache;
use indexmap::IndexMap;

#[derive(Clone)]
pub struct ApplicationContext {
    pub clickhouse_client: clickhouse::Client,
    pub spot_price_cache: Cache&amp;lt;String, IndexMap&amp;lt;String, Decimal&amp;gt;&amp;gt;,
}

impl ApplicationContext {
    pub fn new() -&amp;gt; Self {
        Self {
            clickhouse_client: create_clickhouse_client(),
            spot_price_cache: Cache::builder()
                .max_capacity(1000) // Hold up to 1000 symbol histories
                .build(),
        }
    }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The integration was smooth. I would check spot_price_cache for a symbol; on a miss, I'd query ClickHouse (using the lovely new typed interface), populate the cache with the IndexMap of timestamp-price pairs, and return the data. The cache metrics showed a beautiful story: a high hit rate after the warm-up period. The cache was working! The data provider was robust, and the logic was sound.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Cold, Hard Truth: The Performance Letdown&lt;/strong&gt;&lt;br&gt;
But something was wrong. The overall response times for my spot price API endpoints hadn't improved. In fact, they had slowed down slightly.&lt;/p&gt;

&lt;p&gt;Confused, I broke out the profiler and did rust flamegraph. The culprit wasn't the cache logic itself, nor the ClickHouse queries. It was the cloning.&lt;/p&gt;

&lt;p&gt;Each time I served spot price data of a day from the cache, I was returning a clone of the entire IndexMap of intervals of that day. While IndexMap gives us ordering guarantees crucial for time-series data, it's generally less performant for cloning than a regular HashMap due to its additional internal structures for maintaining order. When you're dealing with hundreds of price points per day (96 entries in each IndexMap for 15-min intervals of the day), the allocation and data copying for every single request was creating significant overhead.&lt;/p&gt;

&lt;p&gt;The cost of cloning the IndexMap was rivaling—and in some cases exceeding—the cost of a quick network round-trip to ClickHouse. The cache was "winning" logically with its high hit count, but losing the performance battle.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Pragmatic Retreat: Feature-Flagging a Strategic Retreat&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I did optimize the cache first by using Arc&amp;gt; to avoid the deep clone, but no IndexMaps are built differently and communicated across processes&lt;/p&gt;

&lt;p&gt;Faced with this reality, I had to make a call. I could:&lt;/p&gt;

&lt;p&gt;Consider smaller granularity: Cache individual price points instead of entire histories&lt;/p&gt;

&lt;p&gt;Abandon the cache: Admit that for this specific use case, it wasn't the right tool&lt;/p&gt;

&lt;p&gt;For now, I chose option two. The simplicity of the direct ClickHouse call was hard to beat. To keep my new, clean code but disable the caching overhead, I did what any pragmatic rust engineer would do: I feature-flagged it.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lessons Learned&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Strong Typing is a Pure Win&lt;/strong&gt;: The clickhouse crate's new type support is fantastic and didn't cause my problem. It improved my code quality.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Caching is a Trade-off&lt;/strong&gt;: It's not free. The overhead of storing and retrieving data from the cache must be significantly less than the cost of fetching the data fresh.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Data Structure Choice Matters&lt;/strong&gt;: Using IndexMap over HashMap has real performance implications, especially when cloning entire structures. Preserving order has its cost!&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Measure, Don't Assume&lt;/strong&gt;: The cache metrics told me one story (high hits = good), but the overall application profiling told the real story (cloning overhead = bad). Always profile with real-world data sizes.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;moka is Excellent&lt;/strong&gt;: My initial assessment stands. For my next caching need where the value type is cheap to clone or can be wrapped in Arc, moka will be my first choice. Its API and observability features are top-notch.&lt;/p&gt;

&lt;p&gt;My quest for ultimate speed hit a snag, but it was a valuable learning experience. The journey to optimize is often iterative, and sometimes the bravest move is to know when to step back and try a different path.&lt;/p&gt;

</description>
      <category>database</category>
      <category>performance</category>
      <category>rust</category>
      <category>architecture</category>
    </item>
    <item>
      <title>Why Composability (and Event Streaming Ingestion) Is the Backbone of Financial Reporting Systems for Metered Energy Data</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Wed, 18 Jun 2025 19:55:16 +0000</pubDate>
      <link>https://dev.to/amaendeepm/why-composability-and-event-streaming-ingestion-is-the-backbone-of-financial-reporting-systems-1aoc</link>
      <guid>https://dev.to/amaendeepm/why-composability-and-event-streaming-ingestion-is-the-backbone-of-financial-reporting-systems-1aoc</guid>
      <description>&lt;p&gt;In the evolving landscape of energy markets, the reporting and financial settlement systems for metered data are expected to be agile, auditable, and automation-friendly. Yet too often, they’re rigid — siloed batch publishings of meter readings and there on stitched together with brittle ETL data pipelines&lt;/p&gt;

&lt;p&gt;While at first glance it might look that we need a better  or another analytics dashboard, what we actually need is a composable architecture, one that can ingest metered data as streams, apply domain-driven transformations, and support replayable, traceable pipelines that keep up with real-world complexity — including different resolution time intervals time series data, cancellations updates, other market updates, and shifting regulations plus time &amp;amp; volume dependent tariffs applications.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Streaming-First Mindset for Metered Data&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Financial settlement in energy isn't just about summing numbers. It’s about events — energy usage events, billing revisions, market conditions, regulatory pricing shifts of tariffs. A streaming-first architecture offers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Low-latency ingestion of time-series meter data and market signals&lt;/li&gt;
&lt;li&gt;Decoupled processing of diverse domains like credit notes, asset lifecycles, tradebooks, and exceptions&lt;/li&gt;
&lt;li&gt;Replayable and audit-compliant pipelines, essential for dispute resolution and regulation&lt;/li&gt;
&lt;li&gt;Composable microservices that act on streams, not snapshots&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Traditional batch ETLs often fail in this landscape due to poor observability, latency, and lack of idempotency. With Apache Kafka and similar platforms, we can treat each meter read, each trade, and each regulatory trigger as a first-class event.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Composability Is Not a Buzzword&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In my experience designing financial settlement systems, composability enables both performance and explainability!&lt;/p&gt;

&lt;p&gt;Imagine a flow like this:&lt;/p&gt;

&lt;p&gt;[Meter Reading Stream] → [Meter Mapper] → [Area Pricing Engine] → [Tradebook Join] → [Market Tariffs]→ [Invoice Generator]&lt;/p&gt;

&lt;p&gt;Each stage is an independent component consuming and emitting events, capable of being redeployed, updated, or audited in isolation.&lt;/p&gt;

&lt;p&gt;Add to this:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafka topics as &lt;strong&gt;source-of-truth logs&lt;/strong&gt; instead of intermediate tables&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Event-driven triggers&lt;/strong&gt; to handle late data, cancellations, or re-rating without overhauling pipelines&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stateful stream processors&lt;/strong&gt; (like Kafka Streams or Flink) that can persist intermediate state while remaining replayable&lt;/li&gt;
&lt;li&gt;The result is a system that’s &lt;strong&gt;not just resilient but living and evolvable&lt;/strong&gt; — a necessity in the post-renewables energy market where contracts, baselines, and obligations can all shift monthly.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Beyond ETL: Streaming as System Integration&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Many teams treat Kafka like a glorified message bus. But in composable financial reporting systems, &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It is the spine of integration. &lt;/li&gt;
&lt;li&gt;Every topic is a contract, while schema evolution tells a story. &lt;/li&gt;
&lt;li&gt;Every consumer lag is a performance metric.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In such systems:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Trade amendments can trigger partial invoice recalculations automatically.&lt;/li&gt;
&lt;li&gt;Historical reruns (e.g. regulation backdates) are handled by replaying topics into stateful consumers.&lt;/li&gt;
&lt;li&gt;Cross-domain data correlation happens in motion, not by nightly joins.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Why This Matters at Scale&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;When you’re dealing with:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Thousands of smart meters&lt;/li&gt;
&lt;li&gt;Dozens of energy trading partners&lt;/li&gt;
&lt;li&gt;Frequent retrospective adjustments, and&lt;/li&gt;
&lt;li&gt;Strict auditing and compliance requirements;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;composability powered by event streaming is not optional. It’s the only scalable way to &lt;strong&gt;keep reporting explainable, up-to-date, and testable.&lt;/strong&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Size Does Matter: Fixing Kafka BrokerTransportFailure in Redpanda</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Mon, 24 Mar 2025 15:29:58 +0000</pubDate>
      <link>https://dev.to/amaendeepm/size-does-matter-fixing-kafka-brokertransportfailure-in-redpanda-449b</link>
      <guid>https://dev.to/amaendeepm/size-does-matter-fixing-kafka-brokertransportfailure-in-redpanda-449b</guid>
      <description>&lt;p&gt;If you're using Redpanda (or Kafka) and hitting mysterious consumer errors like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Message consumption error: BrokerTransportFailure
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You might be dealing with a message size limit issue! I ran into this recently, and increasing the &lt;strong&gt;fetch.message.max.bytes&lt;/strong&gt; limit solved it instantly.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Problem&lt;/strong&gt;&lt;br&gt;
My Rust consumer, built with rdkafka, was randomly failing with BrokerTransportFailure. Debugging didn’t reveal much rather going back n forth as everything otherwise with broker connection, topic, network reachability was all good; but after experimenting with different configurations, I realized the issue was message size constraints as this particular topic hosted quite large size of messages being received&lt;/p&gt;

&lt;p&gt;Kafka/Redpanda has default limits on how much data can be fetched per request. If a message is too big, the broker refuses to send it, causing consumer failures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Fix&lt;/strong&gt;&lt;br&gt;
To resolve this, I had to increase the message size limits on both the the consumer. And that does not require any change in cluster or broker - but only in client code&lt;/p&gt;

&lt;p&gt;The key setting here is &lt;strong&gt;fetch.message.max.bytes&lt;/strong&gt;, which allows the consumer to fetch larger messages.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
pub struct RPMessageProcessor {
    consumer: StreamConsumer,
}

impl RPMessageProcessor {
    pub fn new(group_id: &amp;amp;str, topic_name: &amp;amp;str) -&amp;gt; Self {
        let broker = env::var("REDPANDA_BROKERS").unwrap_or_else(|_| "localhost:9092".to_string());

        let consumer: StreamConsumer = ClientConfig::new()
            .set("bootstrap.servers", broker)
            .set("group.id", group_id)
            .set("enable.auto.commit", "false")
            .set("auto.offset.reset", "earliest")
            .set("fetch.message.max.bytes", "262144000") // Increased to 250 MB
            .create()
            .expect("Consumer creation failed");

        tracing::debug!("Streaming Consumer Created");

        consumer
            .subscribe(&amp;amp;[topic_name])
            .expect("Subscription failed");

        tracing::debug!("Subscribed to {}", topic_name);

        Self { consumer }
    }

    pub async fn run(&amp;amp;self, message_type: &amp;amp;str) {
        loop {
            match self.consumer.recv().await {
                Ok(msg) =&amp;gt; {
                    let should_process = msg.headers().and_then(|headers| {
                        headers.iter().find(|header| {
                            header.key == "message-type"
                                &amp;amp;&amp;amp; header
                                    .value
                                    .map(|v| v == message_type.as_bytes())
                                    .unwrap_or(false)
                        })
                    }).is_some();

                    if should_process {
                        if let Err(err) = Self::process_message(&amp;amp;msg).await {
                            tracing::error!("Error processing message: {}", err);
                        }
                    }

                    // Commit offset
                    if let Err(e) = self.consumer.commit_message(&amp;amp;msg, CommitMode::Sync) {
                        tracing::error!("Failed to commit offset: {}", e);
                    }
                }
                Err(err) =&amp;gt; tracing::error!("Kafka error: {}", err),
            }

            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }

    async fn process_message(msg: &amp;amp;BorrowedMessage&amp;lt;'_&amp;gt;) -&amp;gt; Result&amp;lt;()&amp;gt; {
        if let Some(payload) = msg.payload_view::&amp;lt;str&amp;gt;().transpose()? {
            tracing::info!("Processing message: {}", payload);
        } else {
            tracing::error!("Received empty or invalid message");
        }
        Ok(())
    }
}


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;How it is invoked?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;RPMessageProcessor::new("CRE-RP-CG", &amp;amp;CONFIG.edx.input_topic)&lt;br&gt;
    .run("CRE_HEADER_VALUE")&lt;br&gt;
    .await;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lesson Learned: Size DOES Matter!&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you’re running into Kafka or Redpanda consumer errors, check your message size limits! The broker, producer, and consumer need to be in sync to avoid failures&lt;/p&gt;

</description>
    </item>
    <item>
      <title>How Flamegraph Helped Me Optimize a Rust Application for Intensive Data Transformation and Migration</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Fri, 31 Jan 2025 14:41:29 +0000</pubDate>
      <link>https://dev.to/amaendeepm/how-flamegraph-helped-me-optimize-a-rust-application-for-intensive-data-transformation-and-migration-3p0e</link>
      <guid>https://dev.to/amaendeepm/how-flamegraph-helped-me-optimize-a-rust-application-for-intensive-data-transformation-and-migration-3p0e</guid>
      <description>&lt;p&gt;Rust is having its own reputation for performance and speed. Therefore when working on performance-critical Rust applications, especially those dealing with intensive data transformation and migration, pushing for faster performance by identifying bottlenecks can be challenging. I faced this exact challenge with a Rust application responsible for processing and migrating large datasets. The initial implementation was slow, and I needed a way to pinpoint the performance issues. Flamegraph—a visualization tool that helped me optimize my code effectively though. And here's how I used Flamegraph to transform my application's performance&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Problem: Slow Data Migration&lt;/strong&gt;&lt;br&gt;
My Rust application was designed to migrate data from one system to another, involving complex transformations and large datasets. Despite Rust's reputation for performance, the process was taking hours to complete. I suspected inefficiencies in the code, but without concrete data, optimizing it felt like guesswork.&lt;/p&gt;

&lt;p&gt;That's when I decided to use Flamegraph to visualize where the CPU cycles were being spent. Flamegraph provided a clear, hierarchical view of my application's execution, making it easy to identify hotspots.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;What is Flamegraph?&lt;/strong&gt;&lt;br&gt;
Flamegraph is a profiling tool that generates an interactive SVG graph representing your application's call stack. The width of each stack frame corresponds to the amount of time spent in that function, allowing you to quickly identify performance bottlenecks.&lt;/p&gt;

&lt;p&gt;For Rust, tools like cargo-flamegraph make it easy to generate flamegraphs. It integrates seamlessly with Cargo and provides a straightforward way to profile your application.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Setting Up Flamegraph for Rust&lt;/strong&gt;&lt;br&gt;
Here’s how I set up Flamegraph for my Rust application:&lt;/p&gt;

&lt;p&gt;Install cargo-flamegraph:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cargo install flamegraph
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Profile Your Application:&lt;br&gt;
Run your application with cargo flamegraph. This generates a flamegraph SVG file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cargo flamegraph --bin my_app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Analyze the Flamegraph:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Open the generated SVG file in your browser. The flamegraph shows a hierarchical view of your program's execution, with the most time-consuming functions at the top.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;What the Flamegraph Revealed&lt;/strong&gt;&lt;br&gt;
When I first generated the flamegraph for my application, the results were dazzling to the eye, and my underlying curosity as well. Here's what I discovered:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Inefficient Database Queries:&lt;/strong&gt;&lt;br&gt;
The flamegraph showed that a significant amount of time was spent executing database queries. This was due to fetching large datasets in a single query without proper batching.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Expensive Data Transformation:&lt;/strong&gt;&lt;br&gt;
A hotspot was identified in the transmute_response function, which was responsible for transforming data. The function was processing data sequentially, leading to unnecessary delays.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Unnecessary Cloning:&lt;/strong&gt;&lt;br&gt;
The flamegraph highlighted that a lot of time was spent cloning data structures, particularly in the persist_identifiable_message function. This was due to passing owned data instead of references.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Optimizing the Code&lt;/strong&gt;&lt;br&gt;
With this information, I made the following changes to my application:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Batched Database Queries:&lt;/strong&gt;&lt;br&gt;
I refactored the database queries to fetch data in smaller batches. This reduced memory usage and improved query performance.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Parallel Data Transformation:&lt;/strong&gt;&lt;br&gt;
I parallelized the transmute_response function using futures::future::join_all to process multiple records concurrently. This significantly reduced the time spent on data transformation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Reduced Cloning:&lt;/strong&gt;&lt;br&gt;
I refactored the code to avoid unnecessary cloning by using references wherever possible. This reduced memory allocations and improved overall performance.&lt;/p&gt;

&lt;p&gt;Here’s a snippet of the optimized code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;let batch_size = 5;
for chunk in records.chunks(batch_size) {
    let futures = chunk.iter().map(|rsm34| {
        let transmuted_message = transmute_response(rsm34.complete_payload.clone().into()).unwrap();
        persist_identifiable_message(
            db_conn_pool.clone(),
            rsm34.peek_trace_id.clone().into(),
            transmuted_message.clone(),
        )
    });

    let results = join_all(futures).await;
    for result in results {
        if result.is_err() {
            panic!("Error {:?}", result.err());
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;The Results&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;After applying these optimizations, I regenerated the flamegraph to verify the improvements. The new flamegraph showed a much more balanced distribution of CPU time, with the previously identified hotspots no longer dominating the execution.&lt;/p&gt;

&lt;p&gt;The most satisfying part? The data migration process, which previously took days, now seems to get complete much sooner. The application is not only faster but also more efficient in terms of memory usage.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lessons Learned&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Profiling is Essential:&lt;/strong&gt;&lt;br&gt;
Without profiling tools like Flamegraph, optimizing performance is a guessing game. Profiling gives you concrete data to work with.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Rust's Performance is Only as Good as Your Code:&lt;/strong&gt;&lt;br&gt;
While Rust is a high-performance language, it's still possible to write inefficient code. Tools like Flamegraph help you stay on track.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Optimization is Iterative:&lt;/strong&gt;&lt;br&gt;
Profiling and optimization are iterative processes. Each round of profiling can reveal new bottlenecks to address.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Give Flamegraph a Try!&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you're working on performance-critical Rust applications, I highly recommend giving Flamegraph a try. It's easy to set up, and the insights it provides are invaluable. Whether you're dealing with data transformation, migration, or any other CPU-intensive task, Flamegraph can help you identify and eliminate bottlenecks.&lt;/p&gt;

</description>
      <category>rust</category>
      <category>flamegraph</category>
      <category>collaborativeleadership</category>
      <category>datastreaming</category>
    </item>
    <item>
      <title>Axum in Rust: Flexibility, CORS Control, and Tower Power</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Mon, 02 Dec 2024 15:48:16 +0000</pubDate>
      <link>https://dev.to/amaendeepm/axum-in-rus-flexibility-cors-control-and-tower-power-4ich</link>
      <guid>https://dev.to/amaendeepm/axum-in-rus-flexibility-cors-control-and-tower-power-4ich</guid>
      <description>&lt;p&gt;I started my Rust journey in August 2022, and it’s been an eye-opening experience. Rust is a language designed for performance and safety, but its web development ecosystem was still growing when I first encountered it. After experimenting with different frameworks and approaches, &lt;strong&gt;Axum&lt;/strong&gt; stood out to me as a modern, intuitive solution. &lt;/p&gt;

&lt;p&gt;With Axum, I’ve found building APIs not only productive but also enjoyable. Whether it’s implementing selective CORS controls, tracing requests with ease, or setting up API rate limits, the combination of Axum and &lt;strong&gt;Tower&lt;/strong&gt; middleware just clicks.&lt;/p&gt;

&lt;h2&gt;
  
  
  CORS Control: Fine-Tuning Access
&lt;/h2&gt;

&lt;p&gt;One of the first issues I had to tackle was implementing flexible CORS policies. I had an API with some public endpoints but also internal routes that needed restricted access. In many frameworks, managing this kind of fine-grained control can get messy. But Axum made this simple.&lt;/p&gt;

&lt;p&gt;Using &lt;strong&gt;tower_http::cors&lt;/strong&gt;, I could apply a global CORS policy while exempting specific routes, such as the &lt;code&gt;/public&lt;/code&gt; route being open to all and &lt;code&gt;/internal&lt;/code&gt; requiring strict access. Here’s a quick snippet of how I handled it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use axum::{Router, routing::get};
use tower_http::cors::{CorsLayer, Any};
use tower::ServiceBuilder;

let cors_layer = CorsLayer::new()
    .allow_origin(Any)  // Open access to selected route
    .allow_methods(vec!["GET", "POST"]);

let app = Router::new()
    .route("/public", get(public_handler))
    .route("/internal", get(internal_handler))
    .layer(ServiceBuilder::new().layer(cors_layer));

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Axum allowed me to keep my security intact without complicating the setup, which I hadn’t found in other frameworks with this level of ease.&lt;/p&gt;

&lt;h2&gt;
  
  
  Tower Middleware: Tracing and Logging Done Right
&lt;/h2&gt;

&lt;p&gt;I quickly became a fan of &lt;strong&gt;Tower&lt;/strong&gt;, which integrates effortlessly with Axum. Setting up logging, request tracing, and metrics collection is as simple as adding layers to the router.&lt;/p&gt;

&lt;p&gt;With &lt;strong&gt;tower_http::trace&lt;/strong&gt;, I could capture detailed logs of requests and responses, all in a non-intrusive way:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use tower_http::trace::TraceLayer;
use tracing::Level;

let app = Router::new()
    .route("/api", get(api_handler))
    .layer(TraceLayer::new_for_http().with_level(Level::INFO));  
// Enable logging
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In a few lines of code, I had a fully instrumented API that logged every interaction. It made debugging and performance optimization much easier, and it felt like Axum was purpose-built for this kind of composability.&lt;/p&gt;

&lt;h3&gt;
  
  
  Rate Limiting and Protecting APIs
&lt;/h3&gt;

&lt;p&gt;Rate limiting is another essential feature for any API-driven project. With &lt;strong&gt;tower&lt;/strong&gt;’s middleware, I could easily cap the number of concurrent requests using the &lt;code&gt;ConcurrencyLimitLayer&lt;/code&gt;. Again, the integration with Axum was smooth:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use tower::limit::ConcurrencyLimitLayer;

let app = Router::new()
    .route("/api/limited", get(limited_handler))
    .layer(ConcurrencyLimitLayer::new(100));  // Limit to 100

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This setup ensures that my server remains responsive under load, which is crucial when handling high-traffic APIs.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why Axum Outshines Raw Tokio and Actix
&lt;/h3&gt;

&lt;p&gt;During my journey, I spent some time exploring both &lt;strong&gt;raw Tokio&lt;/strong&gt; and &lt;strong&gt;Actix&lt;/strong&gt; as alternatives. While both are powerful in their own right, I found Axum to be a step ahead in terms of &lt;strong&gt;developer experience&lt;/strong&gt; and &lt;strong&gt;intuitiveness&lt;/strong&gt;.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Axum vs Raw Tokio&lt;/strong&gt;: Raw &lt;strong&gt;Tokio&lt;/strong&gt; is fantastic when you need granular control over asynchronous operations, and its event-driven model is second to none. But when it comes to building web APIs, managing everything at the Tokio level can quickly become complex and verbose. Axum, built on top of Tokio, abstracts much of the boilerplate while maintaining full async capabilities. It’s like having the power of Tokio without the steep learning curve. You can focus on building your application rather than handling the low-level details. With Axum, you don’t need to manage HTTP requests manually or write tons of code to set up routing, middleware, or async logic. Axum does all of this in a much cleaner, modular way, making it the next-generation framework over raw Tokio for web applications.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Axum vs Actix&lt;/strong&gt;: &lt;strong&gt;Actix Web&lt;/strong&gt; was one of the first Rust web frameworks to gain serious traction, and it’s known for its high performance. However, I found Actix’s actor-based model to be more complex than necessary for most web use cases. While Actix shines in certain scenarios, the actor pattern can be an overkill for basic API services. Axum, in contrast, uses a &lt;strong&gt;more familiar and intuitive routing model&lt;/strong&gt;. Its design follows the simplicity of frameworks like Express in JavaScript or Flask in Python, but with all the benefits of Rust’s type safety and performance. And yet, it doesn’t compromise on advanced features like middleware stacking, error handling, or async support. Plus, Axum’s reliance on Tower middleware gives you access to a wide range of ready-made tools (like CORS handling, rate limiting, and tracing) that are easy to plug into your API.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Tokio and Tower at its Core&lt;/strong&gt;: Axum is also deeply integrated with &lt;strong&gt;Tokio&lt;/strong&gt; and &lt;strong&gt;Tower&lt;/strong&gt;, which are two of the most battle-tested and high-performance libraries in the Rust ecosystem. This means that Axum is not only fast but also scalable and production-ready. Whether it’s handling high concurrency with Tokio’s async runtime or layering on functionality with Tower middleware, Axum is designed to scale without getting in your way.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Why I’m Loving Axum
&lt;/h3&gt;

&lt;p&gt;Axum has made my Rust web development journey far smoother than I anticipated. The fact that it allows me to easily control CORS per route, integrate logging and tracing, and enforce API rate limits—without over-complicating things—proves it’s a framework designed with developers in mind.&lt;/p&gt;

&lt;p&gt;Here’s what I love most:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Elegant design&lt;/strong&gt;: Routing, middleware, and error handling all feel natural. You don’t need to fight the framework to build complex APIs.&lt;br&gt;
&lt;strong&gt;Composable and modular&lt;/strong&gt;: Tower middleware makes it easy to layer in functionality like logging, rate limiting, and CORS control without cluttering your application logic.&lt;br&gt;
&lt;strong&gt;Power of async&lt;/strong&gt;: Axum’s seamless integration with Tokio gives me full control over asynchronous operations without adding unnecessary complexity.&lt;/p&gt;

&lt;p&gt;Anyone looking to build web APIs in Rust, Axum can be easily your first choice. It offers the right balance of simplicity, power, and flexibility, making it a worthy successor to both raw Tokio setups and older frameworks like Actix. Whether building a small personal project or scaling up an enterprise service, Axum has the tools and developer experience worth exploring.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Financial Calculations in Rust: Building a Reference Portfolio App from Firm Trades to Customer Earnings</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Wed, 06 Nov 2024 09:40:19 +0000</pubDate>
      <link>https://dev.to/amaendeepm/financial-calculations-in-rust-building-a-reference-portfolio-app-from-firm-trades-to-customer-earnings-5gl2</link>
      <guid>https://dev.to/amaendeepm/financial-calculations-in-rust-building-a-reference-portfolio-app-from-firm-trades-to-customer-earnings-5gl2</guid>
      <description>&lt;p&gt;When precision matters, especially in finance, the smallest decimal places can have a big impact on the bottom line. In this post, I will show a portfolio app in Rust that starts with reconciling the firm’s income from multiple sources, then allocates earnings to customers based on their portfolios, and finally calculates payouts or reinvestments based on their preferences. All while ensuring that our sums add up, from the firm level down to each individual customer.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Use Case&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Our app models a portfolio that deals with income from multiple sources. The steps are:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Reconcile the firm’s income from various sources&lt;/strong&gt; (trade returns, dividends, etc.) and validate that the firm has received the expected amounts.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Calculate the firm’s overall earnings&lt;/strong&gt; after reconciliation.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Allocate earnings to each customer&lt;/strong&gt; based on their share of the firm’s portfolio.**&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Handle payouts or reinvestments&lt;/strong&gt; for each customer, applying specific rounding rules.**&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Validate that the sum of all customer payouts matches the reconciled firm earnings&lt;/strong&gt;, ensuring accuracy across our calculation chain.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;By using Rust’s &lt;code&gt;Decimal&lt;/code&gt; type and PostgreSQL’s &lt;code&gt;NUMERIC&lt;/code&gt;, it can maintain high precision and consistency for our data.&lt;/p&gt;




&lt;h2&gt;
  
  
  Step 1: Reconciling the Firm's Income from Multiple Sources
&lt;/h2&gt;

&lt;p&gt;Before calculating the firm’s total earnings, first we need to reconcile income from different sources. Let’s assume the firm has several streams of income, such as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Market Trades&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Dividends&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Bond Interest Payments&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Each income source is expected to contribute a certain amount, and we want to validate that we’ve received what was anticipated.&lt;/p&gt;

&lt;h3&gt;
  
  
  Firm Income Struct and Reconciliation
&lt;/h3&gt;

&lt;p&gt;To model this, let’s define a &lt;code&gt;FirmIncome&lt;/code&gt; struct that records both expected and received income for each source. Then, we’ll sum these up and check if they match.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rust_decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;collections&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;HashMap&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;serde&lt;/span&gt;&lt;span class="p"&gt;::{&lt;/span&gt;&lt;span class="n"&gt;Serialize&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Deserialize&lt;/span&gt;&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="nd"&gt;#[derive(Serialize,&lt;/span&gt; &lt;span class="nd"&gt;Deserialize,&lt;/span&gt; &lt;span class="nd"&gt;Clone,&lt;/span&gt; &lt;span class="nd"&gt;Debug)]&lt;/span&gt;
&lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;FirmIncome&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;expected&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;received&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;reconcile_income&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;incomes&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;FirmIncome&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;total_expected&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;incomes&lt;/span&gt;&lt;span class="nf"&gt;.iter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="py"&gt;.expected&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.sum&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;total_received&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;incomes&lt;/span&gt;&lt;span class="nf"&gt;.iter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="py"&gt;.received&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.sum&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;total_expected&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;total_received&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;total_received&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;format!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Income mismatch! Expected: {}, Received: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;total_expected&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;total_received&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Example Reconciliation
&lt;/h3&gt;

&lt;p&gt;Here’s how we’d create and reconcile incomes from various sources:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;incomes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;vec!&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
    &lt;span class="n"&gt;FirmIncome&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Market Trades"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;expected&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;200_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;received&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;200_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;FirmIncome&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Dividends"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;expected&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;received&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;FirmIncome&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Bond Interest"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;expected&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;received&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;];&lt;/span&gt;

&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;reconciled_income&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;reconcile_income&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;incomes&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.expect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Income reconciliation failed"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Reconciled Income: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;reconciled_income&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;// Reconciled Income: 270,000.00&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we know the total amount received (&lt;code&gt;270,000.00&lt;/code&gt;) and can proceed with further calculations, confident that our firm’s income is accurately accounted for.&lt;/p&gt;




&lt;h2&gt;
  
  
  Step 2: Calculating the Firm’s Total Earnings Post-Reconciliation
&lt;/h2&gt;

&lt;p&gt;With the reconciled income, we can calculate the firm’s total earnings using a market rate (or another performance metric). The firm’s earnings are calculated based on its market performance multiplied by the total assets.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;
&lt;span class="nd"&gt;#[derive(Serialize,&lt;/span&gt; &lt;span class="nd"&gt;Deserialize,&lt;/span&gt; &lt;span class="nd"&gt;Clone,&lt;/span&gt; &lt;span class="nd"&gt;Debug)]&lt;/span&gt;
&lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;Firm&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;total_assets&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;market_rate&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;reconciled_income&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;calculate_firm_earnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;firm&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;Firm&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;firm&lt;/span&gt;&lt;span class="py"&gt;.reconciled_income&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;firm&lt;/span&gt;&lt;span class="py"&gt;.total_assets&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;firm&lt;/span&gt;&lt;span class="py"&gt;.market_rate&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Example Usage
&lt;/h3&gt;

&lt;p&gt;Assume the firm has &lt;code&gt;10,000,000.00&lt;/code&gt; in total assets with a market rate of &lt;code&gt;2.5%&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;firm&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Firm&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;total_assets&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10_000_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;market_rate&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;reconciled_income&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;firm_earnings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;calculate_firm_earnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;firm&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Firm Earnings after reconciliation: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;firm_earnings&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;// Firm Earnings after reconciliation: 520,000.00&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This calculation gives us the final amount for distribution across the customer portfolios.&lt;/p&gt;




&lt;h2&gt;
  
  
  Step 3: Allocating Earnings to Customer Portfolios
&lt;/h2&gt;

&lt;p&gt;With the firm’s earnings calculated, let’s allocate these earnings to each customer based on their portfolio balance.&lt;/p&gt;

&lt;h3&gt;
  
  
  Customer Struct and Allocation Logic
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="nd"&gt;#[derive(Serialize,&lt;/span&gt; &lt;span class="nd"&gt;Deserialize,&lt;/span&gt; &lt;span class="nd"&gt;Clone,&lt;/span&gt; &lt;span class="nd"&gt;Debug)]&lt;/span&gt;
&lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;Customer&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;i32&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;balance&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;earnings&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;reinvest&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;bool&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;allocate_customer_earnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;firm_earnings&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;customer_balance&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;total_assets&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;firm_earnings&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;customer_balance&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;total_assets&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Example Allocation
&lt;/h3&gt;

&lt;p&gt;If a customer has &lt;code&gt;100,000.00&lt;/code&gt; in their balance, the allocated earnings are calculated as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;customer_balance&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;total_assets&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;firm&lt;/span&gt;&lt;span class="py"&gt;.total_assets&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;customer_earnings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;allocate_customer_earnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;firm_earnings&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;customer_balance&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;total_assets&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Customer Earnings: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;customer_earnings&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;// Customer Earnings: 5,200.00&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Step 4: Processing Payouts and Reinvestments
&lt;/h2&gt;

&lt;p&gt;Depending on each customer’s preference, we either round the earnings to two decimal places for a payout or add them to their balance for reinvestment.&lt;/p&gt;

&lt;h3&gt;
  
  
  Handling Payouts
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;payout&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;Customer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;payout_amount&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.earnings&lt;/span&gt;&lt;span class="nf"&gt;.round_dp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.balance&lt;/span&gt; &lt;span class="o"&gt;-=&lt;/span&gt; &lt;span class="n"&gt;payout_amount&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.earnings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;ZERO&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="n"&gt;payout_amount&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Handling Reinvestment
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;reinvest_earnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;Customer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.balance&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.earnings&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.earnings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;ZERO&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Example Usage
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Customer&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Alice"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="n"&gt;balance&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100_000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;earnings&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;customer_earnings&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;reinvest&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.reinvest&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nf"&gt;reinvest_earnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;payout_amount&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;payout&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Payout Amount: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payout_amount&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;// Payout Amount: 5,200.00&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Updated Balance: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="py"&gt;.balance&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Step 5: Ensuring Bottom-Up Validation of Total Payouts
&lt;/h2&gt;

&lt;p&gt;For consistency, we need to check that the total payouts match the firm’s reconciled earnings.&lt;/p&gt;

&lt;h3&gt;
  
  
  Validation Function
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;validate_total_payouts&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;customers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Customer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;firm_earnings&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;bool&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;total_payouts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;customers&lt;/span&gt;&lt;span class="nf"&gt;.iter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="py"&gt;.earnings&lt;/span&gt;&lt;span class="nf"&gt;.round_dp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="nf"&gt;.sum&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;total_payouts&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;firm_earnings&lt;/span&gt;&lt;span class="nf"&gt;.round_dp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This validation adds an integrity check, ensuring that the sum of individual payouts aligns with the firm’s overall earnings, preventing discrepancies.&lt;/p&gt;




&lt;h2&gt;
  
  
  Precision Preservation with PostgreSQL’s NUMERIC
&lt;/h2&gt;

&lt;p&gt;Using &lt;code&gt;Decimal&lt;/code&gt; alongside PostgreSQL’s &lt;code&gt;NUMERIC&lt;/code&gt; type keeps our data precise, ensuring our calculations maintain integrity across application and database layers.&lt;/p&gt;

&lt;h3&gt;
  
  
  PostgreSQL Schema Example
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;firm&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;SERIAL&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;total_assets&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;market_rate&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;reconciled_income&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;customers&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;SERIAL&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;balance&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;earnings&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;reinvest&lt;/span&gt; &lt;span class="nb"&gt;BOOLEAN&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Final Thoughts
&lt;/h2&gt;

&lt;p&gt;This Rust reference application demonstrates a robust, precision-oriented approach to managing financial calculations. By starting with income reconciliation and layering on accurate payout allocation and validation, we are required to maintain precise accounting of every decimal, aligning the payouts with the firm’s total earnings. &lt;/p&gt;

&lt;p&gt;Rust’s &lt;code&gt;Decimal&lt;/code&gt; and PostgreSQL’s &lt;code&gt;NUMERIC&lt;/code&gt; together make a winning combination, ideal for any business application where precision is not just preferred but mandatory.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Kafka with Rust using rdkafka</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Mon, 21 Oct 2024 14:23:54 +0000</pubDate>
      <link>https://dev.to/amaendeepm/kafka-with-rust-using-rdkafka-m0h</link>
      <guid>https://dev.to/amaendeepm/kafka-with-rust-using-rdkafka-m0h</guid>
      <description>&lt;p&gt;This Rust binding for the librdkafka C library allows you to harness Kafka's powerful messaging capabilities while benefiting from Rust's safety and performance.&lt;/p&gt;

&lt;p&gt;First, you'll want to add the rdkafka dependency to your Cargo.toml file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "naive-runtime", "tracing", "tokio","zstd"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You’ll also need to have librdkafka installed on your system. If you're on macOS, you can easily install it using Homebrew:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;brew install librdkafka
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For Ubuntu or other Debian-based systems, use:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt-get install librdkafka-dev
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;*&lt;em&gt;Setting Up the Kafka Client&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Here’s a sample implementation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use rdkafka::{
    message::OwnedHeaders,
    producer::{BaseRecord, Producer, ProducerContext, ThreadedProducer},
    ClientConfig, ClientContext, Message,
};
use std::{env, time::Duration};

/// Kafka Client
pub struct Client {}

impl Client {
    /// Publish on the Kafka client
    pub async fn publish(
        headers: OwnedHeaders,
        message: &amp;amp;str,
        key: &amp;amp;str,
        topic: &amp;amp;str,
    ) -&amp;gt; anyhow::Result&amp;lt;()&amp;gt; {
        tracing::debug!(event="publish-start", key=?key, topic=?topic);

        let brokers = env::var("KAFKA_BROKERS").unwrap_or_else(|_| "localhost:9092".into());

        let producer: ThreadedProducer&amp;lt;ProducerCallback&amp;gt; = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("acks", "all")
            .set("enable.idempotence", "true")
            .set("compression.type", "zstd")  // Enable zstd compression
            .set("queue.buffering.max.ms", "0")
            .set("retries", "10")
            .set("request.timeout.ms", "10000")
            .create_with_context(ProducerCallback {})
            .expect("unable to create kafka producer");

        // Send the message
        let _ = producer.send(
            BaseRecord::to(topic)
                .payload(message)
                .key(key)
                .headers(headers),
        );

        // Flush the producer to ensure the message is sent
        let _ = producer.flush(Duration::from_secs(5));

        Ok(())
    }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Implementing Retry Logic&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Network issues or broker unavailability can cause message publishing to fail. To handle this, we can add a function for publishing messages with an exponential backoff strategy:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/// Publish on Kafka with an exponential backoff
pub(crate) async fn publish_with_retry(
    headers: OwnedHeaders,
    message: &amp;amp;str,
    key: &amp;amp;str,
    topic: &amp;amp;str,
    interval: u64,
) -&amp;gt; anyhow::Result&amp;lt;()&amp;gt; {
    const MAX_ATTEMPTS: u64 = 10;
    let mut publish_attempts = 1;

    while let Err(err) = Client::publish(headers.clone(), message, key, topic).await {
        tracing::error!(event="publish", msg="failed", err=?err);

        if publish_attempts &amp;gt; MAX_ATTEMPTS {
            tracing::error!(event="publish", msg="exceeded retries", err=?err);
            return Err(KafkaError::RetryExceeded.into());
        } else {
            publish_attempts += 1;
            tracing::error!(event="publish", msg="retrying", err=?err, retry_count=publish_attempts);

            // Note: exponential backoff happens here
            tokio::time::sleep(Duration::from_secs(interval + (2 * publish_attempts))).await;
        }
    }

    Ok(())
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This function retries sending the message, waiting longer between attempts, and gives up after a set number of retries.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Error Handling&lt;/strong&gt;&lt;br&gt;
For better error management during publishing, defining a custom error type can be useful:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/// Kafka Error
#[derive(Debug, thiserror::Error)]
pub enum KafkaError {
    #[error("Retry exceeded on trying to publish message")]
    RetryExceeded,
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Producer Callback for Delivery Reports&lt;/strong&gt;&lt;br&gt;
To handle delivery reports for messages sent, you can implement a ProducerCallback struct. This allows you to manage responses to successful deliveries and failures:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/// Producer Callback for Kafka
struct ProducerCallback {}

impl ClientContext for ProducerCallback {}

impl ProducerContext for ProducerCallback {
    type DeliveryOpaque = ();

    /// Handle delivery report from the producer
    fn delivery(
        &amp;amp;self,
        delivery_result: &amp;amp;rdkafka::producer::DeliveryResult&amp;lt;'_&amp;gt;,
        _delivery_opaque: Self::DeliveryOpaque,
    ) {
        let dr = delivery_result.as_ref();

        match dr {
            Ok(msg) =&amp;gt; {
                let key: &amp;amp;str = msg.key_view().expect("unable to get key view").unwrap();

                tracing::debug!(
                    event = "publish",
                    key = key,
                    offset = msg.offset(),
                    partition = msg.partition()
                );
            }
            Err(err) =&amp;gt; {
                tracing::warn!(event="publish", state="fail",
                    content=?err);
            }
        };
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This callback provides insights into the delivery status of your messages, allowing you to react appropriately to successes and failures.&lt;/p&gt;

&lt;p&gt;With this setup, you can effectively publish messages to Kafka using Rust’s rdkafka library while utilizing zstd compression.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>API Development in Rust: CORS, Tower Middleware, and the Power of Axum</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Mon, 21 Oct 2024 13:40:54 +0000</pubDate>
      <link>https://dev.to/amaendeepm/api-development-in-rust-cors-tower-middleware-and-the-power-of-axum-397k</link>
      <guid>https://dev.to/amaendeepm/api-development-in-rust-cors-tower-middleware-and-the-power-of-axum-397k</guid>
      <description>&lt;p&gt;I started my Rust journey in August 2022, and it’s been an eye-opening experience. Rust is a language designed for performance and safety, but its web development ecosystem was still growing when I first encountered it. After experimenting with different frameworks and approaches, &lt;strong&gt;Axum&lt;/strong&gt; stood out to me as a modern, intuitive solution. &lt;/p&gt;

&lt;p&gt;With Axum, I’ve found building APIs not only productive but also enjoyable. Whether it’s implementing selective CORS controls, tracing requests with ease, or setting up API rate limits, the combination of Axum and &lt;strong&gt;Tower&lt;/strong&gt; middleware just clicks.&lt;/p&gt;

&lt;h2&gt;
  
  
  CORS Control: Fine-Tuning Access
&lt;/h2&gt;

&lt;p&gt;One of the first issues I had to tackle was implementing flexible CORS policies. I had an API with some public endpoints but also internal routes that needed restricted access. In many frameworks, managing this kind of fine-grained control can get messy. But Axum made this simple.&lt;/p&gt;

&lt;p&gt;Using &lt;strong&gt;tower_http::cors&lt;/strong&gt;, I could apply a global CORS policy while exempting specific routes, such as the &lt;code&gt;/public&lt;/code&gt; route being open to all and &lt;code&gt;/internal&lt;/code&gt; requiring strict access. Here’s a quick snippet of how I handled it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use axum::{Router, routing::get};
use tower_http::cors::{CorsLayer, Any};
use tower::ServiceBuilder;

let cors_layer = CorsLayer::new()
    .allow_origin(Any)  // Open access to selected route
    .allow_methods(vec!["GET", "POST"]);

let app = Router::new()
    .route("/public", get(public_handler))
    .route("/internal", get(internal_handler))
    .layer(ServiceBuilder::new().layer(cors_layer));

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Axum allowed me to keep my security intact without complicating the setup, which I hadn’t found in other frameworks with this level of ease.&lt;/p&gt;

&lt;h2&gt;
  
  
  Tower Middleware: Tracing and Logging Done Right
&lt;/h2&gt;

&lt;p&gt;I quickly became a fan of &lt;strong&gt;Tower&lt;/strong&gt;, which integrates effortlessly with Axum. Setting up logging, request tracing, and metrics collection is as simple as adding layers to the router.&lt;/p&gt;

&lt;p&gt;With &lt;strong&gt;tower_http::trace&lt;/strong&gt;, I could capture detailed logs of requests and responses, all in a non-intrusive way:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use tower_http::trace::TraceLayer;
use tracing::Level;

let app = Router::new()
    .route("/api", get(api_handler))
    .layer(TraceLayer::new_for_http().with_level(Level::INFO));  
// Enable logging
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In a few lines of code, I had a fully instrumented API that logged every interaction. It made debugging and performance optimization much easier, and it felt like Axum was purpose-built for this kind of composability.&lt;/p&gt;

&lt;h3&gt;
  
  
  Rate Limiting and Protecting APIs
&lt;/h3&gt;

&lt;p&gt;Rate limiting is another essential feature for any API-driven project. With &lt;strong&gt;tower&lt;/strong&gt;’s middleware, I could easily cap the number of concurrent requests using the &lt;code&gt;ConcurrencyLimitLayer&lt;/code&gt;. Again, the integration with Axum was smooth:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;use tower::limit::ConcurrencyLimitLayer;

let app = Router::new()
    .route("/api/limited", get(limited_handler))
    .layer(ConcurrencyLimitLayer::new(100));  // Limit to 100

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This setup ensures that my server remains responsive under load, which is crucial when handling high-traffic APIs.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why Axum Outshines Raw Tokio and Actix
&lt;/h3&gt;

&lt;p&gt;During my journey, I spent some time exploring both &lt;strong&gt;raw Tokio&lt;/strong&gt; and &lt;strong&gt;Actix&lt;/strong&gt; as alternatives. While both are powerful in their own right, I found Axum to be a step ahead in terms of &lt;strong&gt;developer experience&lt;/strong&gt; and &lt;strong&gt;intuitiveness&lt;/strong&gt;.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Axum vs Raw Tokio&lt;/strong&gt;: Raw &lt;strong&gt;Tokio&lt;/strong&gt; is fantastic when you need granular control over asynchronous operations, and its event-driven model is second to none. But when it comes to building web APIs, managing everything at the Tokio level can quickly become complex and verbose. Axum, built on top of Tokio, abstracts much of the boilerplate while maintaining full async capabilities. It’s like having the power of Tokio without the steep learning curve. You can focus on building your application rather than handling the low-level details. With Axum, you don’t need to manage HTTP requests manually or write tons of code to set up routing, middleware, or async logic. Axum does all of this in a much cleaner, modular way, making it the next-generation framework over raw Tokio for web applications.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Axum vs Actix&lt;/strong&gt;: &lt;strong&gt;Actix Web&lt;/strong&gt; was one of the first Rust web frameworks to gain serious traction, and it’s known for its high performance. However, I found Actix’s actor-based model to be more complex than necessary for most web use cases. While Actix shines in certain scenarios, the actor pattern can be an overkill for basic API services. Axum, in contrast, uses a &lt;strong&gt;more familiar and intuitive routing model&lt;/strong&gt;. Its design follows the simplicity of frameworks like Express in JavaScript or Flask in Python, but with all the benefits of Rust’s type safety and performance. And yet, it doesn’t compromise on advanced features like middleware stacking, error handling, or async support. Plus, Axum’s reliance on Tower middleware gives you access to a wide range of ready-made tools (like CORS handling, rate limiting, and tracing) that are easy to plug into your API.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Tokio and Tower at its Core&lt;/strong&gt;: Axum is also deeply integrated with &lt;strong&gt;Tokio&lt;/strong&gt; and &lt;strong&gt;Tower&lt;/strong&gt;, which are two of the most battle-tested and high-performance libraries in the Rust ecosystem. This means that Axum is not only fast but also scalable and production-ready. Whether it’s handling high concurrency with Tokio’s async runtime or layering on functionality with Tower middleware, Axum is designed to scale without getting in your way.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Why I’m Loving Axum
&lt;/h3&gt;

&lt;p&gt;Axum has made my Rust web development journey far smoother than I anticipated. The fact that it allows me to easily control CORS per route, integrate logging and tracing, and enforce API rate limits—without over-complicating things—proves it’s a framework designed with developers in mind.&lt;/p&gt;

&lt;p&gt;Here’s what I love most:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Elegant design&lt;/strong&gt;: Routing, middleware, and error handling all feel natural. You don’t need to fight the framework to build complex APIs.&lt;br&gt;
&lt;strong&gt;Composable and modular&lt;/strong&gt;: Tower middleware makes it easy to layer in functionality like logging, rate limiting, and CORS control without cluttering your application logic.&lt;br&gt;
&lt;strong&gt;Power of async&lt;/strong&gt;: Axum’s seamless integration with Tokio gives me full control over asynchronous operations without adding unnecessary complexity.&lt;/p&gt;

&lt;p&gt;Anyone looking to build web APIs in Rust, Axum can be easily your first choice. It offers the right balance of simplicity, power, and flexibility, making it a worthy successor to both raw Tokio setups and older frameworks like Actix. Whether building a small personal project or scaling up an enterprise service, Axum has the tools and developer experience worth exploring.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Building a Robust Peek-Dequeue Engine in Rust for a General Message Portal</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Mon, 14 Oct 2024 13:44:57 +0000</pubDate>
      <link>https://dev.to/amaendeepm/building-a-robust-peek-dequeue-engine-in-rust-for-a-general-message-portal-3k2o</link>
      <guid>https://dev.to/amaendeepm/building-a-robust-peek-dequeue-engine-in-rust-for-a-general-message-portal-3k2o</guid>
      <description>&lt;p&gt;In modern distributed systems, reliable message handling is crucial. Rust, with its emphasis on safety and concurrency, is an excellent choice for building robust systems. Here is an example of Peek-Dequeue engine implemented in Rust that interacts with a general message portal. Further there is explaining each component to help understand.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Introduction&lt;/strong&gt;&lt;br&gt;
Message portals are essential components in distributed architectures, acting as intermediaries for message exchange between different services. Building a reliable engine to interact with such portals requires careful handling of network communication, error management, and asynchronous operations.&lt;/p&gt;

&lt;p&gt;This articulation of code below presents a Rust implementation of a Peek-Dequeue engine that utilizes some of best possible practices in asynchronous programming, error handling, and most importantly system resilience.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Overview of the Peek-Dequeue Engine&lt;/strong&gt;&lt;br&gt;
The engine's primary responsibilities are:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Peeking Messages: Regularly checking the message portal for new messages.&lt;/li&gt;
&lt;li&gt;Handling Responses: Parsing and validating the responses, including error handling.&lt;/li&gt;
&lt;li&gt;Processing Messages: Transmuting and persisting valid messages.&lt;/li&gt;
&lt;li&gt;Dequeueing Messages: Removing processed messages from the portal.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here's the high-level structure of the MessagePortalConnection:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pub struct MessagePortalConnection {
    interval: u64,
}

impl Default for MessagePortalConnection {
    fn default() -&amp;gt; Self {
        Self { interval: 1 }
    }
}

impl MessagePortalConnection {
    pub async fn run(&amp;amp;self) {
        // Implementation details...
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Setting Up the Connection&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The MessagePortalConnection struct holds the configuration for the connection, such as the peeking interval.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pub struct MessagePortalConnection {
    interval: u64,
}

impl Default for MessagePortalConnection {
    fn default() -&amp;gt; Self {
        Self { interval: 60 } // Default peeking every minute
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Using the Default trait allows for easy instantiation with default settings while providing flexibility for customization.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Main Run Loop&lt;/strong&gt;&lt;br&gt;
The run method contains the main loop that continuously peeks for new messages.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;impl MessagePortalConnection {
    pub async fn run(&amp;amp;self) {
        let portal_endpoint: Arc&amp;lt;str&amp;gt; = env::var("PORTAL_URL")
            .unwrap_or_else(|_| "https://portal.example.com/".to_owned())
            .into();

        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(self.interval));

        loop {
            tokio::select! {
                _ = interval.tick() =&amp;gt; {
                    // Peeking logic...
                }
            }
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Interval Setup&lt;/strong&gt;: Uses tokio::time::interval for periodic execution without blocking the runtime.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Asynchronous Loop&lt;/strong&gt;: The tokio::select! macro allows the loop to wait on multiple asynchronous operations.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Peeking Messages&lt;/strong&gt;:&lt;br&gt;
Within the loop, the engine attempts to peek messages from the portal.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;let peek_trace_id = Ulid::from_datetime(SystemTime::now()).to_string(); // Unique ID for tracing

tracing::debug!(event="peek-portal", trace=peek_trace_id);

let peek_result = retry_with_backoff(
    || async {
        peek_from_portal(portal_endpoint.clone().into()).await.map(|arc_str| arc_str.to_string())
    },
    5,  // Max retries
    2,  // Initial delay (seconds)
    1.5 // Backoff factor
).await;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Unique Trace ID&lt;/strong&gt;: Generates a ULID based on the current time for traceability.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Retry Logic&lt;/strong&gt;: Utilizes a custom retry_with_backoff function to handle transient failures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Handling Responses&lt;/strong&gt;&lt;br&gt;
After attempting to peek, the engine processes the response.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;if peek_result.is_err() {
    tracing::error!(event="portal_peek_result_error", trace=peek_trace_id, err=?peek_result.err());
    continue;
}


let peek_response = peek_result.unwrap();
let doc_result = Document::parse(&amp;amp;peek_response);

if doc_result.is_err() {
    tracing::error!(event="peeked-invalid-response", trace=peek_trace_id, err=?doc_result.err(), msg="peek-response-error");
    continue;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Error Handling:&lt;/strong&gt; Checks for errors at each step and logs them appropriately.&lt;br&gt;
&lt;strong&gt;XML Parsing&lt;/strong&gt;: Uses roxmltree::Document::parse to parse the XML response.&lt;br&gt;
Processing Valid Messages&lt;br&gt;
Once a valid XML response is confirmed, the engine processes the message.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;let root_node = doc_result.unwrap();
let fault_string = root_node
    .descendants()
    .find(|node| node.has_tag_name("faultstring"))
    .map(|node| node.text())
    .unwrap_or_default();

if fault_string.is_some() {
    tracing::error!(event="peeked-fault-string", msg="peek-SOAP-fault", trace=peek_trace_id, response=?peek_response);
    continue;
}

// Determine the document type
let doc_type_opt = find_document_type_from_peek_response((&amp;amp;peek_response.clone()).to_string());

if doc_type_opt.is_none() {
    tracing::warn!(event="peek", msg="nothing-to-peek", trace=peek_trace_id);
    continue;
}

let doc_type = doc_type_opt.unwrap();
tracing::info!(event="peek", msg="returned document", doc_type=?doc_type, trace=peek_trace_id);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;SOAP Fault Handling&lt;/strong&gt;: Checks for fault messages and handles them gracefully.&lt;br&gt;
&lt;strong&gt;Document Type Identification&lt;/strong&gt;: Determines the type of the received document for appropriate processing.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Dequeueing Messages&lt;/strong&gt;&lt;br&gt;
After processing, the engine attempts to dequeue the message from the portal.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;let msg_ref = extract_message_reference(peek_response.clone().into());

if msg_ref.is_none() {
    tracing::error!(event="peek", msg="xml has no message reference included", trace=peek_trace_id);
    continue;
}

let msg_ref_id = msg_ref.unwrap();
tracing::info!(event="peek", msg="extracted message reference", reference_id=?msg_ref_id, trace=peek_trace_id);

let incoming_raw_xml = peek_response.to_string().clone();

let incoming_payload = extract_payload_json(incoming_raw_xml.clone().into());

let handling_status = handle_incoming(
    peek_trace_id.clone().into(),
    incoming_payload.clone().unwrap(),
    incoming_raw_xml.clone().into(),
).await;

if !handling_status {
    tracing::error!(event="peek", msg="response not handled internally; not dequeue or proceed", trace=peek_trace_id);
    continue;
}

if incoming_payload.is_some() {
    let payload = incoming_payload.unwrap();
    let transmuted_message = transmute_response(incoming_raw_xml.clone().into()).unwrap();
    let persist_status = persist_identifiable_message(peek_trace_id.clone().into(), payload, transmuted_message.clone()).await;
    if persist_status.is_err() {
        tracing::error!(event="Persist_Transmuted_Message", msg="Could not persist Transmuted JSON Message", error=?persist_status.err(), trace=peek_trace_id);
    } else {
        tracing::debug!(event="Persist_Transmuted_Message", msg="Successfully Persisted Transmuted JSON Message", trace=peek_trace_id);
    }
}

let msg_ref_str = msg_ref_id.to_string();
tracing::info!(event="dequeue", msg="trying", msg_id=msg_ref_str, trace=peek_trace_id);

loop {
    let dequeue_result = retry_with_backoff(
        || async {
            request_message_dequeue_portal(msg_ref_str.clone().into())
                .await
                .map(|arc_str| arc_str.to_string())
        },
        10, // Max retries
        2,  // Initial delay (seconds)
        1.5 // Backoff factor
    ).await;

    if dequeue_result.is_err() {
        tracing::error!(event="portal_dequeue", msg="failed; retrying", trace=peek_trace_id);
        notify_error(format!("Dequeue Attempts exceeded retries for Message {msg_ref_id} :: Peek is Stuck, PeekTraceID {peek_trace_id}").into());
        break;
    } else {
        tracing::info!(event="dequeue", msg="ok", msg_id=msg_ref_str, trace=peek_trace_id);
        break;
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Message Reference Extraction&lt;/strong&gt;: &lt;br&gt;
Retrieves a unique identifier for the message.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Handling Status Check&lt;/strong&gt;: Ensures the message was handled correctly before proceeding.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Persistence&lt;/strong&gt;: Attempts to persist the processed message, logging any errors.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Dequeue Loop&lt;/strong&gt;: Uses a loop with retry logic to ensure the message is dequeued, preventing duplicate processing.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Retry Logic with Exponential Backoff&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The retry_with_backoff function implements retry logic with exponential backoff, which is crucial for handling transient errors without overwhelming the system or the portal.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;async fn retry_with_backoff&amp;lt;F, Fut, T, E&amp;gt;(operation: F, max_retries: u32, delay_secs: u64, backoff_factor: f64) -&amp;gt; Result&amp;lt;T, E&amp;gt;
where
    F: Fn() -&amp;gt; Fut,
    Fut: std::future::Future&amp;lt;Output = Result&amp;lt;T, E&amp;gt;&amp;gt;,
{
    let mut retries = 0;
    let mut delay = Duration::from_secs(delay_secs);

    loop {
        let result = operation().await;
        if result.is_ok() || retries &amp;gt;= max_retries {
            return result;
        }
        tokio::time::sleep(delay).await;
        retries += 1;
        delay = Duration::from_secs_f64(delay.as_secs_f64() * backoff_factor);
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Generic Function:&lt;/strong&gt; Works with any asynchronous operation that returns a Result.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Exponential Backoff:&lt;/strong&gt; Increases the delay between retries exponentially, reducing load during failures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Summarizing&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This Peek-Dequeue engine uses Rust's capabilities in building reliable, high-performance system. &lt;br&gt;
The code follows basically:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Asynchronous Efficiency&lt;/strong&gt;: Leveraging Tokio for non-blocking operations.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Robust Error Handling&lt;/strong&gt;: Comprehensive checks and logging at each stage.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Resilience&lt;/strong&gt;: Retry mechanisms with exponential backoff to handle transient failures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Maintainability&lt;/strong&gt;: Modular design with clear separation of concerns.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Rust Errors: anyhow Revelation</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Mon, 07 Oct 2024 09:16:30 +0000</pubDate>
      <link>https://dev.to/amaendeepm/rust-errors-anyhow-revelation-2a33</link>
      <guid>https://dev.to/amaendeepm/rust-errors-anyhow-revelation-2a33</guid>
      <description>&lt;p&gt;How I stopped writing Rust like it was Golang and finally embraced &lt;code&gt;anyhow::Error&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;When I first started with Rust, my error handling looked something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;do_stuff&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;step1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;step1&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;step1&lt;/span&gt;&lt;span class="nf"&gt;.is_error&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Step 1 failed"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;step2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;step2&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;step1&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;step2&lt;/span&gt;&lt;span class="nf"&gt;.is_error&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Step 2 failed"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"All good!"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Or maybe when I learned Rust further my return type was still &lt;code&gt;Result&amp;lt;Arc&amp;lt;str&amp;gt;, Arc&amp;lt;str&amp;gt;&amp;gt;&lt;/code&gt; , but still following same style of error checking and handling!&lt;/p&gt;

&lt;p&gt;Yikes, right? It looked like I was writing Go, but with extra steps. Having an error as a type provided thru &lt;code&gt;anyhow::Error&lt;/code&gt;, and code conditions and sizing came way down smaller and so much more readable:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;anyhow&lt;/span&gt;&lt;span class="p"&gt;::{&lt;/span&gt;&lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;do_stuff&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;step1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;step1&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Step 1 failed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;step2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;step2&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;step1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Step 2 failed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"All good!"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So much cleaner! No more explicit checks, and I can add context where it matters. The best part? I can still use &lt;code&gt;Result&amp;lt;String, String&amp;gt;&lt;/code&gt; for those rare cases where I need super explicit errors.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;critical_stuff&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Detailed custom errors here&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let's talk pros and cons. On the professional side, anyhow has some significant benefits:&lt;/p&gt;

&lt;p&gt;It reduces boilerplate, making code more readable and maintainable.&lt;br&gt;
The Context trait allows for rich error messages without custom error types.&lt;br&gt;
It's great for rapid prototyping and applications where detailed error handling isn't critical.&lt;/p&gt;

&lt;p&gt;However, it's not all roses. There are some potential drawbacks:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Loss of static type checking for errors. You can't exhaustively match on error types, which can be a problem in libraries or safety-critical code.&lt;/li&gt;
&lt;li&gt;Performance overhead. While minimal, anyhow::Error does introduce some runtime cost compared to statically typed errors.&lt;/li&gt;
&lt;li&gt;It can make it easier to be less thoughtful about error design. In complex systems, well-designed error types can be invaluable.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Overall, in my experience, the benefits outweigh the drawbacks, especially in application code. For libraries or performance-critical systems, you might want to stick with custom error types.&lt;/p&gt;

&lt;p&gt;If you're still writing Rust errors like it's Go, give anyhow::Error a shot&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Kafka Data Retention: Symphony of retention.ms &amp; segment.ms</title>
      <dc:creator>amaendeepm</dc:creator>
      <pubDate>Tue, 26 Dec 2023 22:10:09 +0000</pubDate>
      <link>https://dev.to/amaendeepm/kafka-data-retention-symphony-of-retentionms-segmentms-5d42</link>
      <guid>https://dev.to/amaendeepm/kafka-data-retention-symphony-of-retentionms-segmentms-5d42</guid>
      <description>&lt;p&gt;While grasping to see very old data still on a Kafka topic recently led me to a profound understanding of the often underestimated collaboration between &lt;strong&gt;retention.ms&lt;/strong&gt; and &lt;strong&gt;segment.ms&lt;/strong&gt;. Far from a conventional time-to-live concept, these two configurations work in tandem, significantly impacting Kafka's data retention behaviour.&lt;/p&gt;

&lt;p&gt;Going by the documentation around Kafka data orchestration, retention.ms serves as "the" configuration, meticulously shaping data retention. Yet, the true revelation lies in the intricate dance between retention.ms and its counterpart, segment.ms.&lt;/p&gt;

&lt;p&gt;Working in tandem, these configurations play a vital role in shaping Kafka's data retention with precision. Basically, each topic partition divides its log into smaller, manageable chunks known as segments. The size of these segments, determining when the log rolls, is controlled by &lt;strong&gt;segment.bytes&lt;/strong&gt; (sized based) and &lt;strong&gt;segment.ms&lt;/strong&gt; (time-based). This dance becomes especially critical in scenarios where slow-paced topics might lead to extended waits for segment.bytes to accumulate sufficient data.&lt;/p&gt;

&lt;p&gt;The narrative extends beyond a single Kafka topic for those venturing into tiered storage. The orchestrated dance expands into the cloud, where the influence of &lt;strong&gt;retention.bytes&lt;/strong&gt; and &lt;strong&gt;retention.ms&lt;/strong&gt; orchestrates the symphony of data management. Simultaneously, their local counterpart configrations viz. &lt;strong&gt;retention.local.target.bytes&lt;/strong&gt; and &lt;strong&gt;retention.local.target.ms&lt;/strong&gt; thus take center stage, managing the tempo on the local storage platform.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Example:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Lets maybe take a festive example featuring the topic "ChristmasGreetings." This topic is a lively hub of holiday messages from various channels, partitioned for efficiency, and set to unfold the duality of retention.ms and segment.ms:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;em&gt;Number of Partitions &amp;amp; Segment Rollover Config:&lt;/em&gt;&lt;/strong&gt;&lt;br&gt;
The "ChristmasGreetings" topic boasts 4 partitions, each diligently managing its own log segments.&lt;/p&gt;

&lt;p&gt;The size of these segments, governed by segment.bytes, is set to 1 MB, ensuring manageable chunks of holiday cheer.&lt;/p&gt;

&lt;p&gt;segment.ms is choreographed to 15 minutes, allowing segments to gracefully age and rollover.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;em&gt;Entering retention.ms - Longer segment.ms than retention.ms:&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In this scenario, let's consider a longer segment.ms of 30 minutes, while retention.ms remains at 12 hours.&lt;/p&gt;

&lt;p&gt;Log segments gracefully rollover every 30 minutes due to segment.ms, but messages are eligible for deletion only after 12 hours, as dictated by retention.ms.&lt;/p&gt;

&lt;p&gt;This can lead to some segments persisting longer than needed, potentially accumulating more holiday messages than necessary. The temporal control enforced by segment.ms ensures a graceful rollover but doesn't necessarily align with the overall retention policy.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;&lt;strong&gt;Shorter segment.ms than retention.ms:&lt;/strong&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Now, let's flip the scenario, setting segment.ms to 5 minutes and retaining messages for 12 hours.&lt;/p&gt;

&lt;p&gt;Log segments rollover every 5 minutes due to segment.ms, but messages are eligible for deletion only after 12 hours according to retention.ms.&lt;/p&gt;

&lt;p&gt;This configuration ensures more frequent rollovers, potentially creating a higher number of smaller log segments. While it aligns with the retention policy, it might introduce additional overhead in terms of segment management.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;em&gt;Balancing Act: Equal segment.ms and retention.ms:&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;For an optimal configuration, segment.ms is set at 15 minutes, closely aligned with retention.ms of 12 hours.&lt;/p&gt;

&lt;p&gt;Log segments gracefully rollover in sync with the overall retention policy, ensuring efficient data management without unnecessary accumulation or overhead.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Conclusion:&lt;/strong&gt;&lt;br&gt;
In this holiday message capturing performance, segment.ms and retention.ms dance in different tempos, showcasing the nuances of Kafka's orchestration within the festive realm of "ChristmasGreetings." For anyone aiming to optimize and fortify their data streaming platforms, understanding the dynamic interplay between retention.ms and segment.ms becomes an essential therefore. Something to keep in mind always!&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
