DEV Community

Alec Dutcher
Alec Dutcher

Posted on • Updated on

DP-203 Study Guide - Develop a stream processing solution

Study guide

Identify Azure services for stream processing

  • What is streaming data?
    • Unbounded data (at no point do we have the whole dataset)
    • Records can be added at any time
    • Queries often over a subset of records called a window
    • Used when real-time results are required
  • Common use cases
    • Processing IoT data
    • Fraud detection
    • Monitoring social media sentiment
  • Comparing streaming and traditional databases
    • In traditional queries, the user submits the query to the database engine which runs against the entire dataset and returns a result
      • Data is stored, query is not
    • In a streaming query, the user submits the query to the streaming engine which applies the query logic to every data point in the stream after that moment and updates the intermediate result
      • Query is stored, data is not
  • Azure Event Hub
    • Stream ingestion service
    • Stores and buffers data (producers and consumers can operate at their own speeds)
    • Data in storage is persistent and partitioned
    • Allows one or more other services to read from the data stream
    • Competing consumers (duplicate instances of an application) can access and share the data
  • Azure Stream Analytics
    • Stream processing service that moves and transforms data between different data inputs and outputs
    • Uses SQL like language for querying (SELECT INTO output FROM input)
  • Databricks
    • Query data streams using Spark Structured streaming

Create a stream processing solution by using Stream Analytics and Azure Event Hubs

  • Create Azure Event Hub
    • In resource group, create new Event Hub resource
      • Choose namespace name (globally unique)
      • Choose pricing tier
      • Choose partitions (must choose upfront)
      • Create and deploy
    • Go to Event Hubs Namespace and create Event Hub
      • Entities --> Event Hubs --> +
      • Choose name and partition count
      • Review + create
    • Navigate to Event Hub and add Shared Access Policy
      • Settings --> Shared Access Policies --> +
      • Choose policy name and permission (manage, send, listen)
      • Open policy and copy Primary Key
      • Provide key to data source to allow it to send data to the Event Hub
  • Create Stream Analytics Job
    • Create a new Stream Analytics resource
      • Choose name, region, hosting environment, etc
    • Open resource and add input
      • Job topology --> Inputs --> +
      • Choose Event Hub and provide alias
      • Choose Event Hub connection details (can use "Select Event Hub from your subscriptions" to autofill)
    • Choose output(s)
      • Job topology --> Outputs --> +
      • Choose output service and alias
      • Choose output connection details
  • Define query
    • Job topology --> Query
    • Write query with SELECT fields INTO output_alias FROM input_alias
  • Once Stream Analytics job is running, check output to confirm data

Process data by using Spark structured streaming

  • Structured Streaming is a stream processing engine built on top of Apache Spark
  • Using Structured Streaming in Databricks
    • Install Azure Event Hub library
      • Cluster info --> Libraries --> Install new --> Specify name and version
    • Create/import a notebook
    • Read data stream from event hub
      • Create connectionString using Primary Key from Event Hub and EntityPath for the dataset
      • Create a JSON object to store startingEventPosition
      • Create a JSON object to store eventHubsConf (includes connectionString (be sure to encrypt), startingPosition, and setMaxEventsPerTrigger)
      • Configure Spark parallelism
      • Connect to event stream using spark.readStream.format("eventhubs").options(**eventHubsConf).load()
    • Parse and view the data stream
      • eventStreamDF.printSchema() shows properties for Event Hub entries
      • Body property contains the data
        • bodyDF = eventStreamDF.select(col("body").cast("STRING"))
      • Use pyspark.sql.types to define the schema with StructType
        • StructField("field_name", StringType(), False)
      • Parse the body based on the schema
        • parsedDF = bodyDF.select(from_json(col("body"), schema).alias("json"))
      • Flatten the parsed json
        • flatDF = parsedDF.select(col("json.field_name").alias("field_alias"))
    • Write data stream to a delta table
      • Write the stream
        • DF.writeStream.format("delta").option("checkpointlocation", "delta-checkpoints/location_name").start("/delta-tables/location_name")
      • Create table with %sql
        • CREATE TABLE table USING DELTA LOCATION '/delta-tables/table
    • Query the delta table using SQL
      • SELECT * FROM table

