DEV Community

Cover image for Feature Engineering Has a Language Problem
Tun
Tun

Posted on • Edited on • Originally published at quix.io

Feature Engineering Has a Language Problem

Feature engineering is a crucial part of any machine learning (ML) workflow because it enables more complex models to be created than with raw data alone, but it's also one of the most difficult to manage. It's afflicted by a language barrier—a difference in the languages used to encode processing logic. To put it simply, data scientists define their feature computations in one language (e.g. Python or SQL) and data engineers often need to rewrite this logic in another language (e.g. Scala or Java).

My colleague Mike touched on the reasons for this in a previous article "Bridging the gap between data scientists and engineers in machine learning workflows", but I want to zoom in on what exactly this process entails and explore some ideas on how to remove some of the friction.

When do teams encounter language friction?

This problem starts to crop up as companies mature in their level of data sophistication. In-house ML isn't even worth considering until a company has a reliable data pipeline in place to supply models with training data.

However, as data availability and data quality gradually improves, data teams start to create more sophisticated batch processing pipelines that incorporate machine learning. Machine learning models are trained offline and the outputs can begin as artifacts such as CSV files that are assessed by humans before progressing to other types such as class labels in the case of classification models.

Feature transformations as well as training and inference pipelines written by data scientists usually aren't optimised for speed, so ML engineers often rewrite them to run faster. Rewriting the logic for feature engineering is the first place to look for performance gains.

Once an offline ML pipeline has reached a stable state, many companies will look to leverage that data to enhance their product more directly. This often leads to ML models being integrated into application architectures so that, for example, web applications can adapt to customer requirements in real time.

Thus, machine learning as a discipline needs to morph from being an experimental, sporadic, offline process into a repeatable software delivery process. Model files need to be deployed online and produce results in a timely manner. Likewise, the feature computation code from data scientists needs to be adapted for a production environment so the computations can run online. This enables the models to make predictions based on fresh features.

It's at this stage when the impact of language friction starts to become a wider problem:

Image description

Rather than explain the theory and best practices behind feature engineering, I'd like to illustrate the language barrier with an example scenario.

An example scenario: feature engineering for AI-powered market predictions

One of the most studied yet mysterious applications of machine learning is using it to predict the movement of certain financial markets. Since the predictions can influence the movement of the price, most organisations keep their prediction models under wraps. However, some trading apps are experimenting with some form of AI-powered prediction. This is especially prevalent in cryptocurrency trading where all trading data is publicly visible on the blockchain.

For example, the SwissBorg trading app features an ML-powered "CyBorg Predictor" that forecasts price movements for certain assets.

Image description
Source: swissborg.com

This is a canonical example where real-time ML predictions can bring tangible business value (assuming the predictions are accurate!) so it lends itself nicely to an analysis of online feature engineering.

So, let's say that you work for an up-and-coming crypto trading app that wants to introduce similar functionality.

The key features that you need to train a machine learning model are the OHLC data points: the open, high, low and closing prices for a given time window. This data is typically visualised in the form of a candlestick chart which traders use for technical analysis.

Image description
Source: Sabrina Jiang © Investopedia 2020.

There are obviously services that provide precomputed OHLC data, but for argument's sake let's say you want to train a model on features that you've computed yourself. I want to walk through the process of taking this feature from an offline exploratory scenario to a real-time production scenario.

