With release v4.0.1, a new fluent API was introduced to ABRiS to reduce configuration errors and provide more type safety. While this change is a huge improvement going forward, it causes a breaking change for users migrating from version 3. This article walks you through an upgrade of some common use-cases of ABRiS.
More information can be found on the Github Page. More usage examples can be found on the documentation pages. Documentation for version 3 can be found under branch 3.2.
Reading records
A common use-case is to read data from a topic with both key and value schema. In ABRiS 3, this could be done like this:
val keyConfig = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.name",
SchemaManager.PARAM_KEY_SCHEMA_ID -> "latest"
)
val valueConfig = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest",
SchemaManager.PARAM_VALUE_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "record.name",
SchemaManager.PARAM_VALUE_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY -> "record.namespace"
)
import za.co.absa.abris.avro.functions.from_confluent_avro
val result: DataFrame = dataFrame.select(
from_confluent_avro(col("key"), keyConfig) as 'key,
from_confluent_avro(col("value"), valueConfig) as 'value)
With ABRiS 4, it looks like this:
val keyConfig: FromAvroConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("topicName", isKey=true)
.usingSchemaRegistry("http://localhost:8081")
val valueConfig: FromAvroConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicRecordNameStrategy("topicName", "record.name", "record.namespace")
.usingSchemaRegistry("http://localhost:8081")
import za.co.absa.abris.avro.functions.from_avro
val result: DataFrame = dataFrame.select(
from_avro(col("key"), keyConfig) as 'key,
from_avro(col("value"), valueConfig) as 'value)
First and foremost, a new object was introduced, AbrisConfig
. This is the entry point for the new fluent API.
Second, the method from_confluent_avro
was removed and should be replaced with from_avro
. To use the confluent format, specify .fromConfluentAvro
on AbrisConfig
. If you've been using simple vanilla avro, choose .fromSimpleAvro
instead.
Third, notice the second parameter of .andTopicNameStrategy
. The default value of isKey
is false
, which is ok for value schemas. However, in the case of key schemas, isKey
must be set to true
.
Writing records
Using an existing schema
In ABRiS 3, writing records providing an existing schema id could be done like this:
def writeAvro(dataFrame: DataFrame): DataFrame = {
val config = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
SchemaManager.PARAM_VALUE_SCHEMA_ID -> "42"
)
import za.co.absa.abris.avro.functions.to_confluent_avro
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
dataFrame.select(to_confluent_avro(allColumns, config) as 'value)
}
In ABRiS 4, it's like this:
def writeAvro(dataFrame: DataFrame): DataFrame = {
val config: ToAvroConfig = AbrisConfig
.toConfluentAvro
.downloadSchemaById(42)
.usingSchemaRegistry("http://localhost:8081")
import za.co.absa.abris.avro.functions.to_avro
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
dataFrame.select(to_avro(allColumns, config) as 'value)
}
Here again, the method to_confluent_avro
was removed and you have to use .toConfluentAvro
from AbrisConfig
.
Generating the schema from the records
In ABRiS 3, it was incredibly easy (too easy!) to simply have ABRiS generate the schema for you from the records if you didn't provide the schema, like this:
def writeAvro(dataFrame: DataFrame): DataFrame = {
val config = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.record.name"
)
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
dataFrame.select(to_confluent_avro(allColumns, config) as 'value)
}
Generating and registering the schema had to be done during the evaluation of the Spark expression, which was inefficient. Therefore this functionality was removed in v4 and now the schema needs to be registered before the evaluation phase and passed to the ABRiS config.
import org.apache.spark.sql.avro.SchemaConverters.toAvroType
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.avro.registry.SchemaSubject
def writeAvro(dataFrame: DataFrame): DataFrame = {
// generate schema
val allColumns = struct(dataFrame.columns.map(c => dataFrame(c)): _*)
val expression = allColumns.expr
val schema = toAvroType(expression.dataType, expression.nullable)
// register schema
val schemaRegistryClientConfig =
Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "http://localhost:8081")
val schemaManager = SchemaManagerFactory.create(schemaRegistryClientConfig)
val subject = SchemaSubject.usingTopicNameStrategy("topic", isKey=false)
val schemaId = schemaManager.register(subject, schema)
// create config
val config = AbrisConfig
.toConfluentAvro
.downloadSchemaById(schemaId)
.usingSchemaRegistry("http://localhost:8081")
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
dataFrame.select(to_avro(allColumns, config) as 'value)
}
Notice that we used the topic name strategy in this example. SchemaSubject
offers methods for the record name strategy (.usingRecordNameStrategy
) and topic record name strategy as well (.usingTopicRecordNameStrategy
)
Top comments (1)
Thanks! Greate guide !
I struggled a bit with the absence of the auto-generation support.