Gegevens incrementeel laden uit meerdere tabellen in SQL Server naar Azure SQL Database door middel van PowerShell

Van toepassing op: Azure Data Factory Azure Synapse Analytics

Tip

Data Factory in Microsoft Fabric is de volgende generatie van Azure Data Factory, met een eenvoudigere architectuur, ingebouwde AI en nieuwe functies. Als u nieuw bent in gegevensintegratie, begint u met Fabric Data Factory. Bestaande ADF-workloads kunnen upgraden naar Fabric om toegang te krijgen tot nieuwe mogelijkheden voor gegevenswetenschap, realtime analyses en rapportage.

In deze handleiding maakt u een Azure Data Factory met een pijplijn waarmee deltagegevens uit meerdere tabellen in een SQL Server-database worden geladen naar een Azure SQL Database.

In deze zelfstudie voert u de volgende stappen uit:

  • Bereid de bron- en doelgegevensopslag voor.
  • Een data factory maken.
  • Een zelf-hostende Integration Runtime maken.
  • De Integration Runtime installeren.
  • Maak gekoppelde services.
  • Maak bron-, afvoer- en tijdsstempelgegevenssets.
  • Maak, voer uit en controleer een pijplijn.
  • Controleer de resultaten.
  • Gegevens in brontabellen toevoegen of bijwerken.
  • De pijplijn opnieuw uitvoeren en controleren.
  • De eindresultaten bekijken.

Overzicht

Dit zijn de belangrijke stappen voor het maken van deze oplossing:

  1. Selecteer de watermerkkolom.

    Selecteer één kolom voor elke tabel in de brongegevensopslag, waarmee u de nieuwe of bijgewerkte records kunt identificeren elke keer dat het wordt uitgevoerd. Normaal gesproken nemen de gegevens in deze geselecteerde kolom (bijvoorbeeld, last_modify_time of id) toe wanneer de rijen worden gemaakt of bijgewerkt. De maximale waarde in deze kolom wordt gebruikt als grenswaarde.

  2. Bereid een gegevensopslag voor om de watermerkwaarde in op te slaan.

    In deze zelfstudie slaat u de watermerkwaarde op in een SQL-database.

  3. Maak een pijplijn met de volgende activiteiten:

    1. Maak een ForEach-activiteit die door een lijst met namen van gegevensbrontabellen loopt, die als parameter is doorgegeven aan de pijplijn. Voor elke brontabel roept deze de volgende activiteiten voor het laden van de deltagegevens voor deze tabel op.

    2. Maak twee opzoekactiviteiten. Gebruik de eerste lookup-activiteit om de laatste watermerkwaarde op te halen. Gebruik de tweede opzoekactiviteit om de nieuwe grenswaarde op te halen. Deze watermerkwaarden worden doorgegeven aan de Copy activity.

    3. Maak een Copy activity waarmee rijen uit het brongegevensarchief worden gekopieerd met de waarde van de watermerkkolom groter dan de oude grenswaarde en kleiner dan of gelijk aan de nieuwe watermerkwaarde. Vervolgens worden de deltagegevens uit het brongegevensarchief gekopieerd naar Azure Blob-opslag als een nieuw bestand.

    4. Maak een Stored Procedure-activiteit waarmee de watermark-waarde wordt bijgewerkt voor de pijplijn die de volgende keer uitgevoerd wordt.

    Hier volgt de diagramoplossing op hoog niveau:

    Stapsgewijs gegevens laden

Als u geen Azure-abonnement hebt, maakt u een free-account voordat u begint.

Vereisten

  • SQL Server. In deze zelfstudie gebruikt u een SQL Server-database als brongegevensarchief.
  • Azure SQL Database. U gebruikt een database in Azure SQL Database als de sinkgegevensopslag. Als u geen SQL-database hebt, raadpleegt u Maak een database in Azure SQL Database voor stappen om er een te maken.

