Leer y escribir archivos avro

Apache Avro es un formato de serialización de datos basado en filas que proporciona estructuras de datos enriquecidas y una codificación binaria compacta y rápida. Los usuarios de Azure Databricks suelen encontrarlo al ingerir datos de sistemas de transmisión de eventos como Apache Kafka y Google Pub/Sub, donde Avro es el formato de serialización dominante. Azure Databricks admite Avro para leer y escribir con Apache Spark, incluida la conversión automática de esquemas entre tipos de Avro y Spark SQL, creación de particiones, compresión y nombres de registro personalizados.

Si está leyendo registros codificados con Avro desde Apache Kafka u otro bus de mensajes en lugar de desde archivos, consulte Lectura y escritura de datos de Avro de streaming, que cubre las from_avro funciones y to_avro que se usan para la deserialización de streaming.

Prerequisites

Azure Databricks no requiere configuración adicional para usar archivos Avro. Sin embargo, para transmitir archivos Avro, necesita cargador automático.

Opciones

Utilice los métodos .option() y .options() de DataFrameReader y DataFrameWriter para configurar fuentes de datos Avro. Para obtener una lista completa de las opciones admitidas, consulte DataFrameReader Opciones de Avro y DataFrameWriter Opciones de Avro.

Usage

En los ejemplos siguientes se usa el conjunto de datos de Wanderbricks para demostrar la lectura y escritura de archivos avro mediante la API DataFrame de Spark y SQL.

Lectura de archivos avro mediante SQL

Para consultar archivos avro sin registrar una tabla, use read_files. Los permisos del catálogo de Unity en la ubicación externa se aplican automáticamente.

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_avro',
  format => 'avro'
)

Leer y escribir archivos avro

Utilice la API de DataFrame de Apache Spark cuando necesite leer o escribir archivos Avro para un sistema posterior, aplicar transformaciones antes de la carga o controlar opciones, como la creación de particiones y el esquema, en el momento de la escritura.

En los ejemplos siguientes se usa el conjunto de datos de ejemplo de Wanderbricks .

Pitón

from pyspark.sql.functions import year, month

# Write wanderbricks reviews to Avro format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Read an Avro file into a DataFrame
df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
display(df)

# Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Read using a custom Avro schema to select specific fields
avro_schema = """
{
  "type": "record",
  "name": "Review",
  "fields": [
    {"name": "review_id", "type": "string"},
    {"name": "rating", "type": "int"},
    {"name": "comment", "type": ["null", "string"]}
  ]
}
"""
df = spark.read.format("avro").option("avroSchema", avro_schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Write partitioned Avro files by year and month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")

# Write with a custom record name and namespace for Schema Registry compatibility
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").options(
  recordName="Review",
  recordNamespace="com.wanderbricks"
).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

Scala

import org.apache.spark.sql.functions.{year, month}

// Write wanderbricks reviews to Avro format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Read an Avro file into a DataFrame
val df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
df.show()

// Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Read using a custom Avro schema to select specific fields
val avroSchema = """
{
  "type": "record",
  "name": "Review",
  "fields": [
    {"name": "review_id", "type": "string"},
    {"name": "rating", "type": "int"},
    {"name": "comment", "type": ["null", "string"]}
  ]
}
"""
val filtered = spark.read.format("avro").option("avroSchema", avroSchema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Write partitioned Avro files by year and month
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")

// Write with a custom record name and namespace for Schema Registry compatibility
reviews.write.format("avro").options(Map(
  "recordName" -> "Review",
  "recordNamespace" -> "com.wanderbricks"
)).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

SQL

-- Write wanderbricks reviews to Avro format
CREATE TABLE reviews_avro
USING AVRO
AS SELECT * FROM samples.wanderbricks.reviews;

-- Write partitioned Avro files by year and month
CREATE TABLE bookings_avro_partitioned
USING AVRO
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;

SELECT * FROM bookings_avro_partitioned;

Recursos adicionales

  • Lectura y escritura de archivos Parquet: si la carga de trabajo es principalmente analítica y de lectura intensiva en lugar de streaming o con mucha escritura, el diseño de columnas de Parquet ofrece un rendimiento de consulta más eficaz que el almacenamiento basado en filas de Avro.