• Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About
Microsoft Data & AI

All Things Azure

Azure Data Factory (ADFv1) Techniques for Starting a Pipeline On Demand During Development Phase; Pipeline Schedules

4/25/2017

1 Comment

 
One of the challenges of ADF is getting a pipeline to run demand.  I am told by Microsoft that the next version of ADF coming fall 2017 will include this functionality, but right now, there is no <Run Now!> button to be found.  Granted, you will find a rerun button in the ADF Monitor Console (orange box below) ...
Picture
... but this button is only enabled for failed pipelines.  What about during development?  Those of us coming from SSIS expect <execute> now functionality, but with ADF, there is no such thing because the pipeline is tied to the job schedule.  This is when I remind myself that just because something is different, it doesn't make it wrong.   Okay, so let's work with what we have.

First an assumption:  We do not want to have to use a custom .NET activity, PowerShell command, or Azure job scheduler.  The scenario is development mode and all we want to do is test to see if our new datasets and pipeline are producing the desired result.  Second, recall that in order for an activity to run on demand, ADF compares the "start" property of the pipeline to the last run DateTime of each dataset and there is no  way to remove dataset execution history.  

Consequently, there are two ways to start a non-failed pipeline on demand:

1.  Clone, delete and redeploy the datasets used in the pipeline. 
2.  Change the "start" property of pipeline to an earlier point in time

Option #1: Clone, delete and redeploy the datasets
The first thing you will run into is dependencies.  This isn't a big deal if your pipeline only has one set of input/output datasets.  However, when you are working with a pipeline containing a plethora of tables or files, this becomes a time consuming and outright ridiculous option.
1.  Navigate to Azure Data Factory Author and Deploy action
2.  Select each input and output dataset used in the pipeline
3.  Click <Clone>
4.  Right mouse click on each dataset you just cloned and <delete>
5.  From the set of Drafts created in step #3, click <deploy>

Picture
Be aware of "start", "end" and "pipelineMode" pipeline properties before you redeploy.  The pipeline "start" is UTC time and must be the current UTC DateTime or a DateTime in the past, or the pipeline won't start.  If you have cloned a paused pipeline, you will also need to change the "pipelineMode" property to "Scheduled".

As a matter of habit, I clone, delete and redeploy both my datasets and my pipeline to remove all internal run history.  You could change the activity properties of a pipeline and then just clone, delete and redeploy your input datasets, but that seems more complicated to me.  For pipeline that have a limited number of activities, the clone, delete and redeploy goes pretty quick and produces the desired result: a running pipeline!

Disclaimer: This is a DEV solution.  If you are in PROD you will over-write existing files and folders!!


Option #2: Change the "start" property of the pipeline

If all your datasets are in step with each other (all have the same execution history), you can also get the activities of a pipeline to run on demand by updating the "start" property of the pipeline to a prior point in time.  This will cause ADF to "backfill" the destination creating new destination files or folders for this same earlier point in time.   On a full data load (a pipeline that does use a WHERE LastModified DateTime > [x])  the resulting destination  files will be a duplicate of a later point in time, but at the moment, we are in DEV and are just trying to test out the input, output and resulting destination result.  If you are in PROD, be very careful.  Because you have not deleted anything, source files, folders or tables will not get overwritten, but you may have just caused issues for an ETL process.

Disclaimer: This option has worked for me except when my pipeline has activities referencing datasets that have different execution histories.  I've looked at my new (earlier) destination location and have had missing files because there has been datasets with execution histories since the dawn of time.

1 Comment

Azure Data Factory (ADFv2) Parameter Passing: Date Filtering (blog post 1 of 3)

4/25/2017

10 Comments

 
I just had the pleasure of deleting in my blog backlog, three ADFv1 blog posts in the making.  The arrival of Azure Data Factory v2 (ADFv2) makes me want to stand up and sing Handel's Hallelujah Chorus.  Yes, my fine friend, ADFv2 is a real game player now.  Let us begin!

Assumptions:
  1. You have an ADFv2 environment in which to work.  These examples will not work in ADFv1.
  2. You are using VSTS GIT for source code control.  You will also need VSTS GIT for easy copying and editing.
  3. You are familiar with creating ADF linked services aka data connections.