Brontabellen maken in uw SQL Server-database

  1. Open SQL Server Management Studio (SSMS) of Visual Studio Code en maak verbinding met uw SQL Server-database.

  2. Klik in Server Explorer (SSMS) of in het deelvenster Connections (Visual Studio Code) met de rechtermuisknop op de database en kies Nieuwe query.

  3. Voer de volgende SQL-opdracht uit op uw database om tabellen te maken met de naam customer_table en project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Doeltabellen maken in uw Azure SQL Database

  1. Open SQL Server Management Studio (SSMS) of Visual Studio Code en maak verbinding met uw SQL Server-database.

  2. Klik in Server Explorer (SSMS) of in het deelvenster Connections (Visual Studio Code) met de rechtermuisknop op de database en kies Nieuwe query.

  3. Voer de volgende SQL-opdracht uit op uw database om tabellen te maken met de naam customer_table en project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Een andere tabel maken in Azure SQL Database om de hoge grenswaarde op te slaan

  1. Voer de volgende SQL-opdracht uit op uw database om een tabel met de naam watermarktable te maken om de bovengrenswaarde op te slaan:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Initiële watermerkwaarden voor beide brontabellen in de watermerktabel invoegen.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Een opgeslagen procedure maken in de Azure SQL Database

Voer de volgende opdracht uit om een opgeslagen procedure te maken in uw database. Deze opgeslagen procedure werkt de bovengrenswaarde bij elke pijplijnuitvoering bij.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Gegevenstypen en aanvullende opgeslagen procedures maken in Azure SQL Database

Voer de volgende query uit om twee opgeslagen procedures en twee gegevenstypen in uw database te maken. Deze worden gebruikt voor het samenvoegen van de gegevens uit de brontabellen in doeltabellen.

Om de reis gemakkelijk te beginnen, gebruiken we direct deze Stored Procedures waarbij de deltagegevens worden doorgegeven via een tabelvariabele en vervolgens samengevoegd in de doellocatie. Let op dat er niet een 'groot' aantal deltarijen (meer dan 100) wordt verwacht bij de tabelvariabele.

Als u een groot aantal deltarijen in het doelarchief moet samenvoegen, stellen we voor dat u de kopieeractiviteit gebruikt om alle deltagegevens naar een tijdelijke "voorbereidingstabel" in het doelarchief te kopiëren. Vervolgens moet u uw eigen opgeslagen procedure maken zonder een tabelvariabele te gebruiken om ze te combineren van de "voorbereidingstabel" naar de "definitieve tabel".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Installeer de nieuwste Azure PowerShell-modules door de instructies in Install and configure Azure PowerShell te volgen.

Een data factory maken

  1. Definieer een variabele voor de naam van de resourcegroep die u later gaat gebruiken in PowerShell-opdrachten. Kopieer de volgende opdrachttekst naar PowerShell, geef een naam op voor de Azure resourcegroep tussen dubbele aanhalingstekens en voer de opdracht uit. Een voorbeeld is "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Als de resourcegroep al bestaat, wilt u waarschijnlijk niet deze overschrijven. Wijs een andere waarde toe aan de $resourceGroupName-variabele en voer de opdracht opnieuw uit.

  2. Definieer een variabele voor de locatie van de data factory.

    $location = "East US"
    
  3. Voer de volgende opdracht uit om de Azure resourcegroep te maken:

    New-AzResourceGroup $resourceGroupName $location
    

    Als de resourcegroep al bestaat, wilt u waarschijnlijk niet deze overschrijven. Wijs een andere waarde toe aan de $resourceGroupName-variabele en voer de opdracht opnieuw uit.

  4. Definieer een variabele voor de naam van de data factory.

    Belangrijk

    Werk de naam van de data-factory zodanig bij dat deze wereldwijd uniek is. Bijvoorbeeld: ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Voer de volgende cmdlet Set AzDataFactoryV2 uit om de data factory te maken:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Let op de volgende punten:

  • De naam van de datafabriek moet wereldwijd uniek zijn. Als de volgende fout zich voordoet, wijzigt u de naam en probeert u het opnieuw:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Als u Data Factory-exemplaren wilt maken, moet het gebruikersaccount dat u gebruikt om u aan te melden bij Azure lid zijn van inzender- of eigenaarsrollen, of een beheerder van het Azure-abonnement.

  • Voor een lijst met Azure regio's waarin Data Factory momenteel beschikbaar is, selecteert u de regio's die u interesseren op de volgende pagina en vouwt u vervolgens Analytics uit om Data Factory uit te vouwen: Products beschikbaar per regio. De gegevensarchieven (Azure Storage, SQL Database, SQL Managed Instance, enzovoort) en berekeningen (Azure HDInsight, enzovoort) die door de data factory worden gebruikt, kunnen zich in andere regio's bevinden.

Zelf gehoste Integration Runtime maken

