다음을 통해 공유


Azure Data Factory Spark 작업을 사용하여 클라우드에서 데이터 변환

적용 대상: Azure Data Factory Azure Synapse Analytics

Microsoft Fabric의 데이터 팩토리는 더 간단한 아키텍처, 기본 제공 AI 및 새로운 기능을 갖춘 차세대 Azure 데이터 팩토리입니다. 데이터 통합을 접하는 경우 Fabric Data Factory부터 시작합니다. 기존 ADF 워크로드는 Fabric 업그레이드하여 데이터 과학, 실시간 분석 및 보고 전반에 걸쳐 새로운 기능에 액세스할 수 있습니다.

이 자습서에서는 Azure PowerShell 사용하여 Spark 작업 및 주문형 HDInsight 연결된 서비스를 사용하여 데이터를 변환하는 Data Factory 파이프라인을 만듭니다. 이 자습서에서 수행하는 단계는 다음과 같습니다.

  • 데이터 팩터리를 만듭니다.
  • 연결된 서비스를 작성하고 배포합니다.
  • 파이프라인을 작성하고 배포합니다.
  • 파이프라인 실행을 시작합니다.
  • 파이프라인 실행 현황을 모니터링합니다.

Azure 구독이 없는 경우 시작하기 전에 free 계정을 만듭니다.

사전 요구 사항

참고 사항

Azure Az PowerShell 모듈을 사용하여 Azure 상호 작용하는 것이 좋습니다. 시작하려면 Azure PowerShell을 설치하세요. Az PowerShell 모듈로 마이그레이션하는 방법을 알아보려면 Azure PowerShell을 AzureRM에서 Az로 마이그레이션을 참조하세요.

  • Azure Storage 계정. Python 스크립트 및 입력 파일을 만들고 Azure 스토리지에 업로드합니다. Spark 프로그램의 출력은 이 스토리지 계정에 저장됩니다. 주문형 Spark 클러스터는 기본 스토리지와 동일한 스토리지 계정을 사용합니다.
  • Azure PowerShell. Azure PowerShell 지침을 따릅니다.

Blob Storage 계정에 Python 스크립트 업로드

  1. 다음 콘텐츠를 사용하여 WordCount_Spark.py라는 Python 파일을 만듭니다.

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName>을 Azure Storage 계정의 이름으로 바꿉니다. 그런 다음 파일을 저장합니다.

  3. 귀하의 Azure Blob Storage에 adftutorial이라는 컨테이너가 없는 경우 만들기하십시오.

  4. spark라는 폴더를 만듭니다.

  5. spark 폴더 아래에 script라는 하위 폴더를 만듭니다.

  6. script 하위 폴더에 WordCount_Spark.py 파일을 업로드합니다.

입력 파일 업로드

  1. 일부 텍스트가 포함된 minecraftstory.txt라는 파일을 만듭니다. Spark 프로그램은 이 텍스트의 단어 수를 계산합니다.
  2. inputfiles 폴더에 spark이라는 하위 폴더를 만듭니다.
  3. minecraftstory.txt 하위 폴더에 inputfiles를 업로드합니다.

저자 연결된 서비스

이 섹션에서는 두 개의 연결된 서비스를 작성합니다.

  • 데이터 팩터리에 Azure Storage 계정을 연결하는 Azure Storage 연결 서비스입니다. 이 스토리지는 주문형 HDInsight 클러스터에서 사용됩니다. 실행될 Spark 스크립트도 포함되어 있습니다.
  • 온디맨드 HDInsight 연결 서비스. Azure Data Factory 자동으로 HDInsight 클러스터를 만들고, Spark 프로그램을 실행한 다음, 미리 구성된 시간 동안 유휴 상태인 후 HDInsight 클러스터를 삭제합니다.

연결된 서비스 Azure Storage

원하는 편집기를 사용하여 JSON 파일을 만들고, Azure Storage 연결된 서비스의 다음 JSON 정의를 복사한 다음, 파일을 MyStorageLinkedService.json 저장합니다.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

<storageAccountName> 및 <storageAccountKey> Azure Storage 계정의 이름과 키로 업데이트합니다.

주문형 HDInsight 링크된 서비스

원하는 편집기를 사용하여 JSON 파일을 만들고, Azure HDInsight 연결된 서비스의 다음 JSON 정의를 복사하고, 파일을 MyOnDemandSparkLinkedService.json 저장합니다.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

연결된 서비스 정의에서 다음 속성 값을 업데이트합니다.

  • hostSubscriptionId <subscriptionID> Azure 구독의 ID로 바꿉니다. 이 구독에 주문형 HDInsight 클러스터가 만들어집니다.
  • 테넌트입니다. <tenantID> Azure 테넌트의 ID로 대체합니다.
  • servicePrincipalId, servicePrincipalKey - <servicePrincipalID> 및 <servicePrincipalKey> Microsoft Entra ID 서비스 주체의 ID 및 키로 바꿉니다. 이 서비스 주체는 클러스터를 만든 구독 또는 리소스 그룹의 참가자 역할의 구성원이어야 합니다. 자세한 내용은 Microsoft Entra 애플리케이션 및 서비스 주체 만들기 참조하세요. 서비스 주체 ID는 ‘애플리케이션 ID’와 동일하고, 서비스 주체 키는 ‘클라이언트 비밀’ 값과 동일합니다.
  • clusterResourceGroup - <resourceGroupOfHDICluster>를 HDInsight 클러스터를 만들어야 하는 리소스 그룹의 이름으로 바꿉니다.