Create windowed aggregates

  • Types of window
    • Tumbling
      • Fixed window duration
      • Contiguous windows
      • Events belong to exactly one window
      • Created with GROUP BY name, TumblingWindow(second, 10)
    • Hopping
      • Fixed window duration
      • New window starts at a set interval (i.e. a 10s window every 5s)
      • Windows can overlap
      • Events can belong to multiple windows
      • Created with GROUP BY HoppingWindow(second, 10, 5)
    • Sliding
      • Fixed window duration
      • Windows are created when events enter or leave the window
      • Windows can overlap and do not have a fixed scheudule
      • Events can belong to multiple windows
      • Created with GROUP BY SlidingWindow(second, 10)
    • Session
      • Window starts when a new event arrives
      • Window extends to include new events until a specified amount of time passes with no new events
      • Window duration can vary
      • Windows do not overlap and do not have a fixed schedule
      • Events belong to exactly one window
      • Created with GROUP BY SessionWindow(second, 5, 20)
    • Snapshot
      • Events that arrive at precisely the same time are windowed together
      • Windows have no duration
      • Windows do not overlap and do not repeat on a fixed schedule
      • Events belong to exactly one window
      • Created with GROUP BY System.Timestamp()

Handle schema drift

  • Schema drift happens when the schema of incoming data changes over time (adding removing columns, changing data types, etc)
  • Breaking vs non-breaking changes
    • Breaking
      • Removing non-optional field
      • Renaming a field
      • Changing field types to be less restrictive (float to int)
    • Non-breaking
      • Removing an optional field
      • Adding a field
      • Changing the field type to be more restrictive (int to float)
  • Limiting impact
    • Select only necessary fields as early in a query as possible
    • Input validation
      • Performed in the stream consumer
      • One query selects and casts required fields and determines if record is valid, sending results to intermediate stream
      • Second query processes valid records from the intermediate stream (main query)
      • Third query processes invalid records from the intermediate stream
  • Azure Event Hub Schema Registry
    • Store AVRO or JSON schema definitions in the Event Hub
    • When event sender uses this schema, all events are validated against it

Process time series data

  • What is time series data?
    • Sequence of data points ordered by time of occurrence
    • Repeated measurements of the same source
      • At a fixed interval (constant load)
      • When the value changes (varying load)
    • Queried over subsets of data defined by start and end time (window)
  • Defining time
    • Event time = time measurement is taken
    • Processing time = time measurement is received by processing solution
    • Difference can be up to minutes depending on latency
  • When processing in Stream Analytics
    • Default is processing time
    • Can override with TIMESTAMP BY to use event time if available
  • Temporal query windows
    • Timestamp is evaluated against start and end time of windows to determine which window it belongs to

Process within one partition

  • A partitioned database or stream:
    • Is a single logical database/stream
    • Has multiple underlying storage/processing units
    • Has virtually limitless scaling
  • Event Hub and Stream Analytics can both be partitioned
    • Event Hubs are partitioned at creation
    • Stream Analytics is partitioned in the query
  • Partitions and computing nodes
    • A single node can process many partitions
    • A single partition can NOT be split over multiple nodes
    • When an Event Hub or Stream Analytics Job scales, partitions are redistributed over nodes
  • Unpartitioned queries
    • Cannot calculate results using data from only one partition
    • Cannot leverage scale-out architecture
    • Utilize at most 1 SU V2 (6 SU V1)
    • Query must be partitioned before processing to increase performance
    • Partitioning a query can be done by grouping stream data by the desired partition key in a preceding query

Process data across partitions

  • Specifying the partition key
    • Event Hubs are always partitioned
      • Round robin by default
      • Partition key is called PartitionId
      • Custom property can be specified to calculate a PartitionId
    • Stream Analytics can be partitioned
      • Results in parallelizable queries
      • Enables scale-out
  • Compatibility level
    • Property of a Stream Analytics Job
    • PARTITION BY is used for level 1.1 or lower
    • For level 1.2, specify the partition key on the input
  • Partition key is specified differently for each type of input/output
    • For blob storage, the partition key is a part of the path
  • Embarrassingly parallel
    • Query can be processed completely in parallel
    • All inputs, outputs, and queries are partitioned on the same key
    • In the Stream Analytics portal, Job topology --> Query --> --> Job simulation (preview) can show whether a query is parallel and how it can be partitioned

