DEV Community

ChunTing Wu
ChunTing Wu

Posted on

Generating Avro Schemas from Go types

Previously, I have introduced the evolutionary history of data infrastructure, and I have also introduced real-time data analysis in detail.

Both of these articles mention a key player, Debezium. In fact, Debezium has had a place in the modern infrastructure. Let's use a diagram to understand why.

Image description

This architecture diagram should be consistent with the data ingest path of most data infrastructures. In order to store and analyze data in a unified way, centralizing the data in a data warehouse is a general solution.

Debezium captures data changes from various source databases and then writes them to the data warehouse via Kafka and Kafka's consumers. In other words, Debezium is also a Kafka producer.

This architecture looks fine, once there are changes in the data will be captured by Debezium and eventually written to the data warehouse, but there is actually a difficult problem behind needs to be solved, that is, the database schema. In order to provide data analytics, the data warehouse is "schemaful", but how to make the schema of the data warehouse align with the schema of each source database?

The good news is Debezium supports schema registry, it can capture DDL changes from source databases and synchronize them to the schema registry so that Kafka consumers can get schema from the schema registry and synchronize the schema in the data warehouse.

Let's make the architecture mentioned above even more complete.

Image description

The most common format for describing schema in this scenario is Apache Avro.

Unfortunately, a schemaless database like MongoDB doesn't have DDL at all, so of course Debezium can't capture DDL, then how does Debezium upload the schema in this scenario? The answer is to guess, by getting the data to determine the data type, and write the type into the schema registry.

As you can imagine, this is very risky, and the data warehouse schema often faces compatibility challenges due to such guessing. Therefore, there needs to be a way to proactively upload the schema so that the Kafka consumer can correctly recognize the data type and correct the schema of the data warehouse.

Knowing it intellectually is one thing, but knowing it emotionally is quite another.

The story is this: our microservice is developed in Golang and uses mgm as the ORM for MongoDB.

In Golang, we use struct as a Data Transfer Object (DTO), and in the mgm scenario, these DTOs can be marshaled as JSON. So, what we need to do is to generate the corresponding Avro schema based on the definition of struct.

https://github.com/wirelessr/avroschema

The use of this go module is quite simple.

If we have a struct as follows.



type Entity struct {
    AStrField    string  `json:"a_str_field"`
    AIntField    int     `json:"a_int_field"`
    ABoolField   bool    `json:"a_bool_field"`
    AFloatField  float32 `json:"a_float_field"`
    ADoubleField float64 `json:"a_double_field"`
}


Enter fullscreen mode Exit fullscreen mode

Then just import the module to generate the corresponding Avro.



import "github.com/wirelessr/avroschema"

avroschema.Reflect(&Entity{})


Enter fullscreen mode Exit fullscreen mode

The result will be an Avro JSON.



{
    "name": "Entity",
    "type": "record",
    "fields": [
      {"name": "a_str_field", "type": "string"},
      {"name": "a_int_field", "type": "int"},
      {"name": "a_bool_field", "type": "boolean"},
      {"name": "a_float_field", "type": "float"},
      {"name": "a_double_field", "type": "double"}
    ]
}


Enter fullscreen mode Exit fullscreen mode

In addition, the module also supports various complex types and nested types as follows.

  • array and slice
  • map
  • struct in struct
  • time.Time
  • array of struct
  • etc.

Of course, the omitempty tag used by JSON in Golang is also supported.

Fields marked with omitempty become Avro's union type, which is an optional field. Let's take a more complicated example.



type Entity struct {
    UnionArrayField []int `json:"union_array_field,omitempty"`
}


Enter fullscreen mode Exit fullscreen mode

If it is an optional int array, then it will become a union type with null when converted to Avro.



{
    "name": "Entity",
    "type": "record",
    "fields": [
      {"name": "union_array_field", "type": ["null", {
        "type": "array", "items": "int"
      }]}
    ]
}


Enter fullscreen mode Exit fullscreen mode

More detailed tests can be found in the unit test file: reflect_test.go.

However, this is not enough for us, because we need to support various special types in mgm, such as primitive.DateTime or primitive.ObjectID, and most especially mgm.DefaultModel.

Therefore, it is necessary for this module to provide customization capabilities to extend support for more special types, and so a plugin is provided for this module to be used. To support mgm, you can use the already written submodule.

The following is an example of a struct that uses the mgm type.



type Book struct {
    mgm.DefaultModel `bson:",inline"`
    Name             string             `json:"name" bson:"name"`
    Pages            int                `json:"pages" bson:"pages"`
    ObjId            primitive.ObjectID `json:"obj_id" bson:"obj_id"`
    ArrivedAt        primitive.DateTime `json:"arrived_at" bson:"arrived_at"`
    RefData          bson.M             `json:"ref_data" bson:"ref_data"`
    Author           []string           `json:"author" bson:"author"`
}


Enter fullscreen mode Exit fullscreen mode

To support these special types, you can use the built-in custom MgmExtension. This works just like the original Reflect, except it requires the use of a customized instance.



import (
    "github.com/wirelessr/avroschema"
    "github.com/wirelessr/avroschema/mongo"
)

reflector := new(avroschema.Reflector)
reflector.Mapper = MgmExtension

reflector.Reflect(&Book{})


Enter fullscreen mode Exit fullscreen mode

There are a couple of important points to note here.

  1. DefaultModel is a type that directly generates three fields, _id, created_at and updated_at.
  2. If primitive.ObjectID then it is treated as a string, since the UUID mentioned in Avro specification is also treated as a string.
  3. If primitive.DateTime, then it is treated as time.Time.
  4. A special case is bson.M, which is treated as a JSON string because there is no way to be sure what is in it.

According to the above rules, the final Avro schema is as follows.



{
"name": "Book",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "created_at", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "updated_at", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "name", "type": "string" },
{ "name": "pages", "type": "int" },
{ "name": "obj_id", "type": "string" },
{ "name": "arrived_at", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "ref_data", "type": "string" },
{ "name": "author", "type": "array", "items": "string" }
]
}
Enter fullscreen mode Exit fullscreen mode




Conclusion

When implementing the requirement to convert Golang struct to Avro schema I first tried to look for existing packages, but unfortunately I couldn't find any. However, there is a similar project that aims to convert Golang struct to JSON schema, so I referred to his practice and made a version of Avro schema.

In fact, there are still some specifications in the Avro schema that I don't support, such as the enum type, and there are also many optional parameters that I don't support because they don't correspond to struct, such as namespace, aliases, etc.

But for now, this implementation satisfies my need for data ingest, and allows Kafka consumers to have the correct schema to work with. Maybe I'll continue to improve this project sometime in the future so that it can really support all kinds of Avro specifications. But I have to say, in terms of functionality, it's pretty useful right now.

The project has full unit testing and linter, and I'm sure the code structure isn't too complicated, so feel free to contribute.

Top comments (0)