Published on

Data Engineering Zoomcamp, Week 4: Analytics Engineering with dbt

Authors

As defined on their website, dbt™ is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, CI/CD, and documentation.

In this week's post, I reflect on some of the initial challenges I faced while working through the Week 4 module, particulary regarding the datasets needed. I'll offer an interesting solution in which I admitedlly spent way too much time implementing, but enjoyed nonethless.

We'll start by touching on the prerequisites required for the Module, prior to diving into potential memory issues in Mage data pipelines and timestamp precision complexities that arise when converting parquet to a pandas dataframe. Lastly, I'll rehash the steps we performed in the module to successfully create a dbt model we can use for further analysis with Google's BI solution, Looker Studio.

I spent a lot of time putting this post together and I hope you find it insightful. If you have any questions or would simply like to connect, feel free to add me on LinkedIn!

Lastly, the contents of this post are based on my learnings from Week 4 of the DTC Data Engineering Zoomcamp, which you can find at this GitHub repository.

Prerequisites

Before we can get into this module, there are a handful of datasets we need. If you haven't done this already, DTC provides a quick hack to perform this which I believe should set you on the right path. Otherwise, I will go into some issues I had while ingesting the data with Mage below and provide an updated solution that I have personally tested on each dataset. If you have all this already setup, feel free to skip ahead.

  • 2019 and 2020 Yellow NY Taxi data
  • 2019 and 2020 Green NY Taxi data
  • 2019 FHV Taxi data (homework)

Potential Issues

Insufficient Memory

I'll start by saying I was wrong in my Week 3 post. More specifically, I designed a Mage pipeline with the purpose of ingesting all of the necessary taxi data for our module (green, yellow & fhv). When I initially loaded all of the data into GCS, I used this local python script and only then did I explore implementing something similar with Mage. The issue is I only tested one taxi service (green) and one year (2019), which was successful, but I found a plethora of issues when attempting to ingest the yellow taxi data, etc with Mage after the fact. My pipeline was failing because I was loading too much data into memory, more than my Docker container was configured to handle. This would cause the pipeline to fail and ultimately I would have to restart the docker container as it would freeze up due to memory constraints, resulting in one big headache.

This lead me on a search for a solution to my memory problem, but this is not your grandpas memory problem. The first thing I discovered was what Mage calls dynamic blocks. Dynamic blocks allow you to split up blocks, such as a data loader, into multiple pieces and run them in parallel. For example, we can take our data loader and rather concatenating all of our taxi data into one dataframe, we can split it into separate blocks that can each be loaded into memory in parallel or even sequentially.

At first, I ran 4 blocks in parallel (4 data loader blocks), each block represented a parquet file containing taxi data for a specific month (i.e. Jan, Feb, March). At this point I assumed I had cracked the code, but still found my pipleine was failing due to insufficient memmory, once again.

The solution I arrived at was to configure my Data Loader as a dynamic block and to then modify the concurrency of said blocks within the metadata.yml file belonging to my pipeline to a value of 1. This enabled my pipeline to run each dynamic block sequentially (1 by 1) until each file was loaded, rather than appending all the files to a single dataframe.

My understanding is that only 1 dynamic child block will run at a time, which will help alleviate potential memory constraints, but will be effectively much slower. Alas, I am beholden to the specifications of my personal computer, in which I've allocated 8 GBs of memory to docker as well as 4 vCPUs. Were I to configure this mage pipeline to run on a remote container, provisioned with greater memory and compute, I could likely increase the concurrency figure as necessary, effectively speeding up my pipeline runtime.

concurrency_config:
  block_run_limit: 1

At the end of this section, I've provided an example snapshot of what a Mage pipeline with dynamic blocks looks like during execution. For the sake of simplicity, I only loaded 3 months of Taxi data. The pipeline works as follows, the fetch_taxi_url_metadata_csv block is configured as a "dynamic block". It iterates through a range of 12, representing the total months in a year, then appends the url data for each file to a list of dictionaries that is referenced in our return statement as [download_info] to be passed unto our child block fetch_taxi_url_data_csv. The output of this block would look like the following image:

Mage Dynamic Data Loader

Next we have a "dynamic child" block. This secondary data loader takes the url for each item in our download_info list and retrieves the csv data for each month, i.e. January, February and March and returns each as a separate dataframe labeled "child_0, child_1 and child_2". Lastly, our data exporter exporter_taxi_data_gcs takes each child block and exports the dataframe into google cloud storage.

Mage Dynamic Data Loader

