读取和写入 JSON 文件

JSON(JavaScript 对象表示法)是用于数据交换和存储的广泛使用的半结构化格式。 Azure Databricks 支持使用 Apache Spark 读取和写入 JSON,包括单行和多行模式、自动推断架构以及救援数据。 可以使用 Spark 数据帧 API 或 SQL 从云存储读取 JSON 文件,并将数据帧写回到 JSON。

先决条件

Azure Databricks不需要其他配置才能使用 JSON 文件。

选项

使用 .option().options()DataFrameReaderDataFrameWriter 方法来配置 JSON 数据源。 有关支持选项的完整列表,请参阅 DataFrameReader JSON 选项DataFrameWriter JSON 选项

Usage

以下示例使用 Wanderbricks 示例数据集演示如何使用 Spark 数据帧 API 和 SQL 在单行和多行模式下读取和写入 JSON 文件。

写入和读取 JSON 文件

在单行模式(默认值)中,输出的每一行都包含一个完整的 JSON 对象。 将 Wanderbricks 评论写入 JSON 格式,然后将其读回。

Python

# Write wanderbricks reviews to JSON format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("json").save("/Volumes/<catalog>/<schema>/<volume>/reviews_json")

# Read the JSON files into a DataFrame
df = spark.read.format("json").load("/Volumes/<catalog>/<schema>/<volume>/reviews_json")
df.printSchema()
display(df)

Scala(编程语言)

// Write wanderbricks reviews to JSON format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("json").save("/Volumes/<catalog>/<schema>/<volume>/reviews_json")

// Read the JSON files into a DataFrame
val df = spark.read.format("json").load("/Volumes/<catalog>/<schema>/<volume>/reviews_json")
df.printSchema()
df.show()

读取多行 JSON 文件

在多行模式下,单个 JSON 对象可以跨越多行。 启用多行模式可读取跨多行格式化记录的 JSON 文件。

Python

mdf = spark.read.option("multiline", "true").format("json").load("/Volumes/<catalog>/<schema>/<volume>/multi-line.json")
mdf.show(truncate=False)

Scala(编程语言)

val mdf = spark.read.option("multiline", "true").format("json").load("/Volumes/<catalog>/<schema>/<volume>/multi-line.json")
mdf.show(false)

SQL

CREATE TEMPORARY VIEW multiLineJsonTable
USING json
OPTIONS (path="/Volumes/<catalog>/<schema>/<volume>/multi-line.json",multiline=true)

使用 SQL 读取 JSON 文件

可以使用 read_files SQL 中的表值函数 表值函数读取 JSON 文件。

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_json',
  format => 'json',
  multiLine => true)

还可以使用 USING JSON 读取 JSON 文件。 但是,Databricks 建议使用 read_files ,而不是 USING JSON 因为 read_files 允许规范架构和其他文件处理选项。

DROP TABLE IF EXISTS reviews_json_table;

CREATE TABLE reviews_json_table
USING JSON
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews_json", multiline true);

SELECT * FROM reviews_json_table;

指定字符编码

默认情况下,会自动检测输入文件的字符集。 可以使用 charset 选项显式指定字符集:

Python

spark.read.option("charset", "UTF-16BE").format("json").load("/Volumes/<catalog>/<schema>/<volume>/fileInUTF16.json")

Scala(编程语言)

spark.read.option("charset", "UTF-16BE").format("json").load("/Volumes/<catalog>/<schema>/<volume>/fileInUTF16.json")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/fileInUTF16.json',
  format => 'json',
  charset => 'UTF-16BE'
)

下面是一些受支持的字符集:UTF-8UTF-16BEUTF-16LEUTF-16UTF-32BEUTF-32LEUTF-32。 若要查看 Oracle Java SE 支持的字符集的完整列表,请参阅受支持的编码

启用已获救的数据列

已获救的数据列可确保在 ETL 期间永远不会丢失数据。 它捕获未分析的任何数据,因为记录中的一个或多个字段存在下列问题之一:

  • 不存在于提供的架构中。
  • 与提供的架构的数据类型不匹配。
  • 与提供的架构中的字段名称大小写不匹配。

已获救的数据列作为 JSON Blob 返回,其中包含已获救列和记录的源文件路径。

若要启用已获救的数据列,在读取时将 rescuedDataColumn 选项设置为列名:

Python

df = spark.read.option("rescuedDataColumn", "_rescued_data").format("json").load("/Volumes/<catalog>/<schema>/<volume>/reviews_json")

Scala(编程语言)

val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("json").load("/Volumes/<catalog>/<schema>/<volume>/reviews_json")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_json',
  format => 'json',
  rescuedDataColumn => '_rescued_data'
)

若要从已获救的数据列中删除源文件路径,请设置:

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

分析记录时,JSON 分析器支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与以下规则一起使用 rescuedDataColumn时,适用以下规则:

  • DROPMALFORMED 模式下,数据类型不匹配不会导致记录被丢弃,也不会在 FAILFAST 模式下引发错误。
  • 只有损坏的记录(即不完整或格式错误的 JSON)会被删除或引发错误。
  • 如果使用此选项 badRecordsPath ,数据类型不匹配并不被视为错误的记录。 只有不完整的和格式错误的 JSON 记录才会存储在 badRecordsPath 中。

其他资源

  • 读取和写入 Parquet 文件:如果工作负荷主要是分析和读取密集型,Parquet 的列式布局比 JSON 基于行的文本格式更高效地提供查询性能。
  • 读取和写入 Avro 文件:如果要从事件流式处理系统(如 Apache Kafka)生成或使用 JSON,Avro 会提供更压缩的二进制编码,并提供架构演变支持。