Consequently, this scenario has two sections: prototype and production. Note that this is an oversimplification: in reality, there are more phases involved here (I highly recommend Chip Huyen's piece Real-time machine learning: challenges and solutions for more details). However, for the purposes of explaining the "language barrier", I want to keep things simple.

Prototyping Offline with Python

In the first iteration of your ML model, you might focus on one or two currencies such as ETH or Bitcoin. When prototyping the model, you might train the model offline on historical trading data and backtest it for prediction accuracy.

Let's say your data scientist has a JSON file with some sample historical ticker data (it is ideally in the same JSON structure as data that will come from the live trading feed).

Assume they're using Python for prototyping, they might first calculate ETH's 1-hour OHLC data like this:

import pandas as pd
import json

# Load raw ticker data from the JSON file
with open('ticker_data.json', 'r') as file:
    ticker_data = json.load(file)

# Convert ticker data to a pandas DataFrame
ticker_df = pd.DataFrame(ticker_data)

# Only keep rows with "product\_id" equals "ETH-USD"
eth_usd_ticker_df = ticker_df[ticker_df["product_id"] == "ETH-USD"]

# Convert the time column to pandas datetime
eth_usd_ticker_df['time'] = pd.to_datetime(eth_usd_ticker_df['time'])

# Set the time column as the DataFrame index
eth_usd_ticker_df = eth_usd_ticker_df.set_index('time')

# Calculate the OHLC data based on a 1-minute interval
ohlc_df = eth_usd_ticker_df['price'].astype(float).resample('1H', origin='start').agg(
    {
        "open": "first",
        "high": "max",
        "low": "min",
        "close": "last",
    }
)

# Calculate the volume data based on a 1-minute interval
volume_df = eth_usd_ticker_df['last_size'].astype(float).resample('1H', origin='start').sum()

# Combine OHLC and volume data
ohlc_volume_df = pd.concat([ohlc_df, volume_df], axis=1)

print(ohlc_volume_df)
Enter fullscreen mode Exit fullscreen mode

This script will partition the trading data into fixed 1-hour intervals resembling the following result.

Date time open high low close last_size
09/06/2023 12:26:51.360251 1846.55 1846.56 1846.01 1846.55 13.27384
09/06/2023 13:26:51.360251 1846.53 1846.53 1846.22 1846.22 2.141272
09/06/2023 14:26:51.360251 1864.99 1864.99 1864.68 1864.68 2.16268

This data is OK for prototyping the ML model or providing batched long-term predictions, but not great for fine-grained real-time predictions. Prices can fluctuate wildly even within a 1-hour period so you'll want to catch those as they happen. This means putting the ML model online and combining the historical data with a stream of real-time trading data.

Calculating Features Online with Java

Now suppose that you have adapted the model to use features that are a combination of:

  • 1-hour intervals for the last 30 days.
  • 1-minute intervals for the current day.
  • A sliding window of the last 60 seconds updating every second.

You want to put this model online so that it provides a stream of predictions that update in real time. These predictions might be used to populate a real-time dashboard or power automated trading bots.

This requires the OHLC calculations to be refactored considerably. This refactoring is influenced by a number of factors that contribute to the so-called language barrier that slows down ML workflows.

These factors are as follows:

Latency and throughput

The query now needs to run on a continuous unbounded stream of data rather than a table. It also needs to maintain a specific rate of throughput to stop the predictions from getting stale. This requires a purpose-built stream-processing engine that can maintain throughput on high volumes of trading data.

Apache Flink is one of the most popular choices for such use cases and although it supports SQL, many developers choose to write processing logic using Flink's lower-level APIs. Calculations run faster when accessing these APIs directly (rather than using an abstraction layer such as PyFlink or SQL).

@Override
public Tuple5<Double, Double, Double, Double, Integer> merge(Tuple5<Double, Double, Double, Double, Integer> a, Tuple5<Double, Double, Double, Double, Integer> b) {
    return new Tuple5<>(
        a.f0,                                   // Open (min)
        Math.max(a.f1, b.f1),                   // High
        Math.min(a.f2, b.f2),                   // Low
        b.f3,                                   // Close (latest value)
        a.f4 + b.f4                             // Volume
    );
}
Enter fullscreen mode Exit fullscreen mode

An excerpt of the math operations after refactoring in Flink.

Different dependencies

If you're going to translate from SQL or Python into Java for Flink, then you'll also need to import different dependencies which need to be accessible in the execution environment. If you've created a custom function in the form of a UDF, you need to ensure that it is also packaged with the job and deployed to the Flink cluster.

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
Enter fullscreen mode Exit fullscreen mode

An excerpt of all the extra dependencies required after refactoring code into Java.

Real-time data sources and sinks

To calculate OHLC data on a sliding window, the query now needs to use a different data source. Instead of connecting to a database and querying a table, the process needs to operate on some kind of message queue, which is typically a Kafka topic.

Thus a lot of "connector code" needs to be added so that the process:

  • Connects to a Kafka message broker.
  • Reads raw data from one topic and writes results to a second topic.
  • Efficiently serialises and deserialises the data.

There is also more connector code required to write the feature values themselves to an online feature store such as Redis.

// Create Kafka consumer properties
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "myserver:9092");
consumerProps.setProperty("group.id", "flink-ohlc-group");

// Create Kafka producer properties
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "myserver:9092");
Enter fullscreen mode Exit fullscreen mode

A small excerpt of the extensive Kafka configuration required for Flink.

Windowed aggregations and state management

In the prototyping phase, you might already start testing sliding window calculations, but you'd probably use an in-memory dictionary to store the state. This works fine on one computer. But moving to production, however, you would need to use a processing engine that maintains a shared state in a fault-tolerant manner. This is again why many companies choose Apache Flink which is famous for its reliable stateful processing in a distributed computing environment.

If a replica of a process somehow terminates when it's in the middle of calculating OHLC data for a sliding window, another replica can come and pick up where the previous process left off because the calculation steps are continuously written to a shared storage location.