This parallelization allows us flexibility in a handful of ways. We can speed up our pipeline by increase the amount of concurrenct blocks we want to run. If we set this figure to 3, all 3 of our data loaders will run in parallel, instead of sequentially. If we set the concurrency value to 1, only one child block will run at a time, effectively reducing the amount of data we are loading into memory at one time. You can contrast this with some of the Mage pipelines we built in the earlier modules, where we loaded all of the csv data into one singular dataframe value. This may work in a pinch, but does not scale well the more data we load in. Particularly the yellow taxi data, which contains the most amount of records was giving me this issue!

mage-dynamic-blocks-dag

Timestamp Precision

I want to first note that this issue can be avoided simply by using the csv data in the DTC repo, but I was curious of the parquet format and wanted to explore that further by going directly to the source. In hindsight, the following issue was avoidable but regardless I want to share my findings as similar issues are likely to appear if you work in data engineering.

In my work with FHV taxi trip data, I encountered a notable issue while loading the data. My Mage pipleline was breaking due to the following error:

pyarrow.lib.ArrowInvalid: Casting from timestamp[us] to timestamp[ns] would result in out of bounds timestamp: 33106123800000000

This error arises from a common situation in data handling.

Our Parquet files store timestamps in microseconds. When converting these to nanoseconds, the standard practice in Pandas, each value is multiplied by 1000 (since 1 microsecond = 1000 nanoseconds). However, this operation can push the value beyond the maximum limit of a 64-bit signed integer, used by both Pandas (datetime64[ns]) and Parquet for timestamp storage. This limit is 26312^{63}−1 or 9,223,372,036,854,775,8079,223,372,036,854,775,807. Exceeding this threshold results in an 'out of bounds' error.

For instance, our original value of 33,106,123,800,000,000 (in microseconds) becomes 33,106,123,800,000,000,000 when converted to nanoseconds, a figure represented as 3.31061238e+22. This value significantly surpasses the 64-bit limit of 9,223,372,036,854,775,807. In practical terms, the maximum year that can be represented in nanoseconds is approximately Fri Apr 11 2262 23:47:16. Our value, corresponding to February 3, 3019, at 17:30, clearly exceeds this boundary.

To illustrate these out-of-range values, refer to the following table. As a Data Engineer, encountering such erroneous timestamps necessitates a decision on handling. In our bootcamp, I chose to convert out-of-bound timestamps to Pandas' NaT (not-a-time) type, essentially treating them as null values. In real-world scenarios, alternative approaches might be considered.

DatetimeMicrosecond TimestampNanosecond Equivalent (Microsecond * 1000)Within 64-bit Range?
1970-01-01 00:00:011,000,0001,000,000,000 (1e9)Yes
2262-04-11 23:47:169,223,372,036,8549,223,372,036,854,000 (9.22e15)Yes
2262-04-11 23:47:16.8547759,223,372,036,854,7759,223,372,036,854,775,000 (9.22e18)Yes
Out of Bounds9,223,372,036,854,7769,223,372,036,854,776,000 (9.22e18)No
Out of Bounds10,000,000,000,000,00010,000,000,000,000,000,000 (1e19)No
Out of Bounds33,106,123,800,000,00033,106,123,800,000,000,000 (3.31e19)No

Updated Mage Pipeline

If your curious about my implementation of Mage, you can take a look at my repo which contains the pipeline I outlined above, which is specifically named ingest_all_taxi_data_gcs_csv.

Creating and Deploying a dbt™ Model

dbt™ stands as a bridge between data engineering and software engineering, introducing practices from the latter to enhance the management and deployment of SQL code. Just as Python benefits from a wealth of libraries, dbt offers a suite of packages equipped with pre-written macros. These macros, akin to Python functions, simplify and streamline the creation and maintenance of our data models, making the overall process more efficient and manageable.

Setup

Setting up a dbt project involves several key steps that lay the foundation for efficient data modeling and transformation. Let's walk through the process of creating a new dbt project.

Create a project

If you haven't already, you can signup for an account here

Once signed in, you will need to create a DBT Account, which may sound confusing considering we just setup an account, but think of it as a directory that holds all of your dbt projects. I named mine DTC DE Zoomcamp.

If you are having trouble, refer to the documentation provided by DTC in the Week 4 readme for setuping dbt cloud.

Connect project to Github Repository

Before we create and deploy our dbt project, we will need to link dbt to our github repository, more specifically a cloned repository containg the Week 4 "04-analytics-engineering" folder. After we link the repository,

Note: I recommend using the GitHub connector instead of the Git Clone option, as I later had issues with deploying a CI Job due to using the latter.

Mage Global Variables

Next you will need to set the project subdirectory to the location of of our dbt project, which is within the taxi_rides_ny folder. dashboard

Connect project to Data Warehouse

Lastly, we need to link our project to our BigQuery Data Warehouse. This is pretty straight forward, as it allows us to import the service account key we generated in the previous module. At this point, you likely already have one downloaded, but see my previous post here if you need a refresher.

