Incrementeel gegevens laden van Azure SQL Database naar Azure Blob Storage met behulp van PowerShell

Van toepassing op: Azure Data Factory Azure Synapse Analytics

Tip

Data Factory in Microsoft Fabric is de volgende generatie van Azure Data Factory, met een eenvoudigere architectuur, ingebouwde AI en nieuwe functies. Als u nieuw bent in gegevensintegratie, begint u met Fabric Data Factory. Bestaande ADF-workloads kunnen upgraden naar Fabric om toegang te krijgen tot nieuwe mogelijkheden voor gegevenswetenschap, realtime analyses en rapportage.

In deze zelfstudie gebruikt u Azure Data Factory om een pijplijn te maken waardoor deltagegevens uit een tabel in Azure SQL Database worden geladen naar de Azure Blob Storage.

In deze zelfstudie voert u de volgende stappen uit:

  • Bereid de gegevensopslag voor om de watermerkwaarde in op te slaan.
  • Een data factory maken.
  • Maak gekoppelde services.
  • Maak bron-, sink- en grenswaardegegevenssets.
  • Een pipeline maken.
  • Voer de pijplijn uit.
  • Controleer de pijplijnuitvoering.

Overzicht

Hier volgt de diagramoplossing op hoog niveau:

Stapsgewijs gegevens laden

Dit zijn de belangrijke stappen voor het maken van deze oplossing:

  1. Selecteer de watermerkkolom. Selecteer één kolom in de brongegevensopslag die kan worden gebruikt om de nieuwe of bijgewerkte records voor elke uitvoering te segmenteren. Normaal gesproken nemen de gegevens in deze geselecteerde kolom (bijvoorbeeld, last_modify_time of id) toe wanneer de rijen worden gemaakt of bijgewerkt. De maximale waarde in deze kolom wordt gebruikt als grenswaarde.

  2. Bereid een gegevensopslag voor om de watermerkwaarde op te slaan.
    In deze zelfstudie slaat u de watermerkwaarde op in een SQL-database.

  3. Maak een pijplijn met de volgende werkstroom:

    De pijplijn in deze oplossing heeft de volgende activiteiten:

    • Maak twee opzoekactiviteiten. Gebruik de eerste Lookup-activiteit om de laatste watermerkwaarde op te halen. Gebruik de tweede opzoekactiviteit om de nieuwe grenswaarde op te halen. Deze watermerkwaarden worden doorgegeven aan de Copy activity.
    • Maak een Copy activity waarmee rijen uit het brongegevensarchief worden gekopieerd met de waarde van de watermerkkolom groter dan de oude grenswaarde en kleiner dan of gelijk aan de nieuwe watermerkwaarde. Vervolgens worden de deltagegevens uit de brongegevensopslag als een nieuw bestand gekopieerd naar een Blob-opslag.
    • Maak een StoredProcedure-activiteit die de watermarkwaarde bijwerkt voor de pijplijn die de volgende keer wordt uitgevoerd.

Als u geen Azure-abonnement hebt, maakt u een free-account voordat u begint.

Vereisten

Notitie

U wordt aangeraden de Azure Az PowerShell-module te gebruiken om te communiceren met Azure. Zie Install Azure PowerShell om aan de slag te gaan. Zie Migrate Azure PowerShell van AzureRM naar Az voor meer informatie over het migreren naar de Az PowerShell-module.

  • Azure SQL Database. U gebruikt de database als de brongegevensopslag. Als u geen database in Azure SQL Database hebt, raadpleegt u Database maken in Azure SQL Database voor stappen om er een te maken.
  • Azure Storage. U gebruikt de Blob-opslag als de sinkgegevensopslag. Als u geen opslagaccount hebt, raadpleegt u het artikel Een opslagaccount maken voor de stappen om er een te maken. Maak een container met de naam adftutorial.
  • Azure PowerShell. Volg de instructies in Azure PowerShell installeren en configureren.

Een gegevensbrontabel maken in uw SQL-database

  1. Open SQL Server Management Studio. Klik in Server Explorer met de rechtermuisknop op de database en kies Nieuwe query.

  2. Voer de volgende SQL-opdracht uit voor de SQL-database om een tabel met de naam data_source_table te maken als de gegevensbronopslag:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    In deze zelfstudie gebruikt u LastModifytime als de watermerkkolom. De gegevens in de brongegevensopslag worden weergegeven in de volgende tabel:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

Nog een tabel in uw SQL-database maken om de bovengrenswaarde op te slaan

  1. Voer de volgende SQL-opdracht uit op de SQL-database om een tabel met de naam watermarktable te maken om de grenswaarde op te slaan:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Stel de standaardwaarde van de bovengrens in met de tabelnaam van de brongegevensopslag. In deze handleiding heet de tabel data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Controleer de gegevens in de tabel watermarktable.

    Select * from watermarktable
    

    Uitvoer:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Een opgeslagen procedure maken in uw SQL-database