Summary: This blog post will give you examples of the following ADFv2 functionality
  1. Calling a SQL Server stored procedure via ADFv2's new Lookup capability, and retaining the output dataset in memory
  2. Passing dynamic date filters to the Salesforce SELECT statement.  (I use Salesforce because most people can figure out everything else, but SOSQL is not very intuitive.)
  3. Creating date-specific destination folders in Azure Blob Storage (ABS).
  4. Concatenating string values
  5. Correctly referencing pipeline parameters and activities in ADFv2 dynamic content
This blog post is one of three in a series.  I will update this post with a link when they become available.
  • Azure Data Factory (ADF) v2 Parameter Passing: Table Names (2 of 3): I do not like hard-coding table names This blog post will parameter the table name, remove all hard-coded column definitions, and use a SELECT *.
  • ​Azure Data Factory (ADF) v2 Parameter Passing: Putting it All Together (3 of 3): When you combine a Salesforce filter with a parameterized table name, the SELECT * no longer works.  This blob post will show you how to parameterize a list of columns and put together both date filtering and a fully parameterized pipeline.

Setup
To get "this extract start time" we obviously have to incorporate a metadata layer in our solution.  ADFv2 can now Lookup to Cosmos DB as well as several other data sources listed here, but I'm an old fashioned SQL Server lady and I prefer to use an Azure Database for things like this.  It doesn't hurt to keep tables and procs, like what is included in the sample t-sql file at the end of this post, in an Azure Data Warehouse, but honestly, replicated 60 times?  If you can afford it, stand up a simple Azure SQL Database for metadata collection.

Step #1: Create your ADFv2 control objects in your metadata SQL database
  1. etl.ADFControl table
  2. etl.vwADFControl view
  3. etl.usp_ADF_Get_ADFControl_NextExtract stored procedure
  4. etl.usp_ADF_Set_ADFControl stored procedure
The CREATE t-sql scripts for these objects can be downloaded below. 

Step #2: Create your three ADFv2 linked services (data connections)
  1. SQL Server metadata source
  2. Salesforce, or some other data source
  3. Azure Blob Storage, or some other data destination
Tip: If you have never created a linked service before in ADF, use the wizard and do not try to copy someone's JSON.  A limited amount of data connection information is (understandably) stored in VSTS GIT, so you will need to manually update your data connections in each environment (DEV, STG, UAT, PRD etc.) anyway.

Step #3: Create your ADFv2 datasets
I am going to show you screen prints here, but the JSON for these datasets is provided at the end of this blog post.  This blog post uses a hard-coded table name.  In blog post 2 of 3, I'll show you how to use a single input and output dataset to extract all tables from a single source.
metadata DB used for all objects
Your lookup and execute SQL tasks will use this same dataset.
Picture
input dataset for your source
This is your source dataset.  You actually do not need to define every column, but we'll tackle that later.
Picture
Picture
output dataset for your destination
This is your destination dataset definition.  You actually do not need to define every column here either, but more on that later.
Picture
File Path Part 1
@concat(pipeline().parameters.blobContainerName,' /', formatDateTime(pipeline().parameters.actualRunTime, 'yyyy'), '/', formatDateTime(pipeline().parameters.actualRunTime, 'MM'), '/')

File Path Part 2
@concat('AccountPartner-', formatDateTime(pipeline().parameters.actualRunTime, 'yyyy'), formatDateTime(pipeline().parameters.actualRunTime, 'MM'), formatDateTime(pipeline().parameters.actualRunTime, 'dd'), ' ', formatDateTime(pipeline().parameters.actualRunTime, 'hh'), formatDateTime(pipeline().parameters.actualRunTime, 'mm'), '.txt')
Picture
You may be thinking, "This is a lot of stuff to just import one table!" but stay tuned.   I promise you, you will have less development time, cost of ownership, and less downtime by moving data with ADFv2 then you would with SSIS packages especially if you have > 2GB  of data in a source object.  Using parameters is the key to success.

Step #4: Create your ADFv2 pipeline
Picture
Lookup Activity: GetLastExtractStart
Picture
Click <Preview data> to see the result set from the stored procedure.  This value can be referenced with the following syntax:
​activity('GetLastEtxractStart').output.firstRow.NextExtractStartDateTime
Picture

​Stored Procedure Activities: Set Start and End Dates
SetLastExtractStart activity
Picture
SetLastExtractEnd activity
Picture

​Copy Activity: The actual copy data from source to destination filtered by YourSource.LastModifiedDate
Picture
Picture
<----Source Query Dynamic Syntax
SELECT Id, SystemModstamp FROM AccountPartner WHERE SystemModstamp >= @{formatDateTime(activity('GetLastEtxractStart').output.firstRow.NextExtractStartDateTime,'yyyy-MM-ddTHH:mm:ssZ')}
​
If you are new to ADFv2, here are a couple of screen prints on how to validate, save, publish and run your pipeline on demand.
Picture
Picture
Conclusion of the Matter
This blog post is intended for developers just starting out in ADFv2, and especially for ADFv2 developers pulling Salesforce data.  I do not expect you to have needed all the information provided, but I am hoping you'll find a tip or trick that will save you time.  Referencing activities and pipeline parameters is easy to do, for example, but when I first worked on this, I found very few examples.  Finding the right syntax for a Salesforce date filter took me about a week.  

You may be frustrated transitioning from SSIS to ADFv2, but don't give up.  Remember that ADFv2 is not a data transformation tool -- it is a data copy tool.  SSIS <> ADFv2, and in a world where we are moving to MPP architectures and ELT via stored procedures, ADFv2 as an "orchestrator" is worth considering.


Supporting Downloadable Files
json_for_adfv2_parameter_passing_1.zip
File Size: 2 kb
File Type: zip
Download File

adfcontrol_sql_server_metadata_objects.zip
File Size: 1 kb
File Type: zip
Download File

10 Comments

Azure Data Factory (ADFv1) JSON Example for Salesforce Incremental Load of RelationalSource Pipeline

4/25/2017

2 Comments

 
What I am sharing in this blob post isn't rocket science, but when I first began working with ADF, I would have had genuine feelings of gratitude if I could have found a JSON script example for incremental loads.  Consequently, this blog post is here in hopes that it finds someone who just needs a quick pick-me-up with ADF.

You will want to first reference
my blog post on initial data loads with ADF because I will build here on the same resources and linked services.  What is different is the pipeline JSON script.  Follow the instructions for initial data load for everything else.

Using the Account table from Salesforce as an example, here is  what I call an incremental load pipeline:

{
  "name": "InboundPipeline-SalesforceIncrementalLoad",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "RelationalSource",
            "query": "$$Text.Format('select * from Account where LastModifiedDate >= {{ts\\'{0:yyyy-MM-dd HH:mm:ss}\\'}} AND LastModifiedDate < {{ts\\'{1:yyyy-MM-dd HH:mm:ss}\\'}}', WindowStart, WindowEnd)"
          },
          "sink": {
            "type": "AzureDataLakeStoreSink",
            "writeBatchSize": 0,
            "writeBatchTimeout": "00:00:00"
          }
        },
        "inputs": [
          {
            "name": "InputDataset-Account"
          }
        ],
        "outputs": [
          {
            "name": "OutputDataset-Account"
          }
        ],
        "policy": {
          "timeout": "1.00:00:00",
          "concurrency": 1,
          "executionPriorityOrder": "NewestFirst",
          "style": "StartOfInterval",
          "retry": 3,
          "longRetry": 0,
          "longRetryInterval": "00:00:00"
        },
        "scheduler": {
          "frequency": "Day",
          "interval": 1
        },
        "name": "Activity-Account"
      }
    ],
    "start": "2017-04-17T19:48:55.667Z",
    "end": "2099-12-31T05:00:00Z",
    "isPaused": false,
    "pipelineMode": "Scheduled"
  },
}


Unfortunately at this time, the "query" property isn't that flexible.  I have been told by Microsoft that the next version of ADF scheduled for release fall 2017 will include input parameters.  I have tested and was able to use a REPLACE() function in the above SELECT but that is because under the covers, Microsoft is using the Simba ODBC driver for their Salesforce connection.  Each data source will have different capabilities within their "SELECT..."  Having identified several challenges with the current version of ADF, I was just happy to be able to use a WHERE clause.


2 Comments
Forward>>

    Categories

    All
    Agile Methodology
    Azure Blob Storage
    Azure Data Factory
    Azure Data Lake
    Azure Data Warehouse
    Azure SQL Database
    Cosmos DB
    Data Architecture
    Databricks
    Elastic Query
    External Tables
    Linked Services
    Migrating To The Cloud
    Parameters
    PolyBase
    Project Management

    RSS Feed

Powered by Create your own unique website with customizable templates.
  • Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About