• Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About
Microsoft Data & AI

All Things Azure

Azure Data Factory (ADFv1) JSON Example for Salesforce Initial Load RelationalSource Pipeline

4/25/2017

0 Comments

 
This blog post is intended for developers who are new to Azure Data Factory (ADF) and just want a working JSON example.  In another blog post here, I've given you the 10K foot view of how data flows through ADF from a developer's perspective.   This blog post assumes you understand ADF data flows and are now simply wish for a JSON example of a full initial data load into [somewhere].  The JSON provided here pulls data from Salesforce and creates output files in an Azure Data Lake.

Prerequisites:
1.  An established Azure subscription
2.  An Azure Data Factory resource
3.  An Azure Data Lake resource
4. An Azure Active Directory Application that has been given permissions in your Azure Data Lake
     Short Version to create AD application:  AD --> App registrations --> new application registration
     Short Version to give ADL permissions:  ADL --> Data Explorer --> click on top / root folder --> Access


JSON Scripts:
To extract data from Salesforce using ADF, we first need to create in ADF a linked service for Salesforce.  
{
  "name": "SalesforceDataConnection",
  "properties": {
    "type": "Salesforce",
    "typeProperties": {
      "username": "dbradish@pragmaticworks.com",
      "password": "MyPassword",
      "securityToken": "MySecurityTokenObtainedFromSalesforceUI",
      "environmentUrl": "test.salesforce.com"
    }
  },
}

Note: If you are pulling from an actual Salesforce data store, not test, your "environmentUrl" will be "login.salesforce.com".

We next need a linked service for Azure Data Lake.  
AAD = Azure Active Directory
ADL = Azure Data Lake

{
  "name": "ADLakeConnection",
  "properties": {
    "type": "AzureDataLakeStore",
    "typeProperties": {
      "dataLakeStoreUri": "https://MyDataLake.azuredatalakestore.net",
      "servicePrincipalId": "MyAAD-ApplicationID-ServicePrincipalId-ClientId-WhichAreAllTheSameThing",
      "servicePrincipalKey": "MyAAD-ApplicationID-key",
      "tenant": "MyAAD-DirectoryID",
      "subscriptionId": "MyADL-SubscriptionId",
      "resourceGroupName": "MyADL-ResourceGroup"
    }
  },
}

The ADF copy wizard will generate this JSON for you, but you still need to know your IDs, so let's talk about where we find all of this.  Assuming that you are currently logged in successfully to the Azure Portal ...

1  "datalakeStoreUri": Your Azure Data Lake --> Overview --> URL

​2  "servicePrincipalId": Your Azure Active Directory Application --> App registrations --> YourApplicationName --> (click on the application name to see properties) --> ApplicationID

3  "servicePrincipalKey": 
You must have recorded this when you created  the Azure Active Directory application!

4  "tenant": Your Azure Active Directory Application --> Properties --> Directory ID

5  "subscriptionId": Your Azure Data Lake --> Overview --> Subscription ID

6  "resourceGroupName": Your Azure Data Lake --> Overview --> Resource group

Here are a couple of screen prints to help you out.
from Azure Data Lake​
Picture
from Azure Active Directory --> App registrations
Picture
from Azure Active Directory --> Properties
Picture
 Note: your Azure Active Directory (AAD) service principal key aka Application ID (in ADF) aka Client ID (in SSIS), must be recorded at the time the Azure Active Directory application is created.  There is a pop-up message in AAD which clearly reads, "Copy the key value.  You won't be able to retrieve after you leave this blade."

Okay, did we survive the creation of an Azure Data Lake linked service?  If so, let's move on to creating an ADF input dataset for Salesforce.  This JSON uses the Salesforce Account table as an example.
{
  "name": "InputDataset-Account",
  "properties": {
    "structure": [
      {
        "name": "Id",
        "type": "String"
      },
      {
        "name": "IsDeleted",
        "type": "Boolean"
      },
      {
        "name": "MasterRecordId",
        "type": "String"
      },
      {
        "name": "Name",
        "type": "String"
      },
      {
        "name": "Type",
        "type": "String"
      },
<a plethora of more Account columns here. remove this line from your JSON script>
      {
        "name": "Is_Processed_by_DP__c",
        "type": "Boolean"
      }
    ],
    "published": false,
    "type": "RelationalTable",
    "linkedServiceName": "SalesforceDataConnection",
    "typeProperties": {
      "tableName": "Account"
    },
    "availability": {
      "frequency": "Day",
      "interval": 1
    },
    "external": true,
    "policy": {}
  },
}
You will need one input (and output) dataset for EVERY Salesforce table.  Furthermore, pipelines cannot share datasets, so if you want both an initial and incremental load pipeline, you will need TWO input datasets for every Salesforce table.  Personally, I feel this is an ADF travesty, but once I understood pipelines and SliceStart and SliceEnd times, it is understandable howbeit a terrific pain!  Okay here is a work-around tip for you:
1.  Use a Visual Studio Azure Data Factory solution for source code control.
2.  In the VS solution have one JSON script for initial load.  Have a second JSON script for incremental load
3.  In ADF, delete one pipeline before trying to create the other; in effect, you are toggling between the two JSON scripts using VS as your source code of reference.