참고 사항

Azure HDInsight 지원하는 각 Azure 지역에서 사용할 수 있는 총 코어 수에 제한이 있습니다. 주문형 HDInsight 연결된 서비스의 경우 HDInsight 클러스터는 기본 스토리지로 사용되는 Azure Storage 동일한 위치에 만들어집니다. 클러스터를 성공적으로 만드는 데 충분한 코어 할당량이 있는지 확인합니다. 자세한 내용은 Hadoop, Spark, Kafka 등으로 HDInsight에서 클러스터 설정을 참조하세요.

파이프라인 작성

이 단계에서는 Spark 작업이 있는 새 파이프라인을 만듭니다. 이 작업은 단어 개수 샘플을 사용합니다. 아직 다운로드하지 않았으면 이 위치에서 콘텐츠를 다운로드합니다.

원하는 편집기에서 JSON 파일을 만들고, 다음과 같은 파이프라인 정의에 대한 JSON 정의를 복사하고, MySparkOnDemandPipeline.json으로 저장합니다.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

다음 사항에 유의하세요.

  • rootPath는 adftutorial 컨테이너의 spark 폴더를 가리킵니다.
  • entryFilePath는 spark 폴더의 script 하위 폴더에 있는 WordCount_Spark.py 파일을 가리킵니다.

데이터 팩터리 만들기

JSON 파일에서 연결된 서비스 및 파이프라인 정의를 작성했습니다. 이제 데이터 팩터리를 만들고 PowerShell cmdlet을 사용하여 연결된 서비스 및 파이프라인 JSON 파일을 배포해 보겠습니다. 다음 PowerShell 명령을 개별적으로 실행합니다.

  1. 개별적으로 변수를 설정합니다.

    리소스 그룹 이름

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Data Factory 이름. 전역적으로 고유해야 합니다.

    $dataFactoryName = "MyDataFactory09102017"
    

    파이프라인 이름

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. PowerShell을 시작합니다. 이 빠른 시작이 끝날 때까지 Azure PowerShell 열어 두세요. 닫은 후 다시 여는 경우 명령을 다시 실행해야 합니다. Data Factory를 현재 사용할 수 있는 Azure 지역 목록의 경우 다음 페이지에서 관심 있는 지역을 선택한 다음 Analytics 확장하여 데이터 팩터리: 지역별 사용 가능한 프로덕트를 찾습니다. 데이터 팩터리에서 사용하는 데이터 저장소(Azure Storage, Azure SQL Database 등) 및 컴퓨팅(HDInsight 등)은 다른 지역에 있을 수 있습니다.

    다음 명령을 실행하고 Azure 포털에 로그인하는 데 사용하는 사용자 이름과 암호를 입력합니다.

    Connect-AzAccount
    

    다음 명령을 실행하여 이 계정의 모든 구독을 확인합니다.

    Get-AzSubscription
    

    다음 명령을 실행하여 사용하려는 구독을 선택합니다. SubscriptionId를 Azure 구독의 ID로 바꿉니다.

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. ADFTutorialResourceGroup 리소스 그룹을 만듭니다.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. 데이터 팩터리를 만듭니다.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    출력을 보려면 다음 명령을 실행합니다.

    $df
    
  5. JSON 파일을 만든 폴더로 전환하고 다음 명령을 실행하여 Azure Storage 연결된 서비스를 배포합니다.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. 다음 명령을 실행하여 주문형 Spark 연결된 서비스를 배포합니다.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. 다음 명령을 실행하여 파이프라인을 배포합니다.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

파이프라인 실행 시작 및 모니터링

  1. 파이프라인 실행을 시작합니다. 또한 향후 모니터링을 위해 파이프라인 실행 ID를 캡처합니다.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. 다음 스크립트를 실행하여 완료될 때까지 파이프라인 실행 상태를 계속 확인합니다.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. 샘플 실행의 출력은 다음과 같습니다.

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Spark 프로그램의 출력을 사용하여 outputfiles라는 폴더가 adftutorial 컨테이너의 spark 폴더에 만들어졌는지 확인합니다.

이 샘플의 파이프라인은 Azure Blob Storage의 한 위치에서 다른 위치로 데이터를 복사합니다. 다음 방법에 대해 알아보았습니다.

  • 데이터 팩터리를 만듭니다.
  • 연결된 서비스를 작성하고 배포합니다.
  • 파이프라인을 작성하고 배포합니다.
  • 파이프라인 실행을 시작합니다.
  • 파이프라인 실행 현황을 모니터링합니다.

가상 네트워크에 있는 Azure HDInsight 클러스터에서 Hive 스크립트를 실행하여 데이터를 변환하는 방법을 알아보려면 다음 자습서로 진행합니다.

Tutorial: Azure Virtual Network Hive를 사용하여 데이터를 변환합니다.