读取和写入 CSV 文件

CSV(逗号分隔值)是一种纯文本表格格式,通常用于数据交换、ETL 管道和常规用途数据存储。 Azure Databricks 支持使用 Apache Spark 读取和写入 CSV 文件,包括架构推断、压缩、格式错误记录处理和救援数据。

注意

Databricks 建议 SQL 用户使用read_files来读取 CSV 文件。 read_files 在 Databricks Runtime 13.3 LTS 及更高版本中可用。

还可以使用临时视图。 如果使用 SQL 直接读取 CSV 数据而不使用临时视图或 read_files,则存在以下限制:

先决条件

Azure Databricks不需要其他配置才能使用 CSV 文件。 但是,若要流式传输 CSV 文件,需要 自动加载程序

选项

使用.option().options()DataFrameReaderDataFrameWriter方法来配置 CSV 数据源。 有关支持选项的完整列表,请参阅 DataFrameReader CSV 选项DataFrameWriter CSV 选项

Usage

以下示例演示如何读取和写入 CSV 文件、指定架构以及处理格式不正确的记录。

阅读 CSV 文件

以下示例使用 Wanderbricks 示例数据集。 它将审阅数据写入 CSV,然后将其读回。

Python

# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

# Read the CSV file into a DataFrame
df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()

Scala

// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

// Read the CSV file into a DataFrame
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()

R

df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)

使用 SQL 读取 CSV 文件

以下 SQL 示例使用 read_files 读取 CSV 文件。

-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
  'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

指定架构

当 CSV 文件的架构已知时,可以用 schema 选项向 CSV 读取器指定所需的架构。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True),
  StructField("comment", StringType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true)
))

val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  schema => 'review_id string, rating int, comment string'
)

读取列的子集

CSV 分析程序的行为取决于读取哪些列。 如果指定的架构与文件布局不匹配,则结果可能会有很大差异,具体取决于访问哪些列。 CSV 没有列名元数据,因此 Spark 按位置将架构字段映射到列 - 不匹配的架构会将值转移到错误的字段中。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Read only a subset of columns by specifying a partial schema
schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true)
))

val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  schema => 'review_id string, rating int'
)

处理格式不正确的 CSV 记录

使用指定的架构读取 CSV 文件时,文件中的数据可能与架构不匹配。 例如,包含城市名称的字段将不会分析为整数。 结果取决于分析程序运行的模式:

  • PERMISSIVE(默认):对于无法正确分析的字段,插入 null
  • DROPMALFORMED:删除包含无法分析的字段的行
  • FAILFAST:如果发现任何格式错误的数据,则中止读取

若要设置模式,请使用 mode 选项。

Python

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)

Scala

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE'
)

PERMISSIVE 模式下,可以使用以下方法之一检查无法正确分析的行:

  • 你可以提供指向选项 badRecordsPath 的自定义路径,以将损坏的记录记录到文件中。
  • 你可以将列 _corrupt_record 添加到提供给 DataFrameReader 的架构中,以查看生成的 DataFrame 中的损坏的记录。

注意

badRecordsPath 选项优先于 _corrupt_record,这意味着写入所提供路径的格式错误的行不会在生成的 DataFrame 中显示。

使用补救数据列时,格式错误的记录的默认行为会发生变化。

若要使用 _corrupt_record 检查格式错误的行,请将其添加到架构中,并筛选出非 null 的值:

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True),
  StructField("comment", StringType(), True),
  StructField("_corrupt_record", StringType(), True)
])

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true),
  StructField("_corrupt_record", StringType, nullable = true)
))

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

df.filter(df("_corrupt_record").isNotNull).show()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL

启用已获救的数据列

注意

Databricks Runtime 8.3 及更高版本中支持此功能。

使用 PERMISSIVE 模式时,可以启用已获救数据列来捕获未分析的任何数据,因为记录中的一个或多个字段存在下列问题之一:

  • 不存在于提供的架构中。
  • 与提供的架构的数据类型不匹配。
  • 与提供的架构中的字段名称大小写不匹配。

补救数据列以 JSON 文档形式返回,其中包含已补救的列和记录的源文件路径。

若要启用已获救的数据列,在读取时将 rescuedDataColumn 选项设置为列名:

Python

df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

Scala

val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  rescuedDataColumn => '_rescued_data'
)

若要从已获救的数据列中删除源文件路径,请设置:

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

分析记录时,CSV 分析器支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录(即不完整或格式错误的 CSV)会被删除或引发错误。

rescuedDataColumn 模式下使用 PERMISSIVE 时,以下规则适用于损坏的记录

  • 文件的第一行(标题行或数据行)设置预期的行长度。
  • 具有不同列数的行视为不完整。
  • 数据类型不匹配不视为记录受损。
  • 只有不完整和格式错误的 CSV 记录会视为损坏并记录到 _corrupt_record 列或 badRecordsPath

其他资源

  • 读取和写入 Parquet 文件:如果工作负荷需要更好的查询性能或更高效的存储,Parquet 的列式布局比 CSV 的纯文本格式具有显著优势。