Informazioni di riferimento sulle opzioni dell'API Spark

Questa pagina elenca le opzioni di input e output disponibili per le API Spark che leggono e scrivono dati.

Opzioni DataFrameReader

Usare queste opzioni con DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO e Auto Loader per controllare come Azure Databricks legge i file di dati.

Example

L'esempio seguente imposta su multiLineTrue per la lettura di file JSON:

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

Comune

Le opzioni seguenti si applicano a tutti i formati di file.

Chiave Predefinito Valori validi Description
ignoreCorruptFiles false true, false Indica se ignorare i file danneggiati. Se true, i processi Spark continueranno a essere eseguiti quando si verificano file danneggiati e il contenuto letto verrà comunque restituito. Per COPY INTOè possibile osservare i file danneggiati ignorati come numSkippedCorruptFiles nella operationMetrics colonna della cronologia delta Lake. Disponibile in Databricks Runtime 11.3 LTS e versioni successive.
ignoreMissingFiles false per caricatore automatico, true per COPY INTO (legacy) true, false Indica se ignorare i file mancanti. Se true, i processi Spark continuano a essere eseguiti quando vengono rilevati file mancanti e il contenuto viene comunque restituito. Disponibile in Databricks Runtime 11.3 LTS e versioni successive.
modifiedAfter None Stringa timestamp Timestamp facoltativo come filtro per inserire solo i file con un timestamp di modifica dopo il timestamp specificato.
modifiedBefore None Stringa timestamp Timestamp facoltativo come filtro per inserire solo i file con un timestamp di modifica prima del timestamp specificato.
pathGlobFilter oppure fileNamePattern None Stringa di pattern glob Un potenziale modello GLOB per la scelta dei file. Equivalente a PATTERN in COPY INTO (legacy). fileNamePattern può essere usato in read_files.
recursiveFileLookup false true, false Quando true, questa opzione esegue la ricerca nelle directory nidificate anche se i nomi non seguono uno schema di denominazione di partizione come date=2019-07-01.

Avro

Quando si leggono i file Avro, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
avroSchema None Stringa dello schema Avro Schema facoltativo specificato da un utente in formato Avro. Quando si legge Avro, questa opzione può essere impostata su uno schema evoluto compatibile ma diverso dallo schema Avro effettivo. Lo schema di deserializzazione è coerente con lo schema evoluto. Ad esempio, se si imposta uno schema evoluto contenente una colonna aggiuntiva con un valore predefinito, il risultato di lettura contiene anche la nuova colonna.
avroSchemaEvolutionMode none none, restart Come gestire l'evoluzione dello schema quando si usa un registro schemi. none ignora le modifiche dello schema e continua il processo. restart genera un oggetto UnknownFieldException quando vengono rilevate modifiche dello schema e richiede un riavvio del processo.
datetimeRebaseMode LEGACY EXCEPTION, LEGACY, CORRECTED Controlla la ricalibrazione dei valori DATE e TIMESTAMP tra i calendari giuliano e gregoriano prolettico.
enableStableIdentifiersForUnionType false true, false Indica se usare nomi di campo stabili per i tipi di Unione Avro. Se abilitata, i nomi dei campi dei tipi di unione vengono derivati dai relativi nomi di tipo in lettere minuscole , ad esempio member_int, member_string. Genera un'eccezione se due nomi di tipo sono identici dopo la minuscola.
mergeSchema false true, false Indica se dedurre lo schema tra più file e unire lo schema di ogni file. mergeSchema per Avro non consente l'allentamento dei tipi di dati.
mode FAILFAST FAILFAST, PERMISSIVE, DROPMALFORMED Modalità parser per la gestione dei record danneggiati. FAILFAST genera un'eccezione. PERMISSIVE imposta campi in formato non valido su Null. DROPMALFORMED invisibile all'utente elimina i record negativi.
readerCaseSensitive true true, false Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
recursiveFieldMaxDepth None 0 a 15 Profondità massima di ricorsione per i campi Avro ricorsivi. Impostare su per 1 troncare tutti i campi ricorsivi, 2 per consentire un livello di ricorsione e così via fino a 15. Se non è impostato o 0, i campi ricorsivi non sono consentiti.
rescuedDataColumn None Stringa del nome di colonna Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.
stableIdentifierPrefixForUnionType member_ Qualsiasi stringa Prefisso da usare per i nomi dei campi del tipo di unione stabile quando enableStableIdentifiersForUnionType=true.

CSV

Quando si leggono i file CSV, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
badRecordsPath None Stringa di percorso Percorso in cui archiviare i file per registrare le informazioni sui record CSV non validi.
charToEscapeQuoteEscaping \0 Un singolo carattere Carattere utilizzato per sfuggire il carattere usato per sfuggire le virgolette. Ad esempio, per i seguenti record [ " a\\", b ]:
  • Se il carattere da usare come escape per '\' è indefinito, il record non verrà analizzato. Il parser leggerà i caratteri: [a],[\],["],[,],[ ],[b] e genererà un errore perché non riesce a trovare un'virgoletta di chiusura.
  • Se il carattere di escape del '\' viene definito come '\', il record verrà letto con 2 valori: [a\] e [b].
columnNameOfCorruptRecord _corrupt_record Stringa del nome di colonna Supportato per Auto Loader. Non supportato per COPY INTO (obsoleto).
Colonna per l'archiviazione di record malformati e non analizzabili. Se il parametro mode per il parsing è impostato su DROPMALFORMED, questa colonna sarà vuota.
comment \0 Un singolo carattere Definisce il carattere che rappresenta un commento di riga quando viene trovato all'inizio di una riga di testo. Usare '\0' per disabilitare l'ignoramento dei commenti.
dateFormat yyyy-MM-dd Stringa di formato data Formato per l'analisi delle stringhe di data.
emptyValue Stringa vuota Qualsiasi stringa Rappresentazione in forma di stringa di un valore di vuoto.
enableDateTimeParsingFallback false true, false Indica se eseguire il fallback al comportamento di analisi di data e timestamp legacy quando non è possibile analizzare un valore con il formato specificato. Quando false, gli errori di analisi generano un errore o producono null a seconda di mode.
encoding oppure charset UTF-8 Nome java.nio.charset.Charset Nome della codifica dei file CSV. Vederejava.nio.charset.Charset per l'elenco delle opzioni. UTF-16 e UTF-32 non possono essere usati quando multiline è true.
enforceSchema true true, false Indica se applicare forzatamente lo schema specificato o dedotto ai file CSV. Se l'opzione è abilitata, le intestazioni dei file CSV vengono ignorate. Questa opzione viene ignorata per impostazione predefinita quando si usa il caricatore automatico per salvare i dati e consentire l'evoluzione dello schema.
escape \ Un singolo carattere Carattere di escape da utilizzare durante l'analisi dei dati.
extension csv Stringa di estensione di file Estensione del nome file prevista per le letture. I file senza questa estensione vengono filtrati.
failOnUnknownFields false true, false Indica se non riuscire quando il record CSV contiene colonne non presenti nello schema. Quando false, le colonne non riconosciute vengono eliminate o salvate automaticamente a seconda di rescuedDataColumn.
failOnWidenedFields false true, false Indica se non è possibile eseguire l'analisi di un valore di campo come tipo di schema dichiarato senza ampliare. Quando false, i valori con estensione di tipo vengono salvati automaticamente a seconda di rescuedDataColumn. L'impostazione failOnUnknownFields=true può mascherare gli effetti di questa opzione.
header false true, false Indica se i file CSV contengono un'intestazione. Auto Loader presuppone che i file abbiano intestazioni quando si deduce lo schema.
ignoreLeadingWhiteSpace false true, false Indica se ignorare gli spazi vuoti iniziali per ogni valore analizzato.
ignoreTrailingWhiteSpace false true, false Indica se ignorare gli spazi vuoti finali per ogni valore analizzato.
inferSchema false true, false Stabilire se dedurre i tipi di dati dei record CSV analizzati o assumere che tutte le colonne siano di StringType. Richiede un passaggio aggiuntivo sui dati se impostato su true. Per il caricatore automatico, usare cloudFiles.inferColumnTypes invece.
inputBufferSize 1048576 (1 MB) Numeri interi positivi Dimensioni del buffer in byte per il parser CSV. Utile per ottimizzare l'utilizzo della memoria durante l'analisi di file CSV di grandi dimensioni.
lineSep Nessuno, che copre \r, \r\ne \n Una stringa Una stringa di testo tra due record CSV consecutivi.
locale US Identificatore java.util.Locale Una Java impostazioni locali identificate che influiscono sull'analisi predefinita di data, timestamp e decimale all'interno del file CSV.
maxCharsPerColumn -1 Numeri interi positivi o -1 per illimitati Numero massimo di caratteri previsti da un valore da analizzare. Può essere usato per evitare errori di memoria. Il valore predefinito è -1, ovvero illimitato.
maxColumns 20480 Numeri interi positivi Limite rigido del numero di colonne che un record può avere.
mergeSchema false true, false Indica se dedurre lo schema tra più file e unire lo schema di ogni file. Abilitato per impostazione predefinita per l'Auto Loader quando si deduce lo schema.
mode PERMISSIVE PERMISSIVE, DROPMALFORMED, FAILFAST Modalità parser per la gestione di record malformati.
multiLine false true, false Indica se i record CSV si estendono su più righe.
nanValue NaN Qualsiasi stringa Rappresentazione di stringa di un valore non numerico durante l'analisi delle colonne FloatType e DoubleType.
negativeInf -Inf Qualsiasi stringa La rappresentazione sotto forma di stringa dell'infinito negativo durante l'analisi delle colonne FloatType o DoubleType.
nullValue Stringa vuota Qualsiasi stringa Rappresentazione in forma di stringa del valore null.
parserCaseSensitive (obsoleto) false true, false Durante la lettura dei file, decidere se allineare le colonne dichiarate nell'intestazione con lo schema rispettando la distinzione tra maiuscole e minuscole. Questa è true l'impostazione predefinita per Auto Loader. Le colonne che differiscono per caso verranno salvate nel rescuedDataColumn se abilitato. Questa opzione è stata deprecata a favore di readerCaseSensitive.
positiveInf Inf Qualsiasi stringa La rappresentazione in forma di stringa dell'infinito positivo mentre si analizzano le colonne FloatType o DoubleType.
preferDate true true, false Quando possibile, tenta di interpretare le stringhe come date anziché come timestamp. È anche necessario usare l'inferenza dello schema, abilitando inferSchema o usando cloudFiles.inferColumnTypes con il caricatore automatico.
quote " Un singolo carattere Carattere utilizzato per escludere i valori quando il delimitatore di campo fa parte del valore.
readerCaseSensitive true true, false Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
rescuedDataColumn None Stringa del nome di colonna Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
sep oppure delimiter , Una stringa Stringa di separazione tra le colonne.
singleVariantColumn None Stringa del nome di colonna Se impostato su un nome di colonna, legge l'intero record CSV in una singola VariantType colonna con tale nome anziché analizzare ogni campo nella propria colonna. Richiede header=true.
skipRows 0 Numeri interi positivi o 0 Numero di righe dall'inizio del file CSV da ignorare, incluse le righe commentate e vuote. Se header è vero, l'intestazione sarà la prima riga non ignorata e non commentata.
timeFormat HH:mm:ss Stringa di formato ora Formato per l'analisi dei valori delle TimeType colonne.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp Formato per l'analisi delle stringhe di timestamp.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Formato per l'analisi del timestamp senza stringhe di fuso orario (TimestampNTZType).
timeZone None Stringa java.time.ZoneId Oggetto java.time.ZoneId da utilizzare durante l'analisi di timestamp e date.
unescapedQuoteHandling STOP_AT_DELIMITER STOP_AT_CLOSING_QUOTE, BACK_TO_DELIMITER, STOP_AT_DELIMITER, SKIP_VALUERAISE_ERROR La strategia per il trattamento delle virgolette non scappate. Il comportamento di ogni opzione consentita è il seguente:
  • STOP_AT_CLOSING_QUOTE: se nell'input vengono trovate virgolette senza caratteri di escape, accumulare il carattere della virgoletta e continuare ad analizzare il valore come valore tra virgolette, fino a quando non viene trovata una virgoletta di chiusura.
  • BACK_TO_DELIMITER: Se nell'input si trovano virgolette non precedute da caratteri di escape, considerare il valore come un valore non quotato. In questo modo il parser accumula tutti i caratteri del valore analizzato corrente fino a quando non viene trovato il delimitatore definito da sep. Se non viene trovato alcun delimitatore nel valore, il parser continuerà ad accumulare caratteri dall'input fino a quando non viene trovato un delimitatore o una terminazione di riga.
  • STOP_AT_DELIMITER: Se nell'input si trovano virgolette non precedute da caratteri di escape, considerare il valore come un valore non quotato. In questo modo il parser accumula tutti i caratteri fino a quando il delimitatore definito da sep o una fine di riga viene trovata nell'input.
  • SKIP_VALUE: se nell'input vengono trovate virgolette non precedute da caratteri di escape, il contenuto analizzato per il valore specificato verrà ignorato (finché non viene trovato il delimitatore successivo) e il valore impostato in nullValue verrà prodotto.
  • RAISE_ERROR: se nell'input vengono trovate virgolette senza caratteri di escape, verrà generata un'eccezione TextParsingException .

