Introduction
Recently, I shared a custom function (FROM_JSON) for Apache Flink to ease the handling of complex JSON structures in situations where a deeper integration into Flink's type system is not only beneficial but required. While it's a helpful addition for data engineers, who are primarily building streaming workloads directly with Flink SQL or on top of Flink's Table API, it didn't take long before I received a few questions around the applicability of this very UDF as part of end-to-end data flows that are built with Flink CDC pipeline connectors.
In this article, I share selected learnings and condensed findings I discovered while exploring the most relevant parts of Flink CDC’s codebase to figure out what it takes to implement a FROM_JSON UDF equivalent for Flink CDC pipelines.
My journey involved navigating some interesting aspects and challenges related to differences in the involved type systems, the lack of specific capabilities for user-defined functions in Flink CDC, and some subtleties during the handling of complex, nested data structures, as well as specific binary runtime representations for objects of certain data types.
Motivation
The primary reason why I decided to spend some non-negligible amount of time with Flink CDC lately was triggered by the fact that JSON data is currently not treated as a first-class citizen in Flink CDC pipelines. The following concrete example illustrates what I mean by that. Let's assume we have a Postgres table that contains a few columns, three of which store JSON/JSONB:
postgres=# \d+ public.gov_fake_citizens
Table "public.gov_fake_citizens"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
-----------------+---------+-----------+----------+---------+----------+-------------+--------------+-------------
id | uuid | | not null | | plain | | |
personal | json | | not null | | extended | | |
isactive | boolean | | not null | false | plain | | |
registered | text | | not null | | extended | | |
contact | json | | not null | | extended | | |
knownresidences | json | | not null | | extended | | |
Indexes:
"gov_fake_citizens_pkey" PRIMARY KEY, btree (id)
Access method: heap
Here is how one row in this table might look:
id | personal | isactive | registered | contact | knownresidences
--------------------------------------+------------------------------------------------------------------------------------------------------------------+----------+-------------------------------+-----------------------------------------------------------------+------------------------------------------------------------------------------------------------
11e66db7-6834-1efe-498c-b7c9d661fb5b | {"firstname":"Emile","lastname":"Sanford","age":28,"eyecolor":"gray","gender":"female","height":163,"weight":51} | f | 2025-10-17T07:30:49.067+00:00 | {"email":"morgan.shanahan@gmail.com","phone":"+1 305-699-4846"} | ["4361 Prohaska Common, Norikofort, NH 56814","7616 Krajcik Crest, Port Emilioside, ID 36016"]
(1 row)
To run a Flink CDC pipeline that ships data from Postgres to Elasticsearch, the configuration is straightforward. The results, however, are most likely not what the typical friendly data engineer from next door would expect, let alone like to achieve. If we query the Elasticsearch index, the Postgres table row shown above comes back in this way:
{
"id": "11e66db7-6834-1efe-498c-b7c9d661fb5b",
"personal": "{\"firstname\":\"Emile\",\"lastname\":\"Sanford\",\"age\":28,\"eyecolor\":\"gray\",\"gender\":\"female\",\"height\":163,\"weight\":51}",
"contact": "{\"email\":\"morgan.shanahan@gmail.com\",\"phone\":\"+1 305-699-4846\"}",
"knownresidences": "[\"4361 Prohaska Common, Norikofort, NH 56814\",\"7616 Krajcik Crest, Port Emilioside, ID 36016\"]",
"isactive": false,
"registered": "2025-10-17T07:30:49.067+00:00"
}
The reason behind this is that any form of JSON data gets stringified by pipeline source connectors like the Postgres or MySQL ones. From the moment this happens, pipelines just propagate flat strings all the way to the sink. If this is the only result you'd ever need from your Flink CDC pipelines, consider yourself lucky, stop reading, and keep leading an undistracted and happy life.
Chances are, you'd want this to look different and if so, read on to learn more about what needs to happen to end up with documents in your search index that look more like this:
{
"id": "11e66db7-6834-1efe-498c-b7c9d661fb5b",
"personal": {
"firstname": "Emile",
"gender": "female",
"weight": 51,
"eyecolor": "gray",
"age": 28,
"lastname": "Sanford",
"height": 163
},
"contact": {
"phone": "+1 305-699-4846",
"email": "morgan.shanahan@gmail.com"
},
"knownresidences": [
"4361 Prohaska Common, Norikofort, NH 56814",
"7616 Krajcik Crest, Port Emilioside, ID 36016"
],
"isactive": false,
"registered": "2025-10-17T07:30:49.067+00:00"
}
Same, Same But Different
A fundamental aspect to understand when moving from Flink SQL/Table API to Flink CDC context is that while their underlying type systems appear similar on the surface, they are still different from one another. As a consequence, several nuances start to matter, especially when dealing with complex structures and nested data types at runtime, but more on that later.
Schema Definitions
The good news first. Pure schema definitions expressed as string literals are basically interchangeable because both type systems heavily draw from SQL types. We can express the usual primitive suspects and complex types similarly. For instance, the following type definitions are valid in both type systems:
id INT
price DECIMAL(10,2)
hobbies ARRAY<VARCHAR(64)>
scores MAP<STRING,INT>
user ROW<name STRING, age INT, is_active BOOLEAN>
In other words, the high similarity between the two type systems is primarily conceptual and structural. On an implementation level, however, both have their separate Java type equivalents.
Flink Table API defines its types based on classes from:
org.apache.flink.table.types.*org.apache.flink.table.data.*
Flink CDC defines its types based on classes from:
org.apache.flink.cdc.common.types.*org.apache.flink.cdc.common.data.*
Schema Parsing
If we were to parse any such schema string literals shown in the above examples into proper data types, the parsing logic would be largely the same. Yet the different Java type systems behind both worlds suggest separate implementations to either express an org.apache.flink.table.types.DataType (Flink Table API) or an org.apache.flink.cdc.common.types.DataType (Flink CDC). Of course, another option would be to parse schema definitions into one of the two type systems only, and complement this with a uni- or bidirectional type converter depending on our needs.
Task 1
Working towards my Flink CDC variant of the FROM_JSON UDF, the initial task was to parse the schema string literal, this time, however, into a DataType for Flink CDC rather than the Flink Table API. By nature, the result is almost identical to the minimum viable schema parser I already had, except it had to respect the different Java types backing Flink CDC’s type system. The first line in the following test method, taken from one of the unit tests, hints at how this piece of code is going to be used:
@Test
@DisplayName("Parse ROW schema with nested ARRAY")
void testRowTypeSchemaWithNestedArray() {
DataType dataType = SchemaParserCdc.parseType("ROW<id INT, name STRING, hobbies ARRAY<STRING>>");
// Verify it's a RowType
assertTrue(dataType instanceof RowType);
RowType rowType = (RowType) dataType;
// Verify field count
assertEquals(3, rowType.getFieldCount());
// Verify field names
assertEquals("id", rowType.getFieldNames().get(0));
assertEquals("name", rowType.getFieldNames().get(1));
assertEquals("hobbies", rowType.getFieldNames().get(2));
// Verify field types
assertTrue(rowType.getFields().get(0).getType() instanceof IntType);
assertTrue(rowType.getFields().get(1).getType() instanceof VarCharType);
assertTrue(rowType.getFields().get(2).getType() instanceof ArrayType);
// Verify array element type
ArrayType hobbiesArray = (ArrayType) rowType.getFields().get(2).getType();
assertTrue(hobbiesArray.getElementType() instanceof VarCharType);
}
With the schema parsing out of the way, it’s time to shift our focus to the implementation of the user-defined function itself, which will make use of this schema parser to infer the Flink CDC data type according to any custom target schema.
UDFs that aren’t the UDFs You Know
Having written several basic and advanced UDFs for Flink Table API/SQL as well as other data processing systems such as ksqlDB—yeah I know—and Apache Spark in the past, I initially expected Flink CDC UDFs to offer similar capabilities and the same flexibility. Turns out I was in for a little surprise.
Functions in Flink Table API/SQL
- extensive library of built-in system functions
- user-defined functions across multiple categories (scalar, table, aggregate, table aggregate, and process table function)
- async processing for scalar and table functions
- context access in the
open(...)lifecycle method - automatic, annotation-based, and custom programmatic type inference, including call context accessibility
- comprehensive docs
In short, very mature, powerful, and feature-rich, allowing for various capabilities ranging from the most basic to very advanced ones.
Functions in Flink CDC
- relatively small library of built-in system functions
- focused on a single function category, namely scalar functions
context access in the
open(...)lifecycle method but with caveats—more on that follows :) - interoperability with Table API/SQL scalar functions, with notable limitations such as no lifecycle methods or no type hint/type inference compatibility
- automatic or programmatic type inference, however, without call context accessibility or other similar advanced aspects
- full UDF docs essentially fit on a single screen, which, by the way, is a great opportunity to contribute to this rather promising project!
The bottom line here is that, as it stands today, the UDF support in Flink CDC is, well, lean and seems to primarily focus on simpler use cases than what I had in mind here with this FROM_JSON UDF. Any custom scalar functions we’d write could be either applied in projection and/or filter definitions as part of the pipeline’s respective transform block:
transform:
- source-table: mydb.mytable
projection: "*, MY_UDF_A(some_col_1)"
filter: MY_UDF_B(some_col_2) = 123
pipeline:
user-defined-function:
- name: MY_UDF_A
classpath: com.github.hpgrahsl.flink.cdc.udfs.MyDemoUdfA
- name: MY_UDF_B
classpath: com.github.hpgrahsl.flink.cdc.udfs.MyDemoUdfB
The limitations mentioned above create an interesting challenge, though. How or where would we specify the schema definition for a FROM_JSON function call, so that the Flink CDC runtime can properly infer the actual return type when applying this function?
For Flink Table API/SQL functions, this is straightforward, as we can state the schema string literal in a second call parameter and derive the necessary type information directly from that.
SELECT FROM_JSON(my_json_col, 'ARRAY<STRING>') AS parsed_json
Unfortunately, it seems there is no such equivalent that would allow type inference based on the UDFs call context in Flink CDC land, or at least I’m not aware of it. My next idea was to specify the required target schema using some function configuration parameters, which can be accessed in the UDF’s open(...) lifecycle method—far too easy, right? Trying to do so, I figured out that while some classes related to Flink CDC UDFs are prepared for configuration parameters, others are not. Therefore, I had to dig deeper to identify the gaps across about a handful of classes in the current implementation. As a result, I opened FLINK-38792 based on these findings.
Task 2
Since I needed to make progress, I decided to move forward by patching my own fork of Flink CDC 3.6-SNAPSHOT to get proper UDF configuration parameter support. With this in place, I can now do the following:
pipeline:
user-defined-function:
- name: MY_DEMO_UDF
classpath: com.github.hpgrahsl.flink.cdc.udfs.MyDemoUdf
some.config.param: foo
another.setting: 123
and can define an arbitrary configuration for my custom functions as part of the pipeline definition. Call me biased, but I definitely feel this is an improvement and probably also the originally targeted behaviour. Thus, I hope that the upstream project will eventually decide to pick up a PR I’m planning to contribute.
Coming back to our original context, I can define the FROM_JSON UDF like so:
pipeline:
user-defined-function:
- name: FROM_JSON
classpath: com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf
from.json.schema: ARRAY<STRING>
In this particular example configuration, the UDF is instructed to parse JSON arrays containing string elements, which themselves originate from a database column captured by a Flink CDC pipeline connector, for instance, the one for Postgres. The configuration parameter from.json.schema is read in the function’s open(...) lifecycle method and stored in an attribute.
@Override
public void open(UserDefinedFunctionContext context) throws Exception {
if (context != null && context.configuration() != null) {
this.configuredSchema = context.configuration().get(SCHEMA_CONFIG);
}
if (configuredSchema == null || configuredSchema.trim().isEmpty()) {
throw new IllegalArgumentException(
"Schema configuration is required. Set '"
+ SCHEMA_CONFIG.key()
+ "' in the UDF configuration of your pipeline YAML.");
}
//...
}
This way, the schema information becomes accessible in the getReturnType() method, which uses the schema parser I previously wrote to create the corresponding DataType. The Flink CDC runtime can thus infer the proper DataType to expect whenever it needs to evaluate the UDF in question.
@Override
public DataType getReturnType() {
if (configuredSchema != null && !configuredSchema.trim().isEmpty()) {
return SchemaParserCdc.parseType(configuredSchema);
}
throw new IllegalArgumentException(
"Schema configuration is required. Set '"
+ SCHEMA_CONFIG.key()
+ "' in the UDF configuration (e.g., in pipeline YAML under 'config').");
}
Everything up to this point can be considered as foundational preparations before we can eventually come to the meat of it— converting the actual JSON data.
From JSON to Typed Values for Flink CDC
Walking the JSON structure and converting the input data into properly typed values matching the Flink CDC type system along the way doesn’t sound too complicated, or does it? Well, based on my initial experiments, it turned out not to be a walk in the park and here is why.
RecordData
RecordData is a vital interface as it describes how table rows in Flink CDC are represented by a complex type, the RowType. It also defines how each SQL data type is meant to be mapped to internal data structures. These internal data structures are themselves based on either standard Java types (for primitives) or Flink CDC-specific Java types defined by additional custom interfaces or classes such as StringData, DecimalData and others. The table below is taken directly from the Java docs and contains the full picture:
* +--------------------------------+-----------------------------------------+
* | SQL Data Types | Internal Data Structures |
* +--------------------------------+-----------------------------------------+
* | BOOLEAN | boolean |
* +--------------------------------+-----------------------------------------+
* | CHAR / VARCHAR / STRING | {@link StringData} |
* +--------------------------------+-----------------------------------------+
* | BINARY / VARBINARY / BYTES | byte[] |
* +--------------------------------+-----------------------------------------+
* | DECIMAL | {@link DecimalData} |
* +--------------------------------+-----------------------------------------+
* | TINYINT | byte |
* +--------------------------------+-----------------------------------------+
* | SMALLINT | short |
* +--------------------------------+-----------------------------------------+
* | INT | int |
* +--------------------------------+-----------------------------------------+
* | BIGINT | long |
* +--------------------------------+-----------------------------------------+
* | FLOAT | float |
* +--------------------------------+-----------------------------------------+
* | DOUBLE | double |
* +--------------------------------+-----------------------------------------+
* | DATE | int (number of days since epoch) |
* +--------------------------------+-----------------------------------------+
* | TIME | int (number of milliseconds of the day) |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP | {@link TimestampData} |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP WITH LOCAL TIME ZONE | {@link LocalZonedTimestampData} |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP WITH TIME ZONE | {@link ZonedTimestampData} |
* +--------------------------------+-----------------------------------------+
* | ROW | {@link RecordData} |
* +--------------------------------+-----------------------------------------+
* | ARRAY | {@link ArrayData} |
* +--------------------------------+-----------------------------------------+
* | MAP | {@link MapData} |
* +--------------------------------+-----------------------------------------+
The main takeaway for successfully converting JSON with a UDF is as follows: for all data types that directly map to a standard Java type, we only need to read the JSON and can extract the corresponding JsonNode values as is without any additional considerations or explicit conversions on top.
Constructed Data Type Caveats
Based on my early-stage experiments while implementing the FROM_JSON UDF, it’s helpful to take a closer look at the three constructed data types.
ArrayData
The ArrayData type is relevant whenever we hit any JSON array node, the corresponding schema literal for which is ARRAY<element_type>. GenericArrayData is an implementation that allows for the convenient construction of objects based on regular Java arrays.
MapData
Any JSON object for which all field values are of the same type can be converted to the MapData type. The schema literal to express this is MAP<key_type,value_type>. GenericMapData implements this interface and can be used for object construction based on regular Java maps. This implementation internally uses two GenericArrayData objects to represent the map’s keys and values.
For both ArrayData and MapData, it’s important to know that all array elements or map entries must themselves be valid internal data structures. What this means is that any array element or map entry that must be represented by means of a special Java type (see table above) must be explicitly converted before constructing GenericArrayData or GenericMapData objects. This is because their constructors use the data as is without performing type checks let alone do (deep) auto-conversions. Hence, it’s our own responsibility to ensure the constructor only ever receives objects of internal data structure types and potentially convert them upfront.
Task 3
Consequently, the FROM_JSON UDF must respect this at all times, as otherwise, we’d face runtime errors and Flink CDC will almost certainly complain with a ClassCastException whenever this constraint gets violated. To give one concrete example here, if we were to mistakenly build a GenericArrayData object based on a regular Java String[] array rather than a StringData[] array, the following would happen when the Flink CDC runtime tries to access and process its elements:
…
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.flink.cdc.common.data.StringData (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.flink.cdc.common.data.StringData is in unnamed module of loader 'app')
at org.apache.flink.cdc.common.data.GenericArrayData.getString(GenericArrayData.java:221)
at org.apache.flink.cdc.common.data.ArrayData.lambda$createElementGetter$3e58363e$1(ArrayData.java:228)
…
That said, the code for parsing a JSON array node and creating the GenericArrayData object should look something more like this:
private ArrayData parseArray(JsonNode jsonArray, String schemaString) {
DataType arrayDataType = getCachedSchema(schemaString);
if (!(arrayDataType instanceof ArrayType)) {
throw new IllegalArgumentException(
"specified schemaString must be of type ARRAY - got: " + schemaString);
}
ArrayType arrayType = (ArrayType) arrayDataType;
DataType elementType = arrayType.getElementType();
if (jsonArray.size() == 0) {
return new GenericArrayData(new Object[0]);
}
Object[] elements = new Object[jsonArray.size()];
for (int i = 0; i < jsonArray.size(); i++) {
elements[i] = convertJsonNodeToInternalType(jsonArray.get(i), elementType);
}
return new GenericArrayData(elements);
}
Notice the method call to convertJsonNodeToInternalType(...) inside the loop, which guarantees each JSON array element is converted into the proper Flink CDC internal data structure before the resulting GenericArrayData is created and returned. The conversion of map entries to GenericMapData is done in a similar fashion. Besides, this approach ensures that nested arrays (e.g. ARRAY<ARRAY<...>>), nested maps (e.g. MAP<STRING,MAP<STRING,INT>>), or a mix thereof can be implicitly handled as well by means of recursion.
With map and array support in place, let’s focus on the next aspect, namely, how to handle arbitrary JSON objects. Out of habit, I wrongly assumed there must be a similar type duo in place - something like RowData and GenericRowData - however, I was obviously mixing things up with Flink’s Table API, again. Over here in Flink CDC land, we are instead revisiting RecordData.
RecordData
Whenever the FROM_JSON UDF is about to parse arbitrary JSON objects, be it on top-level or somewhere nested, it’s supposed to build a RecordData object matching the specified target schema that is defined with a corresponding ROW type, for instance, ROW<id INT, name STRING, age INT, is_active BOOLEAN>. The relevant implementation of the RecordData interface is BinaryRecordData, which, for reasons of efficiency and performance, is backed by MemorySegment under the hood.
Task 4
Luckily, and in order for us not having to touch any of the lower-level building blocks in that regard, we can rely on the BinaryRecordDataGenerator, a convenience class in Flink CDC’s runtime type utils package. Similar to ArrayData and MapData, it’s important not to forget that each field in a RecordData object must be an internal data structure. With this in mind, the implementation to generate any such BinaryRecordData objects might look as follows:
private RecordData parseObject(JsonNode jsonObject, String schemaString) {
DataType rowDataType = getCachedSchema(schemaString);
RowType rowType = (RowType) rowDataType;
List<String> nestedFieldNames = rowType.getFieldNames();
int arity = nestedFieldNames.size();
Object[] nestedFields = new Object[arity];
for (int i = 0; i < arity; i++) {
String fieldName = nestedFieldNames.get(i);
JsonNode fieldValue = jsonObject.get(fieldName);
DataType fieldType = rowType.getFields().get(i).getType();
nestedFields[i] = convertJsonNodeToInternalType(fieldValue, fieldType);
}
return new BinaryRecordDataGenerator(rowType).generate(nestedFields);
}
This essentially concludes the most relevant implementation aspect of an equivalent FROM_JSON UDF that can work directly within Flink CDC. What’s left are a few more minor tweaks.
Flink CDC’s built-in DataTypeConverter
A critically important class that Flink CDC ships to perform type conversions internally is the DataTypeConverter, which you can find the source code for right here.
Across several methods, we can find multiple larger switch blocks to handle the different data types accordingly. In a few of these places it becomes apparent when reading the code that constructed types, especially the ROW type, aren’t receiving the necessary attention they’d need to enable use cases like the FROM_JSON UDF. Here are two of the more pressing ones:
-
ROWisn’t supported here:
public static DataType convertCalciteRelDataTypeToDataType(RelDataType relDataType) {
switch (relDataType.getSqlTypeName()) {
//…
case ROW:
default:
throw new UnsupportedOperationException(
"Unsupported type: " + relDataType.getSqlTypeName());
}
}
-
ROWtype is passed through as is here, which may or may not work depending on how the user code created the Object value in the first place:
public static Object convert(Object value, DataType dataType) {
if (value == null) {
return null;
}
switch (dataType.getTypeRoot()) {
// …
case ROW:
return value;
// …
}
}
Besides, it would be beneficial if the methods convertToArray(...) and convertToMap(...) detected any non-internal data structures used for array elements or map entries and automatically converted them whenever reasonably possible. This could ultimately free user code from explicitly fulfilling any such framework internal type constraints.
Task 5
Since I already patched my Flink CDC fork during the work on Task 2 (see further above), I did the same here and fixed these little limitations in the DataTypeConverter. And with that, I reached a point where FROM_JSON could be integration tested for the first time as part of a projection specification in the Flink CDC pipeline definition while processing actual JSON data in-flight.
Applying FROM_JSON in Flink CDC Pipelines
We’ve come a long way, so let’s see an exemplary Flink CDC pipeline definition where we are now able to successfully apply FROM_JSON in a projection as part of the transform block. Here, the knownresidences field refers to a JSON column in the Postgres source database table, which contains JSON arrays with string elements.
source:
# ...
transform:
- source-table: public.fake_citizens
projection: id,registered,isactive,FROM_JSON(knownresidences) as knownresidences
sink:
# ...
pipeline:
name: my-pipeline
parallelism: 1
user-defined-function:
- name: FROM_JSON
classpath: com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf
from.json.schema: ARRAY<STRING>
Bonus Surprise
The curious and informed reader will almost certainly wonder how to apply this function multiple times within the same pipeline. And this is a great question indeed :)
Unfortunately, we must assume every use of FROM_JSON will most likely require its own schema string literal defined in the function configuration. For obvious reasons, this inherently rules out direct re-use of the same function definition. So, how about we introduce multiple "uniquely named references" to FROM_JSON such that each function definition gets its individual schema string literal configured? The example below tries to do that with two named references:
pipeline:
name: my-pipeline
parallelism: 1
user-defined-function:
- name: FROM_JSON_1
classpath: com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf
from.json.schema: ARRAY<STRING>
- name: FROM_JSON_2
classpath: com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf
from.json.schema: MAP<VARCHAR,INT>
Well, it turns out the wish was father to the thought as the Flink CDC runtime currently stumbles over this. More specifically, this clashes with the way the Janino compiler tries to process it under the hood and causes the following exception:
// …
Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 76: Redefinition of parameter "__instanceOfFromJsonCdcUdf"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13080)
at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3791)
// …
at org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:76)
... 37 more
Pragmatic Workaround
Rather than setting out to directly address the root cause, I decided for a pragmatic, yet more than good enough workaround for my needs here. By pre-defining a certain number of static inner classes (e.g. N=16) inside the original UDF class, the function can be applied several (up to N) times in the same Flink CDC pipeline context, thereby elegantly circumventing the redefinition problem we faced above. Finally, a valid pipeline definition when doing so could look like this:
source:
# ...
transform:
- source-table: public.fake_citizens
projection: id,registered,isactive,FROM_JSON_1(knownresidences) as knownresidences,FROM_JSON_2(scores) as scores
sink:
# ...
pipeline:
name: my-pipeline
parallelism: 1
user-defined-function:
- name: FROM_JSON_1
classpath: com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf$_C1
from.json.schema: ARRAY<STRING>
- name: FROM_JSON_2
classpath: com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf$_C2
from.json.schema: MAP<VARCHAR,INT>
Summary
Building an equivalent FROM_JSON UDF for Flink CDC pipelines revealed certain implementation challenges stemming from differences between Flink CDC and Flink Table API ecosystems. While both systems use similar SQL-based schema definitions, they rely on entirely separate Java type hierarchies, creating subtle but critical runtime considerations. Another aspect worth highlighting is that Flink CDC's UDF capabilities are currently less mature than the ones found in Flink's Table API/SQL, lacking features like call-context-based type inference. This necessitated an alternate approach using configuration parameters to specify target schemas—functionality that required patching Flink CDC itself (documented in FLINK-38792). Probably the most critical implementation detail involves respecting Flink CDC's internal data structure requirements: constructed types like GenericArrayData and GenericMapData don't perform automatic type conversions, so developers must explicitly convert elements to internal types (StringData, DecimalData, etc.) before target object construction. Failing to do so triggers ClassCastExceptions at runtime. Additional challenges included gaps in the DataTypeConverter class for ROW type handling and issues when trying to reuse UDF definitions in Flink CDC's pipeline YAML. The solution for the latter employs static inner classes to enable multiple FROM_JSON invocations with different schemas in a single pipeline, providing a practical path forward despite current framework limitations.
Takeaways
- Type system divergence: Flink Table API and Flink CDC have conceptually similar but implementation-wise distinct type systems with separate Java class hierarchies
- UDF limitations: Flink CDC's UDF support is lean compared to Table API, lacking advanced features like call context accessibility for type inference
-
Configuration gap: UDF configuration parameters aren't fully supported in Flink CDC
3.6-SNAPSHOT, requiring custom patches -
Internal data structures matter:
ArrayData,MapData, andRecordDataall require elements to already be converted to internal types (StringData,DecimalData, etc.)—no automatic conversion occurs leading to aClassCastExceptionpitfall as using standard Java types causes runtime failures when Flink CDC processes the data -
ROW type handling:
DataTypeConverterhas gaps inROWtype support that need addressing to get complex nested structures working properly - Multiple UDF instances of the same type: Static inner classes seem to provide a pragmatic workaround for applying the same UDF multiple times with different configurations in one pipeline
I learned many things while reading through Flink CDC's codebase, implementing this custom UDF, debugging existing issues, fixing my own bugs, adding unit/integration tests, and writing this article. Hope it's useful for others out there who are trying to extend the processing capabilities of Flink CDC by means of UDFs. At the very least, reading this article should help you save a few precious hours in case you'd hit similar issues.
Top comments (0)