DEV Community

Mahmoud Zalt
Mahmoud Zalt

Posted on • Originally published at zalt.me on

Kafka’s Broker As A Traffic Cop

We’re examining how Apache Kafka’s broker manages every protocol request that hits it. Kafka is a distributed event streaming platform, and on each broker the core traffic cop is the KafkaApis class: more than 2,000 lines of Scala that decide how to handle every request. In practice, this file is Kafka’s front controller.

When this front controller is disciplined, a Kafka cluster feels predictable and debuggable. When it grows without structure, it turns into a god class that’s hard to change safely. I’m Mahmoud Zalt, an AI solutions architect, and we’ll use KafkaApis as a case study in how to design, grow, and eventually refactor a high‑throughput front controller.

The core lesson: you can manage a huge API surface by being relentlessly consistent about the request lifecycle— authorize → validate → delegate → respond —and by extracting feature‑specific logic once that controller starts to accumulate real domain behavior.

KafkaApis as the Broker’s Front Controller

KafkaApis sits on the hot path between the network threads and every major broker subsystem. Every request flows through it, gets inspected, and is routed or rejected.

Broker process
|
+-- Network threads
| |
| +-- RequestChannel.Request --> KafkaApis.handle()
| |
| +-- AuthHelper (ACL checks)
| +-- ApiVersionManager (version gating)
| +-- QuotaManagers (produce/fetch/leader/request)
| +-- MetadataCache (topics, brokers, features)
| +-- ForwardingManager (controller-forwarded APIs)
| +-- ReplicaManager (produce/fetch/deleteRecords/writeTxnMarkers)
| +-- GroupCoordinator (groups, offsets, consumer/streams/share group heartbeats)
| +-- TransactionCoordinator (transactions, producers)
| +-- SharePartitionManager (share fetch/ack sessions, share fetch IO)
| +-- ShareCoordinator (share group state APIs)
| +-- ClientMetricsManager (telemetry)
| +-- ConfigAdminManager/ConfigHelper (configs)
|
+-- Storage layer (logs, state stores) via ReplicaManager and coordinators
Enter fullscreen mode Exit fullscreen mode

The broker’s request path: KafkaApis.handle sits between the network and all major subsystems.

The heart of this design is a single overridden method:

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  def handleError(e: Throwable): Unit = {
    error(s"Unexpected error handling request ${request.requestDesc(true)} " +
      s"with context ${request.context}", e)
    requestHelper.handleError(request, e)
  }

  try {
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")

    if (!apiVersionManager.isApiEnabled(request.header.apiKey, request.header.apiVersion)) {
      throw new IllegalStateException(s"API ${request.header.apiKey} with version ${request.header.apiVersion} is not enabled")
    }

    request.header.apiKey match {
      case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request).exceptionally(handleError)
      // ... dozens more APIs elided ...
      case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request).exceptionally(handleError)
      case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request).exceptionally(handleError)
      case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
    }
  } catch {
    case e: FatalExitError => throw e
    case e: Throwable => handleError(e)
  } finally {
    replicaManager.tryCompleteActions()
    if (request.apiLocalCompleteTimeNanos < 0)
      request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}
Enter fullscreen mode Exit fullscreen mode

KafkaApis.handle: a classic front controller routing every Kafka protocol request.

Once you centralize all request handling, you get consistent behavior, observability, and one place to enforce global rules. The cost is the constant pressure toward a “god class” that’s hard to evolve. KafkaApis shows both sides of this trade‑off.

Rule of thumb: A front controller is powerful, but once it passes ~1,000 lines of complex logic, start extracting feature‑specific modules before it becomes unmanageable.

The “Auth → Validate → Delegate → Respond” Spine

Zoom into any major handler—produce, fetch, offsets, group management, transactions, share—and you see the same spine:

  1. Authorize the caller (ACL checks, possibly role‑dependent).
  2. Validate request fields and resource existence.
  3. Delegate to a subsystem (ReplicaManager, coordinators, share managers, controller).
  4. Respond with protocol‑specific data, including throttling and version‑aware error mapping.

This consistent lifecycle is what keeps a 2,000‑line controller understandable. Let’s look at how it plays out in the two most important APIs.

