DEV Community

James
James

Posted on

Using pyspark to stream data from coingecko API and visualise using dash

Spark Streaming

Spark Streaming is a fantastic tool that allows you to process and analyze continuous data streams in real-time. It's built on top of Apache Spark, a popular distributed computing framework. With Spark Streaming, you can handle data as it arrives, enabling near-instantaneous data processing.

Imagine you have a river of data flowing continuously, like tweets from Twitter or sensor readings from IoT devices. Spark Streaming lets you break this data into small, manageable chunks called micro-batches. Each micro-batch represents a short period of time, such as 1 second or 1 minute. These micro-batches are then processed using the powerful Spark engine, enabling you to perform calculations, extract insights, and make informed decisions on the fly.

Here are some key features of Spark Streaming that make it so valuable:

  • High Throughput: Spark Streaming can handle a massive amount of data coming in at high speeds. It achieves this by leveraging the parallel processing capabilities of Spark, allowing you to process data in parallel across multiple machines.
  • Fault Tolerance: Spark Streaming is designed to be resilient to failures. If a node in the cluster crashes or a network issue occurs, Spark Streaming automatically handles it by redistributing the work to other available nodes. This ensures that your data processing pipeline remains reliable and uninterrupted.
  • Scalability: As your data volume grows, Spark Streaming can easily scale to handle the increased load. You can add more machines to your Spark cluster, enabling you to process larger data streams without sacrificing performance.
  • Windowed Operations: Spark Streaming allows you to perform computations over sliding windows of data. This means you can analyze data over a specific time frame, such as the last 5 seconds or the last 1 hour. It's useful for tasks like calculating moving averages or identifying trends in real-time data.
  • Integration with Spark Ecosystem: Spark Streaming seamlessly integrates with other components of the Spark ecosystem. This means you can combine streaming data processing with Spark SQL for structured data queries, MLlib for machine learning, and GraphX for graph analytics. The integration opens up endless possibilities for real-time analytics.

To start with, PySpark is a powerful framework that allows you to process large-scale data in a distributed and parallel manner. PySpark Streaming is an extension of PySpark that enables you to process real-time data streams.

PySpark Streaming is the Python API for Spark Streaming, which is a component of Apache Spark. Spark Streaming is a scalable and fault-tolerant stream processing system that allows you to process real-time data streams. It provides high-level APIs for programming in various languages, including Python through PySpark.

PySpark Streaming offers a similar programming model and functionality as Spark Streaming, but with the convenience of using Python as the programming language. It allows you to build real-time data processing applications using the rich ecosystem of Python libraries and tools.

Fetching data from coingecko API and persisting in postgreSQL database

The following is a streaming application that periodically fetches data from the CoinGecko API and writes it to a PostgreSQL database. It utilizes PySpark and psycopg2 libraries for data processing and database interaction respectively.

First, it creates a Spark session and defines the schema for the streaming data. The CoinGecko API endpoint is set as api_url.

import psycopg2
import pandas as pd
import time
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, unix_timestamp, col
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, LongType

spark = SparkSession.builder.appName("CoinGeckoStreamingApp").getOrCreate()
# Create a StreamingContext with a batch interval of 1 second
ssc = StreamingContext(spark.sparkContext, 60)

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("current_price", StringType(), True),
    StructField("last_updated", StringType(), True)
])

api_url = "https://api.coingecko.com/api/v3/coins/markets"
Enter fullscreen mode Exit fullscreen mode

Next, there are two important functions defined. fetch_coingecko_data() fetches data from the CoinGecko API using the requests library and returns the JSON response.

def fetch_coingecko_data():
    response = requests.get(api_url, params={"vs_currency": "usd"})
    if response.status_code == 200:
        return response.json()
    else:
        return []
Enter fullscreen mode Exit fullscreen mode

The write_to_postgres(df) function converts the Spark DataFrame to a Pandas DataFrame, establishes a connection to the PostgreSQL database, and writes the data to the "coingecko_data" table.

def write_to_postgres(df):
    pandas_df = df.toPandas()
    # Connection and cursor setup
    # ...
    # Table existence check and creation
    # ...
    # DataFrame column type conversion
    # ...
    # Writing data to PostgreSQL table
    # ...
    # Closing cursor and connection
    # ...
Enter fullscreen mode Exit fullscreen mode

