Oleh Sekirkin

Oleh Sekirkin

Business Professional & Data Enthusiast

Philadelphia, Pennsylvania, United States

ETL Pipeline: front & backend

SQL Python Google BigQuery Apache Airflow

    1. Extract, Transform, Load
    2. Aggregation and Analysis
    3. Automation with Apache Airflow
    4. Connect
    5. Automate

1. Extract, Transform, Load

In today’s data-driven world, the ability to extract, transform, and load (ETL) data efficiently is paramount for any organization looking to gain valuable insights. This article outlines the process I followed to build a robust ETL pipeline utilizing Alpha Vantage API for stock market data analysis, specifically focusing on the tickers $ASTS, $RKLB, and $NVDA (hopefully they go up).

At its core, an ETL pipeline is a systematic approach to handling data, allowing for the automation of data workflows. The process can be broken down into three primary stages:

1) Extract: The first step involves accessing external data sources, which, in this case, is the Alpha Vantage API. This API provides a wealth of financial data, including time series for various stock tickers. By using a Python script, I can easily make requests to the API and retrieve the necessary data in JSON format. The following code snippet illustrates how I initiate the API call and extract hourly data for a specific ticker (this also saves the .csv files in your Drive).

2) Transform: Once the data is extracted, it needs to be cleaned and structured for analysis. This involves converting the JSON response into a pandas Data Frame, renaming columns for clarity, and ensuring that the data types are appropriate for further analysis. The transformation step is crucial, as it sets the stage for meaningful insights to be draw from the data.

3) Load: The final step involves loading the transformed data into a data warehouse (there are many and is up to you which one to use; Amazon Redshift, Google BigQuery, Microsoft Azure, Snowflake). This allows for efficient querying and analysis using SQL. Once loaded, I can perform aggregations and analyses on the data to uncover trends and patterns that inform investment decisions. What I did here is connect BigQuery to my personal Drive, so it extracts the files from there and uploads the dataset I created earlier.

2. Aggregation and Analysis

After successfully loading the extracted and transformed data into BigQuery, the next step involves performing various aggregations and transformations to derive meaningful insights. This section outlines several SQL queries that I applied to the stock market data, each designed to compute key metrics for analysis and decision-making.