Produce: Orchestrator, Not Storage Engine

handleProduceRequest is a textbook orchestrator: it owns protocol semantics, not disk IO.

def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  val produceRequest = request.body[ProduceRequest]

  // 1. Authorization: transactional and per-topic
  if (RequestUtils.hasTransactionalRecords(produceRequest)) {
    val ok = produceRequest.transactionalId != null &&
      authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
    if (!ok) {
      requestHelper.sendErrorResponseMaybeThrottle(request,
        Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
      return
    }
  }

  val unauthorized = mutable.Map[TopicIdPartition, PartitionResponse]()
  val unknown = mutable.Map[TopicIdPartition, PartitionResponse]()
  val invalid = mutable.Map[TopicIdPartition, PartitionResponse]()
  val authorized = mutable.Map[TopicIdPartition, MemoryRecords]()

  val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)]

  // 2. Resolve topic name/ID and classify
  produceRequest.data.topicData.forEach { topic =>
    topic.partitionData.forEach { partition =>
      val (topicName, topicId) =
        if (topic.topicId == Uuid.ZERO_UUID)
          (topic.name, metadataCache.getTopicId(topic.name))
        else
          (metadataCache.getTopicName(topic.topicId).orElse(topic.name), topic.topicId)

      val tp = new TopicPartition(topicName, partition.index)
      if (topicName.isEmpty && request.header.apiVersion > 12)
        unknown += new TopicIdPartition(topicId, tp) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
      else
        topicIdToPartitionData += new TopicIdPartition(topicId, tp) -> partition
    }
  }

  val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, topicIdToPartitionData)(_._1.topic)

  topicIdToPartitionData.foreach { case (tidp, p) =>
    val records = p.records.asInstanceOf[MemoryRecords]
    if (!authorizedTopics.contains(tidp.topic))
      unauthorized += tidp -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
    else if (!metadataCache.contains(tidp.topicPartition))
      unknown += tidp -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
    else
      try {
        ProduceRequest.validateRecords(request.header.apiVersion, records)
        authorized += tidp -> records
      } catch {
        case e: ApiException =>
          invalid += tidp -> new PartitionResponse(Errors.forException(e))
      }
  }

  // 3. Delegate to ReplicaManager
  def sendResponseCallback(status: Map[TopicIdPartition, PartitionResponse]): Unit = {
    val merged = status ++ unauthorized ++ unknown ++ invalid
    // 4. Apply quotas and build final response (acks==0 special case)
    // ...
  }

  if (authorized.isEmpty)
    sendResponseCallback(Map.empty)
  else
    replicaManager.handleProduceAppend(
      timeout = produceRequest.timeout,
      requiredAcks = produceRequest.acks,
      internalTopicsAllowed = request.header.clientId == "__admin_client",
      transactionalId = produceRequest.transactionalId,
      entriesPerPartition = authorized,
      responseCallback = sendResponseCallback,
      recordValidationStatsCallback = processingStatsCallback,
      requestLocal = requestLocal,
      transactionSupportedOperation =
        AddPartitionsToTxnManager.produceRequestVersionToTransactionSupportedOperation(request.header.apiVersion())
    )
}
Enter fullscreen mode Exit fullscreen mode

Produce handler: pure orchestration around a thin delegation to ReplicaManager.

  • Early exits avoid wasted work on unauthenticated transactional producers.
  • Per‑partition maps (unauthorized, unknown, invalid, authorized) keep responsibilities clear and response assembly deterministic.
  • Delegation is thin: KafkaApis never writes to disk; that’s ReplicaManager’s job. Design pattern: Each handler should be an orchestrator. It understands protocol and security, but delegates storage and business rules to subsystems. That separation is a big part of Kafka’s ability to add features without rewriting core IO paths. ### Fetch: Same Spine, Role‑Dependent Rules

The Fetch API follows the same lifecycle but adds a twist: followers and consumers have different authorization models.

