DEV Community

Cover image for Flink Stateful Functions: where to start?
morsapaes
morsapaes

Posted on • Updated on

Flink Stateful Functions: where to start?

Last week, the Apache Flink community released Stateful Functions 2.0: a new way of developing distributed event-driven applications with consistent state. This release added some heat to the stateful serverless movement (I know: "not another buzzword") and, as with any big release, there's always a lot to take in and resources scattered all over the place.

This article sums up the very gist of Stateful Functions and includes a practical example that can softly introduce you to the inner works of the API.

The Gist of It

Stateful Functions allows you to combine a powerful approach to state and composition with the elasticity, rapid scaling and rolling upgrade capabilities of modern FaaS platforms like AWS Lambda and resource orchestration frameworks like Kubernetes. In particular, it overcomes two major limitations looming over many FaaS setups today: consistent state and efficient messaging between functions.

The building blocks of Stateful Functions are functions with persisted state that can interact dynamically with strong consistency guarantees (no need for a database!).

drawing

The runtime is built on Apache Flink with the following principles in mind:

Logical Compute/State Co-location

Messaging, state access/updates and function invocations are managed tightly together. This ensures a high-level of consistency out-of-the-box. In the background, Apache Flink takes care of all the heavy-lifting involved in managing state, routing messages and scheduling invocations for you.

Physical Compute/State Separation

Functions can be executed remotely, with message and state access provided as part of the invocation request. This way, functions can be managed like stateless processes and support rapid scaling, rolling upgrades and other common operational patterns.

Multi-language Support

Function invocations use a simple HTTP/gRPC-based protocol so that you can easily implement functions in multiple languages. For now, Stateful Functions ships with a slim Python SDK for remote execution, but support for other languages like Go, Javascript or Rust is planned (a good excuse to get involved in the project!).

drawing

All these characteristics make it possible to execute functions on a Kubernetes deployment, a FaaS platform or behind a (micro)service, while providing consistent state and lightweight messaging between functions. That's some weight off your shoulders, eh?

For a deeper dive into the motivation behind Stateful Functions, have a look at the announcement blogpost by @StephanEwen. If you're just looking for a quick overview, watch this whiteboard session.

Yeah, but...

Didn't Flink already allow you to build event-driven applications? 😅

To some extent, yes. There's a fine line between stream processing and event-driven applications — and a great deal of overlap. Applications that fell into this overlap could be built with Flink before. What's remarkably different with Stateful Functions is that you're not constrained to a DAG (Direct Acyclic Graph) topology and its rigid communication pattern, as in classical stream processing with Flink. Some applications need a more flexible way to express the interaction between its operators, as well as to scale state (i.e. your application data) and user code independently: this is what Stateful Functions allows you to do. With the added benefit of tightly integrating messaging and state to give you the effect of exactly-once state updates and guaranteed reliable messaging. The operational characteristics provided by the physical compute/state separation are also often critical for many general-purpose application scenarios, but less so for many streaming processing use cases.

How much do I need to know about Flink to use this? 😶

Certain characteristics of Stateful Functions are only possible because of the way Flink was designed in the first place. And because it's a Flink API built in the Flink community, it'll be more natural for you to make certain associations if you know Flink. But this is not a requisite to get started and experiment with Stateful Functions: you won't need to write "Flink code" at any point and you can see Flink as "just the function interpreter".

An Example: Transaction Scoring for Fraud Detection

drawing

Imagine an application that receives financial information and emits alerts for every transaction that exceeds a given threshold fraud score. To build this example with Stateful Functions, you can define four different functions, each tracking its own state:

  • FraudCount: tracks the total number of reported fraudulent transactions made against an account on a rolling 30 day period.

  • MerchantScorer: returns a trustworthiness score for each merchant, relying on a third party service.

  • TransactionManager: enriches transaction records to create feature vectors for scoring and emits fraud alert events.

  • Model: scores transactions based on input feature vectors from the Transaction Manager.

The full code for this example (developed by "The Professor" @sjwiesman) is available here.

Keeping track of fraudulent reports

The entry points to the application are two ingresses (e.g. Kafka Topics): FraudConfirmation and Transactions . As FraudConfirmation events flow in, the FraudCount function increments its internal counter and sets a 30-day expiration timer on this persisted state. After 30 days, the FraudCount function will receive a delayed message (from itself) and clear its state.

You can implement this behaviour by tailoring the function reaction to different types of messages:

public class FraudCount implements StatefulFunction {

    @Persisted
    private final PersistedValue<Integer> count = PersistedValue.of("count", Integer.class);

