DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Visual task orchestration & Drag & Drop, Scaleph Data integration practice based on SeaTunnel

Image description
Apache SeaTunnel meetup in June: topic sharing about Scaleph data integration practice based on Apache SeaTunnel (Incubating), and we hope you will harvest a lot out of this talk.

Summary of the talk:

● About Scaleph

● Introduction to Scaleph architecture and features

● SeaTunnel Community Contribution

● Demonstration of the system

● Follow-up Development Plan

Image description

Qi Wang, an Apache SeaTunnel Contributor, Search and Recommendation Engineer, and Big Data Java Developer

01 Origin of Scaleph Tools

During my early work in the search and recommendation, I was in charge of maintaining the Dump system, which is mainly responsible for indexing data to the search engine.

We face five troublesome problems in maintaining the system.

Timeliness and stability

Search recommendation is the core online system of the e-commerce platforms, which proposes harsh requirements for data timeliness and stability in especial. Since search recommendation system bears most of the traffic from the Consumer of the entire e-commerce platform, once the service fluctuates, it may cause damage to the service, and it will bring a worse experience for the Consumer.

Business complexity/large wide table design

The Dump system will take a series of pre-process to the real-time/offline data and model data of products, categories, brands, stores, product tags, and Data Warehouse of the e-commerce platform, and finally output a large wide table. During this process, the complexity and variability of the upstream business will invade the Dump system, and cast superior technical challenges.

Full + real-time indexing

The full index is run once every day, which mainly aims to update the data with T+1 frequency. After the full index is finished, we will use the real-time index to refresh the data that needs to be updated in real-time, e.g. information about products price , inventory changes, etc.

Data linkage update

We have many sources of upstream data, including message queues, databases, big data-related storage, and Dubbo interfaces. As it is a large wide table design, for example, if it is a commodity index, the big wide table will be commodity-based, and if it is a store index, it will be store-based, because the upstream data changes are not always the commodity or store dimension, the data will also trigger certainly linked updates.

The mission to protect the data

The search recommendation service bears most of the Consumer traffic in the entire e-commerce platform. When the system performance of other teams in the company couldn’t keep up with it, they may transfer the data to the search engine through the Dump system, and then our team would return them to the Web page for them, to avoid a second request to them subsequently.

At the same time, if the other team’s business system generates dirty data, the Dump system should offer data protection to prevent the dirty data from leaking to users and causing bad influence, so it is also very difficult to develop and maintain the system.

02 Why introduce Flink?

As an early user of Flink in China, Alibaba has a long history and successful experience in the field of search recommendation. My professional experience in developing and maintaining the Dump system in the search recommendation team prompted me to start to focus on using Flink to do works other than A/B experimental reports, data flow in real-time, such as using Flink to implement the Dump system to provide data indexing for search engines.

There are 5 advantages of using Flink for data integration:

Image description

  1. Native distributed support: Flink supports multiple deployment and operation methods, including standalone, yarn, Kubernetes;

  2. Low latency, mass throughput capacity: widely used in many large internet enterprises, and the relevant features have been proved superior in production environments;

  3. Ecological support: Flink provides many out-of-the-box connectors, supports CSV, Avro data formats, Kafka, pulsar, and other messaging systems, and many storage systems, and is closely integrated with the big data ecosystem

  4. The system is based on distributed lightweight asynchronous snapshot mechanism to achieve exactly-once semantics, providing data consistency guarantee for task failure, restart, migration, upgrade, etc.

  5. In addition to the metrics provided by Flink itself, the metrics framework allows users to develop custom metrics for tasks to enrich monitoring metrics.

03 Why SeaTunnel?

I love the design concept of SeaTunnel when I first encounter it! SeaTunnel is the next-generation high-performance, distributed, massive data integration framework running on Flink and Spark.

It is important that SeaTunnel is out-of-the-box and seamlessly integrates with the existing ecosystem, as it runs on Flink and Spark, and can be easily reused to the company’s existing Flink and Spark infrastructure. On the other hand, there are many successful use cases in the production environment with SeaTunnel. Besides, after being incubated by Apache Software Foundation, the community is more active and the future is promising.

Image description

04 About Scaleph

The initial purpose of the project

We initially aim to provide a Web UI page for SeaTunnel and to become a data integration open-source system. At present, our main goal is to make an open-source visual data development and management system for SeaTunnel, and we expect Scaleph to minimize the development threshold of real-time and offline data tasks, and provide a one-stop data development platform for developers.

Project Features

In real production applications, when data integration is performed, visual task orchestration or SQL development is the main form of the data integration. We believe that Drag and Drop visual task scheduling can minimize the burden of data integration for users.

Another aim is to achieve multi-version management of jobs and data source support.

  • Flink cluster support for multi-version/multi-deployment environments.
  • Support for real-time/periodic tasks.

Image description
Core Architecture