def handleFetchRequest(request: RequestChannel.Request): Unit = {
  val fetchRequest = request.body[FetchRequest]
  val topicNames = if (fetchRequest.version >= 13) metadataCache.topicIdsToNames() else Collections.emptyMap[Uuid, String]()

  val fetchData = fetchRequest.fetchData(topicNames)
  val forgotten = fetchRequest.forgottenTopics(topicNames)
  val fetchContext = fetchManager.newContext(
    fetchRequest.version,
    fetchRequest.metadata,
    fetchRequest.isFromFollower,
    fetchData,
    forgotten,
    topicNames
  )

  val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()
  val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()

  if (fetchRequest.isFromFollower) {
    // Followers: need CLUSTER_ACTION
    if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
      fetchContext.foreachPartition { (tp, data) =>
        if (tp.topic == null)
          erroneous += tp -> FetchResponse.partitionResponse(tp, Errors.UNKNOWN_TOPIC_ID)
        else if (!metadataCache.contains(tp.topicPartition))
          erroneous += tp -> FetchResponse.partitionResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION)
        else
          interesting += tp -> data
      }
    } else {
      fetchContext.foreachPartition { (tp, _) =>
        erroneous += tp -> FetchResponse.partitionResponse(tp, Errors.TOPIC_AUTHORIZATION_FAILED)
      }
    }
  } else {
    // Consumers: per-topic READ
    val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]
    fetchContext.foreachPartition { (tp, data) =>
      if (tp.topic == null)
        erroneous += tp -> FetchResponse.partitionResponse(tp, Errors.UNKNOWN_TOPIC_ID)
      else
        partitionDatas += tp -> data
    }

    val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)

    partitionDatas.foreach { case (tp, data) =>
      if (!authorizedTopics.contains(tp.topic))
        erroneous += tp -> FetchResponse.partitionResponse(tp, Errors.TOPIC_AUTHORIZATION_FAILED)
      else if (!metadataCache.contains(tp.topicPartition))
        erroneous += tp -> FetchResponse.partitionResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION)
      else
        interesting += tp -> data
    }
  }

  // ... invoke replicaManager.fetchMessages and apply quotas ...
}
Enter fullscreen mode Exit fullscreen mode

Fetch handler: same orchestrator pattern, with role‑dependent auth rules and session context.

The key is consistency: even when the rules differ by caller type, the flow—authorize, validate, delegate, respond—stays the same. That makes a large file feel like many repetitions of one idea instead of a bag of special cases.

Quotas and Throttling as First‑Class Concerns

Authorization and correctness aren’t enough for a high‑throughput system. Kafka also needs to prevent clients from overwhelming brokers. KafkaApis handles this by integrating quota logic directly into the response path.

Quota checks generally happen near response construction, once the handler can approximate response size. This keeps throttling cheap: the broker avoids doing work it will just have to drop.

Produce Quotas: One Throttle View, Multiple Budgets

For produce, Kafka enforces both bandwidth and request‑rate quotas, but exposes a single throttleTimeMs to the client:

val timeMs = time.milliseconds()
val reqSize = request.sizeInBytes

val bandwidthThrottleTimeMs = quotas.produce
  .maybeRecordAndGetThrottleTimeMs(request.session, request.header.clientId, reqSize, timeMs)

val requestThrottleTimeMs =
  if (produceRequest.acks == 0) 0
  else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)

val maxThrottle = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottle > 0) {
  request.apiThrottleTimeMs = maxThrottle
  if (bandwidthThrottleTimeMs > requestThrottleTimeMs)
    requestHelper.throttle(quotas.produce, request, bandwidthThrottleTimeMs)
  else
    requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
}
Enter fullscreen mode Exit fullscreen mode

Produce throttling: two quotas (bandwidth and request rate), one coherent signal to the client.

Internally, the broker tracks distinct budgets; externally, the client just sees a unified delay. Keeping this logic centralized in KafkaApis guarantees consistent semantics across handlers.

Fetch & ShareFetch Quotas: Avoid Fetching What You’ll Drop

Fetch and ShareFetch go a step further by resizing work to fit quotas before doing IO. For normal consumers:

val maxQuotaWindowBytes =
  if (fetchRequest.isFromFollower) Int.MaxValue
  else quotas.fetch.maxValueInQuotaWindow(request.session, clientId).toInt

