Apache Avro 는 풍부한 데이터 구조와 빠르고 빠른 이진 인코딩을 제공하는 행 기반 데이터 직렬화 형식입니다. Azure Databricks 사용자는 Apache Kafka 및 Google Pub/Sub와 같은 이벤트 스트리밍 시스템에서 데이터를 수집할 때 이를 가장 흔히 접하게 되며, 이때 Avro는 주된 직렬화 형식입니다. Azure Databricks Avro와 Spark SQL 형식 간의 자동 스키마 변환, 분할, 압축 및 사용자 지정 레코드 이름을 포함하여 Apache Spark를 사용하여 읽고 쓰는 데 모두 Avro를 지원합니다.
Avro로 인코딩된 레코드를 파일이 아니라 Apache Kafka 또는 다른 메시지 버스에서 읽는 경우, 스트리밍 역직렬화에 사용되는 from_avro 및 to_avro 함수를 다루는 스트리밍 Avro 데이터 읽기 및 쓰기를 참조하세요.
사전 요구 사항
Azure Databricks Avro 파일을 사용하기 위해 추가 구성이 필요하지 않습니다. 그러나 Avro 파일을 스트리밍하려면 자동 로더가 필요합니다.
옵션
DataFrameReader 및 DataFrameWriter의 .option() 및 .options() 메서드를 사용하여 Avro 데이터 원본을 구성합니다. 지원되는 옵션의 전체 목록은 DataFrameReader Avro 옵션 및 DataFrameWriter Avro 옵션을 참조하세요.
Usage
다음 예제에서는 Wanderbricks 데이터 세트를 사용하여 Spark DataFrame API 및 SQL을 사용하여 Avro 파일을 읽고 쓰는 방법을 보여 줍니다.
SQL을 사용하여 Avro 파일 읽기
테이블을 등록하지 않고 Avro 파일을 쿼리하려면 .를 사용합니다 read_files. 외부 위치에 대한 Unity 카탈로그 권한은 자동으로 적용됩니다.
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_avro',
format => 'avro'
)
Avro 파일 읽기 및 쓰기
다운스트림 시스템에 대한 Avro 파일을 읽거나 쓰거나, 로드하기 전에 변환을 적용하거나, 쓰기 시 분할 및 스키마와 같은 옵션을 제어해야 하는 경우 Apache Spark DataFrame API를 사용합니다.
다음 예제에서는 Wanderbricks 샘플 데이터 세트를 사용합니다.
파이썬
from pyspark.sql.functions import year, month
# Write wanderbricks reviews to Avro format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Read an Avro file into a DataFrame
df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
display(df)
# Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Read using a custom Avro schema to select specific fields
avro_schema = """
{
"type": "record",
"name": "Review",
"fields": [
{"name": "review_id", "type": "string"},
{"name": "rating", "type": "int"},
{"name": "comment", "type": ["null", "string"]}
]
}
"""
df = spark.read.format("avro").option("avroSchema", avro_schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Write partitioned Avro files by year and month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")
# Write with a custom record name and namespace for Schema Registry compatibility
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").options(
recordName="Review",
recordNamespace="com.wanderbricks"
).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
스칼라
import org.apache.spark.sql.functions.{year, month}
// Write wanderbricks reviews to Avro format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Read an Avro file into a DataFrame
val df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
df.show()
// Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Read using a custom Avro schema to select specific fields
val avroSchema = """
{
"type": "record",
"name": "Review",
"fields": [
{"name": "review_id", "type": "string"},
{"name": "rating", "type": "int"},
{"name": "comment", "type": ["null", "string"]}
]
}
"""
val filtered = spark.read.format("avro").option("avroSchema", avroSchema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Write partitioned Avro files by year and month
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")
// Write with a custom record name and namespace for Schema Registry compatibility
reviews.write.format("avro").options(Map(
"recordName" -> "Review",
"recordNamespace" -> "com.wanderbricks"
)).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
SQL
-- Write wanderbricks reviews to Avro format
CREATE TABLE reviews_avro
USING AVRO
AS SELECT * FROM samples.wanderbricks.reviews;
-- Write partitioned Avro files by year and month
CREATE TABLE bookings_avro_partitioned
USING AVRO
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;
SELECT * FROM bookings_avro_partitioned;
추가 리소스
- Parquet 파일 읽기 및 쓰기: 워크로드가 스트리밍 또는 쓰기가 많은 대신 주로 분석적이고 읽기가 많은 경우 Parquet의 열 형식 레이아웃은 Avro의 행 기반 스토리지보다 더 효율적인 쿼리 성능을 제공합니다.