ETL Pipeline: front & backend
1. Extract, Transform, Load
2. Aggregation and Analysis
3. Automation with Apache Airflow
4. Connect
5. Automate
In today’s data-driven world, the ability to extract, transform, and load (ETL) data efficiently is paramount for any organization looking to gain valuable insights. This article outlines the process I followed to build a robust ETL pipeline utilizing Alpha Vantage API for stock market data analysis, specifically focusing on the tickers $ASTS, $RKLB, and $NVDA (hopefully they go up).
At its core, an ETL pipeline is a systematic approach to handling data, allowing for the automation of data workflows. The process can be broken down into three primary stages:
1) Extract: The first step involves accessing external data sources, which, in this case, is the Alpha Vantage API. This API provides a wealth of financial data, including time series for various stock tickers. By using a Python script, I can easily make requests to the API and retrieve the necessary data in JSON format. The following code snippet illustrates how I initiate the API call and extract hourly data for a specific ticker (this also saves the .csv files in your Drive).
2) Transform: Once the data is extracted, it needs to be cleaned and structured for analysis. This involves converting the JSON response into a pandas Data Frame, renaming columns for clarity, and ensuring that the data types are appropriate for further analysis. The transformation step is crucial, as it sets the stage for meaningful insights to be draw from the data.
3) Load: The final step involves loading the transformed data into a data warehouse (there are many and is up to you which one to use; Amazon Redshift, Google BigQuery, Microsoft Azure, Snowflake). This allows for efficient querying and analysis using SQL. Once loaded, I can perform aggregations and analyses on the data to uncover trends and patterns that inform investment decisions. What I did here is connect BigQuery to my personal Drive, so it extracts the files from there and uploads the dataset I created earlier.
After successfully loading the extracted and transformed data into BigQuery, the next step involves performing various aggregations and transformations to derive meaningful insights. This section outlines several SQL queries that I applied to the stock market data, each designed to compute key metrics for analysis and decision-making.
WITH daily_data AS ( SELECT DATE(TIMESTAMP_FIELD_0) AS date, open, high, low, close, volume, FIRST_VALUE(open) OVER (PARTITION BY DATE(TIMESTAMP_FIELD_0) ORDER BY TIMESTAMP_FIELD_0) AS daily_open, LAST_VALUE(close) OVER (PARTITION BY DATE(TIMESTAMP_FIELD_0) ORDER BY TIMESTAMP_FIELD_0 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS daily_close FROM 'portfolio-etl-pipeline.stocks_data.stock_NVDA' ) SELECT date, MAX(daily_open) AS daily_open, MAX(high) AS daily_high, MIN(low) AS daily_low, MAX(daily_close) AS daily_close, SUM(volume) AS daily_volume FROM daily_data GROUP BY date ORDER BY date;
SELECT DATE(timestamp_field_0) AS date, MAX(high) - MIN(low) AS intraday_range FROM 'your_project.your_dataset.stock_data' GROUP BY date ORDER BY date ASC;
WITH daily_close AS ( SELECT DATE(timestamp_field_0) AS date, LAST_VALUE(close) OVER(PARTITION BY DATE(timestamp_field_0) ORDER BY timestamp_field_0 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS close FROM 'your_project.your_dataset.stock_data' ) SELECT date, close - LAG(close) OVER(ORDER BY date) AS close_to_close_change FROM daily_close ORDER BY date ASC;
SELECT DATE(timestamp_field_0) AS date, AVG((open + high + low + close) / 4) AS ohlc_average FROM 'your_project.your_dataset.stock_data' GROUP BY date ORDER BY date ASC;
SELECT timestamp_field_0, close, AVG(close) OVER ( ORDER BY timestamp_field_0 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW ) AS sma_5_hour FROM 'your_project.your_dataset.stock_data' ORDER BY timestamp_field_0;
SELECT timestamp_field_0, SUM(close * volume) OVER(ORDER BY timestamp_field_0 ASC) / SUM(volume) OVER(ORDER BY timestamp_field_0 ASC) AS vwap FROM 'your_project.your_dataset.stock_data' ORDER BY timestamp_field_0 ASC;
WITH intraday_volatility AS ( SELECT DATE(timestamp_field_0) AS date, STDDEV(close) AS intraday_volatility FROM 'your_project.your_dataset.stock_data' GROUP BY date ) SELECT date, intraday_volatility FROM intraday_volatility ORDER BY date ASC;
WITH daily_close AS ( SELECT DATE(timestamp_field_0) AS date, LAST_VALUE(close) OVER(PARTITION BY DATE(timestamp_field_0) ORDER BY timestamp_field_0 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS close FROM 'your_project.your_dataset.stock_data' ) SELECT date, STDDEV(close) OVER(ORDER BY date ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS price_volatility FROM daily_close ORDER BY date ASC;
SELECT timestamp_field_0, SUM(volume) OVER(ORDER BY timestamp_field_0 ASC) AS cumulative_volume FROM 'your_project.your_dataset.stock_data' ORDER BY timestamp_field_0 ASC;
SELECT timestamp_field_0, SUM(close - LAG(close) OVER(ORDER BY timestamp_field_0 ASC)) OVER(ORDER BY timestamp_field_0 ASC) AS cumulative_price_change FROM 'your_project.your_dataset.stock_data' ORDER BY timestamp_field_0 ASC;
WITH price_changes AS ( SELECT timestamp_field_0, close - LAG(close) OVER(ORDER BY timestamp_field_0 ASC) AS price_change FROM 'your_project.your_dataset.stock_data' ), gains_and_losses AS ( SELECT timestamp_field_0, IF(price_change > 0, price_change, 0) AS gain, IF(price_change < 0, ABS(price_change), 0) AS loss FROM price_changes ) SELECT timestamp_field_0, 100 - (100 / (1 + (AVG(gain) OVER(ORDER BY timestamp_field_0 ASC ROWS BETWEEN 13 PRECEDING AND CURRENT ROW) / AVG(loss) OVER(ORDER BY timestamp_field_0 ASC ROWS BETWEEN 13 PRECEDING AND CURRENT ROW)))) AS rsi_14_period FROM gains_and_losses ORDER BY timestamp_field_0 ASC;
SELECT timestamp_field_0, EXP(SUM(LN(close)) OVER(ORDER BY timestamp_field_0 ASC ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)) AS ema_5_hour FROM 'your_project.your_dataset.stock_data' ORDER BY timestamp_field_0 ASC;
These SQL queries represent the core transformations and analyses that power the ETL pipeline, enabling me to draw actionable insights from raw stock market data. By carefully selecting and applying these transformations, I’ve managed to create a streamlined process for ongoing analyses and decision-making, ultimately informing my investment strategies.
With the foundational ETL pipeline established, the next logical step is to automate the entire process to ensure continuous and timely data ingestion. Apache Airflow, a powerful open-source workflow management tool, is ideal for orchestrating such pipelines. In this section, I outline the process of integrating Airflow to fully automate the extraction, transformation, and loading (ETL) of stock market data.
To begin, it’s essential to set up the Airflow environment. Apache Airflow operates as a series of interconnected tasks within a Directed Acyclic Graph (DAG), allowing for sophisticated control over data workflows. The setup process involves a few crucial steps:Installation and Initialization: Start by installing Airflow and initializing the Airflow metadata database. The following command installs Airflow and sets up the necessary tables and configurations that Airflow uses to manage DAGs, tasks, and users.
Once the database is initialized, start the Airflow web server. This web interface is pivotal for monitoring and managing your data pipelines. The scheduler is the backbone of Airflow, responsible for executing tasks based on the defined schedule in your DAG.
These steps ensure that your Airflow environment is up and running, providing the framework necessary to automate your ETL pipeline.
The heart of Airflow’s power lies in its DAGs, which define the sequence and dependencies of tasks within your workflow. Below is the DAG that orchestrates the ETL process for ingesting stock data from the Alpha Vantage API, transforming it using Python, and loading it into Google BigQuery.
In this DAG, three core tasks are defined:
- extract_data: This task handles data extraction from the Alpha Vantage API. It utilizes Python to make API requests and retrieve the stock data in JSON format.
- transform_data: After extraction, the data is transformed into a pandas DataFrame, where necessary cleaning and formatting are performed. This step ensures that the data is in a usable state for analysis.
- load_data: Finally, the cleaned data is uploaded to Google BigQuery, where it can be queried and analyzed. This task completes the ETL process, making the data available for immediate use in dashboards and further analytics.
The DAG is configured to run daily, ensuring that the stock data is consistently updated with the latest information. The Airflow UI allows for easy monitoring and troubleshooting of the pipeline, providing detailed logs and status updates for each task. This automation not only saves time but also ensures data accuracy and consistency, crucial for reliable stock market analysis.
With the backend of our ETL pipeline firmly established, the next step involves connecting the data stored in BigQuery to Power BI. This integration will allow us to visualize and analyze the data efficiently, leveraging Power BI’s powerful dashboarding capabilities.
Connecting BigQuery to Power BI is a straightforward process, yet crucial for ensuring that our data remains accessible and up-to-date. Here’s how I did it:
1. Data Source
Stock market data stored in BigQuery tables
2. OAuth
Secure OAuth 2.0 connection to Google Cloud
3. Power BI Desktop
Ready for visualization and analysis
With the connection between BigQuery and Power BI established, the final step is to automate the data ingestion process, ensuring our dashboards are always up-to-date.
1) Set Up the Refresh Schedule: Begin by publishing your Power BI report to the Power BI service. Navigate to the dataset, select “Scheduled Refresh,” and choose a refresh frequency. I opted for a daily refresh to keep the data current every 24 hours.
2) Configure Credentials: Ensure that your Google account credentials linked to BigQuery are correctly configured in the “Data Source Credentials” section. This allows Power BI to access the data without interruption.
3) Monitor and Optimize: Power BI will now automatically refresh the data based on your schedule. Monitor the refresh logs for any issues and consider using incremental refresh to optimize performance as your data grows.
By automating the data ingestion, the pipeline ensures that the insights in Power BI are always based on the latest data, with minimal manual intervention required.