Transformera data i molnet med hjälp av Spark-aktivitet i Azure Data Factory

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Tips

Data Factory i Microsoft Fabric är nästa generations Azure Data Factory, med en enklare arkitektur, inbyggd AI och nya funktioner. Om dataintegrering är nytt för dig börjar du med Fabric Data Factory. Befintliga ADF-arbetsbelastningar kan uppgraderas till Fabric för att få åtkomst till nya funktioner inom datavetenskap, realtidsanalys och rapportering.

I den här självstudien använder du Azure PowerShell för att skapa en Data Factory-pipeline som transformerar data med Spark-aktivitet och en länkad HDInsight-tjänst på begäran. I de här självstudierna går du igenom följande steg:

  • Skapa en datafabrik.
  • Skapa och distribuera länkade tjänster.
  • Skapa och distribuera en pipeline.
  • Starta en pipelineskörning.
  • Övervaka pipelinekörningen.

Om du inte har en Azure-prenumeration skapar du ett free-konto innan du börjar.

Förutsättningar

Kommentar

Vi rekommenderar att du använder modulen Azure Az PowerShell för att interagera med Azure. Kom igång genom att läsa Installera Azure PowerShell. Information om hur du migrerar till Az PowerShell-modulen finns i Migrera Azure PowerShell från AzureRM till Az.

  • Azure Storage konto. Du skapar ett Python skript och en indatafil och laddar upp dem till Azure lagring. Spark-programmets utdata lagras på det här lagringskontot. Spark-klustret på begäran använder samma lagringskonto som den primära lagringen.
  • Azure PowerShell. Följ anvisningarna i Så här installerar och konfigurerar du Azure PowerShell.

Ladda upp Python skript till ditt Blob Storage-konto

  1. Skapa en Python fil med namnet WordCount_Spark.py med följande innehåll:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Ersätt <storageAccountName> med namnet på ditt Azure Storage konto. Spara sedan filen.

  3. I Azure Blob Storage skapar du en container med namnet adftutorial om den inte finns.

  4. Skapa en mapp med namnet spark.

  5. Skapa en undermapp med namnet script under mappen spark.

  6. Överför filen WordCount_Spark.py till undermappen script.

Överföra indatafilen

  1. Skapa en fil med namnet minecraftstory.txt med lite text. Spark-programmet räknar antalet ord i texten.
  2. Skapa en undermapp med namnet inputfiles i mappen spark.
  3. Överför minecraftstory.txt till mappen inputfiles.

Skapa länkade tjänster

Du har skapat två länkade tjänster i det här avsnittet:

  • En Azure Storage länkad tjänst som länkar ett Azure Storage-konto till datafabriken. Den här lagringen används av HDInsight-kluster på begäran. Den innehåller också Spark-skriptet som ska köras.
  • En på begäran länkad HDInsight-tjänst. Azure Data Factory skapar automatiskt ett HDInsight-kluster, kör Spark-programmet och tar sedan bort HDInsight-klustret när det har varit inaktivt under en förkonfigurerad tid.

Azure Storage länkad tjänst

Skapa en JSON-fil med den redigerare du föredrar, kopiera följande JSON-definition av en Azure Storage länkad tjänst och spara sedan filen som MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Uppdatera <storageAccountName> och <storageAccountKey> med namnet och nyckeln för ditt Azure Storage konto.

På begäran länkad HDInsight-tjänst

Skapa en JSON-fil med den redigerare du föredrar, kopiera följande JSON-definition av en Azure HDInsight länkad tjänst och spara filen som MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Uppdatera värden för följande egenskaper i definitionen för den länkade tjänsten:

  • hostSubscriptionId. Ersätt <subscriptionID> med ID:t för din Azure-prenumeration. Klustret HDInsight på begäran skapas i den här prenumerationen.
  • klient. Ersätt <tenantID> med ID för din Azure klientorganisation.
  • servicePrincipalId, servicePrincipalKey. Ersätt <servicePrincipalID> och <servicePrincipalKey> med ID och nyckeln för tjänstens huvudnamn i Microsoft Entra ID. Tjänstens huvudnamn måste vara medlem av deltagarrollen för prenumerationen eller resursgruppen där klustret har skapats. Mer information finns i skapa Microsoft Entra program och tjänstens huvudnamn. Tjänstens huvudnamns-ID motsvarar program-ID :t och en nyckel för tjänstens huvudnamn motsvarar värdet för en klienthemlighet.
  • clusterResourceGroup. Ersätt <resourceGroupOfHDICluster> med namnet på resursgruppen som HDInsight-klustret ska skapas i.

