DEV Community

AutoMQ
AutoMQ

Posted on

Unveiling the Principles of Kafka Migration Tool MirrorMaker2

MirrorMaker2 (referred to as MM2 hereafter) was introduced in December 2019 alongside Kafka 2.4.0. As the name suggests, it is an official Kafka tool designed to address data replication and synchronization issues between Kafka clusters. In practice, it is commonly used for Kafka data backup, reassignment, and disaster recovery purposes.

We are also excited to announce that AutoMQ's productization of MM2-based reassignment features will soon be available. This will facilitate a smoother and faster transition from self-managed Kafka environments to AutoMQ. We invite you to utilize this service upon its release.

Installation and Deployment

MM2 offers three deployment modes: dedicated mode, standalone mode, and Kafka connect mode.
Deployment Modes
Dedicated mode
To deploy Kafka MM2, use the following start command:

./bin/connect-mirror-maker.sh connect-mirror-maker.properties

Enter fullscreen mode Exit fullscreen mode

In this setup, MM2 is still based on Kafka Connect but encapsulates the complexity of Kafka Connect for external users, supporting distributed deployment as well. One-line commands can launch MM2 and the underlying Kafka Connect, though it also sacrifices some of Kafka Connect's flexibility by eliminating the external RESTful API.
Standalone mode
The standalone mode is more suited for testing environments and does not support distributed deployment. This is also noted in KIP-382[1]. As it is not a production-ready version, further details are not elaborated here.
Kafka Connect mode
At present, deploying MM2 requires an existing Kafka Connect cluster. MM2 will deploy its own Connectors on Kafka Connect to complete the entire reassignment process. Since Kafka Connect mode is the most complex deployment mode of MM2, and the underlying principles are the same for both Dedicated mode and Kafka Connect mode—with the former being a packaged solution—it is advantageous to understand MM2's workflow on Kafka Connect for a comprehensive grasp of MM2.

Kafka Connect was introduced in Kafka version 0.9.0 with the aim to simplify the construction of data integration and data flow pipelines, offering a scalable and reliable method to connect Kafka with external systems. Based on this design, it is quite natural for MM2 to be implemented on Kafka Connect.

In Kafka Connect mode, the scheduling resources within MM2 can be categorized as follows:

  • Worker: An MM2 or Kafka Connect process, which is the basic unit for distributed deployment.
  • Connector: A connector within a single Worker that performs reassignment tasks, where multiple Connectors can exist within a Worker, each responsible for a relatively independent function.
  • Task: A Connector splits the reassignment tasks into Tasks, which are the smallest units of concurrent execution.

Kafka Connect cluster
In Kafka Connect Mode, a Kafka Connect cluster needs to be prepared first. The following commands can be executed on each node to start the Kafka Connect cluster.

./bin/connect-distributed.sh config/connect-distributed.properties

Enter fullscreen mode Exit fullscreen mode

Once the Kafka Connect cluster is deployed, we can use the RESTful API provided by Kafka Connect to start all the Connectors required by MM2. By default, the port offered by Kafka Connect is 8083. Even if there are multiple nodes in the Kafka Connect cluster, the following command can be issued to any node in the cluster.

Connector
Assuming the node IP is localhost, the command to start three Connectors is as follows (in reality, the request can be made to any node in the current Kafka Connect cluster):

# MirrorSourceConnector
curl -X POST -H "Content-Type: application/json" --data @mirror-source-connector.properties http://127.0.0.1:8083/connectors
# MirrorCheckpointConnector
curl -X POST -H "Content-Type: application/json" --data @mirror-checkpoint-connector.properties http://127.0.0.1:8083/connectors
# MirrorHeartbeatConnector
curl -X POST -H "Content-Type: application/json" --data @mirror-heartbeat-connector.properties http://127.0.0.1:8083/connectors
Enter fullscreen mode Exit fullscreen mode

The configuration files for these Connectors are mirror-source-connector.properties, mirror-checkpoint-connector.properties, and mirror-heartbeat-connector.properties.

After launching the Connectors, we can also use the following command to view the Connectors that are currently available in the Kafka Connect cluster.

