• Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About
Microsoft Data & AI

All Things Azure

Azure Data Factory (ADF) Pipeline Runs - Query By Factory & Avoid Concurrent Pipeline Executions (blog post 1 of 2)

8/13/2019

1 Comment

 
I had a little bit of fun in ADFv2 today and wanted to share my solution for making sure that orchestrator (master) pipelines do not execute concurrently.  This can happen for several reasons, but as of August 2019, there is a bug in ADF that causes a single trigger to start a pipeline twice.  If you have data-driven incremental loads, this is grossly inconvenient as the metadata tables become completely discombobulated (my technical word of the day).  This solution is built on ADFs ability to query pipeline run metadata from the ADF log through a web activity explained here.  Come walk with me and let's step through this together.

Quick Start Guide
If you are familiar with ADF and just want the ten cent tour,  follow these steps.  Detailed screen prints for each of these steps have been published in blog post 2 of 2 for visual reference.

1.  Create a service principal and record the client Id, client secret, tenant Id for future use.

2. Create a file or SQL Server table to hold your environment properties.  Add the all three IDs to your environment object.  Use Azure Key Vault for your client secret if it is available to you.  Assumption: your environment object already contains SubscriptionName, SubscriptionID, ResourceGroupName, and ADFName information.

3.  Create a new pipeline with parameters PipelineName as string and MetadataDB as string.  Add four variables to the same pipeline named: CurrentDateTime  as string, PriorDateTime as string,  AccessToken as string and FailThePipeline as boolean.

4.  Add a Lookup activity to the pipeline named "Get Environment Properties" and do just that.

5.  Add a Web activity to the pipeline named "Get Token" and enter these property values:
      URL = @concat('https://login.microsoftonline.com/',activity('Get Environment Properties').output.firstRow.ADFTenantID,'/oauth2/token')
      Method = POST
      Headers = Content-Type with value = application/x-www-form-urlencoded
      Body =
@concat'grant_type=client_credentials&resource=https%3A%2F%2Fmanagement.azure.com%2F&client_id=',activity('Get Environment Properties').output.firstRow.ADFClientID,'&client_secret=',activity('Get Environment Properties').output.firstRow.ADFClientSecret,'&scope=openid')

6. Add three Set Variable activities to the pipeline and do just that
     CurrentDateTime variable value = @adddays(utcnow(), +1)
     PriorDateTime variable value = @adddays(utcnow(), -1)
     AccessToken variable value = @activity('Get Token').output.access_token

7. Add a second web activity to the pipeline named "Get ADF Execution Metadata
     URL = @concat('https://management.azure.com/subscriptions/', activity('Get Environment Properties').output.firstRow.SubscriptionID, '/resourceGroups/', activity('Get Environment Properties').output.firstRow.ResourceGroupName, '/providers/Microsoft.DataFactory/factories/', activity('Get Environment Properties').output.firstRow.ADFName, '/queryPipelineRuns?api-version=2018-06-01')
     Method = POST
     Headers = Content-Type with value = application/json
                             Authorization with dynamic content value of @concat('Bearer ',variables('AccessToken'))
     Body = dynamic content value =
{
"lastUpdatedAfter":"@{variables('PriorDateTime')}",
"lastUpdatedBefore":"@{variables('CurrentDateTime')}",
"filters":[
                  {"operand":"PipelineName",
                   "operator":"Equals",
                    "values":[
                                  "@{pipeline().parameters.PipelineName}"
                                  ]
                   },
                  {"operand":"Status",
                   "operator":"Equals",
                    "values":[
                                  "InProgress"
                                  ]
                   }
              ]
}


8.  Add an If Condition activity to the pipeline named "If Pipeline is Already Running" containing expression dynamic content of @greater(length(activity('Get ADF Execution Metadata').output.value), 1)

9.  Add a Set Variable activity to the "If Pipeline is Already Running / If True Activities" activity and name it "Fail the Pipeline" with hard-coded value = Pass string instead of boolean value in order to fail the pipeline.  (ADF really needs an "Exit Pipeline" reporting success or failure activity, but let's leave that for another discussion.)

10.  Connect the pipeline activities all with green Success arrows.  Your pipeline should look like this -->
Picture
11. The final step is to add a Stored Procedure activity to the very front / start of orchestrator pipelines passing the orchestrator pipeline name to the ADF Get Metadata pipeline.  The orchestrator can only continue if the ADF Get Metadata pipeline -- or whatever you called it -- completes successfully.
Picture
Conclusion:​
I have also published blog post 2 of 2 which contains more detailed screen prints, but just because I have a strong desire to help to make you successful (yes, even though we have not met), I have uploaded the JSON code for this pipeline below.  Download it and make it your own.
adf_pipeline_metadata.json
File Size: 11 kb
File Type: json
Download File

1 Comment
Wayne Hing
6/23/2020 05:29:26 pm

Hi Delora,

We have hourly triggers however sometime they fail, we introduced 2 x retries for each trigger, but now the retries can overlap with the next 1 hourly run.The "Exit Pipeline is what is needed" I think.


We want to only run if not already running so we don't have the clash.

Your solution would would but.....we have our ADF setup with log analytics searching for "failed" and sending alerts to the team of failed pipelines. but now we would get alerts because of the

Reply



Leave a Reply.

    Categories

    All
    Agile Methodology
    Azure Blob Storage
    Azure Data Factory
    Azure Data Lake
    Azure Data Warehouse
    Azure SQL Database
    Cosmos DB
    Data Architecture
    Databricks
    Elastic Query
    External Tables
    Linked Services
    Migrating To The Cloud
    Parameters
    PolyBase
    Project Management

    RSS Feed

Powered by Create your own unique website with customizable templates.
  • Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About