val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
Enter fullscreen mode Exit fullscreen mode

Fetch request is proactively resized to fit quota windows.

The handler uses quota information to dial down maxBytes before calling ReplicaManager. This avoids reading data that will just be throttled away. ShareFetch uses a similar approach, wrapped in its own context and size calculations.

Design principle: Throttling is cheap if you can decide early that you’ll exceed quota and shrink or reject the work. Kafka achieves this by marrying protocol fields (like maxBytes) with quota knowledge inside the handler.

Share APIs: Where the God Class Emerges

The disciplined patterns above work well for classic APIs. Complexity spikes with Kafka’s newer share group features: ShareFetch, ShareAcknowledge, and their state/offset APIs.

These introduce:

  • Per‑group share sessions managed via ShareFetchContext.
  • Piggybacked acknowledgements on fetch requests.
  • A renew‑ack mode (KIP‑1222) that changes the meaning of size and wait fields.
  • Intricate rules for validating acknowledgement batches.

All of that currently lives inside KafkaApis. This is where the front controller starts to feel like a god class: it’s not just orchestrating share APIs; it’s implementing their core semantics.

Renew‑Ack: Cross‑Field Invariants in the Handler

When isRenewAck is true for ShareFetch, KIP‑1222 requires multiple other fields to be zero. KafkaApis enforces that directly:

// KIP-1222 enforces setting the maxBytes, minBytes, maxRecords, maxWaitMs
// values to 0, in case isRenewAck is true.
if (shareFetchRequest.version >= 2 && shareFetchRequest.data.isRenewAck) {
  val reqData = shareFetchRequest.data
  var errorMsg: String = ""
  if (reqData.maxBytes != 0) errorMsg += "maxBytes must be set to 0, "
  if (reqData.minBytes != 0) errorMsg += "minBytes must be set to 0, "
  if (reqData.maxRecords != 0) errorMsg += "maxRecords must be set to 0, "
  if (reqData.maxWaitMs != 0) errorMsg += "maxWaitMs must be set to 0, "

  if (errorMsg != "") {
    errorMsg += "if isRenewAck is true."
    error(errorMsg)
    requestHelper.sendMaybeThrottle(request,
      shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
        Errors.INVALID_REQUEST.exception(errorMsg)))
    return CompletableFuture.completedFuture[Unit](())
  }
}
Enter fullscreen mode Exit fullscreen mode

KIP‑1222: cross‑field invariants enforced inline in the handler.

Individually, this is fine. But as more cross‑field rules accumulate, they bury the main “authorize → validate → delegate → respond” spine in validation branches.

Acknowledgement Batch Validation: One Heavy Method

The most cognitively dense piece is validateAcknowledgementBatches, which checks structure and semantics of acknowledgement batches per partition.

def validateAcknowledgementBatches(
  acknowledgementDataFromRequest: mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]],
  erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
  supportsRenewAcknowledgements: Boolean,
  isRenewAck: Boolean
): mutable.Set[TopicIdPartition] = {
  val erroneousTopicIdPartitions = mutable.Set.empty[TopicIdPartition]

  acknowledgementDataFromRequest.foreach { case (tp, batches) =>
    var prevEndOffset = -1L
    var isErroneous = false
    val maxType = if (supportsRenewAcknowledgements) 4 else 3

    batches.forEach { batch =>
      if (!isErroneous) {
        if (batch.firstOffset > batch.lastOffset) {
          // invalid range
          erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST)
          erroneousTopicIdPartitions.add(tp); isErroneous = true
        } else if (batch.firstOffset < prevEndOffset) {
          // overlapping range
          erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST)
          erroneousTopicIdPartitions.add(tp); isErroneous = true
        } else if (batch.acknowledgeTypes == null || batch.acknowledgeTypes.isEmpty) {
          // missing types
          erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST)
          erroneousTopicIdPartitions.add(tp); isErroneous = true
        } else if (batch.acknowledgeTypes.size > 1 &&
                   batch.lastOffset - batch.firstOffset != batch.acknowledgeTypes.size - 1) {
          // type count vs range mismatch
          erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST)
          erroneousTopicIdPartitions.add(tp); isErroneous = true
        } else if (batch.acknowledgeTypes.stream.anyMatch(t => t < 0 || t > maxType)) {
          // invalid type value
          erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST)
          erroneousTopicIdPartitions.add(tp); isErroneous = true
        } else if (batch.acknowledgeTypes.stream.anyMatch(_ == 4) && !isRenewAck) {
          // renew type without renewAck mode
          erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST)
          erroneousTopicIdPartitions.add(tp); isErroneous = true
        } else {
          prevEndOffset = batch.lastOffset
        }
      }
    }
  }

  erroneousTopicIdPartitions
}
Enter fullscreen mode Exit fullscreen mode

