CSV(逗号分隔值)是一种纯文本表格格式,通常用于数据交换、ETL 管道和常规用途数据存储。 Azure Databricks 支持使用 Apache Spark 读取和写入 CSV 文件,包括架构推断、压缩、格式错误记录处理和救援数据。
注意
Databricks 建议 SQL 用户使用read_files来读取 CSV 文件。
read_files 在 Databricks Runtime 13.3 LTS 及更高版本中可用。
还可以使用临时视图。 如果使用 SQL 直接读取 CSV 数据而不使用临时视图或 read_files,则存在以下限制:
先决条件
Azure Databricks不需要其他配置才能使用 CSV 文件。 但是,若要流式传输 CSV 文件,需要 自动加载程序。
选项
使用.option()和.options()的DataFrameReader和DataFrameWriter方法来配置 CSV 数据源。 有关支持选项的完整列表,请参阅 DataFrameReader CSV 选项 和 DataFrameWriter CSV 选项。
Usage
以下示例演示如何读取和写入 CSV 文件、指定架构以及处理格式不正确的记录。
阅读 CSV 文件
以下示例使用 Wanderbricks 示例数据集。 它将审阅数据写入 CSV,然后将其读回。
Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
Scala
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
R
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
使用 SQL 读取 CSV 文件
以下 SQL 示例使用 read_files 读取 CSV 文件。
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
指定架构
当 CSV 文件的架构已知时,可以用 schema 选项向 CSV 读取器指定所需的架构。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
读取列的子集
CSV 分析程序的行为取决于读取哪些列。 如果指定的架构与文件布局不匹配,则结果可能会有很大差异,具体取决于访问哪些列。 CSV 没有列名元数据,因此 Spark 按位置将架构字段映射到列 - 不匹配的架构会将值转移到错误的字段中。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
处理格式不正确的 CSV 记录
使用指定的架构读取 CSV 文件时,文件中的数据可能与架构不匹配。 例如,包含城市名称的字段将不会分析为整数。 结果取决于分析程序运行的模式:
-
PERMISSIVE(默认):对于无法正确分析的字段,插入 null -
DROPMALFORMED:删除包含无法分析的字段的行 -
FAILFAST:如果发现任何格式错误的数据,则中止读取
若要设置模式,请使用 mode 选项。
Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
Scala
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
在 PERMISSIVE 模式下,可以使用以下方法之一检查无法正确分析的行:
- 你可以提供指向选项
badRecordsPath的自定义路径,以将损坏的记录记录到文件中。 - 你可以将列
_corrupt_record添加到提供给 DataFrameReader 的架构中,以查看生成的 DataFrame 中的损坏的记录。
注意
badRecordsPath 选项优先于 _corrupt_record,这意味着写入所提供路径的格式错误的行不会在生成的 DataFrame 中显示。
使用补救数据列时,格式错误的记录的默认行为会发生变化。
若要使用 _corrupt_record 检查格式错误的行,请将其添加到架构中,并筛选出非 null 的值:
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
启用已获救的数据列
注意
Databricks Runtime 8.3 及更高版本中支持此功能。
使用 PERMISSIVE 模式时,可以启用已获救数据列来捕获未分析的任何数据,因为记录中的一个或多个字段存在下列问题之一:
- 不存在于提供的架构中。
- 与提供的架构的数据类型不匹配。
- 与提供的架构中的字段名称大小写不匹配。
补救数据列以 JSON 文档形式返回,其中包含已补救的列和记录的源文件路径。
若要启用已获救的数据列,在读取时将 rescuedDataColumn 选项设置为列名:
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
Scala
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
若要从已获救的数据列中删除源文件路径,请设置:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
分析记录时,CSV 分析器支持三种模式:PERMISSIVE、DROPMALFORMED 和 FAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录(即不完整或格式错误的 CSV)会被删除或引发错误。
在 rescuedDataColumn 模式下使用 PERMISSIVE 时,以下规则适用于损坏的记录:
- 文件的第一行(标题行或数据行)设置预期的行长度。
- 具有不同列数的行视为不完整。
- 数据类型不匹配不视为记录受损。
- 只有不完整和格式错误的 CSV 记录会视为损坏并记录到
_corrupt_record列或badRecordsPath。
其他资源
- 读取和写入 Parquet 文件:如果工作负荷需要更好的查询性能或更高效的存储,Parquet 的列式布局比 CSV 的纯文本格式具有显著优势。