Published on

Data Engineering Zoomcamp, Week 3: BigQuery, Data Warehouses, and Machine Learning

Authors

In the ever-evolving world of data management, terms like Data Warehouse, Data Lake, Data Mart, and Data Lakehouse are frequently mentioned. If you're anything like me, you might find yourself pondering the subtle differences and distinctions between these architectures. As I delve deeper into the realm of Data Engineering, I've realized that these definitions often lack consensus, varying slightly from one context to another. Nevertheless, I'll do my best to describe them at a high level:

  • Data Warehouse: Picture a neatly organized library with well-cataloged books. Examples include Google's BigQuery, Amazon's Redshift, and Azure's SQL Data Warehouse.
  • Data Lake: Imagine a vast storage room with books in boxes, requiring some digging to find what you need, akin to object storage. Example services include Google's Cloud Storage, Amazon's S3 (Simple Storage Service), and Azure Data Lake Storage.
  • Data Mart: Think of it as a specialized section of a library dedicated to a specific subject.
  • Data Lakehouse: A hybrid that combines the structured organization of a Data Warehouse with the expansive storage of a Data Lake.

While these descriptions only scratch the surface, understanding them in simple terms is a great starting point for this week's module. In today's post, we'll dive into Week 3 of the Data Talks Club Data Engineering Zoomcamp, spotlighting Google's Data Warehouse solutiion, BigQuery. Our primary focus will be on query optimization in BigQuery, where we'll explore techniques like partitioning and clustering to enhance performance. Additionally, we'll discuss how to use Google's cloud-based Data Warehouse, BigQuery, for machine learning applications. Specifically, we'll delve into creating a model designed to predict tip amounts for taxi rides. This hands-on experience will help broaden our understanding of data engineering concepts but also demonstrates the practical applications of these technologies in real-world scenarios.

Exporting Taxi Data into Google Cloud Storage

In the previous Week 2 Module, we used Mage (a data orchestration framework) to extract, transform and load our data into PostgreSQL as well as Google BigQuery. This week, we will further capitalize on the knowledge we've built to export all of the necessary data for this module into a Google Cloud Bucket. This is a pivotal step for completing the module, as well as the homework and getting this correct will save you trouble with some of the issues I had with BigQuery SQL by doing this step incorrectly.

I'll provide two solutions, as I implemented a custom python script export the data, but I realize most people probably opted to use Mage.

The goal is to ingest Yellow Taxi Data for the years 2019 and 2020, which are used in the Week 3 Video Modules. Lastly, we will ingest the 2022 Green Taxi Data, which is necessary to complete the Week 3 Homework assignment.

Mage Pipeline

In the pipeline below, we utilize a handful of python libraries to load our data. These include pandas, pyarrow, requests, and the io library. We are using pyarrow.parquet to read the parquet files from the web source. The requests library allows us to fetch data from the actual url, while the io library allows us to create a file-like binary stream from the request content, essentially turning the raw bytes into an object similair to a file, which can be read or written to, without actually having to write the data to a disk.

The for loop essentially iterates through the URLs provided on the NY Taxi website, fetching each parquet file and merging them into one single pandas dataframe to be used in our Export block.

Important: to set the service (green or yellow) and the year (2019, 2020, 2021, etc.) of the data we are wanting to fetch, we will utilize global variables, which are passed into all blocks via keyword arguments (kwargs) and can accessed in a python block with the following line of code var = kwargs['name_of_variable']. This allows us to define the variables in a single place so we do not have to update their values in each block between pipeline runs.

You can set global variables in Mage by navigating to the variables pane while viewing your pipeline:

Mage Global Variables

For the sake of simplicity, I designed the pipeline to only load one service and one year at a time, so in order to get multiple years & services, you will need to run this pipeline multiple times. As I mentioned earlier, you will need to load the green taxi data for 2022 to complete the homework and the yellow taxi data for 2019 & 2020 if you would like to follow along with the module videos.

Data Loader