Above is the architecture diagram of our system, users can drag and drop configuration on the page, using SeaTunnel operators backend by the job management function by the Web UI, and the system automatically generates the SeaTunnel configuration file, which finally submits it to the Flink cluster through the Flinkful library together with the resource jar package uploaded by users in the resource management. The mission of the resource management jar package is to support users to upload their own jar packages, complement SeaTunnel-related defects, or reinforce the SeaTunnel and Flink functions!

We have developed a scheduling task using quartz. When the task is submitted to Flink, the task will regularly pull the task information from the Flink cluster and store it in MySQL, so the end-user can see the task-related runtime information on the web UI page.

Scaleph Features Brief (Data Development)

Project Management

When users create data synchronization tasks, they can manage data synchronization tasks according to different business dimensions.

Job Management

SeaTunnel synchronization tasks can be created by drag-and-drop operation, and then be submitted and run.

Resource Management

SeaTunnel is open source with Apache 2.0 license, which is not compatible with MySQL’s JDBC driver license, SeaTunnel’s jdbc connector does not provide relevant JDBC driver dependencies. When users use the jdbc connector, they need to provide their own JDBC driver package. We provide a resource management function here so that users can upload the driver package themselves, and then submit the SeaTunnel tasks to the cluster with the MySQL driver to ensure the proper operation of the tasks.

Cluster Management

Support for Flink cluster info configing. At present, we can support Standalone Session mode. After users enter the information, they can select the corresponding cluster when submitting SeaTunnel jobs, and the tasks can be run in the cluster.

Data source management

Support users to enter some data source information in advance to save users from entering the data source twice for each task. At the same time, data source sharing and permission restrictions are supported to prevent data source leakage.

Scaleph Features Brief (operation and maintenance center)

The operation and maintenance center is a running log of real-time tasks and periodic tasks, allowing users to see task-related information when they submit tasks. We provide a link jumping operation, users can jump to Flink’s Web UI by clicking on it, and view the specific execution information of tasks on Flink’s official Web UI page.

Image description

Scaleph Features Brief (data standards)

Data Element

Data governance is a big system. Although people are usually more concerned about metadata, data lineage, or data assets, the data standards are also an important part of data governance, we open-source our internal-used standard system to share the knowledge about data standards.

Image description

Since the co-work mode during many data warehouse development process, the same business with the same meaning may be defined differently in different model tables by different developers. The data standard is expected to unify the model field definitions of the data warehouse developers by data elements.

Reference Data

The data in the data warehouse is pulled from the business system through data integration tools, and it will inevitably appear that the fields with the same meaning have different definitions in different business systems, and these fields with the same meaning and different definitions need to be maintained by the data warehouse staff, which are generally carried on by offline documents, and the maintenance may occur to be out of date.

At the same time, the problem that business knowledge cannot be directly mapped to the data warehouse model information may occur, too, and the data standard allows users to maintain this business knowledge on the Web page.

Image description

The figure above shows a case. Here are two defined business systems A and B, which have different gender enumeration values, and the enumeration descriptions of both A/B systems are different, so how should we do?

For this case, we can set a set of unified standards through the data warehouse developers, such as unify the code as 0, 1, 2 with the corresponding description definition, so the user can easily go to view the information by a reference data mapping.

Follow-up ideas

In the data integration, automatic transformation operations can be performed directly through the data standard to achieve automatic maintenance and mapping of knowledge and models.

Scaleph features highlights

Visualization of data development. We believe that in the field of data synchronization, visual drag-and-drop can help users quickly create data integration tasks by dragging and dropping two operators and filling in the corresponding parameters.

Image description

Flinkful is a Java client we developed for Flink.

Image description

As a popular computing engine, Flink provides many ways for users to use it, such as command-line interface, HTTP interface, etc. Through the command-line interface, users can submit tasks, create tasks and cancel tasks; the HTTP interface is mainly used for the Web UI interface.

In the process of Flink system intergration, we found that Flink and Scaleph both running on JVM as an application, but the integration of the two has to be done through shell script, which is very unreasonable. So we developed Flinkful to open up the open capability of Flink in the Java ecosystem and allow users to manage Flink clusters and tasks directly through Flinkful.

We think Flinkful is more meaningful for Flink infrastructure maintainers, so it was stripped out from the Scaleph repository and open-sourced separately.

As for Plug-in system, we hope to define plug-ins to provide system extension interfaces through which users and Scaleph developers can quickly enhance the functionality and features of Scaleph. Currently, we have defined two plug-ins, namely the Data Source plug-in and SeaTunnel plug-in, and through the Data Source plug-in you can quickly extend the data sources such as JDBC, ES, Kafka, Clinkhouse, etc., and centralize these data sources to the Scaleph system for unified configuration and use.

Image description

Currently, SeaTunnel provides so many connectors and transform plug-ins that are rather time-consuming to develop on by one, so we try to figure a simple, declarative way to define the SeaTunnel related parameters, by which the users can quickly move the SeaTunnel related capabilities to the complete Scaleph project.

Problem analysis

Flink-jdbc-connector feature enhancement