    /* Each invocation is provided context about the current function call,
      like the caller address and the function instance's own address. */
    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof ConfirmFraud) {
            int current = count.getOrDefault(0);
            count.set(current + 1);

            /* Send a TTL message with a 30-day delay. This message is durable 
            and will not be lost in case of failure. */
            context.sendAfter(Duration.ofDays(30), context.self(), ExpireFraud.getDefaultInstance());
        }

        if (input instanceof ExpireFraud) {
            int current = count.getOrDefault(0);
            if (current == 0 || current == 1) {
                count.clear();
            }

            count.set(current - 1);
        }

        if (input instanceof QueryFraud) {
            int current = count.getOrDefault(0);
            ReportedFraud response = ReportedFraud.newBuilder().setCount(current).build();
            context.reply(response);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

🍹 FraudCount.java

Here, multiple instances of FraudCount will exist — for example, one per customer account — and each function instance will be associated with a unique address (function type + ID) that other functions can use to message it (no need for service discovery!).

Enriching and scoring transactions

On receiving events from the Transactions ingress, the TransactionManager function messages FraudCount to get the current count of fraud cases reported for the customer account; it also messages the MerchantScorer for the trustworthiness score of the transaction merchant.

public class TransactionManager implements StatefulFunction {

    @Persisted
    private final PersistedValue<Transaction> transactionState = PersistedValue.of("transaction", Transaction.class);

    @Persisted
    private final PersistedValue<Integer> recentFraud = PersistedValue.of("recent-fraud", Integer.class);

    @Persisted
    private final PersistedValue<MerchantScore> merchantScore = PersistedValue.of("merchant-score", MerchantScore.class);

    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof Transaction) {
            Transaction transaction = (Transaction) input;
            transactionState.set(transaction);

            String account = transaction.getAccount();

            /* To invoke a function, simply send it a message. */
            context.send(FRAUD_FN, account, QueryFraud.getDefaultInstance());

            String merchant = transaction.getMerchant();

            context.send(MERCHANT_FN, merchant, QueryMerchantScore.getDefaultInstance());
        }

    /* ... */

    }
}
Enter fullscreen mode Exit fullscreen mode

🍹 TransactionManager.java

TransactionManager can then create a feature vector with the count of fraud cases reported and the merchant score for the customer account that is sent to the Model function for scoring.

Emitting alerts

Depending on the score sent back to TransactionManager, it may emit an alert event to the AlertUser egress if a given threshold is exceeded.

    /* ... */

    private static final int THRESHOLD = 80;

    public void invoke(Context context, Object input) {

    /* ... */

       if (input instanceof Any) {
            final FraudScore fraudScore;
            try {
                fraudScore = (((Any) input).unpack(FraudScore.class));
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Unexpected type", e);
            }
            if (fraudScore.getScore() > THRESHOLD) {
                context.send(ALERT, transactionState.get());
            }

            /* All function invocations are scoped to a key (ID), and so
               persisted values (e.g. state) are also always scoped to that key. */
            transactionState.clear();
            recentFraud.clear();
            merchantScore.clear();
          }
       }
}
Enter fullscreen mode Exit fullscreen mode

It's important to mention that, in this example, functions are not being remotely executed, but embedded in the Flink runtime. This is the most performant option for Stateful Functions deployments, though at the cost of all the operational benefits that physical compute/state separation brings and being limited to Java (slash JVM languages).

...and that's it (for now)!

I hope this article helped you understand the bigger picture of Stateful Functions and how to translate its functionality into a real-world example. To get a better hang of it, you can start exploring the code walkthroughs available in the documentation (in Java or Python) and check the Stateful Functions webpage. If you hit a snag, reach out to the Flink user mailing list or post your questions on Stack Overflow using the apache-flink/flink-statefun tags.

Stateful Functions is a work in progress that is picking up speed with a very small team behind it. If you're interested in contributing, you can learn more about ways to do it in the Apache Flink website.

Top comments (4)

Collapse
 
mazen_ezzeddine profile image
Mazen Ezzeddine

So Stateful functions is the actor model with few nice add-ons from stream processing like state partitioning, excatly once, fault tolerance...

Collapse
 
morsapaes profile image
morsapaes

To some extent, yeah. That comparison comes up quite often and (virtual) actors were definitely an inspiration: twitter.com/StephanEwen/status/125...

Collapse
 
schahaha profile image
schahaha

Thanks, but how to deploy the application and how to use the state function using API?

Collapse
 
morsapaes profile image
morsapaes • Edited

You can deploy StateFun applications standalone or as Flink jobs that can be submitted to a cluster. This (more recent) blogpost describes how to deploy a StateFun application on AWS, for example.

You can check the documentation and this walkthrough for more details.