validateAcknowledgementBatches: several invariants combined in one branch‑heavy loop.

The logic is precise, but every new edge case has to be woven into this nested structure. Understanding failures means mentally simulating multiple branches and shared mutable state.

Refactor hint: When validation logic becomes a long method that both checks invariants and mutates shared error maps, extract pure predicate helpers (for example, isNonOverlappingRange, hasValidAckTypes) and compose them with early‑exit guard clauses. You keep behavior but reduce cognitive load.

ShareFetch: Mixed Concerns and Nested Futures

handleShareFetchRequest has to:

  • Acquire or create a ShareFetchContext, possibly waiting for idle‑session cleanup and failing with SHARE_SESSION_LIMIT_REACHED.
  • Handle requests that include both fetch and acknowledge sections.
  • Respect isRenewAck semantics by skipping fetch work when appropriate.
  • Combine fetch and acknowledge results into a single response, including leader hints and lock durations.

All of this is wired inside KafkaApis, along with:

  • Authorization on topics and share groups.
  • Session lifecycle management.
  • Quota interactions similar to Fetch.
  • Async composition using CompletableFuture combinators.

The result is a handler that mixes orchestration with feature implementation. This is exactly where it makes sense to start extracting a dedicated abstraction.

Breaking Up the God Object Safely

By this point, the traffic cop analogy starts to blur: KafkaApis is not just directing traffic; it is also enforcing complex feature‑specific rules. The analysis calls this out as a classic god class : too many responsibilities in one file.

The remedy is not to dismantle the front controller, but to keep the central dispatcher and move domain‑specific logic behind focused façades.

Extracting ShareApis: A Focused Façade

A natural first step is to extract all share‑related behavior into a ShareApis class or trait. KafkaApis becomes a delegator for those APIs:

--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ class KafkaApis(...)
- def handleShareFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
- // full implementation
- }
-
- def handleShareAcknowledgeRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
- // full implementation
- }
-
- // plus related helpers: handleFetchFromShareFetchRequest, handleAcknowledgements,
- // getAcknowledgeBatchesFromShareAcknowledgeRequest, getAcknowledgeBatchesFromShareFetchRequest,
- // processShareAcknowledgeResponse, validateAcknowledgementBatches, processShareFetchResponse,
- // getResponsePartitionData, shareVersion, isShareGroupProtocolEnabled
+ // Delegation to dedicated ShareApis component
+ def handleShareFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] =
+ shareApis.handleShareFetchRequest(request)
+
+ def handleShareAcknowledgeRequest(request: RequestChannel.Request): CompletableFuture[Unit] =
+ shareApis.handleShareAcknowledgeRequest(request)
@@ class KafkaApis(...)
- val sharePartitionManager: SharePartitionManager,
+ val sharePartitionManager: SharePartitionManager,
   brokerTopicStats: BrokerTopicStats,
   val clusterId: String,
