读取和写入 XML 文件

重要

此功能目前以公共预览版提供。

可扩展标记语言 (XML)是一种标记语言,用于以文本格式设置、存储和共享数据。 它定义一组规则,用于序列化从文档到任意数据结构的数据。

Azure Databricks支持使用 Apache Spark 进行读取和写入的 XML,包括自动架构推理和演变、行标记配置、XSD 验证和 SQL 表达式,例如from_xml。 原生 XML 支持可与 Auto Loader、read_filesCOPY INTO 配合使用,而无需外部 jar 包。

先决条件

XML 文件格式支持需要 Databricks Runtime 14.3 及更高版本。

选项

使用 .option().options()DataFrameReaderDataFrameWriter 方法来配置 XML 数据源。 有关支持选项的完整列表,请参阅 DataFrameReader XML 选项DataFrameWriter XML 选项

分析 XML 记录

XML 规范要求结构必须是格式良好的。 但是,此规范不会立即映射到表格格式。 必须指定 rowTag 选项以指示映射到 DataFrameRow 的 XML 元素。 该 rowTag 元素将成为顶级 struct 元素。 rowTag 的子元素将成为顶级 struct 的字段。

可以为此记录指定架构,也可以自动推断该架构。 由于分析程序仅检查 rowTag 元素,因此会筛选掉 DTD 和外部实体。

以下示例演示了使用不同 rowTag 选项对 XML 文件的架构推理和分析:

Python

xmlString = """
  <reviews>
    <review id="r001">
      <author>Alice</author>
      <rating>5</rating>
      <comment>Amazing stay, highly recommend!</comment>
    </review>
    <review id="r002">
      <author>Bob</author>
      <rating>4</rating>
      <comment>Great location, very comfortable</comment>
    </review>
  </reviews>"""

xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala(编程语言)

val xmlString = """
  <reviews>
    <review id="r001">
      <author>Alice</author>
      <rating>5</rating>
      <comment>Amazing stay, highly recommend!</comment>
    </review>
    <review id="r002">
      <author>Bob</author>
      <rating>4</rating>
      <comment>Great location, very comfortable</comment>
    </review>
  </reviews>"""
val xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString)

使用 rowTag 选项读取 XML 文件,结果为 "reviews"

Python

df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala(编程语言)

val df = spark.read.option("rowTag", "reviews").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
  format => 'xml',
  rowTag => 'reviews'
)

输出:

root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)

+----------------------------------------------------------------------------------------+
|review                                                                                  |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+

rowTag 作为 "review" 读取 XML 文件:

Python

df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:

Scala(编程语言)

val df = spark.read.option("rowTag", "review").xml(xmlPath)
// Infers four top-level fields and parses `review` in separate rows:

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
  format => 'xml',
  rowTag => 'review'
)

输出:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)

+----+------+--------------------------------+------+
|_id |author|comment                         |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5     |
|r002|Bob   |Great location, very comfortable|4     |
+----+------+--------------------------------+------+

使用 XSD 验证 XML 记录

可以选择性地通过 XML 架构定义(XSD)验证每个行级 XML 记录。 XSD 文件在 rowValidationXSDPath 选项中指定。 XSD 不会以其他方式影响已提供或推断的架构。 验证失败的记录被标记为“已损坏”,并根据选项部分中所述的损坏记录处理模式选项进行处理。

可用于 XSDToSchema 从 XSD 文件中提取 Spark 数据帧架构。 它仅支持简单类型、复杂类型和序列类型,仅支持基本 XSD 功能。

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="review">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="rating" type="xs:integer" />
          <xs:element name="comment" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

下表显示了将 XSD 数据类型转换为 Spark 数据类型:

XSD 数据类型 Spark 数据类型
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
shortunsignedByte ShortType
integernegativeIntegernonNegativeIntegernonPositiveIntegerpositiveIntegerunsignedShort IntegerType
longunsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

分析嵌套 XML

在现有 DataFrame 的字符串值列中,可以使用 schema_of_xml 解析 XML 数据,并将 from_xml 的架构和解析结果作为新的 struct 列返回。 作为自变量传递到 schema_of_xmlfrom_xml 的 XML 数据必须是单个格式标准的 XML 记录。

XML模式架构

用于 schema_of_xml 从 XML 字符串推断 Spark 架构。 将结果传递给 from_xml 以解析 XML 列。