// Calculate the OHLC data for each ticker over a 30-second sliding window
DataStream<Tuple5<String, Double, Double, Double, Double>> ohlcStream = tickStream
    .keyBy(tick -> tick.ticker)  // Group by ticker
    .timeWindow(Time.seconds(30), Time.seconds(1))  // Sliding window of 30 seconds with 1 second slide
    .aggregate(new OhlcAggregator());
Enter fullscreen mode Exit fullscreen mode

An excerpt of a sliding window calculation using Flink's DataStream API in Java.

As you can see, that's a lot of refactoring. And I haven't even touched on other process changes such as adding the feature to a feature catalog, interacting with an online feature store, testing, deploying and monitoring the online feature calculation.

But rewriting the code from top to bottom alone can slow down a feature's journey from prototype to production.

Solutions to the language barrier

If this problem is so ubiquitous, how do the big players solve it? It turns out that Netflix, Uber, DoorDash have all built their own sophisticated feature platforms that handle feature management as well as stream and batch processing. They still have the feature translation issue, but they're able to automate the translation process for common calculations.

Unified Feature Platforms

The following table comes from another of Chip Huyen's brilliant pieces, this time "Self-serve feature platforms: architectures and APIs". It illustrates just how many proprietary custom-built feature platform features are out there in the wild already. Note that features are typically still defined in multiple languages.

Comparison of feature platforms

Feature store Feature API (transformation logic > feature logic) Stream compute engine
LinkedIn Venice, Fedex Python > Python Samza, Flink
Airbnb HBase-based Python > Python Spark Streaming
Instacart Scylla, Redis ? > YAML Flink
DoorDash Redis, CockroachDB SQL > YAML Flink
Snap KeyDB (multithreaded fork of Redis) SQL > YAML Spark Streaming
Stripe In-house, Redis Scala > ? Spark Streaming
Meta (FB) Scala-like > ? XStream, Velox
Spotify Bigtable Flink SQL > ? Flink
Uber Cassandra, DynamoDB DSL > ? Flink
Lyft Redis, DynamoDB SQL > YAML Flink
Pinterest In-house, memcached R Flink
Criteo Couchbase SQL > JSON Flink
Binance Flink SQL > Python Flink
Twitter Manhattan, CockroachDB Scala Heron
Gojek DynamoDB SQL > JSON Flink
Etsy Bigtable Scala > ? Dataflow

Source: “Self-serve feature platforms: architectures and APIs" by Chip Huyen.

Yet not every company has the time or resources to build their own in-house feature platform. Now that more companies are moving into the later stages of the ML maturity model, there is increasing demand for simpler end-to-end solutions that help ease the language barrier while eliminating infrastructural complexity.

There are now general feature platforms such as Tecton (proprietary) and Feathr (open source) which aim to keep the batch and streaming code tightly synchronised while handling the actual processing itself. This in itself is enough to cut down the time to production. When LinkedIn announced that they were open sourcing Feathr in April 2022, they revealed that it had "reduced engineering time required for adding and experimenting with new features from weeks to days".

Tecton goes further and removes the headache of having to provision extra infrastructure (assuming that you have Databricks, Amazon EMR, or Snowflake set up as an offline feature store). They provide an end-to-end platform for managing, storing and computing online and offline features.

The following screenshot from Tecton should give you a rough idea of how these feature platforms work.

Image description
Source: tecton.ai

You essentially store variants of the same feature transformation in one "entry" along with some configuration variables that affect the score of the transformation. Connections to external sources such as Kafka are defined elsewhere in Tecton's configuration, so there is a clean separation of concerns between the transformation code and the streaming transport code.

Caveats

Such systems are still intended for companies who are fairly advanced in their ML maturity. They're in some ways, designed to prevent large enterprises from repeatedly building their own custom feature platforms (although many still do). For this reason, these platforms are still fairly complex, probably because they need to address the highly specific requirements of many enterprises with mature MLOps teams. If you're starting off with a limited feature set, there is a risk that the additional complexity could offset the time-savings that you gain by having a more structured feature management pipeline.

The other issue is that they still use Spark or Flink under the hood to do stream processing, which means that code is still being translated or 'transpiled' at some level. Tecton, for example, uses Spark Structured Streaming for stream processing. Spark's native API is written in Scala, so as with Flink, the Python API is just a wrapper around the native API so using it can introduce extra latency. Additionally, Spark Structured Streaming uses a micro-batch processing model, which generally has higher latency compared to event-driven streaming systems like Apache Flink or Kafka Streams. It also lacks built-in complex event processing (CEP) features that other frameworks like Apache Flink offer.

However, not every application requires CEP or very low-latency processing (sub-second or milliseconds), so in most cases the stream processors built into these feature platforms will do the job.