@@ class KafkaApis(...)
- val groupConfigManager: GroupConfigManager
-) extends ApiRequestHandler with Logging {
+ val groupConfigManager: GroupConfigManager
+) extends ApiRequestHandler with Logging {
+
+ private val shareApis = new ShareApis(
+ requestChannel,
+ sharePartitionManager,
+ metadataCache,
+ authHelper,
+ quotas,
+ brokerTopicStats,
+ config,
+ time,
+ groupConfigManager
+ )
Enter fullscreen mode Exit fullscreen mode

Refactor direction: keep dispatch in KafkaApis, move share behavior to ShareApis.

This gives you:

  • Scoped complexity: share sessions, record locks, renew‑ack semantics live in a file with a clear domain boundary.
  • Better tests: unit tests for share behavior can hit ShareApis directly without pulling in the entire dispatcher.
  • Safer evolution: future KIPs around share groups mostly touch ShareApis, not the central controller. Rule of thumb: When a front controller starts containing nontrivial feature implementation, that feature deserves its own façade. Keep the entry point; move domain rules and invariants behind dedicated modules. ### De‑Duplicating Common Authorization and Validation

Another axis of refactoring is de‑duplicating patterns that show up across handlers. One example is “classify topic partitions by authorization and existence,” seen in offset commits, transactional offset commits, offset deletes, and share group offset APIs.

A helper like this aligns behavior and semantics across those handlers:

private case class TopicPartitionCheckResult[T](
  authorized: Seq[T],
  unauthorized: Map[T, Errors],
  unknown: Map[T, Errors]
)

private def classifyTopicPartitions[T](
    requestContext: RequestContext,
    resources: Iterable[T]
  )(nameOf: T => String,
    buildUnknown: T => Errors = _ => Errors.UNKNOWN_TOPIC_OR_PARTITION,
    operation: AclOperation = READ
  ): TopicPartitionCheckResult[T] = {

  val authorizedNames = authHelper.filterByAuthorized(requestContext, operation, TOPIC, resources)(nameOf)

  val authorized = mutable.ArrayBuffer[T]()
  val unauthorized = mutable.Map[T, Errors]()
  val unknown = mutable.Map[T, Errors]()

  resources.foreach { r =>
    val name = nameOf(r)
    if (!authorizedNames.contains(name))
      unauthorized += r -> Errors.TOPIC_AUTHORIZATION_FAILED
    else if (!metadataCache.contains(name))
      unknown += r -> buildUnknown(r)
    else
      authorized += r
  }

  TopicPartitionCheckResult(authorized.toSeq, unauthorized.toMap, unknown.toMap)
}
Enter fullscreen mode Exit fullscreen mode

Centralizing topic/partition classification reduces subtle drift between handlers.

Refactors like this don’t just reduce lines of code; they reduce the number of slightly different implementations of the same rule. For a central controller, that alignment matters more than raw line count.

Practical Takeaways You Can Reuse

Kafka’s KafkaApis is a concrete, battle‑tested example of how to run a high‑throughput front controller without losing track of behavior. The primary lesson is to enforce a consistent handler lifecycle and push domain complexity into focused modules as the system grows.

  1. Standardize the handler lifecycle.

    Make authorize → validate → delegate → respond the default template for every handler. This keeps a large controller understandable and makes new APIs harder to implement “wrong.”

  2. Keep protocol knowledge in one place; spread behavior across subsystems.

    Let your front controller know how to parse requests, enforce ACLs, and assemble responses. Delegate actual work—storage, group membership, transactions, share sessions—to dedicated components.

  3. Extract domain façades when complexity clusters.

    When a feature family (like share groups) accumulates its own contexts, invariants, and async flows, give it a module such as ShareApis. The front controller should delegate to it instead of absorbing its rules.

  4. Centralize repeated authorization/validation patterns.

    If multiple handlers classify topics by auth and existence, or apply similar quotas, extract helpers like classifyTopicPartitions. Your goal is one definition of each policy, used everywhere.

  5. Treat quotas as part of protocol semantics.

    Integrate quota knowledge into handlers so you can shrink or reject work early, the way Kafka adjusts maxBytes for Fetch. Don’t bolt throttling on after the fact.

  6. Keep cross‑field invariants explicit and localized.

    For complex options (like renew‑ack), isolate validation in clear blocks or helpers. Avoid burying the main handler flow in long chains of conditionals.

Front controllers are unavoidable in serious systems: brokers, gateways, control planes all end up with a central entry point. KafkaApis shows how far you can take that pattern before you have to start carving out features into their own modules.

If you apply the same discipline—consistent request lifecycle, thin orchestration, and timely extraction of feature‑specific façades—you can keep your own traffic cop sharp even as the city of features around it grows.

Top comments (0)