Del via


Brug Livy-API'en til at sende og udføre sessionsjob

Gælder for:✅ Fabric Data Engineering og Data Science

Lær hvordan du indsender Spark-sessionsopgaver ved hjælp af Livy API for Fabric Data Engineering.

Forudsætninger

Livy-API'en definerer et samlet slutpunkt for handlinger. Erstat pladsholderne {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID} med de relevante værdier, når du følger eksemplerne i denne artikel.

Konfigurer Visual Studio Code til din Livy API-session

  1. Vælg Lakehouse Settings i din Fabric Lakehouse.

    Skærmbillede, der viser indstillingerne for Lakehouse.

  2. Gå til sektionen Livy-slutpunkt .

    skærmbillede, der viser Lakehouse Livy-endpoint og sessionjob forbindelsesstreng.

  3. Kopier sessionsjob-forbindelsesstreng (den første røde boks i billedet) til din kode.

  4. Navigér til Microsoft Entra-administrationscenter og kopier både applikations-(klient-)ID og Directory (tenant) ID til din kode.

    Skærmbillede, der viser oversigt over Livy API-appen i Microsoft Entra-administrationscenter.

Autentificér en Livy API Spark-session ved enten at bruge et Microsoft Entra-brugertoken eller et Microsoft Entra SPN-token

Autentificér en Livy API Spark-session ved hjælp af et Microsoft Entra SPN-token

  1. Opret en notesbog .ipynb i Visual Studio Code og indsæt følgende kode.

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. Kør notebook-cellen. Du burde se Microsoft Entra-tokenet returneret.

    Skærmbillede, der viser det Microsoft Entra SPN-token, der blev returneret efter at have kørt cell.

Autentificér en Livy API Spark-session ved hjælp af et Microsoft Entra-brugertoken

  1. Opret en notesbog .ipynb i Visual Studio Code og indsæt følgende kode.

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Livy API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Required — execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",         # Required — read lakehouse metadata
        "https://api.fabric.microsoft.com/Code.AccessFabric.All",      # Required — general Fabric API access from Spark Runtime
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",     # Required — access OneLake and Azure storage from Spark Runtime
    ]
    
    # Optional scopes — add these only if your Spark jobs need access to the corresponding services:
    #    "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All"     # Optional — access Azure Key Vault from Spark Runtime
    #    "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All"     # Optional — access Azure Data Lake Storage Gen1 from Spark Runtime
    #    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All" # Optional — access Azure Data Explorer from Spark Runtime
    #    "https://api.fabric.microsoft.com/Code.AccessSQL.All"               # Optional — access Azure SQL audience tokens from Spark Runtime
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Microsoft Entra tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. Kør notebook-cellen. Du burde se Microsoft Entra-tokenet returneret.

    Skærmbillede, der viser det Microsoft Entra brugertoken, der returneres efter at have kørt cell.

Forståelse af Code.* omfang for Livy API'en

Når dine Spark-jobs kører via Livy API'en, styrer scopes, Code.* hvilke eksterne tjenester Spark Runtime kan få adgang til på vegne af den autentificerede bruger. To er påkrævet; Resten er valgfrie afhængigt af din arbejdsbyrde.

Påkrævede Code.* scopes

Omfanget Beskrivelse
Code.AccessFabric.All Gør det muligt at få adgangstokens til Microsoft Fabric. Påkrævet for alle Livy API-operationer.
Code.AccessStorage.All Gør det muligt at få adgangstokene til OneLake og Azure-lagring. Påkrævet for at læse og skrive data i søhuse.

Valgfrie Code.* scopes

Tilføj kun disse scopes, hvis dine Spark-jobs har brug for adgang til de tilsvarende Azure-tjenester under runtime.

Omfanget Beskrivelse Hvornår skal du bruge?
Code.AccessAzureKeyvault.All Gør det muligt at få adgangstokene til Azure Key Vault. Din Spark-kode henter hemmeligheder, nøgler eller certifikater fra Azure Key Vault.
Code.AccessAzureDataLake.All Gør det muligt at få adgangstoks til Azure Data Lake Storage Gen1. Din Spark-kode læser fra eller skriver til Azure Data Lake Storage Gen1-konti.
Code.AccessAzureDataExplorer.All Gør det muligt at få adgangstokene til Azure Data Explorer (Kusto). Din Spark-kode forespørger eller indlæser data til/fra Azure Data Explorer-klynger.
Code.AccessSQL.All Gør det muligt at få adgangstokene til Azure SQL. Din Spark-kode skal kunne forbindes til Azure SQL-databaser.

Bemærkning

Og Lakehouse.Execute.AllLakehouse.Read.All skoperne er også påkrævet, men er ikke en del af Code.* familien. De giver tilladelse til at udføre operationer i og læse metadata fra Fabric søhuse.

Opret en Livy API Spark-session

Tips

Hvis din arbejdsbyrde kræver, at du udfører flere Spark-udsagn samtidigt, så overvej i stedet at bruge sessioner med høj samtidighed . HC-sessioner leverer uafhængige eksekveringskontekster, der kører parallelt, mens systemet håndterer genbrug af underliggende Livy-sessioner.

  1. Tilføj en anden notesbogcelle, og indsæt denne kode.

    import json
    import requests
    
    api_base_url = "https://api.fabric.microsoft.com/"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy API session URL
    # URL pattern: {base_url}/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyapi/versions/{api_version}/sessions
    livy_api_session_url = (f"{api_base_url}v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/"
                           f"livyapi/versions/2023-12-01/sessions")
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy API URL: {livy_api_session_url}")
    print("Creating Livy session...")
    
    try:
        # Create a new Livy session with default configuration
        create_livy_session = requests.post(livy_api_session_url, headers=headers, json={})
    
        # Check if the request was successful
        if create_livy_session.status_code == 202:
            session_info = create_livy_session.json()
            print('Livy session creation request submitted successfully')
            print(f'Session Info: {json.dumps(session_info, indent=2)}')
    
            # Extract session ID for future operations
            livy_session_id = session_info['id']
            livy_session_url = f"{livy_api_session_url}/{livy_session_id}"
    
            print(f"Session ID: {livy_session_id}")
            print(f"Session URL: {livy_session_url}")
    
        else:
            print(f"Failed to create session. Status code: {create_livy_session.status_code}")
            print(f"Response: {create_livy_session.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {create_livy_session.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. Kør notesbogcellen. Du kan se én linje udskrevet, når Livy-sessionen oprettes.

    Skærmbillede, der viser resultaterne af den første udførelse af notesbogens celle.

  3. Du kan bekræfte, at Livy-sessionen er oprettet ved hjælp af [Få vist dine job i overvågningshubben](#View dine job i overvågningshubben).

Integration med Fabric-miljøer

Denne Livy API-session kører som standard i forhold til standardstartgruppen for arbejdsområdet. Alternativt kan du bruge Fabric Environments Create, configur og bruge et miljø i Microsoft Fabric til at tilpasse Spark-poolen, som Livy API-sessionen bruger til disse Spark-jobs. For at bruge et Fabric Environment skal du opdatere den tidligere notebook-celle med denne json-payload.

create_livy_session = requests.post(livy_base_url, headers = headers, json = {
    "conf" : {
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
        }
)

Indsend en spark.sql-sætning ved hjælp af Livy API Spark-sessionen

  1. Tilføj en anden notesbogcelle, og indsæt denne kode.

        # call get session API
    import time
    
    table_name = "green_tripdata_2022"
    
    print("Checking session status...")
    
    # Get current session status
    get_session_response = requests.get(livy_session_url, headers=headers)
    session_status = get_session_response.json()
    print(f"Current session state: {session_status['state']}")
    
    # Wait for session to become idle (ready to accept statements)
    print("Waiting for session to become idle...")
    while session_status["state"] != "idle":
        print(f"   Session state: {session_status['state']} - waiting 5 seconds...")
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
        session_status = get_session_response.json()
    
    print("Session is now idle and ready to accept statements")
    
    # Execute a Spark SQL statement
    execute_statement_url = f"{livy_session_url}/statements"
    
    # Define your Spark SQL query - Replace with your actual table and query
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM {table_name} WHERE column_name = 'some_value' LIMIT 10\").show()",
        "kind": "spark"  # Type of code (spark, pyspark, sql, etc.)
    }
    
    print("Submitting Spark SQL statement...")
    print(f"Query: {payload_data['code']}")
    
    try:
        # Submit the statement for execution
        execute_statement_response = requests.post(execute_statement_url, headers=headers, json=payload_data)
    
        if execute_statement_response.status_code == 200:
            statement_info = execute_statement_response.json()
            print('Statement submitted successfully')
            print(f"Statement Info: {json.dumps(statement_info, indent=2)}")
    
            # Get statement ID for monitoring
            statement_id = str(statement_info['id'])
            get_statement_url = f"{livy_session_url}/statements/{statement_id}"
    
            print(f"Statement ID: {statement_id}")
    
            # Monitor statement execution
            print("Monitoring statement execution...")
            get_statement_response = requests.get(get_statement_url, headers=headers)
            statement_status = get_statement_response.json()
    
            while statement_status["state"] != "available":
                print(f"   Statement state: {statement_status['state']} - waiting 5 seconds...")
                time.sleep(5)
                get_statement_response = requests.get(get_statement_url, headers=headers)
                statement_status = get_statement_response.json()
    
            # Retrieve and display results
            print("Statement execution completed!")
            if 'output' in statement_status and 'data' in statement_status['output']:
                results = statement_status['output']['data']['text/plain']
                print("Query Results:")
                print(results)
            else:
                print("No output data available")
    
        else:
            print(f"Failed to submit statement. Status code: {execute_statement_response.status_code}")
            print(f"Response: {execute_statement_response.text}")
    
    except Exception as e:
        print(f"Error executing statement: {e}")
    
  2. Kør notesbogcellen. Du kan se flere trinvise linjer udskrevet, efterhånden som jobbet sendes, og resultaterne returneres.

    Skærmbillede, der viser resultaterne af den første notesbogcelle med Spark.sql udførelse.

Indsend endnu en spark.sql sætning ved hjælp af Livy API Spark-sessionen

  1. Tilføj en anden notesbogcelle, og indsæt denne kode.

    print("Executing additional Spark SQL statement...")
    
    # Wait for session to be idle again
    get_session_response = requests.get(livy_session_url, headers=headers)
    session_status = get_session_response.json()
    
    while session_status["state"] != "idle":
        print(f"   Waiting for session to be idle... Current state: {session_status['state']}")
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
        session_status = get_session_response.json()
    
    # Execute another statement - Replace with your actual query
    payload_data = {
        "code": f"spark.sql(\"SELECT COUNT(*) as total_records FROM {table_name}\").show()",
        "kind": "spark"
    }
    
    print(f"Executing query: {payload_data['code']}")
    
    try:
        # Submit the second statement
        execute_statement_response = requests.post(execute_statement_url, headers=headers, json=payload_data)
    
        if execute_statement_response.status_code == 200:
            statement_info = execute_statement_response.json()
            print('Second statement submitted successfully')
    
            statement_id = str(statement_info['id'])
            get_statement_url = f"{livy_session_url}/statements/{statement_id}"
    
            # Monitor execution
            print("Monitoring statement execution...")
            get_statement_response = requests.get(get_statement_url, headers=headers)
            statement_status = get_statement_response.json()
    
            while statement_status["state"] != "available":
                print(f"   Statement state: {statement_status['state']} - waiting 5 seconds...")
                time.sleep(5)
                get_statement_response = requests.get(get_statement_url, headers=headers)
                statement_status = get_statement_response.json()
    
            # Display results
            print("Second statement execution completed!")
            if 'output' in statement_status and 'data' in statement_status['output']:
                results = statement_status['output']['data']['text/plain']
                print("Query Results:")
                print(results)
            else:
                print("No output data available")
    
        else:
            print(f"Failed to submit second statement. Status code: {execute_statement_response.status_code}")
    
    except Exception as e:
        print(f"Error executing second statement: {e}")
    
  2. Kør notesbogcellen. Du kan se flere trinvise linjer udskrevet, efterhånden som jobbet sendes, og resultaterne returneres.

    Skærmbillede, der viser resultaterne af den anden udførelse af notesbogens celle.

Afslut Livius session

  1. Tilføj en anden notesbogcelle, og indsæt denne kode.

    print("Cleaning up Livy session...")
    
    try:
        # Check current session status before deletion
        get_session_response = requests.get(livy_session_url, headers=headers)
        if get_session_response.status_code == 200:
            session_info = get_session_response.json()
            print(f"Session state before deletion: {session_info.get('state', 'unknown')}")
    
        print(f"Deleting session at: {livy_session_url}")
    
        # Delete the session
        delete_response = requests.delete(livy_session_url, headers=headers)
    
        if delete_response.status_code == 200:
            print("Session deleted successfully")
        elif delete_response.status_code == 404:
            print("Session was already deleted or not found")
        else:
            print(f"Delete request completed with status code: {delete_response.status_code}")
            print(f"Response: {delete_response.text}")
    
        print(f"Delete response details: {delete_response}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error during session deletion: {e}")
    except Exception as e:
        print(f"Error during session cleanup: {e}")
    

Få vist dine job i overvågningshubben

Du kan få adgang til overvågningshubben for at få vist forskellige Apache Spark-aktiviteter ved at vælge Overvåg i navigationslinkene til venstre.

  1. Når sessionen er i gang eller er i fuldført tilstand, kan du få vist sessionsstatussen ved at gå til Overvågning.

    Skærmbillede, der viser tidligere Livy API-indsendelser i overvågningshubben.

  2. Vælg og åbn det seneste aktivitetsnavn.

    Skærmbillede, der viser den seneste Livy API-aktivitet i overvågningshubben.

  3. I denne Livy API-session kan du se dine tidligere sessionsindsendelser, køre detaljer, Spark-versioner og konfiguration. Læg mærke til den stoppet status øverst til højre.

    Skærmbillede, der viser de seneste Livy API-aktivitetsoplysninger i overvågningshubben.

For at opsummere hele processen har du brug for en fjernklient som Visual Studio Code, en Microsoft Entra app/SPN-token, Livy API endpoint URL, autentificering mod din Lakehouse og til sidst en Session Livy API.