Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
APPLIES TO:
Azure Data Factory
Azure Synapse Analytics
Tip
Data Factory in Microsoft Fabric is de volgende generatie van Azure Data Factory, met een eenvoudigere architectuur, ingebouwde AI en nieuwe functies. Als u nieuw bent in gegevensintegratie, begint u met Fabric Data Factory. Bestaande ADF-workloads kunnen upgraden naar Fabric om toegang te krijgen tot nieuwe mogelijkheden voor gegevenswetenschap, realtime analyses en rapportage.
In deze quickstart maakt u een data factory met behulp van Python. De pijplijn in deze data factory kopieert gegevens van de ene map naar een andere map in Azure Blob Storage.
Azure Data Factory is een cloudservice voor gegevensintegratie waarmee u gegevensgestuurde werkstromen kunt maken voor het organiseren en automatiseren van gegevensverplaatsing en gegevenstransformatie. Met Azure Data Factory kunt u gegevensgestuurde werkstromen maken en plannen, pijplijnen genoemd.
Pijplijnen kunnen gegevens uit verschillende gegevensopslagplaatsen opnemen. Pijplijnen verwerken of transformeren gegevens met behulp van rekenservices zoals Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics en Azure Machine Learning. Pijplijnen publiceren uitvoergegevens naar gegevensarchieven zoals Azure Synapse Analytics voor BI-toepassingen (Business Intelligence).
Vereisten
Een Azure-account met een actief abonnement. Maak gratis een account.
Azure Storage Explorer (optioneel).
Een toepassing in Microsoft Entra ID. Maak de toepassing door de stappen in deze koppeling te volgen, verificatieoptie 2 (toepassingsgeheim) te gebruiken en de toepassing toe te wijzen aan de rol Inzender door de instructies in hetzelfde artikel te volgen. Noteer de volgende waarden, zoals wordt weergegeven in het artikel om in latere stappen te gebruiken: Toepassings-id (client), clientgeheimwaarde en tenant-id.
Een invoerbestand maken en uploaden
Start Kladblok. Kopieer de volgende tekst en sla deze op schijf op in het bestand input.txt.
John|Doe Jane|DoeGebruik hulpprogramma's zoals Azure Storage Explorer om de container adfv2tutorial en input in de container te maken. Vervolgens kunt u het bestand input.txt uploaden naar de map input.
Het Python-pakket installeren
Open een terminal of opdrachtprompt met beheerdersbevoegdheden.
Installeer eerst het Python-pakket voor Azure beheerbronnen:
pip install azure-mgmt-resourceVoer de volgende opdracht uit om het Python-pakket voor Data Factory te installeren:
pip install azure-mgmt-datafactoryDe Python SDK voor Data Factory ondersteunt Python 2.7 en 3.6+.
Voer de volgende opdracht uit om het Python-pakket voor Azure-identiteitsverificatie te installeren:
pip install azure-identityNotitie
Het pakket 'azure-identity' bevat mogelijk conflicten met 'azure-cli' voor enkele algemene afhankelijkheden. Als u een verificatieprobleem ondervindt, verwijdert u 'azure-cli' en de bijbehorende afhankelijkheden, of gebruik een schone machine zonder het pakket 'azure-cli' te installeren om het te laten werken. Voor onafhankelijke clouds moet u de juiste cloudspecifieke constanten gebruiken. Raadpleeg Connect to all regions using Azure libraries for Python Multi-cloud | Microsoft Docs voor instructies om verbinding te maken met Python in onafhankelijke clouds.
Een data factory-client maken
Maak een bestand met de naam datafactory.py. Voeg de volgende instructies toe om verwijzingen naar naamruimten toe te voegen.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeVoeg de volgende functies voor het afdrukken van informatie toe.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))Voeg de volgende code toe aan de methode Main om een instantie van de klasse DataFactoryManagementClient te maken. U gebruikt dit object om de data factory, een gekoppelde service, gegevenssets en een pijplijn te maken. U kunt dit object ook gebruiken om de details van de pijplijnuitvoering te controleren. Stel subscription_id variabele in op de id van uw Azure-abonnement. Voor een lijst met Azure regio's waarin Data Factory momenteel beschikbaar is, selecteert u de regio's die u interesseren op de volgende pagina en vouwt u vervolgens Analytics uit om Data Factory uit te vouwen: Products beschikbaar per regio. De gegevensarchieven (Azure Storage, Azure SQL Database, enzovoort) en berekeningen (HDInsight, enzovoort) die door data factory worden gebruikt, kunnen zich in andere regio's bevinden.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Een data factory maken
Voeg de volgende code toe aan de methode Main om een data factory te maken. Als uw resourcegroep al bestaat, zet dan de eerste create_or_update-instructie in commentaar.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Een gekoppelde service maken
Voeg de volgende code toe aan de methode Main waarmee een Azure Storage gekoppelde service wordt gemaakt.
U maakt gekoppelde services in een gegevensfabriek om uw gegevensopslagplaatsen en compute-services met de gegevensfabriek te verbinden. In deze quickstart hoeft u slechts één Azure Storage-gekoppelde service te maken die zowel als kopieerbron als doelopslag fungeert, met de naam 'AzureStorageLinkedService'. Vervang <storageaccountname> en <storageaccountkey> door de naam en sleutel van uw Azure Storage-account.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Gegevenssets maken
In deze sectie maakt u twee gegevenssets: één voor de bron en de andere voor de sink.
Een gegevensset maken voor bron-Azure-blob
Voeg de volgende code toe aan de Main-methode waarmee een Azure blobgegevensset wordt gemaakt. Zie Azure blobconnector voor meer informatie over eigenschappen van Azure Blob-gegevensset.
U definieert een gegevensset die de brongegevens in Azure Blob vertegenwoordigt. Deze Blob-gegevensset verwijst naar de Azure Storage gekoppelde service die u in de vorige stap hebt gemaakt.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Een gegevensset maken voor sink Azure Blob
Voeg de volgende code toe aan de Main-methode waarmee een Azure blobgegevensset wordt gemaakt. Zie Azure blobconnector voor meer informatie over eigenschappen van Azure Blob-gegevensset.
U definieert een gegevensset die de brongegevens in Azure Blob vertegenwoordigt. Deze Blob-gegevensset verwijst naar de Azure Storage gekoppelde service die u in de vorige stap hebt gemaakt.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Een pipeline maken
Voeg de volgende code toe aan de methode Main om een pijplijn met een kopieeractiviteit te maken.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Een pijplijnrun starten
Voeg de volgende code toe aan de methode Main om een pijplijnuitvoering te activeren.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Een pijplijnuitvoering controleren
Als u de uitvoering van de pijplijn wilt volgen, voegt u de volgende code toe aan de methode Main:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Voeg nu de volgende instructie toe om de methode Main aan te roepen wanneer het programma wordt uitgevoerd:
# Start the main method
main()
Het volledige script
Dit is de volledige Python code:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
De code uitvoeren
Bouw en start de toepassing en controleer vervolgens de uitvoering van de pijplijn.
In de console wordt de voortgang weergegeven van het maken van een datafabriek, een gekoppelde dienst, datareeksen, een pijplijn en een pijplijnuitvoering. Wacht totdat u details ziet van de uitvoering van de kopieeractiviteit, waaronder de omvang van de gelezen/weggeschreven gegevens. Gebruik vervolgens hulpprogramma's zoals Azure Storage explorer om te controleren of de blob(s) is gekopieerd naar 'outputBlobPath' van 'inputBlobPath' zoals u hebt opgegeven in variabelen.
Hier volgt een voorbeeld van uitvoer:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Hulpbronnen opschonen
Als u de data factory wilt verwijderen, voegt u de volgende code toe aan het programma:
adf_client.factories.delete(rg_name, df_name)
Gerelateerde inhoud
De pijplijn in dit voorbeeld kopieert gegevens van de ene locatie naar een andere in Azure Blob Storage. Doorloop de zelfstudies voor meer informatie over het gebruiken van Data Factory in andere scenario's.