Daily OHLC (Open, High, Low, Close)
Provides a comprehensive view of a stock's daily performance by summarizing the opening, high, low, and closing prices.
WITH daily_data AS (
    SELECT
        DATE(TIMESTAMP_FIELD_0) AS date,
        open,
        high,
        low,
        close,
        volume,
        FIRST_VALUE(open) OVER (PARTITION BY DATE(TIMESTAMP_FIELD_0) ORDER BY TIMESTAMP_FIELD_0) AS daily_open,
        LAST_VALUE(close) OVER (PARTITION BY DATE(TIMESTAMP_FIELD_0) ORDER BY TIMESTAMP_FIELD_0 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS daily_close
    FROM
        'portfolio-etl-pipeline.stocks_data.stock_NVDA'
)
SELECT
    date,
    MAX(daily_open) AS daily_open,
    MAX(high) AS daily_high,
    MIN(low) AS daily_low,
    MAX(daily_close) AS daily_close,
    SUM(volume) AS daily_volume
FROM
    daily_data
GROUP BY
    date
ORDER BY
    date;
Intraday Range
Measures the difference between the highest and lowest prices within each trading day, providing insights into daily volatility.
SELECT
    DATE(timestamp_field_0) AS date,
    MAX(high) - MIN(low) AS intraday_range
FROM
    'your_project.your_dataset.stock_data'
GROUP BY
    date
ORDER BY
    date ASC;
Close-to-Close Change
Calculates the daily change in closing prices to identify trends and momentum in stock performance.
WITH daily_close AS (
    SELECT
        DATE(timestamp_field_0) AS date,
        LAST_VALUE(close) OVER(PARTITION BY DATE(timestamp_field_0) ORDER BY timestamp_field_0 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS close
    FROM
        'your_project.your_dataset.stock_data'
)
SELECT
    date,
    close - LAG(close) OVER(ORDER BY date) AS close_to_close_change
FROM
    daily_close
ORDER BY
    date ASC;
OHLC Average
Calculates the average of open, high, low, and close prices for a smoothed view of the stock's performance.
SELECT
    DATE(timestamp_field_0) AS date,
    AVG((open + high + low + close) / 4) AS ohlc_average
FROM
    'your_project.your_dataset.stock_data'
GROUP BY
    date
ORDER BY
    date ASC;
Simple Moving Average (SMA)
Calculates a 5-hour moving average of closing prices to smooth out short-term price fluctuations.
SELECT
    timestamp_field_0,
    close,
    AVG(close) OVER (
        ORDER BY timestamp_field_0
        ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
    ) AS sma_5_hour
FROM
    'your_project.your_dataset.stock_data'
ORDER BY
    timestamp_field_0;
Volume-Weighted Average Price (VWAP)
Calculates the average price weighted by trading volume to determine the true average price paid.
SELECT
    timestamp_field_0,
    SUM(close * volume) OVER(ORDER BY timestamp_field_0 ASC) / SUM(volume) OVER(ORDER BY timestamp_field_0 ASC) AS vwap
FROM
    'your_project.your_dataset.stock_data'
ORDER BY
    timestamp_field_0 ASC;
Intraday Volatility
Calculates the standard deviation of closing prices within each trading day.
WITH intraday_volatility AS (
    SELECT
        DATE(timestamp_field_0) AS date,
        STDDEV(close) AS intraday_volatility
    FROM
        'your_project.your_dataset.stock_data'
    GROUP BY
        date
)
SELECT
    date,
    intraday_volatility
FROM
    intraday_volatility
ORDER BY
    date ASC;
Price Volatility (Close-to-Close)
Measures day-to-day price volatility using close price standard deviation.
WITH daily_close AS (
    SELECT
        DATE(timestamp_field_0) AS date,
        LAST_VALUE(close) OVER(PARTITION BY DATE(timestamp_field_0) ORDER BY timestamp_field_0 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS close
    FROM
        'your_project.your_dataset.stock_data'
)
SELECT
    date,
    STDDEV(close) OVER(ORDER BY date ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS price_volatility
FROM
    daily_close
ORDER BY
    date ASC;
Cumulative Volume
Tracks the running total of trading volume over time.
SELECT
    timestamp_field_0,
    SUM(volume) OVER(ORDER BY timestamp_field_0 ASC) AS cumulative_volume
FROM
    'your_project.your_dataset.stock_data'
ORDER BY
    timestamp_field_0 ASC;
Cumulative Price Change
Captures the total change in price over time to track performance trajectory.
SELECT
    timestamp_field_0,
    SUM(close - LAG(close) OVER(ORDER BY timestamp_field_0 ASC)) OVER(ORDER BY timestamp_field_0 ASC) AS cumulative_price_change
FROM
    'your_project.your_dataset.stock_data'
ORDER BY
    timestamp_field_0 ASC;
Relative Strength Index (RSI)
Momentum oscillator measuring price change velocity. Uses 14-period lookback to identify overbought/oversold conditions.
WITH price_changes AS (
    SELECT
        timestamp_field_0,
        close - LAG(close) OVER(ORDER BY timestamp_field_0 ASC) AS price_change
    FROM
        'your_project.your_dataset.stock_data'
),
gains_and_losses AS (
    SELECT
        timestamp_field_0,
        IF(price_change > 0, price_change, 0) AS gain,
        IF(price_change < 0, ABS(price_change), 0) AS loss
    FROM
        price_changes
)
SELECT
    timestamp_field_0,
    100 - (100 / (1 + (AVG(gain) OVER(ORDER BY timestamp_field_0 ASC ROWS BETWEEN 13 PRECEDING AND CURRENT ROW) /
    AVG(loss) OVER(ORDER BY timestamp_field_0 ASC ROWS BETWEEN 13 PRECEDING AND CURRENT ROW)))) AS rsi_14_period
FROM
    gains_and_losses
ORDER BY
    timestamp_field_0 ASC;
Exponential Moving Average (EMA)
5-hour exponential moving average giving more weight to recent prices. Helps identify trends with faster response to price changes.
SELECT
    timestamp_field_0,
    EXP(SUM(LN(close)) OVER(ORDER BY timestamp_field_0 ASC ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)) AS ema_5_hour
FROM
    'your_project.your_dataset.stock_data'
ORDER BY
    timestamp_field_0 ASC;

These SQL queries represent the core transformations and analyses that power the ETL pipeline, enabling me to draw actionable insights from raw stock market data. By carefully selecting and applying these transformations, I’ve managed to create a streamlined process for ongoing analyses and decision-making, ultimately informing my investment strategies.

3. Automation with Apache Airflow

With the foundational ETL pipeline established, the next logical step is to automate the entire process to ensure continuous and timely data ingestion. Apache Airflow, a powerful open-source workflow management tool, is ideal for orchestrating such pipelines. In this section, I outline the process of integrating Airflow to fully automate the extraction, transformation, and loading (ETL) of stock market data.

To begin, it’s essential to set up the Airflow environment. Apache Airflow operates as a series of interconnected tasks within a Directed Acyclic Graph (DAG), allowing for sophisticated control over data workflows. The setup process involves a few crucial steps:Installation and Initialization: Start by installing Airflow and initializing the Airflow metadata database. The following command installs Airflow and sets up the necessary tables and configurations that Airflow uses to manage DAGs, tasks, and users.

Once the database is initialized, start the Airflow web server. This web interface is pivotal for monitoring and managing your data pipelines. The scheduler is the backbone of Airflow, responsible for executing tasks based on the defined schedule in your DAG.

These steps ensure that your Airflow environment is up and running, providing the framework necessary to automate your ETL pipeline.

The heart of Airflow’s power lies in its DAGs, which define the sequence and dependencies of tasks within your workflow. Below is the DAG that orchestrates the ETL process for ingesting stock data from the Alpha Vantage API, transforming it using Python, and loading it into Google BigQuery.

In this DAG, three core tasks are defined:

    - extract_data: This task handles data extraction from the Alpha Vantage API. It utilizes Python to make API requests and retrieve the stock data in JSON format.
    - transform_data: After extraction, the data is transformed into a pandas DataFrame, where necessary cleaning and formatting are performed. This step ensures that the data is in a usable state for analysis.
    - load_data: Finally, the cleaned data is uploaded to Google BigQuery, where it can be queried and analyzed. This task completes the ETL process, making the data available for immediate use in dashboards and further analytics.

The DAG is configured to run daily, ensuring that the stock data is consistently updated with the latest information. The Airflow UI allows for easy monitoring and troubleshooting of the pipeline, providing detailed logs and status updates for each task. This automation not only saves time but also ensures data accuracy and consistency, crucial for reliable stock market analysis.

4. Connect

With the backend of our ETL pipeline firmly established, the next step involves connecting the data stored in BigQuery to Power BI. This integration will allow us to visualize and analyze the data efficiently, leveraging Power BI’s powerful dashboarding capabilities.

Connecting BigQuery to Power BI is a straightforward process, yet crucial for ensuring that our data remains accessible and up-to-date. Here’s how I did it:

BigQuery

1. Data Source

Stock market data stored in BigQuery tables

Connect

2. OAuth

Secure OAuth 2.0 connection to Google Cloud

Power BI

3. Power BI Desktop

Ready for visualization and analysis

5. Automate

With the connection between BigQuery and Power BI established, the final step is to automate the data ingestion process, ensuring our dashboards are always up-to-date.

    1) Set Up the Refresh Schedule: Begin by publishing your Power BI report to the Power BI service. Navigate to the dataset, select “Scheduled Refresh,” and choose a refresh frequency. I opted for a daily refresh to keep the data current every 24 hours.

    2) Configure Credentials: Ensure that your Google account credentials linked to BigQuery are correctly configured in the “Data Source Credentials” section. This allows Power BI to access the data without interruption.

    3) Monitor and Optimize: Power BI will now automatically refresh the data based on your schedule. Monitor the refresh logs for any issues and consider using incremental refresh to optimize performance as your data grows.

By automating the data ingestion, the pipeline ensures that the insights in Power BI are always based on the latest data, with minimal manual intervention required.