But what if you want a simpler solution that gives you more direct control over the stream processing logic and while not requiring data scientists to grapple with Java or Scala? That's where the other type of solution comes into play—pure Python stream processing frameworks.

Pure Python stream processing frameworks

A pure Python stream processing framework can enable data scientists to prototype with streaming data very early on in the process. They do so by making it very easy to connect to Kafka and run the typical operations that you would perform on an unbounded stream (i.e. sliding window aggregations). A data scientist might still build their logic on a batch dataset first, but it becomes very simple to adapt that same logic for streaming data. This reduces the language barrier, because the same prototype code can be used in production with very minimal refactoring.

In an ideal scenario, the data scientists can also use Python to define the processing workflows. Many features need to be calculated in multiple steps, so it helps to give data scientists more autonomy in defining workflows as well as the transformation logic itself.

For example, Faust and Bytewax are both pure Python stream processing frameworks that can be used in complex processing pipelines.

Faust

Faust was open sourced by Robinhood in 2018 and has since been taken over by the open source community.

When it was first released, Faust looked very promising. For example, Robinhood's engineering team published a compelling blog post on how they used Faust in combination with Apache Airflow to build a better news system. They used Faust commands via Airflow to continuously pull data from various sources (such as RSS feeds and aggregators) while using Kafka to store the results of every processing step. Faust also supports scalable stateful processing with so-called "stateful tables" and can be configured for exactly once processing via the "processing_guarantee" setting.

However, it appears that Robinhood has abandoned Faust. It's not clear why exactly, but there was plenty of speculation on Reddit. There is now a fork of Robinhood's original Faust repo which is more actively maintained by the open source community. However, it still has a lot of open bugs which are show-stoppers for some teams (see this review of stream processing frameworks for more details on those bugs).

Bytewax

Bytewax is a lot newer, launched in early 2021 and open-sourced in February 2022, but is quickly gaining traction due to it being open source and very user-friendly for data scientists. Unlike Faust, Bytewax aims to be a complete stream processing platform and includes functionality to enable data scientists to build their own dataflows—in other words, processing pipelines that include multiple steps that can be represented as nodes in a graph.

In fact, the example OHLC scenario I provided earlier was inspired by a tutorial that uses a simple Bytewax dataflow to read data from a Coinbase WebSocket and write the OHLC feature values to a feature store (Hopsworks).

Image description
Source: "Real-World ML #019: Deploy a real-time feature pipeline to AWS" by Pau Labarta Bajo.

Caveats

Given that the official repo seems to be abandoned, the caveats with Faust should hopefully be clear. Although the Faust fork is more active, it's still uncertain when some of the more serious bugs are going to be fixed. It's worth noting that we also encountered these bugs when trying to do some benchmarking against Faust (for our own Python library).

Bytewax is still fairly new so it will take a while for more reports about how it fares in production to trickle through the ecosystem. When it comes to deploying it, however, you'll still have to deal with some infrastructural complexity—at least for now (they have a managed platform in the works). Looking at their deployment documentation, it's clear that they expect readers to have some knowledge of the infrastructure that will host the stream processing logic. You can choose to run dataflows in local Docker containers, in Kubernetes, AWS EC2 instances, or GCP VM instances. All of these require setup and configuration work that would probably be uninteresting to a data scientist and is probably better handled by a friendly (ML) engineer. Much of this complexity will hopefully go away once their platform becomes generally available.

Conclusion

By now it should be clear the data and ML industry is well aware of the language barrier affecting feature engineering in real-time ML workflows. It has always existed, but was historically solved with in-house solutions hidden from the public. Real-time inference on real-time features was practised by a chosen few with highly specific requirements—so it made sense for them to build their own solutions. Now, with all the increased attention on AI, we're seeing a democratisation of many aspects of MLOps workflows and there are now more standardised approaches to tackling the language barrier such as all-in-one feature platforms and pure Python stream processing frameworks.

Although I've focused on Faust and Bytewax, it would be remiss of me not to mention our own platform Quix which runs Quix Streams— our open source stream processing library. The processing model is not unlike that of Bytewax, but instead of defining data pipelines in Python, you use the Quix Portal UI to piece together your transformation steps (for a peek at how it works in production, see this telemetry case study). The Quix platform is also a fully hosted and managed solution that uses Kafka and Kubernetes under the hood—which makes it pretty much infinitely scalable. We aim to solve the language barrier in the same way as Faust and Bytewax but we want to remove the infrastructure headache too. However, infrastructure is a whole other subject which I hope to tackle in a follow-up post. For now, I hope that my simple example scenario has helped you understand the language barrier in more detail and inspired you to plan for it when you're ready to dive into real-time feature processing.

Top comments (0)