The code then fetches data from the CoinGecko API, creates a Spark DataFrame from the fetched data, performs any required transformations or computations, and writes the DataFrame to PostgreSQL using the write_to_postgres(df) function.Sleeps for 60 seconds before repeating the process.

while True:
    data = fetch_coingecko_data()
    df = spark.createDataFrame(data, schema)
    # Perform transformations/computations on the DataFrame
    # ...
    write_to_postgres(df)
    time.sleep(60)  # Fetch data every 60 seconds

# Start the streaming context
ssc.start()
ssc.awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Visualizing the data using Dash

Below is a Python script that utilizes the Dash framework to create a dashboard for visualizing the cryptocurrency data. Let's go through some of the important snippets:

app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
Enter fullscreen mode Exit fullscreen mode

initializing the Dash application, setting its name as __name__ and applying a Bootstrap theme to the dashboard.

app.layout = html.Div([
    dbc.Container([
        dbc.Row([
            dbc.Col(
                dcc.Graph(id='price-chart')
            )
        ]),
        # ... More graph layouts ...
        dbc.Row([
            dbc.Col(
                html.Div(id='last-fetched')
            )
        ]),
        dcc.Interval(
            id='interval-component',
            interval=60000,
            n_intervals=0
        )
    ])
])
Enter fullscreen mode Exit fullscreen mode

defining the layout of the dashboard using html.Div, dbc.Container, dbc.Row, and dbc.Col components. Each graph is placed within a dcc.Graph component, and other components like the last fetched text and the interval component are added to the layout.

@app.callback(Output('price-chart', 'figure'), Output('last-fetched', 'children'),
              Input('currency-filter', 'value'), Input('interval-component', 'n_intervals'))
def update_price_chart(currencies, n):
    conn = psycopg2.connect(
        host="localhost",
        port="5432",
        database="coin",
        user="data_eng",
        password="data_eng"
    )
    # ... Fetching and processing data ...
    fig = go.Figure()
    for i, df in enumerate(dfs):
        fig.add_trace(go.Scatter(x=df['last_updated'], y=df['price_change'], mode='lines',
                                 name=f'Price Change - {currencies[i]}'))
    fig.update_layout(title='Price Change Over Time', xaxis_title='Date', yaxis_title='Price Change')
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    last_fetched_text = f"Last fetched: {current_time}"
    return fig, last_fetched_text
Enter fullscreen mode Exit fullscreen mode

defining a callback function update_price_chart, which is triggered when the values of the currency-filter dropdown or the interval-component change. Inside the function, a connection is established with a PostgreSQL database using psycopg2. Data is fetched based on the selected currencies, processed, and stored in DataFrames. The data is then used to generate a line plot using plotly.graph_objects. The function also retrieves the current time and returns the figure and last fetched time text.

Similarly, there are other callback functions defined (update_volume_chart, update_scatter_plot, update_bar_chart, update_pie_chart) that update the respective graphs based on the selected currencies and interval.

price-change

Next Steps-deploying the streaming app

When it comes to deploying a PySpark streaming application, there are several options available:

  1. Apache Spark Standalone Cluster: You can deploy your PySpark streaming app on a standalone cluster using Spark's built-in cluster manager. It allows you to submit your application and distribute the workload across multiple machines.

  2. Apache Hadoop YARN: If you have a Hadoop cluster with YARN, you can leverage it to deploy your PySpark streaming app. YARN manages cluster resources, and you can submit your application to YARN for resource allocation and scheduling.

  3. Apache Mesos: Mesos is a cluster manager that simplifies the deployment of distributed applications, including PySpark streaming apps. It handles resource management and scheduling across a cluster of machines.

  4. Cloud Platforms: Cloud platforms such as AWS, Google Cloud, and Azure offer managed Spark services that make it easy to deploy PySpark streaming applications. These services abstract away infrastructure management and provide scalable Spark clusters.

  5. Containerization: Docker and containerization technologies allow you to package your PySpark streaming app with its dependencies into a container. Containers can be deployed on various platforms, providing consistency and portability across different environments.

Consider factors like ease of setup, scalability, and infrastructure management preferences when choosing a deployment option. Starting with a standalone cluster or local deployment is recommended for beginners, gradually exploring cloud platforms or containerization as you gain more experience.

Top comments (0)