DEV Community

ChunTing Wu
ChunTing Wu

Posted on

Playing PyFlink in a Nutshell

Last week, we introduced how to build a PyFlink experiment environment, and today we will use that experimental environment to explore the possibilities of PyFlink.

PyFlink is a general purpose streaming framework and abstracts streaming processing into four levels.

  1. SQL
  2. Table API
  3. DataStream
  4. Stateful Stream Processing

The closer to the bottom the more flexibility is available, but also requiring writing more code. I would like to be able to do almost everything with PyFlink, so let's get started with the basic concepts of PyFlink development from a DataStream perspective.

This article will introduce a few key points of PyFlink development with simple descriptions and examples, but will not mention the implementation details of Flink.

DataStream Concept

The development of DataStream will follow the following process.

Image description

Basically, we get streaming data from a source, process it, and output it to somewhere.

This is expressed in PyFlink as follows.



ds = env.add_source(kafka_consumer)
ds = ds.map(transform, output_type=output_type_info)
ds.add_sink(kafka_producer)


Enter fullscreen mode Exit fullscreen mode

Source and sink are easy to understand, but the key is what processing can be used?

In the official document there is a list of all available operations.
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/

For the above example, we used map.

There is also an example of the operation in the Flink artifact, the code is placed at ./examples/python/datastream/basic_operations.py。

What is the difference between map and flat_map?

There are two similar operations in the operation list, map and flat_map, what is the difference between these two operations?

The difference is in the number of generated outputs.

In the case of map, an input event generates one and only one output event; on the other hand, flat_map can generate zero to many output events.

Let's use the actual code as an example.



def map_transform(i: int):
  return i * i

def flat_map_transform(i: int):
  for idx in range(i):
    yield idx

ds.map(map_transform, output_type=Types.INT())
ds.flat_map(flat_map_transform, output_type=Types.INT())


Enter fullscreen mode Exit fullscreen mode

In this example, map squares all the input integers and passes them out, one input corresponds to one output. However, flat_map outputs a series of events, and the number of output events is determined by the input events.

If the input is 0, then yield of flat_map will not be triggered, and nothing will be generated.

State

State is the best feature of Flink.

Although we have various operations available, many of them actually produce results based on previous events. How do we keep the previous events? This is where State comes in.

State can be considered as an internal storage in order to persist data, and the size of State is the summary of every node’s memory.

Nevertheless, State can be persisted in a durable storage like RocksDB to gain more scalability.



from pyflink.datastream import StreamExecutionEnvironment, EmbeddedRocksDBStateBackend

env = StreamExecutionEnvironment.get_execution_environment()
env.set_state_backend(EmbeddedRocksDBStateBackend())


Enter fullscreen mode Exit fullscreen mode

To use State in Flink framework, there are two key points worth noting.

  1. State can only be used in "Keyed Data Stream".
  2. State is based on operations and not able to share with others.

All available States and the reference are listing below.
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/state/

In fact, an example at ./examples/python/datastream/state_access.py also provides a good demonstration.

Connect (Shared State)

As mentioned in the previous section, states are operation-based and cannot be shared, but sometimes we indeed need to combine two different streaming states, so what should we do?

Fortunately, Flink provides connect to enable us to share the states of different streams within the same job.

By using connect, we can combine different streams and use the same operation, so that we can share the same operation state.

To be more concrete, let me provide a practical example. There are two streams.

  • Stream 1 provides a mapping between the item identifiers and the item names. When the item name changes, an event (item_id, item_name) is sent into the stream, so we just need to save the latest status.
  • Stream 2 is the transaction history, including which item was sold and the number of items ordered.

What we want to do is, when any purchase is entered, then we have to sum it up and append the latest item name to it.

This is the classic streaming enrichment pattern, and I explained the enrichment design pattern in detail in my previous article.

Here is the full program example.



import logging, sys
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, MapFunction, CoMapFunction, CoFlatMapFunction
from pyflink.datastream.state import MapStateDescriptor, ValueStateDescriptor
from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer

class SellTotal(CoFlatMapFunction):

  def open(self, runtime_context: RuntimeContext):
      state_desc = MapStateDescriptor('map', Types.LONG(), Types.STRING())
      self.state = runtime_context.get_map_state(state_desc)

      cnt_desc = ValueStateDescriptor('cnt', Types.LONG())
      self.cnt_state = runtime_context.get_state(cnt_desc)

  # (id, name)
  def flat_map1(self, value):
      self.state.put(value[0], value[1])
      #return Row(value[0], f"update {value[1]}", 0)

  # (id, cnt)
  def flat_map2(self, value):
      cnt = self.cnt_state.value() or 0
      total = cnt + value[1]
      self.cnt_state.update(total)

      if not self.state.contains(value[0]):
          name = "NONAME"
      else:
          name = self.state.get(value[0])

      #return Row(value[0], name, total)
      yield Row(value[0], name, total)


def sell_total_demo(env):
  type_info1 = Types.ROW([Types.LONG(), Types.STRING()])
  ds1 = env.from_collection(
      [(1, 'apple'), (2, 'banana'), (3, 'cherry'), (4, 'durian'), (6, 'fig'), (7, 'grape')],
      type_info=type_info1)

  type_info2 = Types.ROW([Types.LONG(), Types.LONG()])
  ds2 = env.from_collection(
      [(1, 1), (2, 3), (3, 5), (1, 5), (5, 100), (6, 66), (1, 10)],
      type_info=type_info2)

  output_type_info = Types.ROW([Types.LONG(), Types.STRING(), Types.LONG()])
  serialization_schema = JsonRowSerializationSchema.Builder() \
      .with_type_info(output_type_info) \
      .build()
  kafka_producer = FlinkKafkaProducer(
      topic='TempResults',
      serialization_schema=serialization_schema,
      producer_config={'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}
  )

  connected_ds = ds1.connect(ds2)
  connected_ds.key_by(lambda a: a[0], lambda a: a[0]).flat_map(SellTotal(), output_type_info).add_sink(kafka_producer)

  env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:////home/ec2-user/flink-1.15.2/opt/flink-sql-connector-kafka-1.15.2.jar")

    print("connected demo gogo")
    sell_total_demo(env)


Enter fullscreen mode Exit fullscreen mode

In flat_map1 the stream 1 is handled, that is to say, the mapping of item number and item name is maintained, so this stream does not need to generate output events.

The core of the whole application is in flat_map2. We take the accumulated quantity from self.cnt_state and not only add the new quantity but also update it back to the state. Then, in the output process, we take the corresponding name from self.state and finally we output the enriched events.

Conclusion

In the last example, we demonstrate the operation, state, and merging of streams.

From this example, we can easily understand that Flink can do anything we want as long as we write the program correctly.

We will continue to do some experiments on stream processing and will continue to publish any further updates if there are any.

Top comments (0)