Voer de volgende opdracht uit om een opgeslagen procedure te maken in uw SQL-database:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Een data factory maken

  1. Definieer een variabele voor de naam van de resourcegroep die u later gaat gebruiken in PowerShell-opdrachten. Kopieer de volgende opdrachttekst naar PowerShell, geef een naam op voor de Azure resourcegroep tussen dubbele aanhalingstekens en voer de opdracht uit. Een voorbeeld is "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Als de resourcegroep al bestaat, wilt u waarschijnlijk niet dat deze overschreven wordt. Wijs een andere waarde toe aan de $resourceGroupName-variabele en voer de opdracht opnieuw uit.

  2. Definieer een variabele voor de locatie van de data factory.

    $location = "East US"
    
  3. Voer de volgende opdracht uit om de Azure resourcegroep te maken:

    New-AzResourceGroup $resourceGroupName $location
    

    Als de resourcegroep al bestaat, wilt u waarschijnlijk niet dat deze overschreven wordt. Wijs een andere waarde toe aan de $resourceGroupName-variabele en voer de opdracht opnieuw uit.

  4. Definieer een variabele voor de naam van de data factory.

    Belangrijk

    Werk de naam van de gegevensfabriek zodanig bij dat deze wereldwijd uniek is. Bijvoorbeeld: ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. Voer de volgende cmdlet Set AzDataFactoryV2 uit om de data factory te maken:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

Let op de volgende punten:

  • De naam van de data factory moet wereldwijd uniek zijn. Als de volgende fout zich voordoet, wijzigt u de naam en probeert u het opnieuw:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Als u Data Factory-exemplaren wilt maken, moet het gebruikersaccount dat u gebruikt om u aan te melden bij Azure lid zijn van inzender- of eigenaarsrollen, of een beheerder van het Azure-abonnement.

  • Voor een lijst met Azure regio's waarin Data Factory momenteel beschikbaar is, selecteert u de regio's die u interesseren op de volgende pagina en vouwt u vervolgens Analytics uit om Data Factory uit te vouwen: Products beschikbaar per regio. De gegevensarchieven (Storage, SQL Database, Azure SQL Managed Instance, enzovoort) en berekeningen (Azure HDInsight, enzovoort) die door de data factory worden gebruikt, kunnen zich in andere regio's bevinden.

Gekoppelde services maken

U maakt gekoppelde services in een Data Factory om uw datastores en compute-services aan de Data Factory te koppelen. In deze sectie maakt u gekoppelde services met uw opslagaccount en SQL Database.

Een gekoppelde Storage-service maken

  1. Maak een JSON-bestand met de naam AzureStorageLinkedService.json in de map C:\ADF folder met de volgende inhoud. (Maak de map ADF als deze nog niet bestaat.) Vervang en <accountName> door <accountKey> de naam en sleutel van uw opslagaccount voordat u het bestand opslaat.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. Schakel in PowerShell over naar de map ADF.

  3. Voer de cmdlet Set-AzDataFactoryV2LinkedService uit om de gekoppelde service AzureStorageLinkedService te maken. In het volgende voorbeeld geeft u de waarden door voor de parameters ResourceGroupName en DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Hier volgt een voorbeeld van uitvoer:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Een gekoppelde SQL Database-service maken

  1. Maak een JSON-bestand met de naam AzureSQLDatabaseLinkedService.json in de map C:\ADF folder met de volgende inhoud. (Maak de map ADF als deze nog niet bestaat.) Vervang <uw servernaam> en <uw databasenaam> door de naam van uw server en database voordat u het bestand opslaat. U moet uw Azure SQL-server ook configureren om toegang te verlenen tot de beheerde identiteit van uw data factory.

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. Schakel in PowerShell over naar de map ADF.

  3. Voer de cmdlet Set-AzDataFactoryV2LinkedService uit om de gekoppelde service AzureSQLDatabaseLinkedService te maken.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Hier volgt een voorbeeld van uitvoer:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

Gegevenssets maken

In deze stap maakt u gegevenssets om de bron- en sinkgegevens te vertegenwoordigen.

Een brongegevensset maken

  1. Maak een JSON-bestand met de naam SourceDataset.json in dezelfde map met de volgende inhoud:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    In deze zelfstudie wordt de tabelnaam data_source_table gebruikt. Vervang deze naam als u een tabel gebruikt met een andere naam.

  2. Voer de cmdlet Set-AzDataFactoryV2Dataset uit om de gegevensset SourceDataset te maken.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Hier volgt een uitvoervoorbeeld van de cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Een 'sink dataset' maken

  1. Maak een JSON-bestand met de naam SinkDataset.json in dezelfde map met de volgende inhoud:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    Belangrijk

    In dit fragment wordt ervan uitgegaan dat u een blobcontainer hebt met de naam adftutorial in uw blobopslag. Maak de container als deze bestaat niet of stel deze in op de naam van een bestaande container. De uitvoermap incrementalcopy wordt automatisch gemaakt als deze niet bestaat in de container. In deze zelfstudie wordt de bestandsnaam dynamisch gegenereerd met behulp van de expressie @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. Voer de cmdlet Set-AzDataFactoryV2Dataset uit om de gegevensset SinkDataset te maken.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Hier volgt een uitvoervoorbeeld van de cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