import io
import pandas as pd
import requests
import pyarrow as pa
import pyarrow.parquet as pq
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    service = kwargs['service']
    year = kwargs['year']
    print(f'loading {service} taxi data for the year {year}\n')

    data_frames = []

    for i in range(12):
        month = f"{i+1:02d}"
        file_name = f"{service}_tripdata_{year}-{month}.parquet"
        request_url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{file_name}'
        print(f'request url: {request_url}')
        try:
            response = requests.get(request_url)
            response.raise_for_status()  # Raises HTTPError for bad requests
            data = io.BytesIO(response.content)
            df = pq.read_table(data).to_pandas()
            data_frames.append(df)
            print(f"Parquet loaded: {file_name}")
        except requests.HTTPError as e:
            print(f"HTTP Error: {e}")
            # Optionally, handle the error (e.g., by breaking the loop or re-trying)

    # Concatenate all dataframes
    combined_df = pd.concat(data_frames, ignore_index=True)
    return combined_df


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

Exporter

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    """
    Template for exporting data to a Google Cloud Storage bucket.
    Specify your configuration settings in 'io_config.yaml'.

    Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
    """
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    service = kwargs['service']
    year = kwargs['year']

    print(df.dtypes)

    bucket_name = 'mage-zoomcamp-bucket'
    object_key = f'{service}/{service}_tripdata_{year}.parquet'

    GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df,
        bucket_name,
        object_key,
    )

Python Ingestion Script (Optional)

For the script below, I modified web_to_gcs.py, which you can find in the original DTC DE Zoomcamp repo under 03-data-warehouse\extras\web_to_gcs.py that I happened to stumble across, although I'm not sure how many people actually utilized this. I found it needed some tweaking in order to not cause issues with mismatching schema down the line in BQ.

There are a few steps involved we must complete prior to running our script:

  1. Install necessary dependencies
pip install pandas pyarrow google-cloud-storage
  1. Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account key
Set GOOGLE_APPLICATION_CREDENTIALS='C:\downloads\dtc-de-zoomcamp-12345-123a456b789c.json'
  1. Set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
Set GCP_GCS_BUCKET='your-zoomcamp-bucket-name'

Since I'm using Git Bash, I alternatively created a secrets.sh file, which is similair to .env, in order to reference the environmental variables prior to executing our script.

export GOOGLE_APPLICATION_CREDENTIALS='C:\downloads\dtc-de-zoomcamp-12345-123a456b789c.json'
export GCP_GCS_BUCKET='your-zoomcamp-bucket-name'

After creating in saving the secrets.sh file defined above, run the command below in your terminal prior to running our script.

source secrets.sh
import io
import os
import requests
import pandas as pd
import pyarrow.parquet as pq
from google.cloud import storage

# switch out the bucketname
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc-data-lake-bucketname")
print(BUCKET)