Define dbt Models

Staging Models

Now that our project has been configured with the necessary settings, it's time to open up the dbt IDE and get started. Once the IDE has loaded, one of the first things you may notice is a button prompting us to "Create Branch". It is best practice to create feature branches, which are any changes we want to make to our original code, prior to submitting whats known as a "pull request" that generally has to be reviewed by others to assure there are no breaking changes. Once reviewed, the pull request can then be merged into our main branch. In this case, we own the repository so we have the ability to create a feature branch, update our code, submit a pull request and approve it.

Under our dbt project taxi_rides_ny, there should be a models folder containing two subfolders: staging and core. The staging model will define the schema of our taxi data as well as some test suites. The core model will contain our fact table, which will ultimately join our green and yellow taxi data together as one table for further analysis.

Under staging, we defined the schema of our tables in schema.yml. Dbt then allows us to auto-generate the models for our staging tables, green_tripdata and yellow_tripdata, by clicking "Generate model".

sources:
  - name: staging
    database: dtc-de-zoomcamp-12345
     # For postgres:
      #database: production
    schema: ny_taxi

    tables:
      - name: external_green_tripdata
      - name: external_yellow_tripdata
      - name: external_fhv_tripdata
         # freshness:
           # error_after: {count: 6, period: hour}

For the sake of this blog post, I will share the external_fhv_tripdata.sql model I first auto-generated but then later modified to be consistent with the green and yellow tripdata models.

{{ config(materialized="view") }}

with
    fhv_tripdata as (
        select *, row_number() over (partition by pulocationid, pickup_datetime) as rn
        from {{ source("staging", "external_fhv_tripdata") }}
    )

select
    {{ dbt_utils.generate_surrogate_key(["pulocationid", "pickup_datetime"]) }}
    as tripid,
    {{ dbt.safe_cast("dispatching_base_num", api.Column.translate_type("integer")) }}
    as dispatching_base_num,
    cast(pickup_datetime as timestamp) as pickup_datetime,
    cast(dropoff_datetime as timestamp) as dropoff_datetime,
    {{ dbt.safe_cast("pulocationid", api.Column.translate_type("integer")) }}
    as pickup_locationid,
    {{ dbt.safe_cast("dolocationid", api.Column.translate_type("integer")) }}
    as dropoff_locationid,
    sr_flag,
    affiliated_base_number

from fhv_tripdata
--where rn = 1

-- dbt build --select <model.sql> --vars '{'is_test_run: false}'
{% if var("is_test_run", default=true) %} limit 100 {% endif %}

Compiling the model will allow us to see the output of the macros we are using.

with
    fhv_tripdata as (
        select *, row_number() over (partition by pulocationid, pickup_datetime) as rn
        from `dtc-de-zoomcamp-12345`.`ny_taxi`.`external_fhv_tripdata`
    )

select
    to_hex(md5(cast(coalesce(cast(pulocationid as string), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(pickup_datetime as string), '_dbt_utils_surrogate_key_null_') as string))) as tripid,
    safe_cast(dispatching_base_num as INT64) as dispatching_base_num,
    cast(pickup_datetime as timestamp) as pickup_datetime,
    cast(dropoff_datetime as timestamp) as dropoff_datetime,
    safe_cast(pulocationid as INT64) as pickup_locationid,
    safe_cast(dolocationid as INT64) as dropoff_locationid,
    sr_flag,
    affiliated_base_number
    
from fhv_tripdata
--where rn = 1

-- dbt build --select <model.sql> --vars '{'is_test_run: false}'
 limit 100 

Running the following dbt command will create the staging table in google bigquery based on the model we have specified.

dbt build --select +stg_external_fhv_tripdata+ --vars '{'is_test_run': 'false'}'

Core Models

Within the Models folder, we should have two subfolders, Core and Staging. With our staging model defined, we now need to create a our finalized Fact model. This is where we will perform any necessary joins to our table. We no longer need to worry about the data types in our schema as we have handled that in our staging table model. In the case of our fact_fhv_trips.sql model, we are selecting the necessary columns we want in our finalized model and performing two inner-joins to our dim_zones model, which has a where clause to filter out any zones with a borough type of unknown. This was a requirement for one of the homework questions in the week 4 module.

{{
    config(
        materialized='table'
    )
}}

with external_fhv_tripdata as (
    select *, 
    'FHV' as service_type 
    from {{ ref('stg_external_fhv_tripdata') }}
),
dim_zones as (
    select * from {{ ref('dim_zones') }}
    where borough != 'Unknown'
)

SELECT 
external_fhv_tripdata.dispatching_base_num,
external_fhv_tripdata.affiliated_base_number,
external_fhv_tripdata.pickup_locationid,
external_fhv_tripdata.dropoff_locationid,
external_fhv_tripdata.pickup_datetime,
external_fhv_tripdata.dropoff_datetime,
external_fhv_tripdata.service_type
FROM external_fhv_tripdata
inner join dim_zones as pickup_zone 
on external_fhv_tripdata.pickup_locationid = pickup_zone.locationid 
inner join dim_zones as dropoff_zone
on external_fhv_tripdata.dropoff_locationid = dropoff_zone.locationid

Once finished, the lineage of our dbt model should look something like this:

dbt-dag

Model Schema

In our schema.yml file we can

sources:
  - name: staging
    database: dtc-de-zoomcamp-12345
    schema: ny_taxi

    tables:
      - name: external_green_tripdata
      - name: external_yellow_tripdata
      - name: external_fhv_tripdata

and within a downstream model, we can use:

select * from {{ source('staging', 'external_fhv_tripdata') }}

Which will be compiled to:

select * from dtc-de-zoomcamp-12345.ny_taxi.external_fhv_tripdata

We can also define tests we would like to execute against our models on build time. i.e. we want to test our tripid column to ensure each field is unique and not a null value, in the instance below, a warning will be thrown if a record does not meet that criteria. A warning will not stop our model from deploying, but if we were to change the severity type to error, this would stop our deployment. This can all be defined within our schema.yml file in the Staging subdirectory we've created within Models.

models:
  - name: stg_external_green_tripdata
    description: ""
    columns:
      - name: tripid
        data_type: string
        description: ""
        tests:
          - unique:
              severity: warn
          - not_null:
              severity: warn

Packages and Macros

dbt packages can be added in the packages.yml file, similar to how we added python dependencies for our docker container in the requirements.txt file in module 1. We utilized some of these macros while building out our staging models.

db_utils package provides us with macros we can (re)use across dbt projects, such as the generate hash key macro we used in our staging model to create a unique primary key.

The dbt-labs/codegen package contains a handful of useful macros that make writing a model more efficient. For example, we can use the generate_model_yaml macro to generate the YAML for a list of models we can then paste into our schema.yml file. This saves us time when defining the schema of our tables.

{% set models_to_generate = codegen.get_models(directory='marts', prefix='fct_') %}
{{ codegen.generate_model_yaml(
    model_names = models_to_generate
) }}

Project Variables

Variables can be defined within the dbt_project.yml to provide data to models for compilation. i.e. the payment_type_values variable is a field that is present in both green and yellow taxi data.

Defined as follows in dbt_project.yml:

vars:
  payment_type_values: [1, 2, 3, 4, 5]

and later accessed in the schema.yml for our project under both the green and yellow taxi models using the the {{ var('...') }} function:

 - name: payment_type
        data_type: int64
        description: ""
        tests:
          - accepted_values:
              values: " {{ var('payment_type_values') }}"

Deployment Jobs (CI/CD)

dbt makes creating a CI job fairly simple. We will start by creating a new environment within dbt under Deploy -> Environments.

dbt Create Prod Environment

When configuring the deployment job, we'll enable generate docs on run and run source freshness.

A Continious Integration job will ensure when any new changes are successfully merged into main, a job will automatically be deployed and any updates will be applied to our production model.

dbt effectively describes how CI works in their documentation: "When you set up CI jobs, dbt Cloud listens for webhooks from your Git provider indicating that a new PR has been opened or updated with new commits. When dbt Cloud receives one of these webhooks, it enqueues a new run of the CI job.

dbt Cloud builds and tests the models affected by the code change in a temporary schema, unique to the PR. This process ensures that the code builds without error and that it matches the expectations as defined by the project's dbt tests...

When the CI run completes, you can view the run status directly from within the pull request. dbt Cloud updates the pull request in GitHub, GitLab, or Azure DevOps with a status message indicating the results of the run. The status message states whether the models and tests ran successfully or not.

dbt Cloud deletes the temporary schema from your data warehouse when you close or merge the pull request."

We can also setup a Deploy job simply to run our model at a scheduled cadence to ensure any new data that may have been ingested into our data source will be captured in our production models.

Data Visualization

Now that we have solidified our data model, we can use Google's data looker studio to create a dashboard for analysis. Naturally, Google's BigQuery is already built in as a data connector, which makes accessing our data a straightforward process. I used our newly created fact_trips model to build a simple dashboard which aims to convey how Taxi Trips and Revenue were effected by covid from 2019-2020.

dashboard

link to dashboard

Conculsion

For more insights or to discuss these topics further, feel free to connect with me on LinkedIn. I'm really excited to dive into the coming modules as the zoomcamp is coming to a close in the coming months.