Many of the cases in the official SeaTunnel documentation are implemented with FakeSource and ConsoleSink, while we are developing with jdbc-connector as the main component. In the integration process, we found that the JdbcSink of the flink-jdbc-connector plug-in only supports Stream mode, so we turned to Batch mode for it.

JdbcSource requires the user to provide SQL, and the program internally obtains the column and table information of SQL through regular expressions to generate the RowTypeInfo of JdbcSource. But when defining complex SQL, there are aliases, and subqueries that occur, resulting that regular expressions can’t cover all scenarios. We use Jdbc’s Connection to get the ResultSet of the SQL and get the column information of the SQL directly from the ResultSet to generate the RowTypeInfo of JdbcSource.

Slimming the seatunnel-core-flink.jar

SeaTunnel runs on top of Flink and Spark, both of which will be built into two separate jar packages. seatunnel-core-flink.jar is the corresponding implementation of Flink. In version 2.1.1, seatunnel will put the connector based on Flink implementation into this fat jar package.

When it is actually used, the data synchronization task may only use 1–2 of the connectors. There will be a certain amount of extra network overhead when the SeaTunnel task is submitted.

We want to get these: a relatively thin core jar package, and the related connector jar package. When committing, the core-jar package takes a big proportion, and the related connector jar package is affiliated. Also, the resource jar package uploads method was introduced earlier, such as the missing JDBC driver package for SeaTunnel’s jdbc-connector, carrying the resource jar package and the connector jar package can be submitted in the same way.

We also actively shared our experience under the relevant issue on the work of connector splitting, and when SeaTunnel 2.1.2 was released, our system was easily adapted to the separate release form of SeaTunnel-core-flink.jar and connector jar. At the same time, if users do not prepare the JDBC driver in the Flink cluster in advance, they can also upload the driver package by the function of resource management, and submit the SeaTunnel job with the driver package.

Flink jobId acquisition problem

The core way of Flink task submission is in the form of a command-line interface, so users need to submit Flink tasks through shell scripts. After the Flink task is submitted, the command-line client will output the corresponding task id to the console log, and the users need to capture the log output to the console to extract the task id from it.

Because all interactions between our project and Flink are implemented through the Flinkful library, which can send back jobId directly as the return value of an method call. So our implementation is more elegant than capturing the console log to extract the jobId.

SeaTunnel calls System.exit() issue

The SeaTunnel task will first check the user-written configuration file before executing, and if the check fails, it will directly call System.exit(), and then the JVM will exit. SeaTunnel submission is implemented by a shell script, so there is no problem when JVM exits.

But when we integrate SeaTunnel into our application, this calling method will cause Scaleph to stop working directly, resulting in the unavailability of our service. Therefore, we also added a security restriction to the task submission code through the SecurityManager, which forbids calling the System.exit() method for the SeaTunnel-related task submission process.

05 Contribution to the SeaTunnel Community

I commit some PR with a friend of mine, who is also one of the Scaleph developers, such as the jdbc-connector enhancements mentioned above, as well as the implementation of the upsert function of jdbc-connector. flink-jdbc-connector’s JdbcSink has an obvious flaw that is it only supports the insert function, while not supporting update, which will limit the connector’s functionality quite a bit. So we developed support for upsert semantics to fulfill repeated synchronization of data.

Follow-up development plan

We will transfer all the SeaTunnel-related connectors and transform plugins to our visual drag-and-drop page as soon as possible so that users can feel the full power of SeaTunnel. Another direction is to enrich the types of data sources corresponding to the connector as the plug-ins related to SeaTunnel-connector enriching.

We also hope to add DAG-related orchestration capability for data development and data integration and support SQL task development in data development.

About SeaTunnel

SeaTunnel (formerly Waterdrop) is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day in a stable and efficient manner.

Why do we need SeaTunnel?

SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.

  • Data loss and duplication
  • Task buildup and latency
  • Low throughput
  • Long application-to-production cycle time
  • Lack of application status monitoring

SeaTunnel Usage Scenarios

  • Massive data synchronization
  • Massive data integration
  • ETL of large volumes of data
  • Massive data aggregation
  • Multi-source data processing

Features of SeaTunnel

  • Rich components
  • High scalability
  • Easy to use
  • Mature and stable

How to get started with SeaTunnel quickly?

Want to experience SeaTunnel quickly? SeaTunnel 2.1.0 takes 10 seconds to get you up and running.

https://seatunnel.apache.org/docs/2.1.0/developement/setup

How can I contribute?

We invite all partners who are interested in making local open-source global to join the SeaTunnel contributors family and foster open-source together!

Submit an issue:

https://github.com/apache/incubator-seatunnel/issues

Contribute code to:

https://github.com/apache/incubator-seatunnel/pulls

Subscribe to the community development mailing list :

dev-subscribe@seatunnel.apache.org

Development Mailing List :

dev@seatunnel.apache.org

Join Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-10u1eujlc-g4E~ppbinD0oKpGeoo_dAw

Follow Twitter:

https://twitter.com/ASFSeaTunnel

Come and join us!

Top comments (0)