Data Factory
  • 31 Oct 2024
  • 2 Minutes to read
  • Dark
    Light
  • PDF

Data Factory

  • Dark
    Light
  • PDF

Article summary

In this sample we will create a BAM transaction based on a Data Factory Pipeline. In this case the pipeline processes files we receive from branch offices.

The BAM view shows the users the times the pipeline has ran and times there has been an error.

image.png

Walk Thru Video

In the video you will see how we setup this scenario and some of the features that are available.

Parent Transaction Query

The below query will get the latest log row for each run of this pipeline and project out the columns I want

let inputSubscriptionId = "08a281b8-3b07-4219-a517-b11230e9b34f";
let inputResourceGroupName = "EAI_APP_DAILYFILEREPLICATION";
let inputDataFactory = "KV-EAI-FILEMOVES-DF";
let inputPipelineName = "MoveFiles";
//Query for the log event for the pipeline being queued to start, we can also project properties if needed
//for example | extend sourceLocation = Parameters_SourceStore_Location_s
AzureDiagnostics   
| where ResourceProvider == "MICROSOFT.DATAFACTORY"
| where Category == "PipelineRuns"
| where SubscriptionId == inputSubscriptionId
| where ResourceGroup == inputResourceGroupName
| where Resource == inputDataFactory
| where pipelineName_s == inputPipelineName
//Summarize here to get the latest record for each pipeline run
| summarize arg_max(TimeGenerated, *) by CorrelationId
//Extend this column to have a friendly mappable status
| extend OverallStatus = case(
    status_s == 'Succeeded', 'Succeeded',
    status_s == 'Failed', 'Failed',
    'In Progress')
//These columns are common to datafactory
| extend StartTime = SystemParameters_ExecutionStart_t
| extend EndTime = end_t
| extend PipelineName = pipelineName_s
| extend RunId = runId_g
| extend Duration = EndTime - StartTime
//These columns are my custom properties
| extend SourceLocation = Parameters_SourceStore_Location_s
| extend SourceDirectory = Parameters_SourceStore_Directory_s
| extend DestinationLocation = Parameters_DestinationStore_Location_s
| extend DestinationDirectory = Parameters_DestinationStore_Directory_s
//Project our just the columns I want
| project TimeGenerated, Resource, OperationName, OverallStatus, CorrelationId, RunId, PipelineName, StartTime, EndTime, SourceLocation, SourceDirectory, DestinationLocation, DestinationDirectory, Duration
| order by TimeGenerated desc 

Stage in Transaction Query

//Provide the input parameters for this query
let inputCorrelationId = {CorrelationId};
let inputSubscriptionId = "08a281b8-3b07-4219-a517-b11230e9b34f";
let inputResourceGroupName = "EAI_APP_DAILYFILEREPLICATION";
let inputDataFactory = "KV-EAI-FILEMOVES-DF";
let inputPipelineName = "MoveFiles";
//Query for the log event for the pipeline being queued to start, we can also project properties if needed
//for example | extend sourceLocation = Parameters_SourceStore_Location_s
AzureDiagnostics   
| where ResourceProvider == "MICROSOFT.DATAFACTORY"
| where Category == "PipelineRuns"
| where SubscriptionId == inputSubscriptionId
| where ResourceGroup == inputResourceGroupName
| where Resource == inputDataFactory
| where pipelineName_s == inputPipelineName
| where CorrelationId == inputCorrelationId
//Get the most recent record which will be success or failure
| summarize arg_max(TimeGenerated, *) by CorrelationId
//Map the status
| extend OverallStatus = case(
    status_s == 'Succeeded', 'Succeeded',
    status_s == 'Failed', 'Failed',
    'In Progress')
//This will create a url we can use for accessing via the azure portal
| extend PortalUrl = strcat("https://adf.azure.com/en/monitoring/pipelineruns/", runId_g, "?factory=%2Fsubscriptions%2F", inputSubscriptionId, "%2FresourceGroups%2F", inputResourceGroupName, "%2Fproviders%2FMicrosoft.DataFactory%2Ffactories%2F", inputDataFactory)

Was this article helpful?

What's Next