Excel

Quando si legge Excel file, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
dataAddress None Stringa di un intervallo di celle o di un foglio Intervallo di celle da leggere nella sintassi Excel. Se omesso, legge tutte le celle valide dal primo foglio. Utilizzare SheetName!C5:H10 per leggere un intervallo da un foglio denominato, C5:H10 per leggere un intervallo dal primo foglio o SheetName per leggere tutti i dati da un foglio specifico.
headerRows 0 0, 1 Numero di righe iniziali da usare come intestazioni del nome di colonna. Quando dataAddress viene specificato, questo vale all'interno dell'intervallo di celle. Quando 0, i nomi di colonna vengono generati automaticamente come _c1, , _c3_c2e così via.
ignoreMissingSheet false true, false Indica se ignorare automaticamente i file che non contengono il foglio specificato da dataAddress. Quando false, viene generato un errore se manca un file nel foglio richiesto. Si applica solo quando viene specificato un nome di foglio in dataAddress.
includePhoneticRuns false true, false Indica se includere annotazioni fonetiche (ad esempio pinyin o furigana) concatenate ai valori di stringa delle celle durante la lettura di file XLSX.
operation readSheet readSheet, listSheets Operazione da eseguire nella cartella di lavoro Excel. readSheet legge i dati da un foglio. listSheets restituisce uno struct con campi sheetIndex: long e sheetName: String per ogni foglio.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Stringa di formato personalizzata per i valori timestamp-without-timezone archiviati come stringhe in Excel. I formati di data personalizzati seguono i formati dei modelli di data e ora.
dateFormat yyyy-MM-dd Stringa di formato data Stringa di formato personalizzata per i valori stringa letti come Date. I formati di data personalizzati seguono i formati dei modelli di data e ora.

JSON

Quando si leggono i file JSON, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
allowBackslashEscapingAnyCharacter false true, false Indica se consentire alle barre rovesciate di eseguire l'escape di qualsiasi carattere che abbia esito positivo. Se non è abilitata, solo i caratteri specificatamente elencati dalla specifica JSON possono essere sottoposti a escape.
allowComments false true, false Indica se consentire o meno l'uso dei commenti di stile Java, C e C++ ('/', '*' e '//' ) all'interno del contenuto analizzato.
allowNonNumericNumbers true true, false Indica se consentire l'insieme di token non numerici (NaN) come valori numerici in virgola mobile legali.
allowNumericLeadingZeros false true, false Indica se consentire ai numeri integrali di iniziare con zeli aggiuntivi (ignorabili), (ad esempio 000001).
allowSingleQuotes true true, false Se consentire l'uso di virgolette singole (apostrofo, carattere '\') per quotare stringhe (nomi e valori di tipo String).
allowUnquotedControlChars false true, false Indica se consentire alle stringhe JSON di contenere caratteri di controllo senza escape (caratteri ASCII con valore minore di 32, inclusi i caratteri di tabulazione e avanzamento riga) o meno.
allowUnquotedFieldNames false true, false Indica se consentire l'uso di nomi di campo senza virgo chiave, consentiti da JavaScript, ma non dalla specifica JSON.
alternateVariantEncoding None Z85 Codifica usata per i valori Variant nel codice JSON di origine. Impostare su per Z85 decodificare i valori Variant con codifica Base85 anziché archiviati come JSON inline.
badRecordsPath None Stringa di percorso Percorso per archiviare i file per registrare le informazioni sui record JSON non validi.
L'uso dell'opzione badRecordsPath in un'origine dati basata su file presenta le limitazioni seguenti:
  • Non transazionale e può causare risultati incoerenti.
  • Gli errori temporanei vengono considerati errori.
columnNameOfCorruptRecord _corrupt_record Stringa del nome di colonna Colonna per l'archiviazione di record malformati e non analizzabili. Se il parametro mode per il parsing è impostato su DROPMALFORMED, questa colonna sarà vuota.
dateFormat yyyy-MM-dd Stringa di formato data Formato per l'analisi delle stringhe di data.
dropFieldIfAllNull false true, false Indica se ignorare colonne con tutti i valori nulli o matrici e strutture vuote durante l'inferenza dello schema.
encoding oppure charset UTF-8 Nome java.nio.charset.Charset Nome della codifica dei file JSON. Vedere java.nio.charset.Charset per l'elenco delle opzioni. Non è possibile usare UTF-16 e UTF-32 quando multiline è true.
inferTimestamp false true, false Indica se provare a dedurre stringhe di timestamp come TimestampType. Se impostato su true, l'inferenza dello schema potrebbe richiedere molto più tempo. È necessario abilitare cloudFiles.inferColumnTypes per l’uso con il caricatore automatico.
lineSep Nessuno, che copre \r, \r\ne \n Una stringa Una stringa tra due record JSON consecutivi.
locale US Identificatore java.util.Locale Identificatore delle impostazioni locali Java che influisce sull'analisi predefinita di data, timestamp e decimale all'interno di JSON.
maxNestingDepth 500 Numeri interi positivi Profondità massima consentita di annidamento per oggetti e matrici JSON. Aumentare questo valore per i documenti annidati in modo approfondito.
maxNumLen 1000 Numeri interi positivi Lunghezza massima dei token numerici nell'input JSON. Aumentare questo valore per JSON con valori letterali numerici di grandi dimensioni.
maxStringLen illimitata Numeri interi positivi Lunghezza massima dei valori stringa nell'input JSON. Impostare per limitare l'utilizzo della memoria durante l'analisi di JSON con stringhe di grandi dimensioni.
mode PERMISSIVE PERMISSIVE, DROPMALFORMED, FAILFAST Modalità parser per la gestione di record malformati.
multiLine false true, false Indica se i record JSON si estendono su più righe.
prefersDecimal false true, false Tenta di dedurre stringhe come DecimalType invece di tipo float o double, quando possibile. È anche necessario usare l'inferenza dello schema, abilitando inferSchema o usando cloudFiles.inferColumnTypes con il caricatore automatico.
primitivesAsString false true, false Stabilisce se dedurre tipi primitivi come i numeri e i booleani sotto forma di StringType.
readerCaseSensitive true true, false Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. Disponibile in Databricks Runtime 13.3 e versioni successive.
rescuedDataColumn None Stringa del nome di colonna Indica se raccogliere tutti i dati che non possono essere analizzati a causa di una mancata corrispondenza del tipo di dati o di una mancata corrispondenza dello schema (inclusa la combinazione di maiuscole e minuscole) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altre informazioni, vedere Che cos'è la colonna di dati salvata?
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
singleVariantColumn None Stringa del nome di colonna Indica se inserire l'intero documento JSON, analizzato in una singola colonna Variant con la stringa specificata come nome della colonna. Se non è impostato, i campi JSON vengono inseriti nelle proprie colonne.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp Formato per l'analisi delle stringhe di timestamp.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Formato per l'analisi del timestamp senza stringhe di fuso orario (TimestampNTZType).
timeZone None Stringa java.time.ZoneId Oggetto java.time.ZoneId da utilizzare durante l'analisi di timestamp e date.
upgradeExceptionAsBadRecord false true, false Indica se considerare le eccezioni di aggiornamento del tipo (ad esempio, quando un valore non può essere ampliato al tipo di colonna dichiarato) come record non validi anziché generare un'eccezione.

Kafka

Per l'elenco completo delle opzioni del lettore Kafka, vedere Opzioni Kafka di DataStreamReader. Le opzioni seguenti si applicano solo alle letture batch tramite spark.read.format("kafka").

Chiave Predefinito Valori validi Description
endingOffsets latest latesto una stringa di offset JSON Dove interrompere la lettura. Nella stringa -1 JSON è l'offset più recente. -2, ovvero l'offset meno recente, non è consentito come offset finale. Si tratta di una stringa di offset JSON di esempio: {"topicA":{"0":50,"1":-1}}.
endingOffsetsByTimestamp None Stringa di timestamp JSON Offset finali per partizione specificati come timestamp in millisecondi. Ad esempio: {"topicA":{"0":2000,"1":3000}}.
endingTimestamp None Numeri interi positivi o 0 Timestamp finale globale in millisecondi applicato a tutte le partizioni.

ORCO

Durante la lettura dei file ORC si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
mergeSchema false true, false Indica se dedurre lo schema tra più file e unire lo schema di ogni file.

Parquet

Quando si leggono i file Parquet, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
datetimeRebaseMode LEGACY EXCEPTION, LEGACY, CORRECTED Controlla la ricalibrazione dei valori DATE e TIMESTAMP tra i calendari giuliano e gregoriano prolettico.
int96RebaseMode LEGACY EXCEPTION, LEGACY, CORRECTED Controlla la ricalibrazione dei valori di timestamp INT96 tra i calendari Julian e Gregoriano prolettico.
mergeSchema false true, false Indica se dedurre lo schema tra più file e unire lo schema di ogni file.
readerCaseSensitive true true, false Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
rescuedDataColumn None Stringa del nome di colonna Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.

Archivio stati

Usare queste opzioni con spark.read.format("statestore") o la read_statestore funzione con valori di tabella per leggere i dati dello stato structured streaming. Vedi Leggi le informazioni sullo stato di Structured Streaming.

Chiave Predefinito Valori validi Description
batchId ID batch più recente Numeri interi positivi o 0 Batch di destinazione da cui leggere. Usare per eseguire query su uno stato precedente della query. Il batch deve essere sottoposto a commit ma non ancora pulito.
operatorId 0 Numeri interi positivi o 0 Operatore di destinazione da cui leggere. Usare quando la query dispone di più operatori con stato.
storeName DEFAULT Qualsiasi stringa Nome dell'archivio stati di destinazione da cui leggere. Usare quando l'operatore con stato ha più istanze dell'archivio stati. È necessario specificare storeName o joinSide per un join di flusso, ma non entrambi.
joinSide None left, right Lato di destinazione da cui leggere per un join di flusso.The target side to read from for a stream-stream join. È necessario specificare storeName o joinSide per un join di flusso, ma non entrambi.
snapshotStartBatchId None Numeri interi positivi o 0 ID batch dello snapshot da usare come punto iniziale durante la lettura dello stato. Il lettore ricompila lo stato riproducendo le modifiche da questo snapshot fino a batchId. Utile quando uno snapshot è danneggiato. Deve specificare insieme a snapshotPartitionId. Non è possibile usare con readChangeFeed. Supporta l'archivio stati supportato da HDFS e l'archivio stati RocksDB con checkpoint del log delle modifiche abilitati. Disponibile in Databricks Runtime 15.4 LTS e versioni successive.
snapshotPartitionId None Numeri interi positivi o 0 Se specificato, la query legge solo questa partizione. Deve specificare insieme a snapshotStartBatchId. Non è possibile usare con readChangeFeed. Disponibile in Databricks Runtime 15.4 LTS e versioni successive.
readChangeFeed false true, false Quando true, restituisce le modifiche di stato in un intervallo specificato di batch tra changeStartBatchId e changeEndBatchId. Richiede changeStartBatchId. Impossibile usare con joinSide, batchIdsnapshotStartBatchId, o snapshotPartitionId. Disponibile in Databricks Runtime 16.4 LTS e versioni successive.
Per informazioni dettagliate, vedere Leggere le modifiche dello stato di Structured Streaming.
changeStartBatchId None Numeri interi positivi o 0 ID batch iniziale per l'intervallo di feed di modifiche. Obbligatorio quando readChangeFeed è true. Si applica solo quando readChangeFeed è impostato su true. Disponibile in Databricks Runtime 16.4 LTS e versioni successive.
changeEndBatchId ID batch più recente Numeri interi positivi o 0 ID batch finale per l'intervallo di feed di modifiche. Deve essere maggiore o uguale a changeStartBatchId. Si applica solo quando readChangeFeed è impostato su true. Disponibile in Databricks Runtime 16.4 LTS e versioni successive.
stateVarName None Qualsiasi stringa Nome della variabile di stato da leggere. Il nome della variabile di stato è il nome univoco di ogni variabile all'interno della init funzione di un StatefulProcessor oggetto usato dall'operatore transformWithState . Obbligatorio quando si usa l'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive.
readRegisteredTimers false true, false Quando true, legge i timer registrati usati dall'operatore transformWithState . Si applica solo all'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive.
flattenCollectionTypes true true, false Quando true, rende flat i record restituiti per le variabili di stato della mappa e dell'elenco. Quando false, restituisce i record come Spark SQL Array o Map. Si applica solo all'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive.