Configure checkpoints and watermarking during processing

  • Checkpoint
    • Event Hub and ingestion services don't track which records have been consumed, they just apply sequence numbers to the records
    • When processing a data stream, processors take note of where they are in the stream (checkpoint) so they can resume in case of interruption
    • Stream stores sequence numbers per partition
    • Checkpoint used to resume a stream is called an offset
    • Stream Analytics backs up the internal state regularly
      • Intermediate results are saved
      • Checkpoint is saved
    • Catching up from a restore can take some time
  • Watermark
    • Internal marker indicating up to what point in time events are assumed to have been processed
    • Updated when a new event comes in or increased as time processes in the real world
    • Used to identify late events
    • Used to detect opening and closing of a query window

Scale resources

  • Event Hub and Stream Analytics pricing is based on resources provisioned, not necessarily used
  • Provision as little resources as possible to save cost
  • Scaling Azure Event Hub
    • Measured in Throughput Units (TU)
    • 1 TU provides
      • Ingress up to 1 MB per second or 1000 events per second
      • Egress up to 2 MB per second or 4096 events per second
    • Enable auto-inflate to prevent over-provisioning (similar to auto-scaling)
  • Scaling Stream Analytics
    • Measured in Streaming Units (SU)
    • There are two versions, V1 and V2
      • 1 V1 SU = ~1 MB/s
      • 6 V1 SU = ~1 V2 SU
    • SA jobs
      • Use fine-grained deployment units
      • Run on shared hardware
      • Limit scalability to minimum of 1/3 V2 SUs and max of 66 V2 SUs
      • Support virtual network integration
    • SA Clusters
      • Scale further and provide more isolation
      • Fully isolated deployment
      • Scalability has minimum of 12 V2 SUs
      • Supports virtual network integration
      • Jobs can be moved in and out of a cluster

Create tests for data pipelines

  • Stream Analytics allows sampling data from an input by downloading a file
  • It also samples data from an Event Hub automatically when editing a query
  • The Query section has a Test results option as well
  • The Query editor also has an option for uploading sample input data to test changes in the results

Optimize pipelines for analytical or transactional purposes

  • Streams can be joined on DATEDIFF(second, S1, S2) BETWEEN 0 AND 30 where chosen properties and the Timestamp are the same
  • This works best when the streams have the same partition key and partition counts
    • Repartition so that they are partitioned the same way using a preceding query

Handle late-arriving data

  • Why is data late?
    • Network delays, especially with IoT
    • Pipeline congestion - ingestion load is higher than possible throughput
    • Outages in gateway devices
    • Producers that have specific windows of time for output
  • Late data tolerance
    • Configured per Stream Analytics job
    • Consequences
      • Late data is included in results
      • Window results are delayed as the job has to wait for late data
  • Still late data policy
    • Drop to just ignore the record
    • Adjust to update the record timestamp (can introduce time skews)
  • In the portal (Stream Analytics)
    • Settings --> Event ordering
    • Can only be done when the job is not running
    • Choose late arriving window, out of order settings, and whether to drop or adjust
    • Restart job

Handle interruptions

  • SLAs
    • Microsoft SLA is 99.9% or higher based on service tier
    • SLA for Stream Analytics is that job is running **99.9% **of time
    • Catch-up time are the delays that follow from a service interruption
  • Event replication pattern
    • If the SLAs aren't high enough, can increase reliability with ERP
    • Duplicate Event Hub and all downstream infrastructure, processing all events in parallel in multiple regions
    • Only works if
      • Pipelines have independent failure conditions
      • End application can correctly choose data source
      • Event generator is not the bottleneck

Configure exception handling

  • Output data error handling policy
    • Defines how Stream Analytics should proceed in the case it fails to write to an output
    • Allows for two values
      • Drop - record will be ignored and never written to output (better for speed)
      • Retry - keep attempting to write until success or another error (better for correctness)
  • Configured in the portal in the Settings --> Error policy section

Upsert data

Replay archived stream data

Top comments (0)