语法schema_of_xml(xmlStr [, options])

论点 Required Description
xmlStr 指定单个格式良好的 XML 记录的 STRING 表达式。
options 指定指令的 MAP<STRING,STRING> 文本。

返回一个 STRING,该字符串包含具有 n 个字符串字段的结构的定义,其中列名派生自 XML 元素和属性名称。 这些字段值保存派生的格式化 SQL 类型。

from_xml

使用 from_xml 将包含 XML 记录的 STRING 列解析为结构体。 直接提供架构定义,或使用 schema_of_xml 的输出。

语法from_xml(xmlStr, schema [, options])

论点 Required Description
xmlStr 指定单个格式良好的 XML 记录的 STRING 表达式。
schema STRING 表达式或对 schema_of_xml 函数的调用。
options 指定指令的 MAP<STRING,STRING> 文本。

返回一个结构,其中包含与架构定义匹配的字段名称和类型。 模式必须定义为逗号分隔的列名和数据类型对,例如在CREATE TABLE中使用。 “ 选项” 部分中显示的大多数选项都适用,但有以下例外:

  • rowTag:由于只有一条 XML 记录,因此 rowTag 选项不适用。
  • mode(默认值为 PERMISSIVE):允许采用在分析期间处理损坏记录的模式。
    • PERMISSIVE:遇到损坏的记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。 若要保留损坏的记录,可以设置以用户定义的架构命名 columnNameOfCorruptRecord 的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 推理架构时,它会在输出架构中隐式添加 columnNameOfCorruptRecord 字段。
    • FAILFAST:遇到损坏的记录时抛出异常。

示例

