DEV Community

Timothy Spann.   🇺🇦
Timothy Spann. 🇺🇦

Posted on • Originally published at datainmotion.dev on

Consuming Streaming Stocks Data with Python, Websockets and Pulsar

https://medium.com/@tspann/lets-check-our-stocks-from-finnhub-and-do-some-real-time-analytics-1b7963008e19

Let’s Check Our Stocks From FinnHub and Do Some Real-Time Analytics

Code: https://github.com/tspannhw/FLiPN-Py-Stocks

The easiest application to build is a simple Python application since finnhub includes the basics in their documentation. We are going to use their free WEBSOCKET interface to Trades so we can get real-time events as they happen. We will get JSON data for each trade triggered.

Finnhub - Free realtime APIs for stock, forex and cryptocurrency.

Finnhub - Free APIs for realtime stock, forex, and cryptocurrency. Company fundamentals, Economic data, and Alternative…

finnhub.io

Python App

Python application receives websocket stream of JSON arrays and sends individual JSON messages with a JSON schema.

architecture

Raw Data

{"data":[{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887195,"v":1},{"c":["1","8","24","12"],"p":122.09,"s":"TSLA","t":1672348887196,"v":4},{"c":["1","8","24","12"],"p":122.09,"s":"TSLA","t":1672348887196,"v":10},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887196,"v":1},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887196,"v":2},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887196,"v":10},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887198,"v":79},{"c":["1","24","12"],"p":129.58,"s":"AAPL","t":1672348887666,"v":1},{"c":["1","24","12"],"p":129.575,"s":"AAPL","t":1672348887785,"v":1}],"type":"trade"}
{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887195,"v":1}

Data Description

data
List of trades or price updates.
s
Symbol.
p
Last price.
t
UNIX milliseconds timestamp.
v
Volume.
c
List of trade conditions. A comprehensive list of trade conditions code can be found here

Let’s Build a Schema for our JSON data. Once we have a class definied for it in Python, we can send that to an Apache Pulsar cluster and it will generate the first version of our schema for us. When we have a schema it lets us treat that data as a table in Trino, Spark SQL and Flink SQL. So this is awesome.

By defining our data and making it fully structured with a schema even though it is still semi-structured JSON, it makes it very easy to work with. We know what we are getting. This will make it easier to stream into Apache Pinot, Apache Iceberg, Delta Lake or another analytics system.

class Stock (Record):
symbol = String()
ts = Float()
currentts = Float()
volume = Float()
price = Float()
tradeconditions = String()
uuid = String()

We then connect to our Pulsar cluster, very easy in Python.

client = pulsar.Client(‘pulsar://localhost:6650’)
producer = client.create_producer(topic=’persistent://public/default/stocks’ ,schema=JsonSchema(Stock),properties={“producer-name”: “py-stocks”,”producer-id”: “pystocks1” })

If we have never used this topic before, Pulsar will create it for you. For best practices, build your tenant, namespace and topic before your application while you are defining schemas and data contracts.

For more information on the Python interface for Pulsar, check out this link.

NEWBIE HINT:

For a free cluster and training, check out this training academy.

Example Python Projects

GitHub - tspannhw/FLiP-Py-Pi-EnviroPlus: FLiP-Py-Pi-EnviroPlus. Apache Flink, Apache Pulsar, Apache…

FLiP-Py-Pi-EnviroPlus. Apache Flink, Apache Pulsar, Apache Spark, Python, Raspberry Pi, Enviro+ sensors. Tim…

github.com


streamnative-academy/iot-examples/python3 at master · streamnative/streamnative-academy

You can't perform that action at this time. You signed in with another tab or window. You signed out in another tab or…

github.com


GitHub - tspannhw/FLiP-Py-Jetson-Sensors: Waveshare Environmental Sensor Board on Jetson Nano 2GB …

Waveshare Environmental Sensor Board on Jetson Nano 2GB - Apache Pulsar - Python 3.10 - Ubuntu 18.04 - GitHub …

github.com


GitHub - tspannhw/FLiP-Py-Pi-Nano1: FLiP-Py-Pi-Nano2: Apache Pulsar, Python 3.9, Raspberry Pi 4 …

FLiP-Py-Pi-Nano2: Apache Pulsar, Python 3.9, Raspberry Pi 4 - 2GB, Nano1 - BME688, MICS6814, SGP30, Breakout Garden …

github.com


GitHub - tspannhw/FLiP-Py-BreakoutGarden: Pimoroni Breakout Garden with Raspberry Pi 4 - 2GB RAM …

Pimoroni Breakout Garden with Raspberry Pi 4 - 2GB RAM - Apache Pulsar - Python 3.10 - GitHub …

github.com


GitHub - tspannhw/FLiP-PulsarDevPython101: Apache Pulsar Development 101 with Python

Apache Pulsar Development 101 with Python. Contribute to tspannhw/FLiP-PulsarDevPython101 development by creating an…

github.com


GitHub - tspannhw/awesome-apachepulsar-python: All the resources for using Python with Apache…

All the resources for using Python with Apache Pulsar Apache Pulsar Development 101 with Python In this session I will…

github.com

For all the real newbies, here is the real getting started.

GitHub - tspannhw/Meetup-YourFirstEventDrivenApp: NJ Meetup - Build an event-driven architecture…

NJ Meetup - Build an event-driven architecture with Apache Pulsar - GitHub - tspannhw/Meetup-YourFirstEventDrivenApp…

github.com

Consume Pulsar Data

bin/pulsar-client consume "persistent://public/default/stocks" -s stocks-reader -n 0
----- got message -----
key:[20221230191756_42a4752d-5f66-4245-8153-a5ec8478f738], properties:[], content:{
"symbol": "AAPL",
"ts": 1672427874976.0,
"currentts": 20221230191756.0,
"volume": 10.0,
"price": 128.055,
"tradeconditions": "1 12",
"uuid": "20221230191756_42a4752d-5f66-4245-8153-a5ec8478f738"
}
----- got message -----
key:[20221230191756_a560a594-7c12-42e7-a76d-6650a48533e0], properties:[], content:{
"symbol": "TSLA",
"ts": 1672427874974.0,
"currentts": 20221230191756.0,
"volume": 100.0,
"price": 120.94,
"tradeconditions": "",
"uuid": "20221230191756_a560a594-7c12-42e7-a76d-6650a48533e0"
}

References


Apache Pulsar


Python


Python Programming

Top comments (2)

Collapse
 
karthikeyan_ profile image
karthikeyan

Very happy the see this🎉, but already many people wrote this what is special in this The challenge😵‍💫🤕 is writing in limited library, space, code and time. In: micropython.

Iam ready to help you and I need help😅.

I gathered some information for you,

  • Micropython library: Documentation (latest)
  • Want to run in: Single core processor.
  • Connection: client and server are connected with WiFi.
  • Focus on memory in: ESP8266, ESP32
ESP8266 ESP32
RAM 320 KB 4 MB
ROM 4 MB 4 MB
Core Single Dual

If you are interested to work on this DM me Dev profile

Collapse
 
devopsking profile image
UWABOR KING COLLINS

Awesome read