The ETL (Extract, Transform, Load) process is fundamental for managing data efficiently, especially in applications that require quick decision-making based on real-time data. In this article, we will explore the ETL process using a practical example involving real-time cryptocurrency trades from the Binance API. The Python code provided illustrates how to extract trade data, transform it into a usable format, load it into an SQLite database, and visualize the data with real-time plotting.
Sample ETL Project : https://github.com/vcse59/FeatureEngineering/tree/main/Real-Time-CryptoCurrency-Price-Tracker
1. Extract
The first step of the ETL process is Extraction, which involves gathering data from various sources. In this case, data is extracted through a WebSocket connection to the Binance Testnet API. This connection allows for real-time streaming of BTC/USDT trades.
Here's how the extraction is implemented in the code:
async with websockets.connect(url) as ws:
response = await ws.recv()
trade_data = json.loads(response)
Each message received contains essential trade data, including the price, quantity, and timestamp, which is formatted as JSON.
2. Transform
Once the data is extracted, it undergoes the Transformation process. This step cleans and structures the data to make it more useful. In our example, the transformation includes converting the timestamp from milliseconds to a readable format and organizing the data into appropriate types for further processing.
price = float(trade_data['p'])
quantity = float(trade_data['q'])
timestamp = int(trade_data['T'])
trade_time = datetime.fromtimestamp(timestamp / 1000.0)
This ensures that the price and quantity are stored as floats, and the timestamp is converted to a datetime object for easier manipulation and analysis.
3. Load
The final step is Loading, where the transformed data is stored in a target database. In our code, the SQLite database serves as the storage medium for the trade data.
The loading process is managed by the following function:
def save_trade_to_db(price, quantity, timestamp):
conn = sqlite3.connect('trades.db')
cursor = conn.cursor()
# Create a table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
price REAL,
quantity REAL,
timestamp TEXT
)
''')
# Insert the trade data
cursor.execute('''
INSERT INTO trades (price, quantity, timestamp)
VALUES (?, ?, ?)
''', (price, quantity, trade_time))
conn.commit()
conn.close()
This function connects to the SQLite database, creates a table if it doesn't exist, and inserts the trade data.
4. Visualize
In addition to storing data, it is essential to visualize it for better understanding and decision-making. The provided code includes a function to plot the trades in real-time:
def plot_trades():
if len(trades) > 0:
timestamps, prices, quantities = zip(*trades)
plt.subplot(2, 1, 1)
plt.cla() # Clear the previous plot for real-time updates
plt.plot(timestamps, prices, label='Price', color='blue')
plt.ylabel('Price (USDT)')
plt.legend()
plt.title('Real-Time BTC/USDT Prices')
plt.xticks(rotation=45)
plt.subplot(2, 1, 2)
plt.cla() # Clear the previous plot for real-time updates
plt.plot(timestamps, quantities, label='Quantity', color='orange')
plt.ylabel('Quantity')
plt.xlabel('Time')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout() # Adjust layout for better spacing
plt.pause(0.1) # Pause to update the plot
This function generates two subplots: one for price and another for quantity. It uses the matplotlib library to visualize the data dynamically, allowing users to observe market trends in real-time.
Conclusion
This example highlights the ETL process, demonstrating how data can be extracted from a WebSocket API, transformed for analysis, loaded into a database, and visualized for immediate feedback. This framework is crucial for building applications that need to make informed decisions based on real-time data, such as trading platforms and market analysis tools.
Top comments (0)