Using an ETL/ELT tool like Data Factory often involves a LOT of development, debugging, and head-scratching, before eventually arriving at a working and useful solution. If we only ever see the final solution – it can be quite challenging to digest every step and nuance about how it works.

Consider the ADF pattern below that orchestrates the movement of data from a source database to Azure Data Lake Storage using a control table and Data Flows. There are a lot details to consider about what may seem like a relatively simple pipeline run, so this post will focus focus on just a small piece of this larger solution.

Let’s start with a simple idea that the majority of data in our source database may not be changing very often. It’s likely that only a small portion of each table may have changed since our last pipeline run. We call these changed rows the “deltas”.

As the size of our data tables grow – it may not be feasible, timely, or cost effective to try to move all the data every time we run a pipeline. Instead, we want to focus on the smaller parts of the data in each table that have changed from the last time it was moved. But how do we determine which data has changed?

Cue the Watermark Column

By design, this is the column in each table that indicates when it was last created or modified. We can use this column in a simple SQL statement to return all of the rows with a ModifiedDate after a point in time.

Granted this is an old AdventureWorks table so the dates are a bit ‘historical’, but we will use this pattern in ADF to return only columns that have been changed since our ‘last run’.

This post assumes some understanding of setting up and using Control Tables (as I won’t go through those specifics). Check out this post if you would like a quick primer on that.

Here’s the (updated) Control Table for orchestrating this job. Let’s get our Dataset and Pipeline setup.

We can reuse the datasets previously created for our Source Azure SQL DB and Control Table. We need to make one quick edit to our dataset for ADLS to allow it to handle RAW or STAGING folders down the road.

*In a previous post we parameterized the filetype to “parquet.” Since the initial dataset was created with the parquet template (see the parquet logo in the top left) it is not possible to change the filetype in the control table to something like “csv” and expect the generated file to work correctly.

Ok – with that set – let’s setup a pipeline run that will identify and copy the deltas from our source database tables to our Raw ALDS folder.

Here’s what the entire run looks like – and then we will go through some of the pieces in more detail.

The ‘top’ line of the pipeline is a repeat of the same process we used in this previous post. Let’s focus on the 3 steps below.

Set a Variable based on Current Time

While it might seem intuitive to just drag the “Set variable” card onto the pipeline screen – variables must first be ‘created’ before they can be used.

To create a variable – we have to bring up the Pipeline Menu by clicking on the background of the pipeline area and navigating to Variables.

Once named – we can now ‘select’ this variable in the dropdown menu of the Set Variable card.

We are going to set some dynamic content for this variable that gets the current date and time and then switches it to Eastern Standard. We need to combine 3 functions which work together to build our return value. The returned value must be in a format accepted in our SQL where clause.

We can check to make sure this is returning the expected value using the debug feature and then selecting the output of this variable step.

This is returning the expected value!

If we wanted this value to instead return a date that was maybe 1 day ago – we could use the addDays function to the formula like so. This same pattern would work for addHours, addMinutes, addSeconds, etc)

addDays(utcnow(),-1),'UTC','Eastern Standard Time'),'yyyy-MM-dd HH:mm:ss.fff')

DateTime Versus DateTime2

Now is a great moment to point out a problem that arises when we have two different datetime formats (one in our control table and another in our source table.

The issue is largely due to the number of decimal places after the final second. DateTime2 has 7 (even if it’s not specified) while datetime will always have 3. This mismatch will cause problems when we try to use the values from the last_run column of our control table to filter the DateTime column of our Source Table.

We can solve this by either removing the milliseconds entirely or reducing the datetime2 value to just 3 places. Let’s do the latter by creating another variable that converts our DateTime2 value from the control table to DateTime format. First we create the variable name.

Then set the variable to convert the last_run column to 3 places after the decimal (or less).

Identify Deltas in Source Table and Copy to Raw Folder in ADLS

This middle step of our data movement is a Copy Activity. On the source side of the activity we are going to reference our parameterized control table columns and a bit of dynamic content magic to make a SQL select statement.

Let’s have a closer look at the ‘dynamic content’ SQL Statement for the Copy Activity:

Here’s the script:

SELECT * FROM @{concat(item().source_schema,'.',item().source_table)} WHERE @{item().source_watermark_column} >= '@{variables('lastrun')}'

There are a few considerations with this:

  • Content from parameters / variables must start be wrapped with @{ }
  • Intentional line breaks in the code will create artifacts in the SQL script that mess up the final result

When everything works as planned – the desired SQL script will appear correctly in the input part of the Copy Activity job (when debugging)

Let’s have a look at the sink side of the copy activity

Here we are using our dataset parameters to establish the storage convention and naming convention of our RAW delta file. This pattern creates a folder for each table in RAW.

Perhaps its a bit redundant to have the destination_table repeated both as a folder and as the filename. We could instead use a date for the filename (which would give us a living record of each delta in the same folder)

We can unlock a lot of different storage and naming patterns using our control table values and these parameters. For now, this date pattern is sufficient.

Update Control Table with Variable Timestamp

In this last step we want to update the last_run column in our control table with the variable timestamp we created at the start of the run. This value will then be used as the watermark value for the next run.

First we need to create the Stored Procedure. Here’s a simple one that accepts 3 parameters and updates the control table for the row that has a matching schema and table name.

Now let’s add a stored procedure activity to our pipeline and fill it out.

That’s it. Now let’s debug and test to make sure everything works as expected.

Perfect! Now publish and save the pipeline (hopefully you’ve been doing that throughout).

As our solution grows we will come back to this pattern and reuse some of these elements! Stay tuned for the next data factory installment!