$ curl http://127.0.0.1:8083/connectors
["mm2-heartbeat-connector","mm2-source-connector","mm2-checkpoint-connector"]%
Enter fullscreen mode Exit fullscreen mode

For more details on the Kafka Connect RESTful API, refer to Kafka Connect 101: Kafka Connect's REST API[2].

Workflow

As discussed above, in MM2, there are three Connectors responsible for completing the entire replication process. These three Connectors include:

  • MirrorSourceConnector: Synchronizes message data from topics in the source cluster to the target cluster.
  • MirrorCheckpointConnector: Translates and synchronizes the consumer offsets from the source cluster to the target cluster.
  • MirrorHeartbeatConnector: Sends heartbeats periodically to the source cluster to verify and monitor the connection and the progress of reassignment tasks between the two clusters. JMX monitoring information is available for both MirrorSourceConnector and MirrorCheckpointConnector, providing a comprehensive view of the reassignment progress and health.

MM2 creates the following types of Topics (all Topics, except for heartbeats, are created in the target cluster):

  • connect-configs: Stores the configuration information of connectors in MM2.
  • connect-offsets: Stores the consumer offsets for MirrorSourceConnector and MirrorCheckpointConnector in MM2.
  • connect-status: Stores the status information of connectors in MM2.
  • mm2-offset-syncs.A.internal: Stores the offset mapping information for message synchronization between the source and target clusters (i.e., OffsetSync messages) used for translating consumer offsets. Messages in this Topic are emitted by MirrorSourceConnector (the 'A' in the Topic name represents the alias of the source cluster).
  • A.checkpoints.internal: Stores the consumption progress synchronized with the GroupId. The stored information includes GroupId, Partition, and the consumption points in both the source and target clusters. This information is emitted by the MirrorCheckpointConnector (the 'A' in the Topic name represents the alias of the source cluster).
  • heartbeats: Regularly sends heartbeat messages to the source cluster, which are then synchronized to the target cluster. The body of messages in this Topic mainly stores simple timestamp information, emitted by the MirrorHeartbeatConnector. Understanding the specific MM2 workflow is crucial, especially grasping the roles of the mm2-offset-syncs.A.internal and A.checkpoints.internal Topics.

Message Synchronization and Offset Mapping
The MirrorSourceConnector starts synchronizing messages from the earliest offset. During message synchronization, it generates OffsetSync messages. These OffsetSync messages record the partition information of the synchronized messages and the offset mappings in both the source and target clusters.

Recording the offset mapping information in the OffsetSync messages is essential, as a message synchronized from the source to the target cluster is likely to have different offsets before and after synchronization, and there might be scenarios of message duplication and topics from multiple source clusters being synchronized to a single target topic. Offset mapping greatly assists in aligning messages from the source cluster with those in the target cluster.

This OffsetSync message is stored in mm2-offset-syncs.A.internal. However, an OffsetSync message is not generated for every synchronized message. By default, an OffsetSync message is generated every 100 messages, a parameter that can be adjusted using offset.lag.max. For synchronization judgments of OffsetSync messages, refer to the specific implementation details in org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState#update.

Offset Translation
The MirrorCheckpointConnector performs the specific task of translating checkpoint positions. It consumes OffsetSync messages from mm2-offset-syncs.A.internal, then translates the consumer offsets from the source cluster to the target cluster, and executes the alterConsumerGroupOffsets method to reset the consumer offsets.

Since OffsetSync does not synchronize based on time intervals, the result is that if the latest message offset in the current partition does not exceed 100 from the last synchronized offset, a new OffsetSync will not be generated. As MirrorCheckpointConnector relies on the message offsets in OffsetSync to synchronize consumer progress, the consumer offsets in the target cluster can hardly be fully synchronized and may lag behind the source cluster by up to 100 offsets. However, in version 3.7.0 and later, a fallback logic based on time synchronization has been added to OffsetSync, which has resolved this issue [3].

Specifically, if the current message is within 100 offsets of the latest message in the previous OffsetSync, but there has been a significant time since the last synchronization of OffsetSync messages, a forced synchronization of OffsetSync messages will occur (controlled by the offset.flush.internal.ms parameter, defaulting to 10s).

The content of the OffsetSync messages can be conveniently viewed using the following command.