In deze sectie maakt u een zelf-hostende Integration Runtime en koppelt u deze aan een on-premises machine aan de SQL Server-database. De zelf-hostende Integration Runtime is het onderdeel dat gegevens kopieert van SQL Server op uw computer naar Azure SQL Database.

  1. Maak een variabele voor de naam van de Integration Runtime. Gebruik een unieke naam en noteer deze. U gaat deze verderop in de zelfstudie gebruiken.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Een zelf-hostende Integration Runtime maken.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Hier is een voorbeelduitvoer:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Voer de volgende opdracht uit om de status van de gemaakte Integration Runtime op te halen. Controleer of de waarde van de status eigenschap is ingesteld op NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Hier is een voorbeelduitvoer:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Voer de volgende opdracht uit om de verificatiesleutels op te halen die worden gebruikt voor het registreren van de zelf-hostende Integration Runtime met Azure Data Factory-service in de cloud:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Hier is een voorbeelduitvoer:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Kopieer een van de sleutels (zonder de dubbele aanhalingstekens) voor de registratie van de zelf-gehoste integratie-runtime, die u op uw computer zal installeren in de volgende stappen.

Het hulpprogramma Integration Runtime installeren

  1. Als u de Integration Runtime al op uw computer heeft, verwijdert u deze met behulp van Programma's toevoegen of verwijderen.

  2. Download de zelf-hostende Integration Runtime op een lokale Windows computer. Voer de installatie uit.

  3. Selecteer op de pagina Welcome to Microsoft Integration Runtime Setup de optie Next.

  4. Ga op de pagina Gebruiksrechtovereenkomst akkoord met de voorwaarden en de gebruiksrechtovereenkomst en selecteer Volgende.

  5. Selecteer Volgende op de pagina Doelmap.

  6. Selecteer op de pagina Klaar om Microsoft Integration Runtime te installerenInstalleren.

  7. Selecteer op de pagina Completed the Microsoft Integration Runtime SetupFinish.

  8. Plak op de pagina Register Integration Runtime (zelf-hostend) de sleutel die u in de vorige sectie hebt opgeslagen en selecteer Register.

    De Integration Runtime registreren

  9. Selecteer op de pagina Nieuw Integration Runtime (zelf-hostend) knooppuntFinish.

  10. Als de zelf-gehoste Integration Runtime succesvol is geregistreerd, ziet u het volgende bericht:

    Registratie is voltooid

  11. Selecteer op de pagina Register Integration Runtime (zelfgehost)Launch Configuration Manager.

  12. Wanneer het knooppunt is verbonden met de cloudservice, ziet u de volgende pagina:

    De pagina 'Knooppunt is verbonden'

  13. Test nu de connectiviteit met uw SQL Server-database.

    Tabblad Diagnostische gegevens

    a. Ga op de pagina Configuration Manager naar het tabblad Diagnostics.

    b. Selecteer SqlServer als het type gegevensbron.

    c. Voer de naam van de server in.

    d. Voer de naam van de database in.

    e. Selecteer de verificatiemethode.

    f. Voer de gebruikersnaam in.

    g. Voer het wachtwoord in dat bij de gebruikersnaam hoort.

    h. Selecteer Test om te bevestigen dat de Integration Runtime verbinding kan maken met SQL Server. U ziet een groen vinkje als het gelukt is om verbinding te maken. U ziet een foutbericht als er geen verbinding kan worden gemaakt. Los eventuele problemen op en zorg ervoor dat de integration runtime verbinding kan maken met SQL Server.

    Notitie

    Noteer de waarden voor het verificatietype, de server, de database, de gebruiker en het wachtwoord. Je gebruikt ze verderop in deze zelfstudie.

Gekoppelde services maken

Je maakt gekoppelde services in een datafactory om je gegevensopslagplaatsen en rekenservices aan de datafactory te koppelen. In deze sectie maakt u gekoppelde services aan uw SQL Server-database en uw database in Azure SQL Database.

De gekoppelde SQL Server-service maken