Testo

Durante la lettura dei file di testo si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
encoding UTF-8 Nome java.nio.charset.Charset Nome della codifica del separatore di riga del file TEXT. Il contenuto del file non è interessato da questa opzione e viene letto as-is.
lineSep Nessuno, che copre \r, \r\n e \n Una stringa Una stringa tra due record TEXT consecutivi.
wholeText false true, false Indica se leggere un file come singolo record.

XML (Extensible Markup Language)

Durante la lettura dei file XML si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
rowTag None Qualsiasi stringa Il tag di riga dei file XML da trattare come una riga. Nell'esempio XML <book> <page><page>...<book>, il valore appropriato è page. Si tratta di un'opzione obbligatoria.
samplingRatio 1.0 0.0 a 1.0 Definisce una frazione di righe utilizzate per l'inferenza dello schema. Le funzioni predefinite XML ignorano questa opzione.
excludeAttribute false true, false Indica se escludere gli attributi negli elementi.
mode None PERMISSIVE, DROPMALFORMED, FAILFAST Modalità per gestire i record danneggiati durante l'analisi.
  • PERMISSIVE: per i record danneggiati, inserisce la stringa in formato non valido in un campo configurato da columnNameOfCorruptRecord e imposta i campi in formato non valido su null. Per mantenere i record danneggiati, è possibile impostare un campo di tipo string denominato columnNameOfCorruptRecord in uno schema definito dall'utente. Se il campo non è presente in uno schema, i record danneggiati vengono eliminati durante l'analisi. Quando si deduce uno schema, il parser aggiunge in modo implicito un campo columnNameOfCorruptRecord in uno schema di output.
  • DROPMALFORMED: ignora i record danneggiati. Questa modalità non è supportata per le funzioni predefinite XML.
  • FAILFAST: genera un'eccezione quando il parser incontra i record danneggiati.
