KoP is short for Kafka on Pulsar, and as the name implies, it is how to read and write Kafka data on Pulsar. KoP brings the Kafka Protocol Processing Plugin to the Pulsar Broker to make Apache Pulsar compatible with the Apache Kafka protocol. By adding the KoP protocol processing plugin to an existing Pulsar cluster, users can migrate existing Kafka applications and services to Pulsar without modifying the code.
The key features of Apache Pulsar are as follows:
Streamline operations with enterprise-class multi-tenancy features.
Avoid data relocation and simplify operations.
Persistently retain event streams with Apache BookKeeper and tiered storage.
Leverage Pulsar Functions for serverless event processing.
The KoP architecture is shown in the following diagram, which shows that KoP introduces a new protocol processing plugin that leverages existing components of Pulsar (e.g. Topic discovery, distributed logging repository-ManagedLedger, cursor, etc.) to implement the Kafka transport protocol.
Routine Load Subscribing to Pulsar Data
Apache Doris Routine Load supports accessing Kafka data to Apache Doris and guarantees transactional operations during data access. Apache Pulsar is positioned as a cloud-native era enterprise messaging publishing and subscription system that is already in use by many online services. So how do Apache Pulsar users access data to Apache Doris? The answer is through KoP.
Since Kop provides Kafka compatibility directly in Pulsar, so Plusar can be used like Kafka for Apache Doris, and the whole process can be done without task changes for Apache Doris to connect Pulsar data to Apache Doris and get the Routine Load's transactional guarantees
Practical operation
Pulsar installation environment preparation:
- Download the Pulsar binary package and unzip:
#Download
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#Unzip and enter the installation directory
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
KoP Compilation and Installation:
- Download KoP Source Code
git clone https://github.com/streamnative/kop.git
cd kop
- Compiling KoP:
mvn clean install -DskipTests
- protocols configuration: Create the protocols folder in the unpacked apache-pulsar directory and copy the compiled nar package to the protocols folder.
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
- View the results after adding:
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
Add KoP configuration:
- Add the following configuration to standalone.conf or broker.conf
#Protocols to which KoP is adapted
messagingProtocols=kafka
#KoP's NAR file path
protocolHandlerDirectory=./protocols
#Whether to allow automatic topic creation
allowAutoTopicCreationType=partitioned
- Add the following service listener configuration
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
When the following error occurs:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
Add the following configuration to enable transactionCoordinatorEnabled
kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
This error must be fixed, otherwise you will see that data is produced and consumed on Pulsar using the tools that come with kafka: bin/kafka-console-producer.sh and bin/kafka-console-consumer.sh works fine, but in Apache Doris the data cannot be synchronized over.
Launch Pulsar
#bin/pulsar standalone
pulsar-daemon start standalone
Create Doris database and build tables
mysql -u root -h 127.0.0.1 -P 9030
create database pulsar_doris;
#Switching databases
use pulsar_doris;
#Create clicklog table
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "clickTime",
`type` String NOT NULL COMMENT "clickType",
`id` VARCHAR(100) COMMENT "id",
`user` VARCHAR(100) COMMENT "user",
`city` VARCHAR(50) COMMENT "city"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Creating Routine Load Tasks
CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test",
"property.group.id" = "doris"
);
The parameters in the above command are explained as follows:
pulsar_doris: The database where the Routine Load task is located
load_from_pulsar_test: Routine Load task name
clicklog:The target table for the Routine Load task
strict_mode: Whether the import is in strict mode, set to false here
format: The type of data to import, here configured as json
kafka_broker_list: Address of the kafka broker service
kafka_broker_list: kafka topic name, i.e. which topic to sync data on
property.group.id: Consumer group id
Data import and testing
- Data Import
Construct a ClickLog data structure and call Kafka's Producer to send 50 million pieces of data to Pulsar.
The ClickLog data structure is as follows
public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
... //Omit getter and setter
}
The core code logic for message construction and delivery is as follows.
String strDateFormat = "yyyy-MM-dd HH:mm:ss";
@Autowired
private Producer producer;
try {
for(int j =0 ; j<50000;j++){
int batchSize = 1000;
for(int i = 0 ; i<batchSize ;i++){
ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType("webset");
clickLog.setUser("user"+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
}
}
} catch (Exception e) {
e.printStackTrace();
}
- ROUTINE LOAD Task View
Execute SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G; command to view the status of the import task.
mysql> SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 12:03:34
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
Progress: {"0":"51139566"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified
From the above results, we can see that totalRows is 50000000 and errorRows is 0. It means that the data is imported into Apache Doris without any loss or redundancy.
- Data Validation Execute the following command to count the data in the table and find that the result is also 50000000, as expected.
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql>
Conclusion
With KoP, we have been able to seamlessly integrate Apache Pulsar data into Apache Doris without any modifications to the Routine Load task and guarantee transactional nature of the data import process. In the meantime, the Apache Doris community has initiated the design of native import support for Apache Pulsar, and it is believed that it will soon be possible to directly subscribe to message data in Pulsar and guarantee Exactly-Once semantics during the data import process.
Links
Apache Doris website: http://doris.apache.org
Apache Doris GitHub:https://github.com/apache/doris
Please contact us via: dev@doris.apache.org
Top comments (0)