$ ./bin/kafka-console-consumer.sh --formatter "org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter" --bootstrap-server 127.0.0.1:9592 --from-beginning --topic mm2-offset-syncs.A.internal
OffsetSync{topicPartition=heartbeats-0, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=test-0-0, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartition=test-0-0, upstreamOffset=101, downstreamOffset=101}
OffsetSync{topicPartition=heartbeats-0, upstreamOffset=2, downstreamOffset=2}
In the case of the HeartbeatConnector within MM2, it primarily serves to monitor the current synchronization status of the MM2 cluster. The content of the HeartbeatTopic can be viewed using the following command.
$ ./bin/kafka-console-consumer.sh --formatter "org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter"  --bootstrap-server 127.0.0.1:9092 --from-beginning --topic heartbeats --property print.key=true
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564822022}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564842185}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564862192}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564882197}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564902202}
Enter fullscreen mode Exit fullscreen mode

Here, a heartbeat message is generated every 20 seconds, containing a timestamp of the current time. Thus, by viewing the synchronized heartbeat Topic messages in the target cluster, one can monitor the current message synchronization status.

Load Balancing

In Kafka Connect, an individual Kafka Connect process is referred to as a worker. In a distributed environment, a group of workers with the same group.id forms a Kafka Connect cluster.

Although both Connectors and Tasks participate in the load balancing process, Connectors and Tasks are not orthogonal. Tasks are subordinate to Connectors. A Connector's involvement in load balancing simply indicates which worker will execute the logic specific to a Connector class. The specific implementation logic can be referred to in EagerAssigner#performTaskAssignment:

private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
                                                      Map<String, ExtendedWorkerState> memberConfigs,
                                                      WorkerCoordinator coordinator) {
    // 用于记录 Connector 分配结果
    Map<String /* member */, Collection<String /* connector */>> connectorAssignments = new HashMap<>();
    // 用于记录 Task 分配结果
    Map<String /* member */, Collection<ConnectorTaskId>> taskAssignments = new HashMap<>();

    List<String> connectorsSorted = sorted(coordinator.configSnapshot().connectors());
    // 使用一个环形迭代器,将 connector 和 task 分别分配给不同的 worker
    CircularIterator<String> memberIt = new CircularIterator<>(sorted(memberConfigs.keySet()));
    // 先分配 Connector
    for (String connectorId : connectorsSorted) {
        String connectorAssignedTo = memberIt.next();
        log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);
        Collection<String> memberConnectors = connectorAssignments.computeIfAbsent(connectorAssignedTo, k -> new ArrayList<>());
        memberConnectors.add(connectorId);
    }
    // 在分配具体的 Task,延续 member 迭代器中的顺序
    for (String connectorId : connectorsSorted) {
        for (ConnectorTaskId taskId : sorted(coordinator.configSnapshot().tasks(connectorId))) {
            String taskAssignedTo = memberIt.next();
            log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
            Collection<ConnectorTaskId> memberTasks = taskAssignments.computeIfAbsent(taskAssignedTo, k -> new ArrayList<>());
            memberTasks.add(taskId);
        }
    }
    // 序列化分配结果并返回
    ......
}
Enter fullscreen mode Exit fullscreen mode

The figure below illustrates the load balancing situation with 3 Workers, 1 Connector, and 5 Tasks, as well as the scenario before and after Worker2 crashes.

However, this method of load balancing can cause a significant thundering herd effect, such as during the scaling of a Kafka Connect cluster. Nodes not involved in the scaling may experience lengthy stop-the-world issues. Similar issues may arise during rolling upgrades in a Kubernetes environment. This type of load balancing in Kafka is referred to as Eager Rebalance.

Later, Kafka introduced Incremental Cooperative Rebalance[4], which incorporates a delay to postpone the rebalance process. With this improvement, when a node undergoes a rolling upgrade, load balancing does not occur immediately, as the upgraded node may quickly return. The results of previous load balancing can be maximally preserved, minimizing the impact on the overall message synchronization process. Compared to Eager Rebalance, which can quickly achieve a final state of load balancing, Incremental Cooperative Rebalance significantly reduces the global impact of scenarios like rolling upgrades on load balancing.

Top comments (0)