inferSchema true true, false Se true, tenta di dedurre un tipo appropriato per ogni colonna DataFrame risultante. Se false, tutte le colonne risultanti sono di tipo string. Le funzioni predefinite XML ignorano questa opzione.
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord Stringa del nome di colonna Consente di rinominare il nuovo campo contenente una stringa in formato non valido creata dalla PERMISSIVE modalità.
attributePrefix None Qualsiasi stringa Prefisso per gli attributi per distinguere gli attributi dagli elementi. Questo sarà il prefisso per i nomi dei campi. Il valore predefinito è _. Può essere vuoto per la lettura del codice XML, ma non per la scrittura. Si applica anche alle opzioni XML DataFrameWriter.
valueTag _VALUE Qualsiasi stringa Tag utilizzato per i dati di caratteri all'interno di elementi che hanno anche attributi o elementi figlio. L'utente può specificare il campo valueTag nello schema oppure verrà aggiunto automaticamente durante l'inferenza dello schema quando i dati di tipo carattere sono presenti in elementi con altri elementi o attributi. Si applica anche alle opzioni XML DataFrameWriter.
encoding UTF-8 Nome java.nio.charset.Charset Per la lettura, decodifica i file XML in base al tipo di codifica specificato. Per la scrittura, specifica la codifica (charset) dei file XML salvati. Le funzioni predefinite XML ignorano questa opzione. Si applica anche alle opzioni XML DataFrameWriter.
ignoreSurroundingSpaces true true, false Indica se gli spazi vuoti che circondano i valori devono essere ignorati. I dati di caratteri composti solo da spazi bianchi vengono ignorati.
rowValidationXSDPath None Stringa di percorso del file Percorso di un file XSD facoltativo utilizzato per convalidare il codice XML per ogni riga singolarmente. Le righe che non riescono a convalidare vengono considerate come errori di analisi. L'XSD non influisce in caso contrario sullo schema, indipendentemente dal fatto che sia specificato o dedotto.
ignoreNamespace false true, false Se true, i prefissi degli spazi dei nomi sugli elementi e gli attributi XML vengono ignorati. I tag <abc:author> e <def:author>, ad esempio, vengono considerati come se entrambi siano solo <author>. I namespace non possono essere ignorati nell'elemento rowTag, solo nei suoi figli leggibili. L'analisi XML non è sensibile ai namespace anche se false.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp Stringa di formato timestamp personalizzata che segue il formato del modello datetime pattern. Questo vale per il tipo timestamp. Si applica anche alle opzioni XML DataFrameWriter.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Stringa di formato personalizzata per timestamp senza fuso orario che segue il formato del modello datetime. Questo vale per il tipo TimestampNTZType. Si applica anche alle opzioni XML DataFrameWriter.
dateFormat yyyy-MM-dd Stringa di formato data Stringa di formato data personalizzata che segue il modello datetime. Questo vale per il tipo di data. Si applica anche alle opzioni XML DataFrameWriter.
locale en-US Tag di lingua IETF BCP 47 Imposta un locale come tag di lingua nel formato IETF BCP 47. Ad esempio, locale viene usato durante l'analisi di date e timestamp.
nullValue stringa null Qualsiasi stringa Imposta la rappresentazione in forma di stringa del valore null. Quando si tratta di null, il parser non scrive attributi ed elementi per i campi. Si applica anche alle opzioni XML DataFrameWriter.
readerCaseSensitive true true, false Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
rescuedDataColumn None Stringa del nome di colonna Indica se raccogliere tutti i dati che non possono essere analizzati a causa di una mancata corrispondenza di tipo di dati o di schema (inclusa una differenza nel caso delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, vedere Che cos'è la colonna di dati salvata?. COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
singleVariantColumn none Stringa del nome di colonna Specifica il nome della singola colonna variante. Se questa opzione viene specificata per la lettura, analizzare l'intero record XML in una singola colonna Variant con il valore della stringa di opzione specificato come nome della colonna. Se questa opzione viene specificata per la scrittura, scrivere il valore della singola colonna Variant nei file XML. Si applica anche alle opzioni XML DataFrameWriter.
useLegacyXMLParser true true, false Indica se usare il parser XML legacy. Il parser legacy ha una convalida meno rigorosa per il contenuto in formato non valido, ma è meno efficiente per la memoria. Impostare su per false acconsentire esplicitamente al parser predefinito più restrittivo.
wildcardColName xs_any Stringa del nome di colonna Nome della colonna utilizzato per acquisire elementi XML che corrispondono all'elemento dello schema con caratteri jolly (xs:any). Non può essere usato insieme a rescuedDataColumn.

Opzioni DataStreamReader

Usare queste opzioni con DataStreamReader.option() per configurare le letture di streaming dalle tabelle Delta Lake e da altre origini basate su file.

Per le opzioni di formato di file (JSON, CSV, Parquet e altri), vedere Opzioni DataFrameReader.

Per le opzioni del caricatore automatico (cloudFiles.*), vedere Caricamento automatico.

Example

L'esempio seguente imposta maxFilesPerTrigger su 10 per un flusso di tabelle Delta Lake:

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

Comune

Le opzioni seguenti si applicano alle tabelle Delta Lake e ad altre origini di streaming basate su file.

Chiave Predefinito Valori validi Description
cleanSource off off, delete, archive Come gestire i file di origine dopo l'elaborazione da parte del flusso. off non esegue alcuna azione. delete elimina definitivamente il file di origine. archive sposta il file in sourceArchiveDir. Se impostato su archive, sourceArchiveDir deve essere impostato anche . Non si applica allo streaming di tabelle Delta Lake.
fileNameOnly false true, false Indica se identificare i file già elaborati in base al nome file solo anziché al percorso completo. Quando true, i file in percorsi diversi con lo stesso nome file vengono considerati come lo stesso file e non vengono rielaborati. Non si applica allo streaming di tabelle Delta Lake.
latestFirst false true, false Indica se elaborare prima i file modificati più di recente all'interno di ogni micro batch. Utile quando si desidera elaborare i dati più recenti il più rapidamente possibile. Quando true e maxFilesPerTrigger o maxBytesPerTrigger è impostato, maxFileAge viene ignorato. Non si applica allo streaming di tabelle Delta Lake.
maxBytesPerTrigger None Numeri interi positivi Valore massimo flessibile per la quantità di dati elaborati per ogni micro batch. Un batch può elaborare più del limite se l'unità di input più piccola lo supera. Se usato insieme a maxFilesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto uno dei limiti.
Per il caricatore automatico, usare cloudFiles.maxBytesPerTrigger invece. Vedere Common( Comune).
maxCachedFiles 10000 Numeri interi positivi o 0 Numero massimo di file non elaborati da memorizzare nella cache per i micro batch successivi. Impostare su 0 per disattivare la memorizzazione nella cache. Aumentare questo valore quando la directory di origine contiene molti nuovi file per ogni trigger. Non si applica allo streaming di tabelle Delta Lake.
maxFileAge 7d Stringa di durata, 7d ad esempio o 4h Validità massima dei file considerati per l'elaborazione, rispetto al timestamp del file modificato più di recente anziché all'ora di sistema corrente. I file precedenti a questa soglia vengono ignorati. Ignorato quando latestFirst è true e maxFilesPerTrigger o maxBytesPerTrigger è impostato. Non si applica allo streaming di tabelle Delta Lake.
maxFilesPerTrigger 1000 per Delta Lake e Auto Loader. Nessun valore massimo per altre origini basate su file. Numeri interi positivi Limite superiore per il numero di nuovi file elaborati in ogni micro batch. Se usato insieme a maxBytesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto uno dei limiti.
Per il caricatore automatico, usare cloudFiles.maxFilesPerTrigger invece. Vedere Common( Comune).
sourceArchiveDir None Stringa di percorso Percorso della directory di archiviazione quando cleanSource è impostato su archive. I file di origine vengono spostati in questo percorso dopo l'elaborazione, mantenendo la struttura di directory relativa. Non si applica allo streaming di tabelle Delta Lake.

Caricatore automatico

Usare queste opzioni con l'origine cloudFiles per configurare il caricatore automatico per l'inserimento in streaming dall'archiviazione cloud. Le opzioni specifiche dell'origine cloudFiles sono precedute cloudFiles dal prefisso per mantenerle in uno spazio dei nomi separato da altre opzioni di origine Structured Streaming .

Comune

Le opzioni seguenti si applicano a tutte le configurazioni del caricatore automatico.

Chiave Predefinito Valori validi Description
cloudFiles.allowOverwrites false true, false Indica se consentire modifiche al file della directory di input per sovrascrivere i dati esistenti.
Per le avvertenze di configurazione, vedere Il caricatore automatico elabora nuovamente il file quando il file viene aggiunto o sovrascritto?.
cloudFiles.backfillInterval None Stringa di durata, 1 day ad esempio o 1 week Il caricatore automatico può attivare i backfill asincroni a un determinato intervallo. Per altre informazioni, vedere Attivare i backfill regolari usando cloudFiles.backfillInterval.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
cloudFiles.cleanSource OFF OFF, DELETE, MOVE Se eliminare o spostare automaticamente i file elaborati dalla directory di input. Se impostato su OFF (impostazione predefinita), non vengono eliminati file.
Se impostato su DELETE, Il caricatore automatico elimina automaticamente i file 30 giorni dopo l'elaborazione. A tale scopo, il caricatore automatico deve disporre delle autorizzazioni di scrittura per la directory di origine.
Se impostato su MOVE, il caricatore automatico sposta automaticamente i file nel percorso specificato in cloudFiles.cleanSource.moveDestination 30 giorni dopo l'elaborazione. A tale scopo, Auto Loader deve disporre delle autorizzazioni di scrittura per la directory sorgente e per la destinazione di spostamento.
Un file viene considerato elaborato quando ha un valore non Null per commit_time nel risultato della cloud_files_state funzione con valori di tabella. Vedere cloud_files_state funzione con valori di tabella. L'attesa aggiuntiva di 30 giorni dopo l'elaborazione può essere configurata tramite cloudFiles.cleanSource.retentionDuration.
Esaminare le considerazioni seguenti prima di abilitare cloudFiles.cleanSource:
  • Azure Databricks non consiglia di usare questa opzione se sono presenti più flussi che utilizzano dati dal percorso di origine perché il consumer più veloce eliminerà i file e non verrà inserito nelle origini più lente.
  • L'abilitazione di questa funzionalità richiede il caricamento automatico per mantenere uno stato aggiuntivo nel relativo checkpoint, che comporta un sovraccarico delle prestazioni, ma consente una migliore osservabilità tramite la cloud_files_state funzione con valori di tabella. Vedere cloud_files_state funzione con valori di tabella.
  • cleanSource usa l'impostazione corrente per decidere se scegliere MOVE o DELETE un determinato file. Si supponga, ad esempio, che l'impostazione fosse MOVE quando il file è stato elaborato originariamente, ma è stato modificato in DELETE quando il file è diventato un candidato per la pulizia 30 giorni dopo. In questo caso cleanSource eliminerà il file.
  • Non è garantito che i file vengano puliti non appena retentionDuration scade. Per ridurre i costi, il caricatore automatico elimina i file simultaneamente con l'elaborazione del flusso e termina non appena l'elaborazione del flusso viene completata o terminata. I file candidati per la pulizia, ma non possono essere puliti durante l'elaborazione del flusso verranno prelevati alla successiva esecuzione del caricatore automatico.

Disponibile in Databricks Runtime 16.4 e versioni successive.
cloudFiles.cleanSource.retentionDuration 30 days Stringa CalendarInterval , 14 daysad esempio , 2 weekso 1 month Tempo di attesa prima che i file elaborati diventino candidati per l'archiviazione con cleanSource. Deve essere maggiore di 7 giorni per DELETE. Nessuna restrizione minima per MOVE.
Disponibile in Databricks Runtime 16.4 e versioni successive.
cloudFiles.cleanSource.moveDestination None Percorso del volume di Archiviazione cloud o Catalogo Unity Percorso in cui archiviare i file elaborati quando cloudFiles.cleanSource è impostato su MOVE. Può trattarsi di un percorso di archiviazione cloud o di un percorso del volume del catalogo Unity (ad esempio, /Volumes/my_catalog/my_schema/my_volume/archive/).
Il percorso di spostamento deve:
  • Non essere un elemento figlio della directory di origine. Se si inserisce la destinazione di spostamento all'interno della directory di origine, i file archiviati vengono nuovamente inseriti.
  • Trovarsi nello stesso percorso esterno, volume o montaggio DBFS dell'origine. Gli spostamenti tra bucket e contenitori diversi non sono supportati e generano un errore.

Il caricatore automatico deve disporre delle autorizzazioni di scrittura per questa directory.
Disponibile in Databricks Runtime 16.4 e versioni successive.
cloudFiles.format Nessuno (opzione obbligatoria) avro, binaryFile, csv, jsonorc, parquet, , textxml Formato del file di dati nel percorso di origine. I valori validi includono:
cloudFiles.includeExistingFiles true true, false Indica se includere file esistenti nel percorso di input di elaborazione del flusso o elaborare solo i nuovi file in arrivo dopo l'installazione iniziale. Questa opzione viene valutata solo quando avvii uno streaming per la prima volta. La modifica di questa opzione dopo il riavvio del flusso non ha alcun effetto.
cloudFiles.inferColumnTypes false true, false Stabilisce se dedurre tipologie esatte di colonna quando si sfrutta l'inferenza dello schema. Per impostazione predefinita, le colonne vengono dedotte come stringhe durante l'inferenza di set di dati JSON e CSV. Per altri dettagli, vedere Inferenza dello schema .
cloudFiles.maxBytesPerTrigger None Stringa di byte, ad esempio 10g Numero massimo di nuovi byte da elaborare in ogni trigger. Questo è un massimo morbido. Se sono presenti file di 3 GB ciascuno, Azure Databricks elabora 12 GB in un micro batch. Un singolo file non viene mai suddiviso tra micro batch; viene sempre elaborato completamente all'interno di un singolo, anche quando le dimensioni superano questo limite. Se usato insieme a cloudFiles.maxFilesPerTrigger, Azure Databricks consuma fino al limite inferiore di cloudFiles.maxFilesPerTrigger o cloudFiles.maxBytesPerTrigger, a seconda di quale valore viene raggiunto per primo. Questa opzione non ha alcun effetto se usata con Trigger.Once() (Trigger.Once() è deprecata).
In Databricks Runtime 18.0 e versioni successive, questa opzione viene configurata dinamicamente e non deve essere impostata manualmente.
cloudFiles.maxFileAge None Stringa di durata Per quanto tempo viene rilevato un evento di file a scopo di deduplicazione. Databricks sconsiglia di ottimizzare questo parametro a meno che non si stiano inserendo dati nell'ordine di milioni di file all'ora. Per altri dettagli, vedere la sezione relativa al rilevamento degli eventi file .
L'ottimizzazione troppo aggressiva di cloudFiles.maxFileAge può causare problemi di qualità dei dati, ad esempio l'inserimento duplicato o i file mancanti. Di conseguenza, Databricks consiglia un'impostazione conservativa per cloudFiles.maxFileAge, ad esempio 90 giorni, che è simile a quella consigliata per le soluzioni di inserimento dati simili.
cloudFiles.maxFilesPerTrigger 1000 Numeri interi positivi Numero massimo di nuovi file da elaborare in ogni trigger. Se usato insieme a cloudFiles.maxBytesPerTrigger, Azure Databricks consuma fino al limite inferiore di cloudFiles.maxFilesPerTrigger o cloudFiles.maxBytesPerTrigger, a seconda di quale valore viene raggiunto per primo. Questa opzione non ha alcun effetto se usata con Trigger.Once() (deprecato).
In Databricks Runtime 18.0 e versioni successive, questa opzione viene configurata dinamicamente e non deve essere impostata manualmente.
cloudFiles.partitionColumns None Elenco delimitato da virgole di nomi di colonna Elenco delimitato da virgole di colonne di partizione in stile Hive che si desidera dedurre dalla struttura di directory dei file. Le colonne di partizione in stile Hive sono coppie chiave-valore combinate da un segno di uguaglianza, <base-path>/a=x/b=1/c=y/file.formatad esempio . In questo esempio le colonne di partizione sono a, b e c. Per impostazione predefinita, queste colonne vengono aggiunte automaticamente allo schema se si usa l'inferenza dello schema e si specifica l'oggetto <base-path> da cui caricare i dati. Se si specifica uno schema, il caricatore automatico prevede che queste colonne vengano incluse nello schema. Se non si desidera che queste colonne facciano parte dello schema, è possibile specificare "" per ignorare queste colonne. Inoltre, è possibile usare questa opzione quando si vuole dedurre il percorso del file in strutture di directory complesse, come nell'esempio seguente:
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
Specificando cloudFiles.partitionColumns come year,month,day restituisce year=2022 per file1.csv, ma le colonne month e day sono null.
month e day vengono analizzati correttamente per file2.csv e file3.csv.
cloudFiles.schemaEvolutionMode addNewColumnsquando non viene specificato uno schema; in caso contrario, none addNewColumns, none, rescuefailOnNewColumns La modalità per l'evoluzione dello schema man mano che vengono individuate nuove colonne nei dati. Per impostazione predefinita, le colonne vengono dedotte come stringhe durante l'inferenza di set di dati JSON. Per altri dettagli, vedere Evoluzione dello schema .
cloudFiles.schemaHints None Stringa dello schema Informazioni sullo schema specificate per il caricamento automatico durante l'inferenza dello schema. Vedere hint di schema per maggiori dettagli.
cloudFiles.schemaLocation Nessuno (obbligatorio per dedurre lo schema) Stringa di percorso La posizione in cui archiviare lo schema dedotto e le modifiche successive. Per altri dettagli, vedere Inferenza dello schema .
cloudFiles.useStrictGlobber false true, false Indica se usare un globber rigoroso che si allinei con il comportamento globbing predefinito di altre risorse di file in Apache Spark. Per altri dettagli, vedere Modelli di caricamento dei dati comuni . Disponibile in Databricks Runtime 12.2 LTS e versioni successive.
cloudFiles.validateOptions true true, false Indica se convalidare le opzioni del caricatore automatico e restituire un errore per opzioni sconosciute o incoerenti.

Directory

L'opzione seguente si applica quando si usa la modalità elenco directory.

Chiave Predefinito Valori validi Description
cloudFiles.useIncrementalListing (obsoleto) auto in Databricks Runtime 17.2 e versioni successive in false Databricks Runtime 17.3 e versioni successive auto, true, false Questa funzionalità è stata deprecata. Databricks consiglia di usare la modalità di notifica file con eventi di file anziché cloudFiles.useIncrementalListing.
Indicare se utilizzare l'elenco incrementale anziché quello completo nella modalità elenco directory. Per impostazione predefinita, il caricatore automatico fa il massimo sforzo per rilevare automaticamente se una determinata directory è applicabile per l'elenco incrementale. È possibile usare in modo esplicito l'elenco incrementale o usare l'elenco completo della directory impostandolo rispettivamente come true o false.
L'abilitazione errata dell'elenco incrementale in una directory non lessicalmente ordinata impedisce al caricatore automatico di individuare nuovi file.
Funziona con Azure Data Lake Storage (abfss://), S3 (s3://) e GCS (gs://).
Disponibile in Databricks Runtime 9.1 LTS e versioni successive.

Notifica file

Per informazioni sulla configurazione della modalità di notifica file, incluse le autorizzazioni cloud necessarie, le istruzioni di installazione e i metodi di autenticazione, vedere Configurare i flussi del caricatore automatico in modalità di notifica file.

Chiave Predefinito Valori validi Description
cloudFiles.fetchParallelism 1 Numeri interi positivi Numero di thread da usare per il recupero di messaggi dal servizio di accodamento.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
cloudFiles.pathRewrites None Stringa mappa JSON Obbligatorio solo se si specifica un queueUrl oggetto che riceve notifiche di file da più bucket S3 e si vuole usare i punti di montaggio configurati per l'accesso ai dati in questi contenitori. Usa questa opzione per riscrivere il prefisso del percorso bucket/key con il punto di montaggio. È possibile riscrivere solo i prefissi. Ad esempio, per la configurazione {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, il percorso s3://<databricks-mounted-bucket>/path/2017/08/fileA.json viene riscritto in dbfs:/mnt/data-warehouse/2017/08/fileA.json.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
cloudFiles.resourceTag None Stringhe di tag chiave-valore Una serie di coppie di tag chiave-valore che consentono di associare e identificare le risorse correlate, ad esempio:
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
Non usare quando cloudFiles.useManagedFileEvents è impostato su true. Impostare invece i tag delle risorse usando la console del provider di servizi cloud.
Per altre informazioni, vedere Tag delle risorse del provider di servizi cloud.
cloudFiles.useManagedFileEvents false true, false Se impostato su true, Il caricatore automatico usa il servizio eventi file per individuare i file nel percorso esterno. È possibile usare questa opzione solo se il percorso di caricamento si trova in un percorso esterno con eventi di file abilitati. Vedere Usare la modalità di notifica file con gli eventi di file.
Gli eventi di file offrono prestazioni a livello di notifiche nell'individuazione file, perché il caricatore automatico può individuare nuovi file dopo l'ultima esecuzione. A differenza dell'elenco di directory, questo processo non deve elencare tutti i file nella directory.
Ci sono alcune situazioni in cui il caricatore automatico usa l'elenco di directory anche se l'opzione degli eventi di file è abilitata:
  • Durante il caricamento iniziale, quando includeExistingFiles è impostato su true, viene eseguito un elenco di directory completo per individuare tutti i file presenti nella directory prima dell'avvio del caricatore automatico.
  • Il servizio eventi file ottimizza l'individuazione dei file memorizzando nella cache i file creati più di recente. Se Auto Loader viene eseguito raramente, questa cache può scadere e Auto Loader ritorna all'elenco di directory per individuare i file e aggiornare la cache. Per evitare questo scenario, richiamare il caricatore automatico almeno una volta ogni sette giorni.

Vedere Quando il caricatore automatico con gli eventi di file usa l'elenco di directory? per un elenco completo delle situazioni in cui il caricatore automatico usa l'elenco di directory con questa opzione.
Disponibile in Databricks Runtime 14.3 LTS e versioni successive.
cloudFiles.listOnStart false true, false Se impostato su true, Il caricatore automatico esegue un elenco di directory completo all'avvio del flusso, anziché iniziare con il token di continuazione nel checkpoint. Usare questa opzione per eseguire il ripristino da errori, ad esempio CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. Vedere How do I recover from a CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN error?.
cloudFiles.useNotifications false true, false Indica se usare la modalità di notifica file per determinare quando sono presenti nuovi file. Se false, usare la modalità elenco di directory. Consulta Confronta le modalità di rilevamento dei file del caricatore automatico.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
Tag delle risorse del provider di servizi cloud

Il caricatore automatico aggiunge per impostazione predefinita le coppie di tag chiave-valore seguenti per impostazione predefinita:

  • vendor: Databricks
  • path: percorso da cui vengono caricati i dati. Non disponibile in GCP a causa di limitazioni di etichettatura.
  • checkpointLocation: La posizione del checkpoint dello stream. Non disponibile in GCP a causa di limitazioni di etichettatura.
  • streamId: un identificatore univoco globale per il flusso.

Databricks riserva questi nomi di chiave e non è possibile sovrascriverne i valori.

Per altre informazioni su Azure, vedere Denominazione di code e metadati e la trattazione di properties.labels nelle sottoscrizioni di eventi. Il caricatore automatico archivia queste coppie di tag chiave-valore in JSON come etichette.

Specifico del cloud

Il caricatore automatico include opzioni per la configurazione dell'infrastruttura cloud per la modalità di notifica file. Per le autorizzazioni cloud necessarie e le istruzioni di configurazione, vedere Configurare flussi del caricatore automatico in modalità di notifica file.

Azure

È necessario specificare i valori per tutte le opzioni seguenti se si specifica cloudFiles.useNotifications = true e si vuole che il caricatore automatico configuri automaticamente i servizi di notifica:

Chiave Predefinito Valori validi Description
cloudFiles.resourceGroup None Qualsiasi stringa Gruppo di risorse Azure in cui viene creato l'account di archiviazione.
cloudFiles.subscriptionId None Qualsiasi stringa ID sottoscrizione Azure in cui viene creato il gruppo di risorse.
databricks.serviceCredential None Qualsiasi stringa Il nome della credenziale di servizio Databricks . Disponibile in Databricks Runtime 16.1 e versioni successive.

Se una credenziale del servizio Databricks non è disponibile, è possibile specificare le opzioni di autenticazione seguenti:

Chiave Predefinito Valori validi Description
cloudFiles.clientId None Qualsiasi stringa ID cliente o ID applicazione del principale del servizio.
cloudFiles.clientSecret None Qualsiasi stringa Segreto del client principale di servizio.
cloudFiles.connectionString None Una stringa di connessione La stringa di connessione per l'account di archiviazione, in base alla chiave di accesso dell'account o alla firma di accesso condiviso (SAS).
cloudFiles.tenantId None Qualsiasi stringa ID tenant Azure in cui viene creata l'entità servizio.

Specificare l'opzione seguente solo se si imposta cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda esistente:

Chiave Predefinito Valori validi Description
cloudFiles.queueName None Qualsiasi stringa Nome della coda di Azure. Se specificato, l'origine dei file cloud utilizza direttamente gli eventi di questa coda invece di configurare i propri servizi di archiviazione Griglia di eventi di Azure e di archiviazione code. In tal caso, il databricks.serviceCredential o cloudFiles.connectionString richiede solo autorizzazioni di lettura per la coda.

Delta Lake

Quando si esegue la lettura da una tabella Delta Lake tramite , si applicano le opzioni seguenti.spark.readStream

Chiave Predefinito Valori validi Description
allowSourceColumnDrop None Numero di versione o always Impostare su un numero di versione della tabella Delta o always per consentire al flusso di continuare dopo l'eliminazione delle colonne dallo schema della tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.
allowSourceColumnRename None Numero di versione o always Impostare su un numero di versione della tabella Delta o always per consentire al flusso di continuare dopo la ridenominazione delle colonne nella tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.
allowSourceColumnTypeChange None Numero di versione o always Impostare su un numero di versione della tabella Delta o always per consentire al flusso di continuare dopo la modifica dei tipi di colonna nella tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedere Estensione del tipo.
excludeRegex None Stringa regex Java Modello di espressione regolare. File i cui percorsi corrispondono al modello vengono esclusi dalla lettura in streaming. Utile per filtrare i file che non sono conformi alla convenzione di denominazione prevista.
failOnDataLoss true true, false Indica se la query di streaming non riesce se i dati di origine sono stati eliminati a causa della conservazione dei log (logRetentionDuration). Impostare su false per ignorare i dati mancanti e continuare l'elaborazione. Vedere Configurare la conservazione dei dati per le query di spostamento cronologico.
ignoreChanges (obsoleto) false true, false Disponibile in Databricks Runtime 11.3 LTS e versioni precedenti. Ricrea i file di dati riscritti dopo le operazioni di modifica, ad UPDATEesempio , MERGE INTODELETE, o OVERWRITE. Le righe non modificate possono essere generate insieme a nuove righe, pertanto i consumer downstream devono gestire duplicati. Le operazioni di eliminazione non vengono propagate a valle. Sostituito da skipChangeCommits in Databricks Runtime 12.2 LTS e versioni successive.
ignoreDeletes (obsoleto) false true, false Ignora le transazioni che eliminano i dati ai limiti della partizione (solo la partizione completa viene eliminata). Non gestisce eliminazioni, aggiornamenti o altre modifiche non di partizione. Utilizzare invece skipChangeCommits.
readChangeFeed oppure readChangeData false true, false Indica se abilitare la lettura del feed di dati delle modifiche per la query di streaming. Se abilitato, il flusso genera modifiche a livello di riga (inserimenti, aggiornamenti ed eliminazioni) con colonne di metadati aggiuntive. Vedere Usare il feed di dati delle modifiche in Azure Databricks.
schemaTrackingLocation None Stringa di percorso Percorso di una directory in cui Delta Lake tiene traccia delle modifiche dello schema per la lettura in streaming. Obbligatorio quando si esegue lo streaming da tabelle con mapping di colonne abilitato e si usano allowSourceColumn* le opzioni per gestire l'evoluzione dello schema. Deve trovarsi all'interno checkpointLocation della query di streaming. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.
skipChangeCommits false true, false Ignora le transazioni che eliminano o modificano i record e i processi esistenti vengono aggiunti solo. Databricks consiglia questa opzione per la maggior parte dei carichi di lavoro che non usano feed di dati delle modifiche. Disponibile in Databricks Runtime 12.2 LTS e versioni successive. Vedere Ignorare i commit delle modifiche upstream con skipChangeCommits.
startingTimestamp Ultima versione disponibile Stringa timestamp, 2019-01-01T00:00:00.000Z ad esempio o stringa di data, ad esempio 2019-01-01 Timestamp da cui iniziare la lettura. Il flusso legge tutte le modifiche apportate alla tabella di cui è stato eseguito il commit o dopo il timestamp specificato. Se il timestamp precede tutti i commit della tabella disponibili, il flusso inizia dal commit meno recente disponibile. Non può essere usato insieme a startingVersion. Ignorato se il checkpoint di streaming esiste già.
startingVersion Ultima versione disponibile Intero positivo, 0o latest Versione della tabella Delta da cui iniziare la lettura. Il flusso legge tutte le modifiche di cui è stato eseguito il commit in o dopo la versione specificata. Specificare latest per iniziare solo dalle modifiche più recenti. Non può essere usato insieme a startingTimestamp. Ignorato se il checkpoint di streaming esiste già. Consulta Lavora con la cronologia delle tabelle.
withEventTimeOrder false true, false Divide lo snapshot della tabella iniziale in bucket di tempo dell'evento per impedire che i record vengano contrassegnati erroneamente come eventi tardivi e eliminati in query con stato con filigrane. Non può essere modificato dopo l'avvio dell'elaborazione iniziale degli snapshot senza eliminare il checkpoint. Disponibile in Databricks Runtime 11.3 LTS e versioni successive. Vedere Elaborare lo snapshot iniziale senza eliminare i dati.

Kafka

Usare queste opzioni con spark.readStream.format("kafka") o spark.read.format("kafka"):

Chiave Predefinito Valori validi Description
assign None Stringa JSON, ad esempio {"topicA":[0,1],"topicB":[2,4]} Partizioni specifiche da utilizzare. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign .
failOnDataLoss true true, false Indica se la query non riesce se i dati potrebbero essere andati persi, ad esempio a causa di argomenti eliminati o troncamento offset. Impostare su false per ignorare i dati mancanti e continuare.
Databricks stima in modo conservativo se i dati potrebbero essere andati persi. Tuttavia, questo potrebbe causare falsi allarmi.
fetchoffset.numretries 3 Numeri interi positivi o 0 Numero di tentativi durante il recupero degli offset Kafka non riesce.
fetchoffset.retryintervalms 1000 Numeri interi positivi o 0 Intervallo in millisecondi tra tentativi di recupero offset.
groupIdPrefix spark-kafka-source (streaming), spark-kafka-relation (batch) Qualsiasi stringa Prefisso personalizzato da usare per l'ID gruppo di consumer Kafka generato automaticamente. Se kafka.group.id è impostato in modo esplicito, il connettore ignora questa opzione.
kafka.group.id None Qualsiasi stringa ID del gruppo di consumer Kafka da usare durante la lettura. Prestare attenzione: le query che condividono lo stesso ID gruppo interferiscono tra loro e potrebbero leggere solo dati parziali. Ciò può verificarsi quando si eseguono carichi di lavoro batch e di streaming simultanei o quando si riavviano rapidamente le query. Se impostato, groupIdPrefix viene ignorato. Per ridurre al minimo i problemi, impostare la configurazione session.timeout.ms del consumer Kafka su un valore ridotto.
includeHeaders false true, false Indica se includere intestazioni di messaggio Kafka come colonna nell'output.
kafkaconsumer.polltimeoutms None Numeri interi positivi Timeout in millisecondi per la chiamata al consumer poll() Kafka.
kafka.bootstrap.servers None Elenco delimitato da virgole di host:port stringhe Elenco delimitato da virgole di indirizzi host:port per i broker Kafka. Imposta la proprietà del bootstrap.servers client Kafka.
Se non sono presenti dati di Kafka, controllare l'elenco di indirizzi del broker per gli indirizzi non corretti. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. I client Kafka presuppongono che i broker saranno disponibili alla fine e riprovare per sempre quando ricevono errori di rete.
maxRecordsPerPartition None Numeri interi positivi Numero massimo di record per ogni partizione Spark. Se impostato, il connettore divide le partizioni Kafka in modo che ogni partizione Spark legga al massimo questo numero di record.
È anche possibile usare questa opzione con minPartitions. Quando vengono impostate entrambe le opzioni, Spark usa qualsiasi opzione restituisca più partizioni.
minPartitions None Numeri interi positivi Numero minimo di partizioni Spark da leggere da Kafka. Quando impostato, il connettore divide le partizioni Kafka di grandi dimensioni per aumentare il parallelismo. Quando non è impostato, Spark crea una partizione per ogni partizione di argomento Kafka. Utile per gestire l'asimmetria o il picco dei carichi di dati.
Questa opzione reinizializza i consumer Kafka per ogni trigger, che potrebbero influire sulle prestazioni con SSL.
startingOffsets latest (streaming), earliest (batch) earliest, latesto una stringa di offset JSON Offset da cui la query inizia la lettura. Nella stringa -1 JSON è l'offset più recente. -2 è l'offset meno recente. Ad esempio: {"topicA":{"0":23,"1":-2}}.
Per le query di streaming, questa opzione si applica solo all'avvio di una nuova query. Le query riprese usano sempre il checkpoint. Durante una query, le nuove partizioni iniziano a leggere al primo offset.
Per le query batch, latest non è consentito.
startingOffsetsByTimestamp None Stringa di timestamp JSON, ad esempio {"topicA":{"0":1000,"1":2000}} Elenco di offset iniziali per ogni partizione, specificato come timestamp in millisecondi. Quando non esiste alcun offset per un timestamp, il comportamento della query viene determinato da startingOffsetsByTimestampStrategy.
Per le query di streaming, questa opzione si applica solo all'avvio di una nuova query. Le query riprese usano sempre il checkpoint. Durante una query, le nuove partizioni iniziano a leggere al primo offset.
startingOffsetsByTimestampStrategy error error, latest Strategia da usare quando non viene trovato alcun offset per un timestamp specificato in startingOffsetsByTimestamp o startingTimestamp. error genera un'eccezione. latest utilizza l'offset disponibile più recente.
startingTimestamp None Numeri interi positivi o 0 Timestamp iniziale globale in millisecondi che si applica a tutte le partizioni. Quando non esiste alcun offset per il timestamp, il comportamento viene controllato da startingOffsetsByTimestampStrategy.
subscribe None Elenco delimitato da virgole di nomi di argomenti Argomenti da sottoscrivere. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign .
subscribePattern None Stringa regex Java Modello utilizzato per sottoscrivere argomenti. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Ad esempio: topic.*.

Le opzioni seguenti si applicano solo alle letture in streaming con spark.readStream.format("kafka"):

Chiave Predefinito Valori validi Description
bytesEstimateWindowLength 300s Stringhe di durata, ad 10m esempio o 600s Intervallo di tempo usato per stimare i byte rimanenti per la estimatedTotalBytesBehindLatest metrica. Consultare Recupero delle metriche Kafka.
maxOffsetsPerTrigger None Numeri interi positivi Numero massimo di offset da elaborare per intervallo di trigger. Gli offset vengono distribuiti proporzionalmente tra le partizioni degli argomenti.
maxTriggerDelay 15m Stringhe di durata, ad 10m esempio o 600s Tempo massimo di attesa per minOffsetsPerTrigger l'accumulo prima dell'attivazione.
minOffsetsPerTrigger None Numeri interi positivi Numero minimo di offset da accumulare prima di attivare un micro batch. Quando maxTriggerDelay viene raggiunto, il micro batch viene eseguito indipendentemente.

Per le opzioni di offset applicabili solo alle letture batch con spark.read.format("kafka"), vedere Opzioni Kafka di DataFrameReader.

Autenticazione

Databricks consiglia di usare una credenziale del servizio Catalogo Unity per l'autenticazione ai servizi Kafka gestiti dal cloud (AWS MSK, Hub eventi di Azure o Google Cloud Managed Kafka).

Chiave Predefinito Valori validi Description
databricks.serviceCredential None Qualsiasi stringa Nome di una credenziale del servizio Catalogo Unity per l'autenticazione nei servizi Kafka gestiti dal cloud. Disponibile in Databricks Runtime 16.1 e versioni successive.
databricks.serviceCredential.scope None Qualsiasi stringa Ambito OAuth per le credenziali del servizio. Impostare questa impostazione solo quando Azure Databricks non può dedurre automaticamente l'ambito per il servizio Kafka.

Quando una credenziale del servizio non è disponibile, usare le opzioni SASL/SSL (passate come kafka.* proprietà). Quando si usano credenziali del servizio, non è necessario specificare kafka.sasl.mechanism, kafka.sasl.jaas.configo kafka.security.protocol.

Chiave Predefinito Valori validi Description
kafka.security.protocol None Stringa del protocollo di sicurezza, ad esempio SASL_SSL, , SSLPLAINTEXT Protocollo di sicurezza per la comunicazione broker.
kafka.sasl.mechanism None Stringa di meccanismo SASL, ad esempio PLAIN, SCRAM-SHA-256SCRAM-SHA-512, , OAUTHBEARER,AWS_MSK_IAM Meccanismo SASL.
kafka.sasl.jaas.config None Stringa di configurazione JAAS Stringa di configurazione dell'account di accesso JAAS.
kafka.sasl.login.callback.handler.class None Nome completo della classe Nome completo della classe di un gestore di callback di accesso per l'autenticazione SASL.
kafka.sasl.client.callback.handler.class None Nome completo della classe Nome completo della classe di un gestore di callback client per l'autenticazione SASL.
kafka.ssl.truststore.location None Stringa di percorso del file Percorso del file dell'archivio attendibilità SSL.
kafka.ssl.truststore.password None Qualsiasi stringa Password per il file dell'archivio attendibilità SSL.
kafka.ssl.keystore.location None Stringa di percorso del file Percorso del file dell'archivio chiavi SSL.
kafka.ssl.keystore.password None Qualsiasi stringa Password per il file dell'archivio chiavi SSL.

Per istruzioni complete sull'installazione dell'autenticazione, vedere Autenticazione.

Pub/Sub

Usa queste opzioni con spark.readStream.format("pubsub") per sottoscrivere Google Pub/Sub. Sono necessarie le opzioni subscriptionId, topicIde projectId .

Chiave Predefinito Valori validi Description
subscriptionId None Qualsiasi stringa Obbligatorio. ID sottoscrizione Pub/Sub. Se non esiste, il connettore crea la sottoscrizione.
topicId None Qualsiasi stringa Obbligatorio. ID argomento Pub/Sub.
projectId None Qualsiasi stringa Obbligatorio. ID progetto Google Cloud.
numFetchPartitions Metà del numero di executor disponibili durante l'inizializzazione del flusso Numeri interi positivi Numero di attività Spark parallele che recuperano righe dalla sottoscrizione.
maxBytesPerTrigger None Numeri interi positivi Limite flessibile per il numero di byte da elaborare per micro batch.
maxRecordsPerFetch 1000 Numeri interi positivi Numero di righe da recuperare per ogni attività prima dell'elaborazione.
maxFetchPeriod 10s Stringa di durata, 1s ad esempio o 1m Intervallo di tempo per ogni attività dedicato al recupero prima di elaborare le righe. Azure Databricks consiglia di usare il valore predefinito.
deleteSubscriptionOnStreamStop false true, false Quando true, la sottoscrizione, da subscriptionId, viene eliminata al termine della query di streaming.
serviceCredential None Qualsiasi stringa Nome di una credenziale del servizio Azure Databricks per l'autenticazione in Pub/Sub. Disponibile in Databricks Runtime 16.1 e versioni successive.
clientEmail None Stringa di indirizzo di posta elettronica Indirizzo di posta elettronica dell'account del servizio Google. Obbligatorio quando non si usano credenziali del servizio.
clientId None Qualsiasi stringa ID client dell'account del servizio Google. Obbligatorio quando non si usano credenziali del servizio.
privateKey None Stringa di chiave privata Chiave privata per l'account del servizio Google. Obbligatorio quando non si usano credenziali del servizio.
privateKeyId None Qualsiasi stringa ID chiave privata per l'account del servizio Google. Obbligatorio quando non si usano credenziali del servizio.

Per altre informazioni su Pub/Sub, vedi Iscriviti a Google Pub/Sub.

Pulsar

Usare queste opzioni con spark.readStream.format("pulsar") per eseguire lo streaming da Apache Pulsar. Disponibile in Databricks Runtime 14.1 e versioni successive.

Sono necessarie le opzioni seguenti. È necessario specificare esattamente uno di topic, topicso topicsPattern.

Chiave Predefinito Valori validi Description
service.url None Stringa dell'URL del servizio Pulsar Pulsar per il servizio Pulsar serviceURL , ad esempio pulsar://broker.example.com:6650.
topic None Qualsiasi stringa Nome di un singolo argomento da utilizzare.
topics None Elenco delimitato da virgole di nomi di argomenti Elenco delimitato da virgole di nomi di argomenti da utilizzare.
topicsPattern None Stringa regex Java Stringa Java regex in modo che corrisponda ai nomi degli argomenti.

Sono supportate anche le opzioni seguenti:

Chiave Predefinito Valori validi Description
admin.url None Stringa URL URL HTTP del servizio di amministrazione pulsar. Obbligatorio quando maxBytesPerTrigger è impostato.
allowDifferentTopicSchemas false true, false Se vengono letti più argomenti con schemi diversi, usare questa opzione per disattivare la deserializzazione automatica del valore dell'argomento basato su schema. Quando questo è true, vengono restituiti solo i valori grezzi.
failOnDataLoss true true, false Indica se la query non riesce quando i dati vengono persi. Ad esempio, la perdita di dati può verificarsi quando gli argomenti vengono eliminati o i messaggi scadono a causa dei criteri di conservazione.
maxBytesPerTrigger None Numeri interi positivi Limite flessibile per il numero di byte da elaborare per micro batch. Richiede admin.url.
pollTimeoutMs 120000 Numeri interi positivi Tempo di attesa per la lettura dei messaggi provenienti da Pulsar, in millisecondi.
predefinedSubscription None Qualsiasi stringa Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark.
startingOffsets latest latest, earliesto una stringa di offset JSON Da dove iniziare la lettura.
subscriptionPrefix None Qualsiasi stringa Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark.
waitingForNonExistedTopic false true, false Indica se il connettore attende fino a quando non vengono creati gli argomenti desiderati.

È possibile specificare configurazioni aggiuntive di client, amministratore e lettore Pulsar usando i modelli di opzione seguenti:

Modello Opzioni di configurazione
pulsar.admin.* Configurazione dell'amministratore pulsar
pulsar.client.* Configurazione del client Pulsar, incluse le opzioni di autenticazione, ad pulsar.client.authPluginClassName esempio e pulsar.client.authParams.
pulsar.reader.* Configurazione del lettore Pulsar

Per altre informazioni sulle opzioni di autenticazione del client Pulsar e dell'amministratore, vedere Autenticazione.

Autenticazione

Azure Databricks supporta l'autenticazione tramite truststore e keystore per Pulsar. Azure Databricks consiglia di usare i segreti per archiviare i dettagli di autenticazione. Vedere Gestione dei segreti.

Chiave Predefinito Valori validi Description
pulsar.client.authPluginClassName None Nome completo della classe Nome completo della classe del plug-in di autenticazione. Ad esempio: org.apache.pulsar.client.impl.auth.AuthenticationTls.
pulsar.client.authParams None Stringa di credenziali Credenziali di autenticazione passate al plug-in di autenticazione come stringa. Ad esempio: tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem.
pulsar.client.useKeyStoreTls false true, false Quando true, abilita la configurazione TLS basata su KeyStore anziché i file in formato PEM.
pulsar.client.tlsTrustStoreType None Qualsiasi stringa Formato del file dell'archivio attendibilità TLS. Ad esempio: JKS.
pulsar.client.tlsTrustStorePath None Stringa di percorso del file Percorso del file dell'archivio attendibiliTÀ TLS contenente certificati CA attendibili. Obbligatorio quando pulsar.client.useKeyStoreTls è true.
pulsar.client.tlsTrustStorePassword None Qualsiasi stringa Password per il file dell'archivio attendibilità TLS.

Se il flusso usa un PulsarAdmin, è anche possibile impostare le opzioni seguenti:

Chiave Predefinito Valori validi Description
pulsar.admin.authPluginClassName None Nome completo della classe Nome completo della classe del plug-in di autenticazione per il client amministratore pulsar.
pulsar.admin.authParams None Stringa di credenziali Credenziali di autenticazione per il plug-in di autenticazione client amministratore Pulsar.
pulsar.admin.useTls None true, false Indica se usare TLS per la connessione client di amministrazione Pulsar.
pulsar.admin.tlsAllowInsecureConnection None true, false Indica se consentire connessioni TLS non sicure per il client di amministrazione pulsar.
pulsar.admin.tlsTrustCertsFilePath None Stringa di percorso del file Percorso del file di certificato TLS attendibile per il client amministratore Pulsar.
pulsar.admin.useKeyStoreTls None true, false Indica se usare TLS basato su KeyStore per il client di amministrazione pulsar.
pulsar.admin.tlsTrustStoreType None Qualsiasi stringa Formato dell'archivio attendibilità TLS per il client di amministrazione pulsar. Ad esempio: JKS.
pulsar.admin.tlsTrustStorePath None Stringa di percorso del file Percorso del file dell'archivio attendibilità TLS per il client di amministrazione pulsar. Obbligatorio quando pulsar.admin.useKeyStoreTls è true.
pulsar.admin.tlsTrustStorePassword None Qualsiasi stringa Password per l'archivio attendibilità TLS del client amministratore Pulsar.

Per esempi di autenticazione, vedere Eseguire l'autenticazione a Pulsar.

Opzioni DataFrameWriter

Usare queste opzioni con DataFrameWriter.option() e DataFrameWriterV2.option() per controllare come Azure Databricks scrive i dati.

Example

Nell'esempio seguente viene impostato su mergeSchemaTrue per la scrittura di una tabella Delta Lake:

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

Avro

Quando si scrivono file Avro, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
avroSchema None Stringa dello schema JSON Schema Avro completo come stringa JSON. Usare questa opzione per convertire i tipi SPARK SQL in tipi Avro specifici. Si applica ai file Avro in lettura e scrittura.
avroSchemaUrl None Stringa URL URL che punta a un file di schema Avro. Usare anziché avroSchema quando lo schema viene archiviato esternamente. Si escludono con avroSchema a vicenda. Si applica ai file Avro in lettura e scrittura.
compression snappy uncompressed, deflate, snappy (default), bzip2, xzzstandard Codec di compressione da usare durante la scrittura. Si applica ai file Avro in lettura e scrittura.
recordName topLevelRecord Qualsiasi stringa Nome del record di primo livello nello schema Avro di output. Si applica ai file Avro in lettura e scrittura.
positionalFieldMatching false true, false Indica se trovare una corrispondenza tra lo schema Spark e lo schema Avro in base alla posizione del campo anziché in base al nome. Si applica ai file Avro in lettura e scrittura.
recordNamespace Stringa vuota Qualsiasi stringa Spazio dei nomi per il record di primo livello nello schema Avro di output. Si applica ai file Avro in lettura e scrittura.

Delta Lake e Apache Iceberg

Quando si scrivono tabelle Delta Lake e Apache Iceberg, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
clusterByAuto false true, false Indica se abilitare il clustering liquido automatico, in cui Azure Databricks seleziona le colonne di clustering in base ai modelli di query. Valido solo con mode("overwrite"). Non può essere usato con append la modalità . Disponibile in Databricks Runtime 16.4 e versioni successive. Si applica a Usa clustering liquido per le tabelle.
mergeSchema None true, false Indica se abilitare l'evoluzione dello schema per l'operazione di scrittura. Le nuove colonne nel dataframe di origine vengono aggiunte allo schema della tabella di destinazione. Si applica alle accodamenti batch e streaming. Si applica allo schema della tabella di aggiornamento.
overwriteSchema None true, false Indica se sostituire lo schema della tabella e il partizionamento durante la sovrascrittura. Richiede mode("overwrite") senza replaceWhere. Non è possibile usare con partitionOverwriteMode. Si applica allo schema della tabella di aggiornamento.
partitionOverwriteMode None static, dynamic Modalità di sovrascrittura della partizione. Impostare su su dynamic per sovrascrivere solo le partizioni contenenti nuovi dati, lasciando invariate tutte le altre partizioni. Modalità legacy, non supportata nel calcolo serverless o in Databricks SQL. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
replaceOn None Stringa di espressione booleana Espressione booleana che corrisponde alle righe nella tabella di destinazione da sostituire con le righe della query di origine. Può fare riferimento a colonne sia dalla tabella di destinazione che dalla query di origine. Le righe nella destinazione che corrispondono a una riga di origine vengono eliminate e sostituite. Se l'origine è vuota, non vengono eseguite eliminazioni. Usare targetAlias per evitare ambiguità nei riferimenti alle colonne. Disponibile in Databricks Runtime 17.1 e versioni successive. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
replaceUsing None Elenco delimitato da virgole di nomi di colonna Elenco delimitato da virgole di nomi di colonna usati per trovare le corrispondenze tra la tabella di destinazione e la query di origine. Sia la destinazione che l'origine devono contenere tutte le colonne elencate. Le righe nella destinazione che corrispondono a una riga di origine nel confronto di uguaglianza vengono eliminate e sostituite. NULL i valori vengono considerati non uguali e non corrispondono. Disponibile in Databricks Runtime 16.3 e versioni successive. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
replaceWhere None Stringa di espressione predicato Espressione di predicato. Sovrascrive in modo atomico solo i record che corrispondono al predicato. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
targetAlias None Qualsiasi stringa Alias stringa per la tabella di destinazione. Usare con replaceOn o replaceWhere per disambiguare i riferimenti a colonne quando la condizione fa riferimento a colonne sia dalla tabella di destinazione che dalla query di origine. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
txnAppId None Qualsiasi stringa Stringa univoca che identifica l'applicazione per le scritture idempotenti nelle foreachBatch operazioni. Usare insieme txnVersion a per assicurarsi che le operazioni di scrittura esattamente una sola volta in più tabelle Delta Lake. Si applica a Use for idempotent table writes(Usa foreachBatch per le scritture di tabelle idempotenti).
txnVersion None Intero che aumenta in modo monotonico Numero che aumenta in modo monotonico usato come versione della transazione per le scritture idempotenti nelle foreachBatch operazioni. Usare insieme txnAppId a per assicurarsi che le operazioni di scrittura esattamente una sola volta in più tabelle Delta Lake. Si applica a Use for idempotent table writes(Usa foreachBatch per le scritture di tabelle idempotenti).
optimizeWrite None true, false Indica se abilitare l'opzione Ottimizzazione automatica scrittura per questa operazione di scrittura. Esegue l'override della spark.databricks.delta.optimizeWrite.enabled configurazione. Si applica a Che è Delta Lake in Azure Databricks?.
userMetadata None Qualsiasi stringa Stringa definita dall'utente aggiunta ai metadati di commit per l'operazione di scrittura. Visibile nell'output di DESCRIBE HISTORY. Si applica alle tabelle Enrich con metadati personalizzati.

CSV

Quando si scrivono file CSV, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
charToEscapeQuoteEscaping \0 (non abilitato) Un singolo carattere Carattere utilizzato per eseguire l'escape del carattere di escape quando differisce dal carattere di virgolette. Si applica a csv (DataFrameWriter).
compression none none (default), bzip2, gzip, lz4snappy, , deflatezstd Codec di compressione da usare durante la scrittura. Si applica a csv (DataFrameWriter).
dateFormat yyyy-MM-dd Stringa di formato data Formattare la stringa per i valori delle colonne di data. Si applica a csv (DataFrameWriter).
emptyValue Stringa vuota Qualsiasi stringa Stringa scritta per valori vuoti (non Null). Si applica a csv (DataFrameWriter).
encoding UTF-8 Nome java.nio.charset.Charset Codifica dei caratteri per i file di output. Si applica a csv (DataFrameWriter).
escape \ Un singolo carattere Carattere utilizzato per eseguire l'escape dei valori tra virgolette. Si applica a csv (DataFrameWriter).
escapeQuotes true true, false Indica se inserire caratteri di escape tra virgolette all'interno di valori di campo tra virgolette. Si applica a csv (DataFrameWriter).
header false true, false Indica se scrivere nomi di colonna come prima riga dell'output. Si applica a csv (DataFrameWriter).
ignoreLeadingWhiteSpace false true, false Indica se tagliare gli spazi vuoti iniziali dai valori durante la scrittura. Si applica a csv (DataFrameWriter).
ignoreTrailingWhiteSpace false true, false Indica se tagliare gli spazi vuoti finali dai valori durante la scrittura. Si applica a csv (DataFrameWriter).
lineSep \n Una stringa Stringa separatore di riga utilizzata tra i record. Si applica a csv (DataFrameWriter).
locale en-US Identificatore java.util.Locale Un identificatore java.util.Locale. Una Java impostazioni locali identificate che influiscono sull'analisi predefinita di data, timestamp e decimale all'interno del file CSV.
nullValue Stringa vuota Qualsiasi stringa Stringa scritta per i valori Null. Si applica a csv (DataFrameWriter).
quote " Un singolo carattere Carattere utilizzato per virgolettere i valori di campo che contengono il separatore. Si applica a csv (DataFrameWriter).
quoteAll false true, false Indica se racchiudere tutti i valori di campo tra virgolette indipendentemente dal contenuto. Si applica a csv (DataFrameWriter).
sep , Una stringa Carattere delimitatore di campo. Si applica a csv (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp Stringa di formato per i valori di colonna timestamp. Si applica a csv (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Stringa di formato per timestamp senza valori di colonna fuso orario (TimestampNTZType).

Excel

Quando si scrivono file di Excel, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
dataAddress None Un nome di foglio o una stringa di riferimento di cella Nome del foglio o cella iniziale per la scrittura. Se omesso, scrive in un foglio denominato Sheet1 a partire dalla cella A1. Accetta un nome di foglio (SheetName) o un riferimento a una singola cella (SheetName!A1). Gli intervalli di celle non sono supportati per le scritture.
dateFormatInWrite yyyy-mm-dd Stringa di formato data Excel Excel stringa di formato cella applicata alle colonne Date. Usa Excel sintassi del formato.
headerRows 0 0, 1 Indica se scrivere nomi di colonna come prima riga.
timestampNTZFormat yyyy-mm-dd hh:mm:ss Stringa di formato timestamp Excel Excel stringa di formato cella applicata alle colonne TimestampNTZ e Timestamp. Usa Excel sintassi del formato.
version xlsx xlsx, xls Versione del formato di file Excel da scrivere.

JSON

Quando si scrivono file JSON, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
compression none none, bzip2, gzip, lz4snappy, , deflatezstd Codec di compressione da usare durante la scrittura. Si applica a json (DataFrameWriter).
dateFormat yyyy-MM-dd Stringa di formato data Formattare la stringa per i valori delle colonne di data. Si applica a json (DataFrameWriter).
encoding UTF-8 Nome java.nio.charset.Charset Codifica dei caratteri per i file di output. Si applica a json (DataFrameWriter).
ignoreNullFields valore di spark.sql.jsonGenerator.ignoreNullFields true, false Indica se omettere campi con valori Null dall'output JSON. Si applica a json (DataFrameWriter).
lineSep \n Una stringa Stringa separatore di riga utilizzata tra i record. Si applica a json (DataFrameWriter).
locale en-US Identificatore java.util.Locale Identificatore delle impostazioni locali Java che influisce sull'analisi predefinita di data, timestamp e decimale all'interno di JSON.
pretty false true, false Indica se abilitare l'output JSON con rientro, multilinea.
sortKeys false true, false Indica se ordinare le chiavi degli oggetti JSON in ordine alfabetico nell'output. Utile per produrre output deterministico.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp Stringa di formato per i valori di colonna timestamp. Si applica a json (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Stringa di formato per timestamp senza valori di colonna fuso orario (TimestampNTZType).
writeNonAsciiCharacterAsCodePoint false true, false Indica se codificare caratteri non ASCII come \uXXXX sequenze di escape Unicode anziché caratteri UTF-8 letterali nell'output.

ORCO

Quando si scrivono file ORC, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
compression zstd none, uncompressed, snappy, zliblzo, zstd, , lz4brotli Codec di compressione da usare durante la scrittura. Si applica a orc (DataFrameWriter).

Parquet

Quando si scrivono file Parquet, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
compression snappy none, uncompressed, snappy, gzip, lzobrotli, lz4, , lz4_rawzstd Codec di compressione da usare durante la scrittura. Si applica a parquet (DataFrameWriter).
spark.sql.parquet.outputTimestampType INT96 INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS Tipo fisico utilizzato per codificare le colonne timestamp. Usare INT96 per la compatibilità con i lettori Parquet legacy che non supportano i tipi di timestamp standard.

Testo

Quando si scrivono file di testo, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
compression none none, bzip2, gzip, lz4snappy, , deflatezstd Codec di compressione da usare durante la scrittura. Si applica al testo (DataFrameWriter).
encoding UTF-8 Nome java.nio.charset.Charset Codifica dei caratteri per i file di output.
lineSep \n Una stringa Stringa separatore di riga utilizzata tra i record. Si applica al testo (DataFrameWriter).

XML (Extensible Markup Language)

Quando si scrivono file XML, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
arrayElementName item Qualsiasi stringa Nome dell'elemento per gli elementi della matrice che non hanno un nome esplicito. Si applica a xml (DataFrameWriter).
attributePrefix _ Qualsiasi stringa Prefisso anteporto ai nomi di campo che corrispondono agli attributi XML. Si applica a xml (DataFrameWriter).
compression none none, bzip2, gzip, lz4snappy, , deflatezstd Codec di compressione da usare durante la scrittura. Si applica a xml (DataFrameWriter).
dateFormat yyyy-MM-dd Stringa di formato data Formattare la stringa per i valori delle colonne di data. Si applica a xml (DataFrameWriter).
declaration version="1.0" encoding="UTF-8" standalone="yes" Stringa di dichiarazione XML o stringa vuota da eliminare Stringa di dichiarazione XML scritta all'inizio di ogni file di output. Impostare su una stringa vuota per eliminare la dichiarazione. Si applica a xml (DataFrameWriter).
encoding UTF-8 Nome java.nio.charset.Charset Codifica dei caratteri per i file di output. Si applica a xml (DataFrameWriter).
indent 4 spazi Qualsiasi stringa Stringa utilizzata per impostare il rientro degli elementi figlio nell'output. Impostare su una stringa vuota per disattivare il rientro e scrivere ogni riga su una singola riga.
locale en-US Identificatore java.util.Locale Identificatore delle impostazioni locali Java che influisce sulla formattazione predefinita di data, timestamp e decimale all'interno del codice XML.
nullValue null Qualsiasi stringa Stringa scritta per i valori Null. Se impostato su null, gli attributi e gli elementi figlio per i campi Null vengono omessi. Si applica a xml (DataFrameWriter).
rootTag ROWS Qualsiasi stringa Tag dell'elemento radice che esegue il wrapping di tutti gli elementi di riga nell'output. Si applica a xml (DataFrameWriter).
rowTag ROW Qualsiasi stringa Tag dell'elemento che rappresenta una riga nell'output. Si applica a xml (DataFrameWriter).
singleVariantColumn None Stringa del nome di colonna Nome della singola colonna Variant da scrivere in file XML. Si applica a xml (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp Stringa di formato per i valori di colonna timestamp. Si applica a xml (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato timestamp Stringa di formato per timestamp senza valori di colonna fuso orario. Si applica a xml (DataFrameWriter).
validateName true true, false Indica se generare un'eccezione se un nome di colonna non è un identificatore di elemento XML valido. Si applica a xml (DataFrameWriter).
valueTag _VALUE Qualsiasi stringa Nome del campo utilizzato per i dati di tipo carattere negli elementi XML che dispongono anche di attributi o elementi figlio. Si applica a xml (DataFrameWriter).

Opzioni dataStreamWriter

Usare queste opzioni con DataStreamWriter.option() per configurare le scritture di streaming.

Example

Nell'esempio seguente viene impostata la posizione del checkpoint per un flusso:

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

Comune

Le opzioni seguenti si applicano a tutte le operazioni di scrittura di streaming.

Chiave Predefinito Valori validi Description
checkpointLocation Nessuno (obbligatorio) Stringa di percorso Percorso della directory del checkpoint per la query di streaming. Obbligatorio per garantire la tolleranza di errore e le garanzie di elaborazione esattamente una volta. Ogni query di streaming deve usare un percorso di checkpoint univoco. Databricks consiglia di archiviare i checkpoint in un volume di Unity Catalog o in un percorso di archiviazione cloud. Consulta Checkpoint di Structured Streaming.
path None Stringa di percorso Percorso di output per sink di streaming basati su file, ad esempio Parquet. Si applica solo ai formati basati su file.

Sink della console

Quando si scrivono flussi nel sink della console, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
numRows 20 Numeri interi positivi Numero di righe da visualizzare per ogni micro batch durante la scrittura nel sink della console.
truncate true true, false Indica se troncare stringhe lunghe durante la visualizzazione delle righe. Impostare su per false visualizzare i valori stringa completi.

Delta Lake

Quando si scrive un flusso in una tabella Delta Lake tramite , si applicano le opzioni seguenti.format("delta") Le opzioni di sola sovrascrittura, overwriteSchemaad esempio , replaceWheree partitionOverwriteMode non sono supportate per le scritture in streaming.

Chiave Predefinito Valori validi Description
mergeSchema false true, false Se evolvere lo schema della tabella Delta Lake quando il dataframe di streaming contiene nuove colonne. Si applica solo alla modalità di output di accodamento. Si applica allo schema della tabella di aggiornamento.
userMetadata None Qualsiasi stringa Stringa definita dall'utente aggiunta ai metadati di commit per l'operazione di scrittura. Visibile nell'output di DESCRIBE HISTORY. Si applica alle tabelle Enrich con metadati personalizzati.

Sink di file

L'opzione seguente si applica quando si scrive un flusso in formati basati su file (Parquet, JSON, CSV, ORC, text). Per le opzioni specifiche del formato, vedere Opzioni DataFrameWriter.

Chiave Predefinito Valori validi Description
retention None Stringa temporale, ad 7 days esempio o 24 hours Per quanto tempo conservare i file di metadati sink usati per la tolleranza di errore e la compattazione. Quando non è impostato, i file di metadati vengono conservati a tempo indeterminato.

Sink Kafka

Quando si scrive in Kafka, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
kafka.bootstrap.servers None Elenco delimitato da virgole di host:port stringhe Obbligatorio. Elenco delimitato da virgole degli indirizzi del broker host:port Kafka.
topic None Qualsiasi stringa Argomento Kafka di destinazione per tutte le righe. Obbligatorio se il dataframe non include una topic colonna.
kafka.* None Qualsiasi valore di configurazione del producer Kafka Qualsiasi configurazione del producer Kafka preceduta da kafka.. Ad esempio: kafka.compression.type.

Sink di memoria

Quando si scrivono flussi nel sink di memoria, si applicano le opzioni seguenti.

Chiave Predefinito Valori validi Description
queryName Nessuno (obbligatorio) Qualsiasi stringa Nome della tabella in memoria in cui scrive la query. Obbligatorio per il sink di memoria. Configurabile anche tramite .queryName().
mode exactlyonce exactlyonce, atleastonce Garanzia di recapito per il sink di memoria. exactlyonce usa la modalità micro batch con semantica exactly-once. atleastonce usa la modalità continua con semantica at-least-once.

Opzioni della funzione Spark

Alcune funzioni predefinite di Spark SQL accettano una options mappa che controlla il comportamento di analisi o serializzazione. Passare le opzioni come Python dict o scala Map[String, String].

Example

L'esempio seguente analizza una colonna JSON durante l'eliminazione di record in formato non valido:

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

Avro

Le funzioni Avro accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

L'esempio seguente decodifica una colonna Avro con l'evoluzione dello schema abilitata:

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

Inoltre, le varianti del Registro schemi di from_avro e to_avro accettano le opzioni seguenti:

Chiave Predefinito Valori validi Description
schemaId None Intero ID schema ID schema dal Registro di sistema dello schema confluent da usare per decodificare i dati Avro codificati con uno schema incompatibile con jsonFormatSchema. Si applica solo a from_avro .
confluent.schema.registry.* None Qualsiasi valore della proprietà client di Confluent SR Proprietà di configurazione del client del Registro di sistema dello schema confluent. Passare qualsiasi proprietà client di Confluent SR usando questo prefisso, ad esempio confluent.schema.registry.basic.auth.user.info per le credenziali di autenticazione di base. Obbligatorio per le varianti del Registro di sistema dello schema di from_avro e to_avro.

CSV

Le funzioni CSV accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

L'esempio seguente legge CSV con un separatore personalizzato e NULL un valore:

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

JSON

Le funzioni JSON accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

L'esempio seguente scrive JSON con NULL campi ignorati e piuttosto formattati abilitati:

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobuf e to_protobuf non usano un'origine dati basata su file. I dati protobuf vengono sempre letti e scritti come colonne binarie usando queste funzioni. Le opzioni vengono passate come e Map[String, String] fanno distinzione tra maiuscole e minuscole.

Example

L'esempio seguente decodifica una colonna Protobuf usando la modalità PERMISSIVE:

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Le funzioni Protobuf usano le opzioni seguenti:

Chiave Predefinito Valori validi Description
mode FAILFAST FAILFAST, PERMISSIVE Come gestire i record danneggiati. FAILFAST genera un'eccezione. PERMISSIVE imposta campi in formato non valido su Null. Si applica a from_protobuf.
recursive.fields.max.depth -1 (disabilitato) 0 a 10 Profondità massima di ricorsione per i campi Protobuf ricorsivi. Impostare su 0 per disattivare il supporto dei campi ricorsivi. Si applica a from_protobuf.
convert.any.fields.to.json false true, false Indica se convertire i campi Protobuf Any in una stringa JSON anziché in un oggetto STRUCT. Si applica a from_protobuf.
emit.default.values false true, false Indica se generare campi con valori zero o predefiniti (semantica proto3). Quando false, i campi con valori predefiniti vengono omessi dall'output. Si applica a from_protobuf.
enums.as.ints false true, false Indica se eseguire il rendering dei campi enum come valori interi anziché come stringhe. Si applica a from_protobuf.
upcast.unsigned.ints false true, false Se eseguire l'upcast uint32 a Long e uint64 per evitare Decimal(20,0) l'overflow di integer. Si applica a from_protobuf.
unwrap.primitive.wrapper.types false true, false Indica se annullare il wrapping google.protobuf dei tipi wrapper (ad esempio e Int32ValueStringValue) nei tipi Spark primitivi corrispondenti. Si applica a from_protobuf.
retain.empty.message.types false true, false Indica se mantenere i tipi di messaggio Protobuf vuoti nello schema di output inserendo una colonna fittizia. Si applica a from_protobuf.
schema.registry.subject None Qualsiasi stringa Nome soggetto registro schemi. Obbligatorio quando si usano le varianti del Registro di sistema dello schema di from_protobuf e to_protobuf.
schema.registry.address None Stringa host:port Indirizzo del Registro di sistema dello schema (host e porta). Obbligatorio quando si usano le varianti del Registro di sistema dello schema di from_protobuf e to_protobuf.
schema.registry.protobuf.name None Qualsiasi stringa Specifica il messaggio Protobuf da usare quando l'oggetto del Registro di sistema dello schema contiene più messaggi. Optional.
schema.registry.schema.evolution.mode "restart" "restart", "none" Modalità di gestione delle modifiche dello schema quando viene rilevato un ID schema più recente in un record in ingresso. "restart" termina la query con un UnknownFieldException; configurare i processi per il riavvio in caso di errore durante la selezione delle modifiche. "none" ignora le modifiche di schema-ID e analizza i record più recenti con lo schema originale.
confluent.schema.registry.<option> Qualsiasi valore valido dell'opzione client del Registro di sistema dello schema di Confluent Passare qualsiasi opzione client del Registro di sistema dello schema confluente usando il prefisso "confluent.schema.registry". Ad esempio, impostare su "confluent.schema.registry.basic.auth.credentials.source""USER_INFO" e "confluent.schema.registry.basic.auth.user.info" su per "<KEY>:<SECRET>" configurare l'autenticazione di base.

XML (Extensible Markup Language)

Le funzioni XML accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

Nell'esempio seguente viene scritto xml con tag radice e riga personalizzati:

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))