Leia e escreva ficheiros Avro

Apache Avro é um formato de serialização de dados baseado em linhas que fornece estruturas de dados ricas e uma codificação binária compacta e rápida. Os utilizadores do Azure Databricks encontram-na mais frequentemente ao ingerir dados de sistemas de streaming de eventos como o Apache Kafka e o Google Pub/Sub, onde o Avro é o formato dominante de serialização. O Azure Databricks suporta Avro tanto para leitura como para escrita com o Apache Spark, incluindo conversão automática de esquema entre tipos SQL Avro e Spark, particionamento, compressão e nomes personalizados de registos.

Se estiver a ler registos em formato Avro do Apache Kafka ou de outro barramento de mensagens, em vez de ficheiros, consulte Ler e escrever dados Avro em streaming, que aborda as funções from_avro e to_avro, utilizadas para desserialização em streaming.

Pré-requisitos

O Azure Databricks não requer configuração adicional para utilizar ficheiros Avro. No entanto, para transmitir ficheiros Avro, precisas do Auto Loader.

Opções

Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar origens de dados Avro. Para uma lista completa de opções suportadas, veja DataFrameReader opções Avro e DataFrameWriter opções Avro.

Usage

Os exemplos seguintes utilizam o conjunto de dados Wanderbricks para demonstrar a leitura e escrita de ficheiros Avro usando a API Spark DataFrame e SQL.

Leia ficheiros Avro usando SQL

Para consultar ficheiros Avro sem registar uma tabela, use read_files. As permissões do Unity Catalog na localização externa aplicam-se automaticamente.

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_avro',
  format => 'avro'
)

Leia e escreva ficheiros Avro

Use a API Apache Spark DataFrame quando precisar de ler ou escrever ficheiros Avro para um sistema a jusante, aplicar transformações antes de carregar, ou controlar opções como particionamento e esquema no momento da escrita.

Os exemplos seguintes utilizam o conjunto de dados de exemplo do Wanderbricks .

Python

from pyspark.sql.functions import year, month

# Write wanderbricks reviews to Avro format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Read an Avro file into a DataFrame
df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
display(df)

# Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Read using a custom Avro schema to select specific fields
avro_schema = """
{
  "type": "record",
  "name": "Review",
  "fields": [
    {"name": "review_id", "type": "string"},
    {"name": "rating", "type": "int"},
    {"name": "comment", "type": ["null", "string"]}
  ]
}
"""
df = spark.read.format("avro").option("avroSchema", avro_schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Write partitioned Avro files by year and month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")

# Write with a custom record name and namespace for Schema Registry compatibility
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").options(
  recordName="Review",
  recordNamespace="com.wanderbricks"
).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

linguagem de programação Scala

import org.apache.spark.sql.functions.{year, month}

// Write wanderbricks reviews to Avro format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Read an Avro file into a DataFrame
val df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
df.show()

// Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Read using a custom Avro schema to select specific fields
val avroSchema = """
{
  "type": "record",
  "name": "Review",
  "fields": [
    {"name": "review_id", "type": "string"},
    {"name": "rating", "type": "int"},
    {"name": "comment", "type": ["null", "string"]}
  ]
}
"""
val filtered = spark.read.format("avro").option("avroSchema", avroSchema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Write partitioned Avro files by year and month
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")

// Write with a custom record name and namespace for Schema Registry compatibility
reviews.write.format("avro").options(Map(
  "recordName" -> "Review",
  "recordNamespace" -> "com.wanderbricks"
)).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

SQL

-- Write wanderbricks reviews to Avro format
CREATE TABLE reviews_avro
USING AVRO
AS SELECT * FROM samples.wanderbricks.reviews;

-- Write partitioned Avro files by year and month
CREATE TABLE bookings_avro_partitioned
USING AVRO
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;

SELECT * FROM bookings_avro_partitioned;

Recursos adicionais

  • Leia e escreva ficheiros Parquet: Se a sua carga de trabalho for sobretudo analítica e centrada na leitura, em vez de processamento em fluxo ou de escrita intensiva, a estrutura colunar do Parquet oferece um desempenho nas consultas mais eficiente do que o armazenamento baseado em linhas do Avro.