Avro 파일 읽기 및 쓰기

Apache Avro 는 풍부한 데이터 구조와 빠르고 빠른 이진 인코딩을 제공하는 행 기반 데이터 직렬화 형식입니다. Azure Databricks 사용자는 Apache Kafka 및 Google Pub/Sub와 같은 이벤트 스트리밍 시스템에서 데이터를 수집할 때 이를 가장 흔히 접하게 되며, 이때 Avro는 주된 직렬화 형식입니다. Azure Databricks Avro와 Spark SQL 형식 간의 자동 스키마 변환, 분할, 압축 및 사용자 지정 레코드 이름을 포함하여 Apache Spark를 사용하여 읽고 쓰는 데 모두 Avro를 지원합니다.

Avro로 인코딩된 레코드를 파일이 아니라 Apache Kafka 또는 다른 메시지 버스에서 읽는 경우, 스트리밍 역직렬화에 사용되는 from_avroto_avro 함수를 다루는 스트리밍 Avro 데이터 읽기 및 쓰기를 참조하세요.

사전 요구 사항

Azure Databricks Avro 파일을 사용하기 위해 추가 구성이 필요하지 않습니다. 그러나 Avro 파일을 스트리밍하려면 자동 로더가 필요합니다.

옵션

DataFrameReaderDataFrameWriter.option().options() 메서드를 사용하여 Avro 데이터 원본을 구성합니다. 지원되는 옵션의 전체 목록은 DataFrameReader Avro 옵션DataFrameWriter Avro 옵션을 참조하세요.

Usage

다음 예제에서는 Wanderbricks 데이터 세트를 사용하여 Spark DataFrame API 및 SQL을 사용하여 Avro 파일을 읽고 쓰는 방법을 보여 줍니다.

SQL을 사용하여 Avro 파일 읽기

테이블을 등록하지 않고 Avro 파일을 쿼리하려면 .를 사용합니다 read_files. 외부 위치에 대한 Unity 카탈로그 권한은 자동으로 적용됩니다.

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_avro',
  format => 'avro'
)

Avro 파일 읽기 및 쓰기

다운스트림 시스템에 대한 Avro 파일을 읽거나 쓰거나, 로드하기 전에 변환을 적용하거나, 쓰기 시 분할 및 스키마와 같은 옵션을 제어해야 하는 경우 Apache Spark DataFrame API를 사용합니다.

다음 예제에서는 Wanderbricks 샘플 데이터 세트를 사용합니다.

파이썬

from pyspark.sql.functions import year, month

# Write wanderbricks reviews to Avro format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Read an Avro file into a DataFrame
df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
display(df)

# Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Read using a custom Avro schema to select specific fields
avro_schema = """
{
  "type": "record",
  "name": "Review",
  "fields": [
    {"name": "review_id", "type": "string"},
    {"name": "rating", "type": "int"},
    {"name": "comment", "type": ["null", "string"]}
  ]
}
"""
df = spark.read.format("avro").option("avroSchema", avro_schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

# Write partitioned Avro files by year and month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")

# Write with a custom record name and namespace for Schema Registry compatibility
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").options(
  recordName="Review",
  recordNamespace="com.wanderbricks"
).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

스칼라

import org.apache.spark.sql.functions.{year, month}

// Write wanderbricks reviews to Avro format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Read an Avro file into a DataFrame
val df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
df.show()

// Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Read using a custom Avro schema to select specific fields
val avroSchema = """
{
  "type": "record",
  "name": "Review",
  "fields": [
    {"name": "review_id", "type": "string"},
    {"name": "rating", "type": "int"},
    {"name": "comment", "type": ["null", "string"]}
  ]
}
"""
val filtered = spark.read.format("avro").option("avroSchema", avroSchema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

// Write partitioned Avro files by year and month
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")

// Write with a custom record name and namespace for Schema Registry compatibility
reviews.write.format("avro").options(Map(
  "recordName" -> "Review",
  "recordNamespace" -> "com.wanderbricks"
)).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")

SQL

-- Write wanderbricks reviews to Avro format
CREATE TABLE reviews_avro
USING AVRO
AS SELECT * FROM samples.wanderbricks.reviews;

-- Write partitioned Avro files by year and month
CREATE TABLE bookings_avro_partitioned
USING AVRO
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;

SELECT * FROM bookings_avro_partitioned;

추가 리소스

  • Parquet 파일 읽기 및 쓰기: 워크로드가 스트리밍 또는 쓰기가 많은 대신 주로 분석적이고 읽기가 많은 경우 Parquet의 열 형식 레이아웃은 Avro의 행 기반 스토리지보다 더 효율적인 쿼리 성능을 제공합니다.