Een gegevensset voor een watermerk maken

In deze stap maakt u een gegevensset voor het opslaan van een bovengrenswaarde.

  1. Maak een JSON-bestand met de naam WatermarkDataset.json in dezelfde map met de volgende inhoud:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Voer de cmdlet Set-AzDataFactoryV2Dataset uit om de gegevensset WatermarkDataset te maken.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Hier volgt een uitvoervoorbeeld van de cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

Een pipeline maken

In deze zelfstudie maakt u een pijplijn met twee Lookup-activiteiten, één Kopieer-activiteit en één StoredProcedure-activiteit die in één pijplijn is gekoppeld.

  1. Maak een JSON-bestand met de naam IncrementalCopyPipeline.json in dezelfde map met de volgende inhoud:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Voer de cdmlet Set-AzDataFactoryV2Pipeline uit om de pijplijn IncrementalCopyPipeline te maken.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Hier volgt een voorbeeld van uitvoer:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

De pijplijn uitvoeren

  1. Voer de pijplijn IncrementalCopyPipeline uit met behulp van de cmdlet Invoke-AzDataFactoryV2Pipeline. Vervang plaatsaanduidingen door de namen van uw eigen resourcegroep en data factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Controleer de status van de pijplijn door de cmdlet Get-AzDataFactoryV2ActivityRun uit te voeren totdat alle activiteiten worden uitgevoerd. Vervang tijdelijke aanduidingen door uw eigen juiste tijd voor de parameters RunStartedAfter en RunStartedBefore. In deze zelfstudie gaat u -RunStartedAfter "2017/09/14" en -RunStartedBefore "2017/09/15" gebruiken.

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Hier volgt een voorbeeld van uitvoer:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

De resultaten bekijken

  1. In de Blob-opslag (sinkopslag) ziet u dat de gegevens zijn gekopieerd naar het bestand dat is gedefinieerd in SinkDataset. In de huidige tutorial is de bestandsnaam Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Open het bestand. U ziet records in het bestand die hetzelfde zijn als de gegevens in de SQL-database.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. Controleer de laatste waarde van watermarktable. U ziet dat de watermerkwaarde is bijgewerkt.

    Select * from watermarktable
    

    Hier volgt een voorbeeld van uitvoer:

    TableName WatermarkValue
    gegevensbron_tabel 2017-09-05 8:06:00.000

Gegevens invoegen in de gegevensbronopslag om te controleren of de deltagegevens worden geladen

  1. Voeg nieuwe gegevens in de SQL-database (brongegevensopslag) in.

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    De bijgewerkte gegevens in de SQL-database zijn:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Voer de pijplijn IncrementalCopyPipeline opnieuw uit met behulp van de cmdlet Invoke-AzDataFactoryV2Pipeline. Vervang plaatsaanduidingen door de namen van uw eigen resourcegroep en data factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Controleer de status van de pijplijn door de cmdlet Get-AzDataFactoryV2ActivityRun uit te voeren totdat alle activiteiten worden uitgevoerd. Vervang tijdelijke aanduidingen door uw eigen juiste tijd voor de parameters RunStartedAfter en RunStartedBefore. In deze zelfstudie gaat u -RunStartedAfter "2017/09/14" en -RunStartedBefore "2017/09/15" gebruiken.

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Hier volgt een voorbeeld van uitvoer:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. In de Blob-opslag ziet u dat een ander bestand is gemaakt. In deze tutorial is de nieuwe bestandsnaam Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Als u dit bestand opent, ziet u twee rijen met records.

  5. Controleer de laatste waarde van watermarktable. U ziet dat de watermerkwaarde opnieuw is bijgewerkt.

    Select * from watermarktable
    

    voorbeelduitvoer:

    TableName WatermarkValue
    gegevensbron_tabel 2017-09-07 09:01:00.000

In deze zelfstudie hebt u de volgende stappen uitgevoerd:

  • Bereid de gegevensopslag voor om de watermerkwaarde in op te slaan.
  • Een data factory maken.
  • Maak gekoppelde services.
  • Maak bron-, sink- en grenswaardegegevenssets.
  • Een pipeline maken.
  • Voer de pijplijn uit.
  • Controleer de pijplijnuitvoering.

In deze zelfstudie heeft de pijplijn gegevens uit één tabel in Azure SQL Database naar Blob Storage gekopieerd. Ga naar de volgende zelfstudie voor meer informatie over het kopiëren van gegevens uit meerdere tabellen in een SQL Server-database naar SQL Database.