Moving on,  we now need an ADF output dataset.  
{
  "name": "OutputDataset-Account",
  "properties": {
    "structure": [
      {
        "name": "Id",
        "type": "String"
      },
      {
        "name": "IsDeleted",
        "type": "Boolean"
      },
      {
        "name": "MasterRecordId",
        "type": "String"
      },
      {
        "name": "Name",
        "type": "String"
      },
      {
        "name": "Type",
        "type": "String"
      },
    <a plethora of more Account columns here.  Remove this line from your JSON script>
      {
        "name": "Is_Processed_by_DP__c",
        "type": "Boolean"
      }
    ],
    "published": false,
    "type": "AzureDataLakeStore",
    "linkedServiceName": "ADLakeConnection",
    "typeProperties": {
      "fileName": "Account {Year}{Month}{Day} {Hour}{Minute}.avro",
      "folderPath": "Salesforce/{Year}{Month}{Day}",
      "format": {
        "type": "AvroFormat"
      },
      "partitionedBy": [
        {
          "name": "Year",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "yyyy"
          }
        },
        {
          "name": "Month",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "MM"
          }
        },
        {
          "name": "Day",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "dd"
          }
        },
        {
          "name": "Hour",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "hh"
          }
        },
        {
          "name": "Minute",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "mm"
          }
        }
      ]
    },
    "availability": {
      "frequency": "Day",
      "interval": 1
    },
    "external": false,
    "policy": {}
  },
}
Why an AVRO data type?  At the writing of this blog post the SSIS Azure Feature Pack contains an Azure Data Lake Source component, but Microsoft forgot to include a text qualifier property.  Oops!   No one is perfect.

Why partitions?  This is how ADF creates time slices.  In this example, I wanted my Salesforce Account ADL files to sort into nice little MyADL\Salesforce\YYYYMMDD\Account YYYYMMDD HHMM.avro buckets.

Now for the grand finale, let's create an ADF pipeline.  We are almost home!
{
  "name": "InboundPipeline-SalesforceInitialLoad",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "RelationalSource",
            "query": "select * from Account"
          },
          "sink": {
            "type": "AzureDataLakeStoreSink",
            "writeBatchSize": 0,
            "writeBatchTimeout": "00:00:00"
          }
        },
        "inputs": [
          {
            "name": "InputDataset-Account"
          }
        ],
        "outputs": [
          {
            "name": "OutputDataset-Account"
          }
        ],
        "policy": {
          "timeout": "1.00:00:00",
          "concurrency": 1,
          "executionPriorityOrder": "NewestFirst",
          "style": "StartOfInterval",
          "retry": 3,
          "longRetry": 0,
          "longRetryInterval": "00:00:00"
        },
        "scheduler": {
          "frequency": "Day",
          "interval": 1
        },
        "name": "Activity-Account"
      },
     <Including Salesforce Opportunity here for your amazement or amusement. Remove this line from your JSON script>
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "RelationalSource",
            "query": "select * from Opportunity"
          },
          "sink": {
            "type": "AzureDataLakeStoreSink",
            "writeBatchSize": 0,
            "writeBatchTimeout": "00:00:00"
          }
        },
        "inputs": [
          {
            "name": "InputDataset-Opportunity"
          }
        ],
        "outputs": [
          {
            "name": "OutputDataset-Opportunity"
          }
        ],
        "policy": {
          "timeout": "1.00:00:00",
          "concurrency": 1,
          "executionPriorityOrder": "NewestFirst",
          "style": "StartOfInterval",
          "retry": 3,
          "longRetry": 0,
          "longRetryInterval": "00:00:00"
        },
        "scheduler": {
          "frequency": "Day",
          "interval": 1
        },
        "name": "Activity-Opportunity"
      }
    ],
    "start": "2017-04-17T19:48:55.667Z",
    "end": "2099-12-31T05:00:00Z",
    "isPaused": false,
    "pipelineMode": "Scheduled"
  },
}


Getting your new JSON scripts to execute on demand is another blog post.  Hint: Your "start" UTC time of your pipeline controls this!

Download all JSON scripts below.  

​Enjoy!
salesforcedataconnection.json
File Size: 0 kb
File Type: json
Download File

adlakeconnection.json
File Size: 0 kb
File Type: json
Download File

inputdataset-salesforceaccount.json
File Size: 0 kb
File Type: json
Download File

outputdataset-salesforceaccount.json
File Size: 2 kb
File Type: json
Download File

inboundpipeline-salesforceinitialload.json
File Size: 2 kb
File Type: json
Download File

0 Comments



Leave a Reply.

    Categories

    All
    Agile Methodology
    Azure Blob Storage
    Azure Data Factory
    Azure Data Lake
    Azure Data Warehouse
    Azure SQL Database
    Cosmos DB
    Data Architecture
    Databricks
    Elastic Query
    External Tables
    Linked Services
    Migrating To The Cloud
    Parameters
    PolyBase
    Project Management

    RSS Feed

Powered by Create your own unique website with customizable templates.
  • Home
  • Modeling for BI
  • DBA
  • Azure
  • SSIS
  • SSAS
  • SSRS - Reporting
  • PBI - Analytics
  • Consulting
  • About