def upload_to_gcs(bucket, object_name, local_file):
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    """
    # # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
    # # (Ref: https://github.com/googleapis/python-storage/issues/74)
    storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024  # 5 MB
    storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024  # 5 MB

    client = storage.Client()
    bucket = client.bucket(bucket)
    blob = bucket.blob(object_name)
    blob.upload_from_filename(local_file)

def web_to_gcs(year, service):
    for i in range(12):
        # sets the month part of the file_name string
        month = '0'+str(i+1)
        month = month[-2:]

        # csv file_name
        file_name = f"{service}_tripdata_{year}-{month}.parquet"
        # request url for week 3 homework
        request_url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{service}_tripdata_{year}-{month}.parquet'
        print(request_url)
        #request_url = f"{init_url}{service}/{file_name}"
        r = requests.get(request_url)
        open(file_name, 'wb').write(r.content)
        print(f"Local: {file_name}")

        df = pq.read_table(file_name)
        #df.to_parquet(file_name, engine='pyarrow')
        print(f"Parquet: {file_name}")
        # upload it to gcs 
        upload_to_gcs(BUCKET, f"{service}/{file_name}", file_name)
        print(f"GCS: {service}/{file_name}")

# The following two datasets are used in the Week 3 Video Modules
web_to_gcs('2019', 'yellow')
web_to_gcs('2020', 'yellow')

# The following dataset is necessary to complete the Week 3 Homework Questions
web_to_gcs('2022', 'green')

Now that we've installed the necessary requirements for our script to run, set our environmental variables, and defined our ingestion script, we can now run our script in the terminal!

Make sure you are in the directory where you saved the script prior to running this command in your shell:

python web_to_gcs.py

Google Big Query

Big Query is a cloud-based Data Warehouse that Google describes as a fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run analytics over vast amounts of data in near real time. With BigQuery, there's no infrastructure to set up or manage, letting you focus on finding meaningful insights using GoogleSQL and taking advantage of flexible pricing models across on-demand and flat-rate options.

BigQuery is often described as serverless, which really just means for us the end user, there are no servers in which we must manage.

In regards to pricing, BigQuery has two main components:

  • Compute pricing: cost to process queries, including SQL queries, ML queries, etc.
  • Storage pricing: cost of storing data in BigQuery

By default, queries are billed using an on-demand pricing model (per Terabyte), where the you only pay for the data scanned by your queries. The first TiB per month is free, while any additional TiB thereafter is priced at $6.25 as of this writing.

In the Week 3 module, we covered Partioning & Clustering which help us to optimize our SQL queries and ultimately, reduce our compute cost, which is something every data engineer should be concerned with when constructing our pipelines, data models, etc.

For more on pricing, refer to google's documentation here.

Creating an External Table

An External Table is a table that is created using a data source that is not stored in BigQuery storage. For our purposes, we will be creating an External Table using the Taxi data we ingested earlier into our Google Cloud Storage bucket. There are other external data sources you can also utilize, such such as a google cloud database or a different cloud product altogether.

Depending on whether you leveraged Mage to ingest the data or my custom python script will ultimately dictate which uri you set for our external table in BigQuery. This is a pivotal step in our module as essentially every query after we define our external table is dependent upon it's creation and a uniform table schema. You can fetch the uri, which in this case is a google storage specific url pointing us to our data. For example, the parquet file I ingested into GCS using Mage would be: gs://mage-zoomcamp-jonah-oliver/yellow/yellow_tripdata_2019.parquet. You can list multiple files using square brackets with a comma as a delimter, as shown below.

-- Creating external table referring to gcs path
CREATE OR REPLACE EXTERNAL TABLE `dtc-de-zoomcamp-12345.ny_taxi.external_yellow_tripdata`
OPTIONS (
  format = 'parquet',
  uris = ['gs://mage-zoomcamp-bucket/yellow/yellow_tripdata_2019.parquet',
  'gs://mage-zoomcamp-bucket/yellow/yellow_tripdata_2020.parquet']
);

Potential Issues

If any issues with the table schema exist, they will rear their head in the subsequent queries. I had spent a bit of time struggling with this as the DTC provided CSV files were producing mismatching schema, which caused any queries against the newly created external table to fail. What I mean by mismatching schema is that one table yellow_tripdata_2020-01 would have a VendorID datatype of INTEGER while another table yellow_tripdata_2020-02 would have a VendorID data type of FLOAT. An alternative solution would be to explicity define the data types in our data loader block, similair to what was done in the week 2 module covering workflow orchestration with Mage. I ultimately went straight to the source and found the parquet files there to have a uniform schema, so I'm sharing what worked for me.

Important: be sure to disable the query caching feature in BigQuery prior to running the SQL below. It will prevent us from highlighting the differences in query performance when using partitioning, clustering, etc. and is additionaly important for completing the Homework for this module. If query caching is enabled, this will produce results (execution details) that do not align with the homework answers, so be sure to disable this feature. Lastly, I recommend if you are still getting results that don't align with a particular homework solution to refresh your web page and try again. This should save you some trouble.

Partitioning and Clustering

In data engineering, optimizing query performance is crucial. Partitioning and clustering are two key strategies used in BigQuery to enhance the efficiency of data warehouse queries. Here is an example of a subset of our taxi data, prior to performing any partitioning or clustering. I will try and demonstrate the effects of each method with data in the sections below.

RowVendorIDPickup DatetimeTrip Distance
122020-01-14 12:57:37 UTC1.2
222020-01-14 17:30:02 UTC0.99
322020-01-14 22:33:45 UTC0.43
422020-01-14 07:57:20 UTC1.8
522020-01-14 13:26:44 UTC1.0
622020-01-14 22:46:25 UTC0.53
722020-01-14 07:35:59 UTC1.39

Partitioning

Partitioning involves dividing a table into segments based on specific column values, often dates. This approach is particularly useful for large datasets, as it allows queries to scan only relevant partitions, reducing the amount of data processed and thereby improving performance.

In our project, we created a partitioned table yellow_tripdata_partitoned, partitioning it by the tpep_pickup_datetime date. This significantly reduced the data scanned in our queries, as demonstrated by the comparison between partitioned and non-partitioned table queries.

Partition for 2020-01-14

RowVendorIDPickup DateTrip Distance
132020-01-141.2
222020-01-140.99
312020-01-140.43

Partition for 2020-01-15

RowVendorIDPickup DateTrip Distance
422020-01-151.8
522020-01-151.0

Partition for 2020-01-16

RowVendorIDPickup DateTrip Distance
622020-01-160.53
722020-01-161.39

Clustering

Clustering complements partitioning by organizing data within each partition. It sorts the data based on specified columns, which can further speed up query performance, especially for filter-heavy queries.

We enhanced our partitioned table by clustering it by VendorID. This resulted in even more efficient queries, scanning less data compared to the partitioned table without clustering.

Partitioning and clustering are two methods of improving performance of our data warehouse queries.

Partition for 2020-01-14 (Clustered by VendorID)

RowVendorIDPickup DateTrip Distance
312020-01-140.43
222020-01-140.99
132020-01-141.2

Practical Impact

To showcase the effectiveness of these methods, we performed queries on non-partitioned, partitioned, and partitioned-clustered tables. The results clearly showed a reduction in the amount of data scanned – moving from 1.6GB with the non-partitioned table to around 106 MB with the partitioned table, and even less with the partitioned-clustered table.

These optimizations are pivotal in data warehousing, particularly in a cloud environment like BigQuery, where performance improvements can also lead to cost savings. For more detailed information on partitioned tables, you can refer to Google's documentation.

For more on partitioned tables, refer to google's documentation here.

-- Check yellow trip data
SELECT * FROM `dtc-de-zoomcamp-410523.ny_taxi.external_yellow_tripdata`;

-- Create a non partitioned table from external table
CREATE OR REPLACE TABLE `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_non_partitoned` 
AS SELECT * FROM `dtc-de-zoomcamp-12345.ny_taxi.external_yellow_tripdata`;

-- Create a partitioned table from external table
CREATE OR REPLACE TABLE `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned` 
PARTITION BY
  DATE(tpep_pickup_datetime) AS
SELECT * FROM `dtc-de-zoomcamp-12345.ny_taxi.external_yellow_tripdata`;

-- Impact of partition
-- Scanning 1.6GB of data
SELECT DISTINCT(VendorID)
FROM `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_non_partitoned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2019-06-30';

-- Scanning ~106 MB of DATA
SELECT DISTINCT(VendorID)
FROM `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned` 
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2019-06-30';

-- Let's look into the partitons
SELECT table_name, partition_id, total_rows
FROM `ny_taxi.INFORMATION_SCHEMA.PARTITIONS`
WHERE table_name = 'yellow_tripdata_partitoned'
ORDER BY total_rows DESC;

-- Creating a partition and cluster table
CREATE OR REPLACE TABLE `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `dtc-de-zoomcamp-12345.ny_taxi.external_yellow_tripdata`;

-- Query scans 1.1 GB
SELECT count(*) as trips
FROM `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned` 
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2020-12-31'
  AND VendorID=1;

-- Query scans 864.5 MB
SELECT count(*) as trips
FROM `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned_clustered`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2020-12-31'
  AND VendorID=1;

Machine Learning Model

Data science and machine learning are crucial areas where data engineers should possess foundational knowledge. As engineers, we often play a pivotal role in provisioning high-quality datasets that data scientists rely on for building and optimizing their models. Week 3 of our course served as a valuable reminder of the importance of these skills. This section highlighted the implementation of a linear regression model, using our NY Taxi data, all within BigQuery. To deepen my understanding, I plan to revisit Andrew Ng's course on Machine Learning available on Coursera following the conclusion of the Zoomcamp. This course covers essential topics like Linear and Logistic Regression, Gradient Descent, Overfitting, as well as Supervised and Unsupervised Learning, providing a solid foundation for anyone interested in the field.

-- SELECT THE COLUMNS OF INTEREST
SELECT passenger_count, trip_distance, PULocationID, DOLocationID, payment_type, fare_amount, tolls_amount, tip_amount
FROM `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned` WHERE fare_amount != 0;

-- CREATE A ML TABLE WITH APPROPRIATE TYPE
CREATE OR REPLACE TABLE `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_ml` (
`passenger_count` INT64,
`trip_distance` FLOAT64,
`PULocationID` STRING,
`DOLocationID` STRING,
`payment_type` STRING,
`fare_amount` FLOAT64,
`tolls_amount` FLOAT64,
`tip_amount` FLOAT64
) AS (
SELECT CAST(passenger_count AS INT64), trip_distance, cast(PULocationID AS STRING), CAST(DOLocationID AS STRING),
CAST(payment_type AS STRING), fare_amount, tolls_amount, tip_amount
FROM `dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_partitoned` WHERE fare_amount != 0
);

-- CREATE MODEL WITH DEFAULT SETTING
CREATE OR REPLACE MODEL `dtc-de-zoomcamp-12345.ny_taxi.tip_model`
OPTIONS
(model_type='linear_reg',
input_label_cols=['tip_amount'],
DATA_SPLIT_METHOD='AUTO_SPLIT') AS
SELECT
*
FROM
`dtc-de-zoomcamp-410523.ny_taxi.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL;

-- CHECK FEATURES
SELECT * FROM ML.FEATURE_INFO(MODEL `dtc-de-zoomcamp-12345.ny_taxi.tip_model`);

-- EVALUATE THE MODEL
SELECT
*
FROM
ML.EVALUATE(MODEL `dtc-de-zoomcamp-12345.ny_taxi.tip_model`,
(
SELECT
*
FROM
`dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL
));

-- PREDICT THE MODEL
SELECT
*
FROM
ML.PREDICT(MODEL `dtc-de-zoomcamp-12345.ny_taxi.tip_model`,
(
SELECT
*
FROM
`dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL
));

-- PREDICT AND EXPLAIN
SELECT
*
FROM
ML.EXPLAIN_PREDICT(MODEL `dtc-de-zoomcamp-12345.ny_taxi.tip_model`,
(
SELECT
*
FROM
`dtc-de-zoomcamp-410523.ny_taxi.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL
), STRUCT(3 as top_k_features));

-- HYPER PARAM TUNNING
CREATE OR REPLACE MODEL `dtc-de-zoomcamp-12345.ny_taxi.tip_hyperparam_model`
OPTIONS
(model_type='linear_reg',
input_label_cols=['tip_amount'],
DATA_SPLIT_METHOD='AUTO_SPLIT',
num_trials=5,
max_parallel_trials=2,
l1_reg=hparam_range(0, 20),
l2_reg=hparam_candidates([0, 0.1, 1, 10])) AS
SELECT
*
FROM
`dtc-de-zoomcamp-12345.ny_taxi.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL;