We are back and ready to expand upon our Data Factory ELT journey. Previously we looked at using a Control Table and Watermark Columns. In this post we will use a Dataflow to combine our Raw Deltas with an existing staging table and then review the file in Power BI Desktop. The genesis for this idea was shared with me by Karl Hacke who designed the pattern and provided a lot of guidance around this body of work. I appreciate his support and mentorship throughout this learning process.
The pathway we are exploring today is ideal for pipelines that will run on LARGE tables. Since the cost of pipelines is accrued during their execution, the LONGER a run takes – the more expensive it is. So there is a benefit to having a shorter pipeline.
The alternative pathway (called Trunc and Load) implies that we delete the old data and grab a fresh copy of the entire database table every time the pipeline runs. While there are less moving parts in this method, there is a natural inflection point where tables become too large for this to be practical. In this post we will be continuing with the top pathway (Watermark Deltas).
Our pipeline has not changed much from the previous 2 posts… Here are the first few steps:
One minor change above is that we’ve moved our Start time variable outside of the for each loop. This ensures that the time placed in our control table at the end is consistent for all related tables.
Let’s look inside the For Each Loop:
In our previous post – we walked through the details of moving data that has been changed since our last pipeline execution into a RAW data lake folder. Let’s step back and ask a high level question.
What happens the FIRST time a new table enters this Pipeline?
This is managed by our Control Table – and our the pipeline uses the “Last_run” column to determine which rows to copy into the Raw ALDS folder.
If this were a new table, it should be assigned an OLD watermark value – which ensures that ALL ROWS are captured in the first run.
So now we have a copy of our NEW table in the RAW folder.
How do we check if a table already Exists in the Staging Folder?
Our activity today is based on using the Staging folder of our Data Lake as the data source for Power BI reporting. Each folder has a database parquet file (or files) representing the most recent ‘version’ of our last pipeline execution run (presumably after some simple transformation work has been done).
If we were doing a Trunc and Load – we would just overwrite the staging file and move it along, but in our case, we want to use this previous Staging file and add our raw deltas to it.
So we first have to see if the file is present in the Staging directory.
To do this we will use the Get Metadata Activity to return specified metadata for our Staging environment. Here’s what that step looks like in our pipeline.
Notice that in this step we needed to use a new dataset for this metadata activity…. here’s why.
In future steps – we are going to use a Dataflow to ‘create’ the new Staging file. By design – dataflows will not allow the use of a dataset that specifies the filename. Instead these names are assigned automatically.
We aren’t going to be able to ‘predict’ what this name will be – so we can’t specify a filename in our dataset. The Get Metadata Activity also doesn’t allow for us to use wildcards (*) in our existing parameterized dataset.
To get around this we need to create a new dataset that simply removes the file name parameter.
Our metadata activity will use this dataset will point to the Folder where our Staging files will be placed.
If this were the first time this table were run (or if we wanted to reload the entire table) – the folder specified in the dataset would not exist. This is because the folder is created during the Pipeline run.
Let’s delete the Product folder and try again:
Split the Pipeline between a TRUE or FALSE pathway
For this next step we are using the IF Condition Activity
If No Staging Folder exists (FALSE)
This means we must create the staging file for the first time. Similar to what we’ve done a few times already – we will use a COPY activity and query to copy the ENTIRE table to a STAGING folder.
It’s worth pointing out that at this point the “For Each iterator” vanishes from the Dynamic Content menu. However, we can still use the formula – by just typing in the values directly.
If a Staging Folder exists: (TRUE)
When the staging folder (and file) do exist – we want to run our update cadence. Here are the 4 steps included:
We first need to know what rows exist in the source system. We will need this in the dataflow – in order to remove any rows in our staging table that may no longer exist in our Source table.
For simplicity we can ‘reuse’ the same copy activity from our FALSE activity with a few quick modifications to the SQL query so that it ONLY selects the key column.
On the sink side – we need to update the location to be a subfolder in RAW called “Keys”.
Let’s build a DataFlow
DataFlows can be a little intimidating (especially if we want to maintain our parameterized pipeline), so let’s pick this one apart slowly.
Let’s start with the end in mind:
Remember that our 3 source files in the dataflow are built using the same logic and parameterized dataset. Let’s look at the existing Staging file:
We don’t have to do much except select the dataset here. Most of the logic happens based on the parameters we feed to this. In fact – for EVERY source or sink in this Dataflow – we will have the opportunity to add the appropriate parameters to the dataset.
The names of our dataflow source and sink items will appear as parameters in the dataflow settings.
With all our sources identified. The next step is to add a HASH for our Source Keys. Since each table has a different key_column ID – we need to setup a way to pass this column name into the Dataflow.
Create a Dataflow Parameter
Now outside the dataflow – we can access this parameter and add information from our control table which identifies the key column in each table.
Let’s create a Hash Key
Regardless if we actually need to do this – we are going to add a md5 Hash column to our SourceKeys called HashKeysfromSource. We will do this to our SourceKeys table and again to our Staging+RawDelta table. We will utilize our dataflow parameter in this step – which identifies the the correct key column for each table.
Bring together Existing Staging and Raw Deltas to build a New Staging File
This step is pretty mundane. We are going to append (or union) our two files
This builds our NEW Staging table!
After this step we will add a Hash Key column to this table – exactly how we did previously.
With a hash key on both tables – we can join against this column and keep only the rows that exist in both.
Send new staging file to ADLS
Remove the old staging and keys file
Now we need to do a little clean up of our raw and staging folders to remove files we no longer need. Since the staging file will be assigned a random name by the dataflow, we will identify the older file (to be removed) using the modified date.
The keys file is easier to identify since we were able to assign a proper name when we created it.
Testing with Power BI
Since parquet files cannot be natively opened with the storage explorer, we will point Power BI to the Staging folder in order to ‘see’ if our solution is working as we expect.
We import our parquet files using the Azure Data Lake Storage Gen2 connector
We can filter down our result set to just the folder we are interested in – and expand the ‘content’ menu to combine multiple parquet files (if partitioned) together.
The end result should be a visible table of our staging data.
Since we are working with static datasets, we can test this arrangement by adding a new row to one of our target tables to ‘see’ if that new row appears in our updated staging file. Likewise if we delete that same row from the dataset, we would expect it to also vanish from our new staging file (as a result of our key deletes step).
While there is certainly more debugging that could occur around the possibility of duplicate rows appearing, this setup is working as expected
There it is! There are certainly a lot of moving parts to this setup – but this pattern will come in very handy when we encounter large database tables.