若要分析 XML 字符串列,请使用 schema_of_xml 推断架构,然后将其传递给 from_xml

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>
"""

df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala(编程语言)

import org.apache.spark.sql.functions.{from_xml, schema_of_xml, lit}

val xmlData = """
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>""".stripMargin

val df = Seq((1, xmlData)).toDF("review_id", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

在 SQL 中分析内联 XML:

SELECT from_xml('
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>',
  schema_of_xml('
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>')
);

在 XML 和数据帧结构之间进行转换

由于 DataFrame 和 XML 之间存在结构差异,因此对于从 XML 数据转换为 DataFrame 以及从 DataFrame 转换为 XML 数据来说,有一些转换规则。 请注意,可以使用选项 excludeAttribute 禁用处理属性。

将 XML 转换为 DataFrame

读取 XML 时,Azure Databricks根据以下规则将 XML 元素和属性映射到 DataFrame 字段。

属性将转换为具有标题前缀 attributePrefix的字段。

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

这会生成以下架构:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

包含属性或子元素的元素中的字符数据将被解析到 valueTag 字段中。 如果字符数据多次出现,则 valueTag 字段将转换为类型 array

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

这会生成以下架构:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _myTwoAttrib: string (nullable = true)
 |-- three: string (nullable = true)

从 DataFrame 转换为 XML

将数据帧写入 XML 时,由于 DataFrame 和 XML 数据模型之间的差异,某些嵌套结构需要特殊处理。

如果 DataFrame 包含一个 ArrayType 字段,且该字段的元素类型也是 ArrayType,则将其写入 XML 时会多生成一层嵌套,而在对 XML 文件执行往返转换时不会出现这种情况。 这只会影响 XML 外部源的数据帧 — 读取和写入 XML 文件会保留原始结构。

例如,具有以下架构的数据帧:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

以及以下数据:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

生成以下 XML 输出:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

选项 DataFrame 指定 arrayElementName 中未命名数组的元素名称(默认值:item)。

启用已获救的数据列

已获救的数据列可确保在 ETL 期间永远不会丢失数据。 它捕获未分析的任何数据,因为记录中的一个或多个字段存在下列问题之一:

  • 不存在于提供的架构中。
  • 与提供的架构的数据类型不匹配。
  • 与提供的架构中的字段名称大小写不匹配。

已获救的数据列作为 JSON 文档返回,其中包含已获救的列和记录的源文件路径。

若要启用已获救的数据列,在读取时将 rescuedDataColumn 选项设置为列名:

Python

df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")

Scala(编程语言)

val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_xml',
  format => 'xml',
  rowTag => 'review',
  rescuedDataColumn => '_rescued_data'
)

若要从已获救的数据列中删除源文件路径,请设置:

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

分析记录时,XML 分析程序支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录(即不完整或格式错误的 XML)会被删除或引发错误。

使用自动加载程序推断和改进架构

关于此主题以及适用选项的详细讨论,请参阅 自动加载器中的架构推理和演变配置。 可以将自动加载程序配置为自动检测已加载的 XML 数据的架构,使你能够初始化表,而无需显式声明数据架构,并在引入新列时改进表架构。 这样就无需一直手动跟踪和应用架构更改。

默认情况下,自动加载程序架构推理会试图避免由于类型不匹配而出现的架构演变问题。 对于不编码数据类型(JSON、CSV 和 XML)的格式,自动加载程序会将所有列推断为字符串,包括 XML 文件中的嵌套字段。 Apache Spark DataFrameReader 使用不同的行为进行架构推理,根据示例数据为 XML 源中的列选择数据类型。 若要使用自动加载程序实现此行为,请将选项 cloudFiles.inferColumnTypes 设置为 true

自动加载程序在处理数据时会检测是否添加了新列。 当自动加载程序检测到新列时,流会停止并出现 UnknownFieldException。 在流引发此错误之前,自动加载程序会在最新的数据微批上执行架构推理,并通过将新列合并到架构末尾来使用最新架构更新架构位置。 现有列的数据类型将保持不变。 自动加载程序支持 架构演变的不同模式,可在选项 cloudFiles.schemaEvolutionMode中设置这些模式。

可以使用 架构提示 来确保您已知和期望的架构信息应用于推断的架构。 如果知道某列是特定的数据类型,或者想选择更通用的数据类型(例如选择双精度而不是整数),则可以使用 SQL 模式规范语法,以字符串形式为列数据类型提供任意数量的提示。 启用补救数据列时,以非架构所用的大小写形式命名的字段将加载到 _rescued_data 列。 你可以通过将选项 readerCaseSensitive 设置为 false 来更改此行为,在这种情况下自动加载程序将以不区分大小写的方式读取数据。

Usage

以下示例使用 Wanderbricks 数据集 演示如何使用 Spark 数据帧 API 和 SQL 读取和写入 XML 文件。

读取和写入 XML

使用 DataFrame API 将 Wanderbricks 评论写入 XML 并读回。

Python

# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
  .format("xml") \
  .option("rootTag", "reviews") \
  .option("rowTag", "review") \
  .save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")

# Read the XML file back
df_read = spark.read \
  .format("xml") \
  .option("rowTag", "review") \
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()

Scala(编程语言)

// Write Wanderbricks reviews to XML
val df = spark.read.table("samples.wanderbricks.reviews")
df.write
  .format("xml")
  .option("rootTag", "reviews")
  .option("rowTag", "review")
  .save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")

// Read the XML file back
val dfRead = spark.read
  .format("xml")
  .option("rowTag", "review")
  .xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
dfRead.show()

R

df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")

读取数据时,可以手动指定架构:

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("rating", IntegerType(), True),
    StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()

Scala(编程语言)

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true)))
val df = spark.read.option("rowTag", "review").schema(customSchema).xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.show()

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("rating", "integer"),
  structField("comment", "string"))

df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", schema = customSchema, rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")

使用 SQL 读取和写入 XML

使用 SQL DDL 从 XML 文件创建表。 Azure Databricks自动推断列类型。

DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;

还可以在 DDL 中指定列名和类型。 在这种情况下,不会自动推断架构。

DROP TABLE IF EXISTS reviews;

CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");

使用 COPY INTO 加载 XML

用于 COPY INTO 将 XML 文件从云存储加载到 Delta 表中。

DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;

COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');

读取 XML 并进行行验证

使用选项 rowValidationXSDPath 在读取时针对 XSD 架构验证每行。

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "review")
    .option("rowValidationXSDPath", xsdPath)
    .load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()

Scala(编程语言)

val df = spark.read
  .option("rowTag", "review")
  .option("rowValidationXSDPath", xsdPath)
  .xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.printSchema

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
  format => 'xml',
  rowTag => 'review',
  rowValidationXSDPath => '/Volumes/<catalog>/<schema>/<volume>/reviews.xsd'
)

使用自动加载程序加载 XML

使用自动加载程序将 XML 文件从云存储持续引入到具有自动架构推理和演变的 Delta 表中。

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "review")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("reviews")
)

Scala(编程语言)

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "review")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())
  .toTable("reviews")

其他资源