In deze stap koppelt u uw SQL Server-database aan de data factory.

  1. Maak een JSON-bestand met de naam SqlServerLinkedService.json in de map C:\ADFTutorials\IncCopyMultiTableTutorial (maak de lokale mappen als deze nog niet bestaan) met de volgende inhoud. Selecteer de juiste sectie op basis van de verificatie die u gebruikt om verbinding te maken met SQL Server.

    Belangrijk

    Selecteer de juiste sectie op basis van de verificatie die u gebruikt om verbinding te maken met SQL Server.

    Als u gebruikmaakt van SQL-verificatie, moet u de volgende JSON-definitie kopiëren:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Als u Windows authentication gebruikt, kopieert u de volgende JSON-definitie:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Belangrijk

    • Selecteer de juiste sectie op basis van de verificatie die u gebruikt om verbinding te maken met SQL Server.
    • Vervang <integratieruntime-naam> met de naam van uw integratieruntime.
    • Vervang <servername>, <databasenaam>, <username> en <password> met waarden van uw SQL Server-database voordat u het bestand opslaat.
    • Als u een slash wilt gebruiken (\) in de naam van uw gebruikersaccount of server, moet u het escapeteken (\) gebruiken. Een voorbeeld is mydomain\\myuser.
  2. Voer in PowerShell de volgende cmdlet uit om over te schakelen naar de map C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Voer de cmdlet Set-AzDataFactoryV2LinkedService uit om de gekoppelde service AzureStorageLinkedService te maken. In het volgende voorbeeld geeft u de waarden door voor de parameters ResourceGroupName en DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Hier is een voorbeelduitvoer:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

De gekoppelde SQL Database-service maken

  1. Maak een JSON-bestand met de naam AzureSQLDatabaseLinkedService.json in de map C:\ADFTutorials\IncCopyMultiTableTutorial met de volgende inhoud. (Maak de map ADF als deze nog niet bestaat.) Vervang <servername>, <databasenaam>, <gebruikernaam> en <password> met de naam van uw SQL Server-database, naam van uw database, gebruikersnaam en wachtwoord voordat u het bestand opslaat.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. Voer in PowerShell de cmdlet Set-AzDataFactoryV2LinkedService uit om de gekoppelde service AzureSQLDatabaseLinkedService te maken.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Hier is een voorbeelduitvoer:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Gegevenssets maken

In deze stap maakt u gegevenssets die de gegevensbron, het gegevensdoel en de locatie voor het opslaan van het watermerk vertegenwoordigen.

Een brongegevensset maken

  1. Maak een JSON-bestand met de naam SourceDataset.json in dezelfde map met de volgende inhoud:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    De Copy activity in de pijplijn maakt gebruik van een SQL-query om de gegevens te laden in plaats van de hele tabel te laden.

  2. Voer de cmdlet Set-AzDataFactoryV2Dataset uit om de gegevensset SourceDataset te maken.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Hier volgt een uitvoervoorbeeld van de cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Een sinkgegevensset maken

  1. Maak een JSON-bestand met de naam SinkDataset.json in dezelfde map met de volgende inhoud. Het element tableName wordt in runtime dynamisch ingesteld door de pijplijn. De ForEach-activiteit in de pijplijn doorloopt een lijst met namen van tabellen en geeft de tabelnaam door aan deze gegevensset in elke iteratie.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Voer de cmdlet Set-AzDataFactoryV2Dataset uit om de gegevensset SinkDataset te maken.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Hier volgt een uitvoervoorbeeld van de cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Een gegevensset maken voor een watermerk

In deze stap maakt u een gegevensset voor het opslaan van een bovengrenswaarde.

  1. Maak een JSON-bestand met de naam WatermarkDataset.json in dezelfde map met de volgende inhoud:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Voer de cmdlet Set-AzDataFactoryV2Dataset uit om de gegevensset WatermarkDataset te maken.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Hier volgt een uitvoervoorbeeld van de cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Een pipeline maken

In deze pijplijn wordt een lijst met tabelnamen gebruikt als parameter. De ForEach-activiteit doorloopt de lijst met namen van tabellen en voert de volgende bewerkingen uit:

  1. Gebruik de opzoekactiviteit voor het ophalen van de oude bovengrenswaarde (de initiële waarde, of de waarde die is gebruikt in de laatste iteratie).

  2. Gebruik de opzoekactiviteit voor het ophalen van de nieuwe bovengrenswaarde (de maximale waarde van de bovengrenskolom in de brontabel).

  3. Gebruik de Copy activity om gegevens tussen deze twee watermerkwaarden van de brondatabase naar de doeldatabase te kopiëren.

  4. Gebruik de opgeslagen-procedureactiviteit voor het bijwerken van de oude bovengrenswaarde die in de eerste stap van de volgende iteratie moet worden gebruikt.

Maak de pijplijn

  1. Maak een JSON-bestand met de naam IncrementalCopyPipeline.json in dezelfde map met de volgende inhoud:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Voer de cdmlet Set-AzDataFactoryV2Pipeline uit om de pijplijn IncrementalCopyPipeline te maken.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Hier is een voorbeelduitvoer:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

