• Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About
Microsoft Data & AI

All Things Azure

Azure Data Factory (ADF) Pipeline Runs - Query By Factory & Avoid Concurrent Pipeline Executions (blog post 2 of 2)

8/13/2019

4 Comments

 
Previously I published a Quick Start Guide for avoiding concurrent pipeline executions.  That was blog post 1 of 2, and I wrote it for developers familiar with Azure, especially Azure AD service principals and ADF web activities.  For everyone else, I would like to provide more tips and screen prints, because if your brain works anything like mine, I put A and B together much better with a picture.  When I first started with ADF and accessed Microsoft Docs, I struggled with connecting the JSON examples provided with the ADF activity UIs (user interfaces).  If you had been near, you may have heard me sputtering, "what? What?  WHAT??!"  Enough said.

This blob post will follow the same step numbers as blog post 1, so if you started with the Quick Start Guide, you can find detail for those same steps right here.

Step #1: Create a service principal and record the client Id, client secret, and tenant Id for future use.
When you query the ADF log, you have to impersonate someone.  In ADF you do this through an Azure AD (Active Directory) application, also called a service principal.  You then must give this service principal permissions in the Data Factory.   If you have worked with SSIS, this is a similar concept.  You run SSIS scheduled jobs as <some AD account> and that account must have permissions to your source and destinations.
To create a service principal...
1.  Navigate to the Azure portal
2.  In the left menu, locate Azure Active Directory and click on it to open
3.  In Azure Active Directory blade, select App registrations (9th option under Overview)
​4.  Select New registration from the menu.​

For more information on creating service principals, please see Microsoft documentation here.

5. For this blog post, know that it is critical that the client ID, Client Secret and Tenant ID be retained in an appropriate place

Picture
Picture
Step #2: Create a file or SQL Server table to hold your environment properties. 
This can be a text file, SQL Server table, Cosmos DB document -- any source that can be accessed by ADF in a Lookup activity will work.  
SQL Server Metadata Table Example:
CREATE TABLE [etl].[ADF_EnvironmentProperties](
     [EnvironmetLongDescription] [nvarchar](50) NOT NULL,
     [EnvironmentShortDescription] [char](3) NOT NULL,
     [SubscriptionName] [nvarchar](100) NOT NULL,
     [SubscriptionID] [nvarchar](100) NOT NULL,
     [ResourceGroupName] [nvarchar](100) NOT NULL,
     [ADFName] [nvarchar](30) NOT NULL,
     [ADFServicePrincipalName] [nvarchar](100) NULL,
     [ADFTenantID] [nvarchar](100) NULL,
     [ADFClientID] [nvarchar](100) NULL,
     [ADFClientSecret] [nvarchar](100) NULL
) ON [PRIMARY]
JSON Metadata Table Example:
{
    "id": "EnvironmentProperties",
    "Environment": "Development",
    "SubscriptionName": "Your Sub Name",
    "SubscriptionID": "Your Sub ID",
    "ResourceGroup": "Your RG",
    "ADFName": "Your ADF Name",
    "ADFServicePrincipalName": "Your SP Name",
    "ADFTenantID": "Your SP Tenant ID",
    "ADFClientID": "Your SP Client ID",
    "ADFClientSecret": "Your SP Client Secret"
}
Picture
Step #3: Create a new pipeline with parameters and variables. 
 PipelineName as string (Pipeline name that will be passed by orchestrator Execute Pipeline activity)
MetadataDB as string.  (SQL Server DB name, text location or JSON file location where Environment Properties are stored.)
Picture
CurrentDateTime  as string, 
PriorDateTime as string,  
AccessToken as string
FailThePipeline as boolean.
Picture
Step #4: Add a Lookup activity to the pipeline named "Get Environment Properties"
If you used my SQL Server metadata script above, your lookup activity properties will look similar to the next screen print.  To read the data values stored in memory, ADF syntax would be @activity('Get environment Properties').output.firstRow.YourColumnName.  
Picture
Step #5: Add a Web activity to the pipeline named "Get Token"  ​
This activity can be a little more challenging by its very nature.  I struggle most with Body syntax of the web activity, but for you, just copy, paste and send me a wine and cheese basket for Christmas.
1 - @concat('https://login.microsoftonline.com/',activity('Get Environment Properties').output.firstRow.ADFTenantID,'/oauth2/token')
3 - application/x-www-form-urlencoded
4 - @concat('grant_type=client_credentials&resource=https%3A%2F%2Fmanagement.azure.com%2F&client_id=',activity('Get Environment Properties').output.firstRow.ADFClientID,'&client_secret=',activity('Get Environment Properties').output.firstRow.ADFClientSecret,'&scope=openid')​
Picture
Step #6: Add three Set Variable activities to the pipeline ​
This activity is easy breezy (named so for a hiking trail in Sedona, AZ).  I am not going to beat this to death here, but will drop a couple of screen prints and provide some syntax that can be modified to work best for you.
     CurrentDateTime variable value = @adddays(utcnow(), +1)
     PriorDateTime variable value = @adddays(utcnow(), -1)
     AccessToken variable value = @activity('Get Token').output.access_token
Picture
Picture
Step #7: Add a second web activity to the pipeline named "Get ADF Execution Metadata​"
​
 Syntax for this activity is provided in the Microsoft Doc here, but because the documentation leaves so much to the imagination (isn't that a nice way of saying it?), follow the screen prints and copy the syntax provided below.

1 - 
@concat('https://management.azure.com/subscriptions/', activity('Get Environment Properties').output.firstRow.SubscriptionID, '/resourceGroups/', activity('Get Environment Properties').output.firstRow.ResourceGroupName, '/providers/Microsoft.DataFactory/factories/', activity('Get Environment Properties').output.firstRow.ADFName, '/queryPipelineRuns?api-version=2018-06-01')

3 - application/json

4 - @concat('Bearer ',variables('AccessToken'))

​5 - 
{
"lastUpdatedAfter":"@{variables('PriorDateTime')}",
"lastUpdatedBefore":"@{variables('CurrentDateTime')}",
"filters":[
                  {"operand":"PipelineName",
                   "operator":"Equals",
                    "values":[
                                  "@{pipeline().parameters.PipelineName}"
                                  ]
                   },
                  {"operand":"Status",
                   "operator":"Equals",
                    "values":[
                                  "InProgress"
                                  ]
                   }
              ]
}
​
Picture
Picture
Step #8: Add an If Condition activity to the pipeline named "If Pipeline is Already Running" 
I have started to build a code library for ADF that will make a good blog post -- you know, the every-day things we can do in t-sql while we sleep but in ADF we learn anew.  One of these every-day coding items is Count(), which when working with a collection, this becomes a Length() function.  Refer to Microsoft's document of ADF syntax here. For now, to figure out if a pipeline is already running, you need something similar to this: @greater(length(activity('Get ADF Execution Metadata').output.value), 1)
Picture
Step #9: Add a Set Variable activity to the "If Pipeline is Already Running / If True Activities"
From heretofore, it is completely your call.  My motivation for pulling ADF log  data was to make sure that a single pipeline would never execute concurrently.  This is surely not the only use for querying the ADF log.  However to do this, I purposefully failed this ADF get metadata pipeline so that my calling "parent" pipeline would not continue.  ADF does not have a "quit" activity or "raise error" activity, so we are left to manually make some activity throw an error.  I opted for a Set Variable activity that tries to write a string value to a boolean variable.   Mission accomplished.
Picture
Step #10: Connect the pipeline activities all with green Success arrows.
I am including a screen shot of the completed pipeline so that you have a visual of how everything comes together.
Picture
Step #11: Add a Stored Procedure activity to each orchestrator pipeline
Continuing with my single use case, the final step is to add an Execute Pipeline activity to the "parent" or "calling" pipeline.  Basically, the "parent" is asking "Am I already running?  If so, return an error and I'll stop my execution".  Be sure to pass  the "parent" pipeline name to the ADF Get Metadata pipeline.  The calling pipeline will now only continue if the ADF Get Metadata pipeline -- or whatever you called it -- completes successfully.

Note: I only put this concurrency check in my orchestrator pipelines which control the data flow of multiple data ingestion pipelines.  Every activity has a duration price, so I don't want to be querying the ADF log unnecessarily.
Picture
Conclusion of the Matter: I admire people who can wax eloquent about Azure theory and great ideas.  Being a Microsoft tool stack mentor and consultant, I simply do not feel prepared unless I can say "I have a POC for that!"  I think of this as a "show me the money!" training technique.  This is exactly why I have just written a entire chapter in two blog posts.  Friend, "this is the money".  Go forth and query your ADF pipeline run metadata with confidence!
4 Comments
Jason
8/25/2021 08:46:10 am

This is extremely helpful. Thank you so much. I found two things when following the post.
- When you said "You then must give this service principal permissions in the Data Factory", this isn't very specific. I had to grant the service principle the Reader role through Access control (IAM) on the data factory.
- Your final IF condition checks for greater than 1 when I believe you intended it to be greater than 0.

Thank you again!

Reply
Lokesh N
10/11/2021 08:07:21 am

Thank you very much. Will try this !

Reply
Infylce- Azure-Training Chennai link
5/4/2022 10:14:11 pm

Hi Team, well made sense of about Azure Data Factory (ADF) Pipeline Runs - Query By Factory and Avoid Concurrent Pipeline Execution. Much obliged for your significant blog by your Team!!

Reply
Azure Training in chennai link
5/24/2022 08:06:38 am

Your Article is really impressive about Azure Data Factory (ADF) Pipeline Runs - Query By Factory & Avoid Concurrent Pipeline Executions. very clear and effective by simple steps

Reply



Leave a Reply.

    Categories

    All
    Agile Methodology
    Azure Blob Storage
    Azure Data Factory
    Azure Data Lake
    Azure Data Warehouse
    Azure SQL Database
    Cosmos DB
    Data Architecture
    Databricks
    Elastic Query
    External Tables
    Linked Services
    Migrating To The Cloud
    Parameters
    PolyBase
    Project Management

    RSS Feed

Powered by Create your own unique website with customizable templates.
  • Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About