Kommentar

Azure HDInsight har begränsningar för det totala antalet kärnor som du kan använda i varje Azure region som stöds. För den länkade HDInsight-tjänsten på begäran skapas HDInsight-klustret på samma plats som den Azure Storage som används som primär lagringsplats. Se till att du har tillräckligt med kärnkvoter så att klustret kan skapas framgångsrikt. Mer information finns i Set up clusters in HDInsight with Hadoop, Spark, Kafka, and more (Konfigurera kluster i HDInsight med Hadoop, Spark, Kafka med mera).

Skapa en pipeline

I det här steget kan du skapa en ny pipeline med en Spark-aktivitet. Aktiviteten använder exemplet ordräkning. Hämta innehållet från den här platsen om du inte redan gjort det.

Skapa en JSON-fil med önskat redigeringsprogram, kopiera följande JSON-definition för en pipelinedefinition och spara filen som MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Observera följande:

  • rootPath pekar på Spark-mappen i containern adftutorial.
  • entryFilePath pekar på filen WordCount_Spark.py i skriptets undermapp i Spark-mappen.

Skapa en datafabrik

Du har skapat definitioner för länkad tjänst och pipeline i JSON-filer. Nu ska vi skapa en datafabrik och distribuera de länkade JSON-filerna för tjänsten och pipelinen med hjälp av PowerShell-cmdletar. Kör följande PowerShell-kommandon ett i taget:

  1. Ange variabler en i taget.

    Namn på resursgrupp

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Namn på datafabrik. Måste vara globalt unikt

    $dataFactoryName = "MyDataFactory09102017"
    

    Namn på pipeline

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Starta PowerShell. Håll Azure PowerShell öppna till slutet av den här snabbstarten. Om du stänger och öppnar det igen måste du köra kommandona en gång till. Om du vill ha en lista över Azure regioner där Data Factory är tillgängligt för närvarande väljer du de regioner som intresserar dig på följande sida och expanderar sedan Analytics för att hitta Data Factory: Produkter tillgängliga per region. Datalager (Azure Storage, Azure SQL Database osv.) och beräkningar (HDInsight osv.) som används av datafabriken kan finnas i andra regioner.

    Kör följande kommando och ange det användarnamn och lösenord som du använder för att logga in på Azure-portalen:

    Connect-AzAccount
    

    Kör följande kommando för att visa alla prenumerationer för det här kontot:

    Get-AzSubscription
    

    Kör följande kommando för att välja den prenumeration som du vill arbeta med. Ersätt SubscriptionId med ID:t för din Azure-prenumeration:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Skapa resursgruppen: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Skapa datafabriken.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Kör följande kommando för att se utdata:

    $df
    
  5. Växla till mappen där du skapade JSON-filer och kör följande kommando för att distribuera en Azure Storage länkad tjänst:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Kör följande kommando för att distribuera en länkad Spark-tjänst på begäran:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Kör följande kommando för att distribuera en pipeline:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Starta och övervaka en pipelinekörning

  1. Starta en pipelineskörning. Den samlar även in pipelinekörningens ID för kommande övervakning.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Kör följande skript för att kontinuerligt kontrollera pipelinekörningens status tills den är klar.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Här är utdata från exempelkörningen:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Bekräfta att en mapp med namnet outputfiles har skapats i mappen spark i adftutorial-containern med utdata från Spark-programmet.

Pipeline i det här exemplet kopierar data från en plats till en annan i en Azure Blob-lagring. Du har lärt dig att:

  • Skapa en datafabrik.
  • Skapa och distribuera länkade tjänster.
  • Skapa och distribuera en pipeline.
  • Starta en pipelineskörning.
  • Övervaka pipelinekörningen.

Gå vidare till nästa självstudie för att lära dig hur du transformerar data genom att köra Hive-skript på ett Azure HDInsight kluster som finns i ett virtuellt nätverk.