De pijplijn uitvoeren

  1. Maak een parameterbestand met de naam Parameters.json in dezelfde map met de volgende inhoud:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Voer de pijplijn IncrementalCopyPipeline uit met behulp van de cmdlet Invoke-AzDataFactoryV2Pipeline. Vervang plaatsaanduidingen door de namen van uw eigen resourcegroep en data factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

De pijplijn bewaken

  1. Meld u aan bij de Azure-portal.

  2. Selecteer Alle services, zoek met het trefwoord Datafactories en selecteer Datafactories.

  3. Zoek naar uw data factory in de lijst met data factory’s, en selecteer deze om de pagina Data Factory te openen.

  4. Selecteer op de pagina Data factoryOpen op de tegel Open Azure Data Factory Studio om Azure Data Factory op een afzonderlijk tabblad te starten.

  5. Selecteer op de startpagina van Azure Data Factory Monitor aan de linkerkant.

    Schermafbeelding toont de startpagina voor Azure Data Factory.

  6. U kunt alle pijplijnactiviteiten en hun status zien. Merk op dat in het volgende voorbeeld de status van de pijplijnuitvoering Geslaagd is. U kunt parameters controleren die zijn doorgegeven aan de pijplijn door de koppeling in de kolom Parameters te selecteren. Als er een fout is opgetreden, ziet u een koppeling in de kolom Fout.

    Schermopname toont pijplijnuitvoeringen voor een datafabriek, inclusief uw pijplijn.

  7. Wanneer u de koppeling in de kolom Acties selecteert, ziet u alle uitvoeringen van activiteiten voor de pijplijn.

  8. Ga terug naar de weergave Pijplijnuitvoeringen door Alle pijplijnuitvoeringen te selecteren.

De resultaten bekijken

Voer in SQL Server Management Studio de volgende query's uit op de DOEL-SQL-database om te controleren of de gegevens zijn gekopieerd van brontabellen naar doeltabellen:

Query

select * from customer_table

Uitvoer

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Query

select * from project_table

Uitvoer

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Query

select * from watermarktable

Uitvoer

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

U ziet dat de bovengrenswaarden voor beide tabellen zijn bijgewerkt.

Meer gegevens toevoegen aan de brontabellen

Voer de volgende query uit op de brondatabase SQL Server om een bestaande rij in customer_table bij te werken. Voeg een nieuwe rij in project_table in.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Voer de pijplijn opnieuw uit

  1. Voer nu de pijplijn opnieuw uit door de volgende PowerShell-opdracht te gebruiken:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Volg de pijplijnuitvoeringen met behulp van de instructies in de sectie De pijplijn bewaken. Als de pijplijnstatus In uitvoering is, ziet u een andere actiekoppeling onder Acties om de pijplijn te annuleren.

  3. Klik op Vernieuwen om de lijst te vernieuwen totdat de uitvoering van de pijplijn is voltooid.

  4. Selecteer desgewenst de koppeling Uitvoeringen van activiteit weergeven onder Acties om alle activiteitsuitvoeringen te bekijken die gekoppeld zijn aan deze pijplijnuitvoering.

De eindresultaten bekijken

Voer in SQL Server Management Studio de volgende query's uit op de doeldatabase om te controleren of de bijgewerkte/nieuwe gegevens zijn gekopieerd van brontabellen naar doeltabellen.

Query

select * from customer_table

Uitvoer

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Let op de nieuwe waarden van Name en LastModifytime voor de PersonID voor nummer 3.

Query

select * from project_table

Uitvoer

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Let erop dat de invoer van NewProject toegevoegd is aan project_table.

Query

select * from watermarktable

Uitvoer

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

U ziet dat de bovengrenswaarden voor beide tabellen zijn bijgewerkt.

In deze zelfstudie hebt u de volgende stappen uitgevoerd:

  • Bereid de bron- en doelgegevensopslag voor.
  • Een data factory maken.
  • Een zelf-gehoste integration runtime (IR) maken.
  • De Integration Runtime installeren.
  • Maak gekoppelde services.
  • Maak bron-, afvoer- en tijdsstempelgegevenssets.
  • Maak, voer uit en controleer een pijplijn.
  • Controleer de resultaten.
  • Gegevens in brontabellen toevoegen of bijwerken.
  • De pijplijn opnieuw uitvoeren en controleren.
  • De eindresultaten bekijken.

Ga naar de volgende zelfstudie voor meer informatie over het transformeren van gegevens met behulp van een Spark-cluster op Azure: