Usar Carregador Automático com o Catálogo do Unity

O Auto Loader pode ingerir dados com segurança de locais externos configurados com o Unity Catalog. Para saber mais sobre como conectar o armazenamento com segurança ao Catálogo do Unity, consulte Conectar-se ao armazenamento de objetos de nuvem usando o Catálogo do Unity. O Carregador Automático depende do fluxo estruturado para processamento incremental. Para obter recomendações e limitações, consulte Usar o Catálogo do Unity com o fluxo estruturado.

Observação

No Databricks Runtime 11.3 LTS e posteriores, você pode usar o Carregador Automático com modos de acesso padrão ou dedicado (anteriores modos de acesso compartilhado e de usuário único).

Por padrão, há suporte ao modo de listagem de diretórios.

Especifique os locais para recursos do Auto Loader no Unity Catalog

O modelo de segurança do Catálogo do Unity considera que todos os locais de armazenamento referenciados em uma carga de trabalho são gerenciados pelo Catálogo do Unity. O Databricks recomenda sempre armazenar informações de ponto de verificação e evolução de esquema em locais de armazenamento gerenciados pelo Unity Catalog. O Catálogo do Unity não permite que você aninhe arquivos de inferência e evolução de esquema ou ponto de verificação no diretório da tabela.

Ingerir dados do armazenamento em nuvem usando o Catálogo do Unity

Os exemplos a seguir pressupõem que o usuário executor tem permissões READ FILES no local externo, privilégios de proprietário nas tabelas de destino e as configurações e concessões a seguir.

Observação

O Azure Data Lake Storage é o único tipo de armazenamento do Azure compatível com o Catálogo do Unity.

Local de armazenamento Conceder
abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data READ FILES
abfss://dev-bucket@<storage-account>.dfs.core.windows.net READ FILES, WRITE FILES, CREATE TABLE

Usar o Auto Loader para carregar em uma tabela gerenciada pelo Unity Catalog

Os exemplos a seguir demonstram como usar o Carregador Automático para ingerir dados em uma tabela gerenciada do Catálogo do Unity.

Python

checkpoint_path = "abfss://dev-bucket@<storage-account>.dfs.core.windows.net/_checkpoint/dev_table"

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load("abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable("dev_catalog.dev_database.dev_table"))

SQL

CREATE OR REFRESH STREAMING TABLE dev_catalog.dev_database.dev_table
AS SELECT * FROM STREAM read_files(
  'abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data',
  format => 'json'
);

Quando você usa read_files em uma instrução CREATE STREAMING TABLE dentro de Pipelines Declarativos do Lakeflow Spark, as localizações de pontos de verificação e esquemas são gerenciadas automaticamente.

Usar o Carregador Automático para carregar em uma tabela externa do Catálogo do Unity

Para manter os dados em um local de armazenamento específico, use uma tabela externa do Catálogo do Unity em vez de uma tabela gerenciada. Por exemplo, use uma tabela externa para compartilhar dados com clientes que não são do Databricks ou para registrar dados existentes. Com tabelas externas, você define o caminho de armazenamento. Consulte Trabalhar com tabelas externas.

Para usar o Carregador Automático com uma tabela externa do Catálogo do Unity, primeiro registre a tabela com CREATE TABLE ... LOCATION e depois transmita para ela usando o nome. O local da tabela deve estar dentro de um local externo em que você tenha CREATE EXTERNAL TABLE permissões. A localização do ponto de verificação também deve estar em um local externo gerenciado pelo Unity Catalog. Use um caminho separado dos dados da tabela.

checkpoint_path = "abfss://dev-bucket@<storage-account>.dfs.core.windows.net/_checkpoint/dev_table"
table_path = "abfss://dev-bucket@<storage-account>.dfs.core.windows.net/external/dev_table"

# One-time: register the external table in UC.
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS dev_catalog.dev_database.dev_table
  USING DELTA
  LOCATION '{table_path}'
""")

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load("abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable("dev_catalog.dev_database.dev_table"))