Informazioni di riferimento sulle opzioni dell'API Spark

Questa pagina elenca le opzioni di input e output disponibili per le API Spark che leggono e scrivono dati.

Opzioni DataFrameReader

Usare queste opzioni con DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO e Auto Loader per controllare come Azure Databricks legge i file di dati.

Example

L'esempio seguente imposta su multiLineTrue per la lettura di file JSON:

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

Comune

Le opzioni seguenti si applicano a tutti i formati di file.

Chiave Predefinito Description
ignoreCorruptFiles false Indica se ignorare i file danneggiati. Se true, i processi Spark continueranno a essere eseguiti quando si verificano file danneggiati e il contenuto letto verrà comunque restituito. Per COPY INTOè possibile osservare i file danneggiati ignorati come numSkippedCorruptFiles nella operationMetrics colonna della cronologia delta Lake. Disponibile in Databricks Runtime 11.3 LTS e versioni successive.
ignoreMissingFiles false per caricatore automatico, true per COPY INTO (legacy) Indica se ignorare i file mancanti. Se true, i processi Spark continuano a essere eseguiti quando vengono rilevati file mancanti e il contenuto viene comunque restituito. Disponibile in Databricks Runtime 11.3 LTS e versioni successive.
modifiedAfter None Timestamp facoltativo come filtro per inserire solo i file con un timestamp di modifica dopo il timestamp specificato.
modifiedBefore None Timestamp facoltativo come filtro per inserire solo i file con un timestamp di modifica prima del timestamp specificato.
pathGlobFilter oppure fileNamePattern None Un potenziale modello glob da fornire per la scelta dei file. Equivalente a PATTERN in COPY INTO (legacy). fileNamePattern può essere usato in read_files.
recursiveFileLookup false Quando true, questa opzione esegue la ricerca nelle directory nidificate anche se i nomi non seguono uno schema di denominazione di partizione come date=2019-07-01.

Avro

Chiave Predefinito Description
avroSchema None Schema facoltativo fornito da un utente in formato Avro. Quando si legge Avro, questa opzione può essere impostata su uno schema evoluto compatibile ma diverso dallo schema Avro effettivo. Lo schema di deserializzazione è coerente con lo schema evoluto. Ad esempio, se si imposta uno schema evoluto contenente una colonna aggiuntiva con un valore predefinito, il risultato di lettura contiene anche la nuova colonna.
avroSchemaEvolutionMode none Come gestire l'evoluzione dello schema quando si usa un registro schemi. Valori validi: none (ignorare le modifiche dello schema e continuare il processo), restart (quando vengono rilevate modifiche dello schema, genera un e UnknownFieldException richiede un riavvio del processo).
datetimeRebaseMode LEGACY Controlla la ricalibrazione dei valori DATE e TIMESTAMP tra i calendari giuliano e gregoriano prolettico. Valori validi: EXCEPTION, LEGACY e CORRECTED.
enableStableIdentifiersForUnionType false Indica se usare nomi di campo stabili per i tipi di Unione Avro. Se abilitata, i nomi dei campi dei tipi di unione vengono derivati dai relativi nomi di tipo in lettere minuscole , ad esempio member_int, member_string. Genera un'eccezione se due nomi di tipo sono identici dopo la minuscola.
mergeSchema false Indica se dedurre lo schema tra più file e unire lo schema di ogni file. mergeSchema per Avro non consente l'allentamento dei tipi di dati.
mode FAILFAST Modalità parser per la gestione dei record danneggiati. Valori validi: FAILFAST (genera un'eccezione), PERMISSIVE (imposta campi in formato non valido su Null), DROPMALFORMED (elimina automaticamente i record non validi).
readerCaseSensitive true Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
recursiveFieldMaxDepth None Profondità massima di ricorsione per i campi Avro ricorsivi. Impostare su per 1 troncare tutti i campi ricorsivi, 2 per consentire un livello di ricorsione e così via fino a 15. Se non è impostato o 0, i campi ricorsivi non sono consentiti. Valori validi: 0 per 15.
rescuedDataColumn None Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.
stableIdentifierPrefixForUnionType member_ Prefisso da usare per i nomi dei campi del tipo di unione stabile quando enableStableIdentifiersForUnionType=true.

CSV

Chiave Predefinito Description
badRecordsPath None Percorso in cui archiviare i file per registrare le informazioni sui record CSV non validi.
charToEscapeQuoteEscaping \0 Carattere utilizzato per sfuggire il carattere usato per sfuggire le virgolette. Ad esempio, per i seguenti record [ " a\\", b ]:
  • Se il carattere da usare come escape per '\' è indefinito, il record non verrà analizzato. Il parser leggerà i caratteri: [a],[\],["],[,],[ ],[b] e genererà un errore perché non riesce a trovare un'virgoletta di chiusura.
  • Se il carattere di escape del '\' viene definito come '\', il record verrà letto con 2 valori: [a\] e [b].
columnNameOfCorruptRecord _corrupt_record Supportato per Auto Loader. Non supportato per COPY INTO (obsoleto).
Colonna per l'archiviazione di record malformati e non analizzabili. Se il parametro mode per il parsing è impostato su DROPMALFORMED, questa colonna sarà vuota.
comment \0 Definisce il carattere che rappresenta un commento di riga quando viene trovato all'inizio di una riga di testo. Usare '\0' per disabilitare l'ignoramento dei commenti.
dateFormat yyyy-MM-dd Formato per l'analisi delle stringhe di data.
emptyValue Stringa vuota Rappresentazione in forma di stringa di un valore di vuoto.
enableDateTimeParsingFallback false Indica se eseguire il fallback al comportamento di analisi di data e timestamp legacy quando non è possibile analizzare un valore con il formato specificato. Quando false, gli errori di analisi generano un errore o producono null a seconda di mode.
encoding oppure charset UTF-8 Nome della codifica dei file CSV. Vederejava.nio.charset.Charset per l'elenco delle opzioni. UTF-16 e UTF-32 non possono essere usati quando multiline è true.
enforceSchema true Indica se applicare forzatamente lo schema specificato o dedotto ai file CSV. Se l'opzione è abilitata, le intestazioni dei file CSV vengono ignorate. Questa opzione viene ignorata per impostazione predefinita quando si usa il caricatore automatico per salvare i dati e consentire l'evoluzione dello schema.
escape \ Carattere di escape da utilizzare durante l'analisi dei dati.
extension csv Estensione del nome file prevista. I file senza questa estensione vengono filtrati durante le letture.
failOnUnknownFields false Indica se non riuscire quando il record CSV contiene colonne non presenti nello schema. Quando false, le colonne non riconosciute vengono eliminate o salvate automaticamente a seconda di rescuedDataColumn.
failOnWidenedFields false Indica se non è possibile eseguire l'analisi di un valore di campo come tipo di schema dichiarato senza ampliare. Quando false, i valori con estensione di tipo vengono salvati automaticamente a seconda di rescuedDataColumn. L'impostazione failOnUnknownFields=true può mascherare gli effetti di questa opzione.
header false Indica se i file CSV contengono un'intestazione. Auto Loader presuppone che i file abbiano intestazioni quando si deduce lo schema.
ignoreLeadingWhiteSpace false Indica se ignorare gli spazi vuoti iniziali per ogni valore analizzato.
ignoreTrailingWhiteSpace false Indica se ignorare gli spazi vuoti finali per ogni valore analizzato.
inferSchema false Stabilire se dedurre i tipi di dati dei record CSV analizzati o assumere che tutte le colonne siano di StringType. Richiede un passaggio aggiuntivo sui dati se impostato su true. Per il caricatore automatico, usare cloudFiles.inferColumnTypes invece.
inputBufferSize 1048576 (1 MB) Dimensioni del buffer in byte per il parser CSV. Utile per ottimizzare l'utilizzo della memoria durante l'analisi di file CSV di grandi dimensioni. Valori validi: numeri interi positivi.
lineSep Nessuno, che copre \r, \r\ne \n Una stringa di testo tra due record CSV consecutivi.
locale US Un identificatore java.util.Locale. Influenza l'interpretazione delle date, dei timestamp e dei decimali predefinita all'interno del file di tipo CSV.
maxCharsPerColumn -1 Numero massimo di caratteri previsti da un valore da analizzare. Può essere usato per evitare errori di memoria. Il valore predefinito è -1, ovvero illimitato. Valori validi: numeri interi positivi o -1 illimitati.
maxColumns 20480 Limite rigido del numero di colonne che un record può avere. Valori validi: numeri interi positivi.
mergeSchema false Indica se dedurre lo schema tra più file e unire lo schema di ogni file. Abilitato per impostazione predefinita per l'Auto Loader quando si deduce lo schema.
mode PERMISSIVE Modalità parser per la gestione di record malformati. Valori validi: PERMISSIVE, DROPMALFORMED, FAILFAST.
multiLine false Indica se i record CSV si estendono su più righe.
nanValue NaN Rappresentazione di stringa di un valore non numerico durante l'analisi delle colonne FloatType e DoubleType.
negativeInf -Inf La rappresentazione sotto forma di stringa dell'infinito negativo durante l'analisi delle colonne FloatType o DoubleType.
nullValue Stringa vuota Rappresentazione in forma di stringa del valore null.
parserCaseSensitive (obsoleto) false Durante la lettura dei file, decidere se allineare le colonne dichiarate nell'intestazione con lo schema rispettando la distinzione tra maiuscole e minuscole. Questa è true l'impostazione predefinita per Auto Loader. Le colonne che differiscono per caso verranno salvate nel rescuedDataColumn se abilitato. Questa opzione è stata deprecata a favore di readerCaseSensitive.
positiveInf Inf La rappresentazione in forma di stringa dell'infinito positivo mentre si analizzano le colonne FloatType o DoubleType.
preferDate true Quando possibile, tenta di interpretare le stringhe come date anziché come timestamp. È anche necessario usare l'inferenza dello schema, abilitando inferSchema o usando cloudFiles.inferColumnTypes con il caricatore automatico.
quote " Carattere utilizzato per escludere i valori quando il delimitatore di campo fa parte del valore.
readerCaseSensitive true Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
rescuedDataColumn None Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
sep oppure delimiter , Stringa di separazione tra le colonne.
singleVariantColumn None Se impostato su un nome di colonna, legge l'intero record CSV in una singola VariantType colonna con tale nome anziché analizzare ogni campo nella propria colonna. Richiede header=true.
skipRows 0 Numero di righe dall'inizio del file CSV che devono essere ignorate (incluse le righe commentate e vuote). Se header è vero, l'intestazione sarà la prima riga non ignorata e non commentata. Valori validi: numeri interi positivi o 0.
timeFormat HH:mm:ss Formato per l'analisi dei valori delle TimeType colonne.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Formato per l'analisi delle stringhe di timestamp.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Formato per l'analisi del timestamp senza stringhe di fuso orario (TimestampNTZType).
timeZone None Oggetto java.time.ZoneId da utilizzare durante l'analisi di timestamp e date.
unescapedQuoteHandling STOP_AT_DELIMITER La strategia per il trattamento delle virgolette non scappate. Opzioni consentite:
  • STOP_AT_CLOSING_QUOTE: se nell'input vengono trovate virgolette senza caratteri di escape, accumulare il carattere della virgoletta e continuare ad analizzare il valore come valore tra virgolette, fino a quando non viene trovata una virgoletta di chiusura.
  • BACK_TO_DELIMITER: Se nell'input si trovano virgolette non precedute da caratteri di escape, considerare il valore come un valore non quotato. In questo modo il parser accumula tutti i caratteri del valore analizzato corrente fino a quando non viene trovato il delimitatore definito da sep. Se non viene trovato alcun delimitatore nel valore, il parser continuerà ad accumulare caratteri dall'input fino a quando non viene trovato un delimitatore o una terminazione di riga.
  • STOP_AT_DELIMITER: Se nell'input si trovano virgolette non precedute da caratteri di escape, considerare il valore come un valore non quotato. In questo modo il parser accumula tutti i caratteri fino a quando il delimitatore definito da sep o una fine di riga viene trovata nell'input.
  • SKIP_VALUE: se nell'input vengono trovate virgolette non precedute da caratteri di escape, il contenuto analizzato per il valore specificato verrà ignorato (finché non viene trovato il delimitatore successivo) e il valore impostato in nullValue verrà prodotto.
  • RAISE_ERROR: se nell'input vengono trovate virgolette senza caratteri di escape, verrà generata un'eccezione TextParsingException .

Excel

Chiave Predefinito Description
dataAddress None Intervallo di celle da leggere nella sintassi Excel. Se omesso, legge tutte le celle valide dal primo foglio. Utilizzare "SheetName!C5:H10" per leggere un intervallo da un foglio denominato, "C5:H10" per leggere un intervallo dal primo foglio o "SheetName" per leggere tutti i dati da un foglio specifico.
headerRows 0 Numero di righe iniziali da usare come intestazioni del nome di colonna. Quando dataAddress viene specificato, questo vale all'interno dell'intervallo di celle. Quando 0, i nomi di colonna vengono generati automaticamente come _c1, _c2, _c3e così via. Valori validi: 0, 1.
ignoreMissingSheet false Indica se ignorare automaticamente i file che non contengono il foglio specificato da dataAddress. Quando false, viene generato un errore se manca un file nel foglio richiesto. Si applica solo quando viene specificato un nome di foglio in dataAddress. Valori validi: true, false.
includePhoneticRuns false Indica se includere annotazioni fonetiche (ad esempio pinyin o furigana) concatenate ai valori di stringa delle celle durante la lettura di file XLSX. Valori validi: true, false.
operation readSheet Operazione da eseguire nella cartella di lavoro Excel. Valori validi: readSheet (legge i dati da un foglio), listSheets (restituisce uno struct con campi sheetIndex: long e sheetName: String per ogni foglio).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato personalizzata per i valori timestamp-without-timezone archiviati come stringhe in Excel. I formati di data personalizzati seguono i formati dei modelli di data e ora.
dateFormat yyyy-MM-dd Stringa di formato personalizzata per i valori stringa letti come Date. I formati di data personalizzati seguono i formati dei modelli di data e ora.

JSON

Chiave Predefinito Description
allowBackslashEscapingAnyCharacter false Indica se consentire alle barre rovesciate di eseguire l'escape di qualsiasi carattere che abbia esito positivo. Se non è abilitata, solo i caratteri specificatamente elencati dalla specifica JSON possono essere sottoposti a escape.
allowComments false Indica se consentire o meno l'uso dei commenti di stile Java, C e C++ ('/', '*' e '//' ) all'interno del contenuto analizzato.
allowNonNumericNumbers true Indica se consentire l'insieme di token non numerici (NaN) come valori numerici in virgola mobile legali.
allowNumericLeadingZeros false Indica se consentire ai numeri integrali di iniziare con zeli aggiuntivi (ignorabili), (ad esempio 000001).
allowSingleQuotes true Se consentire l'uso di virgolette singole (apostrofo, carattere '\') per quotare stringhe (nomi e valori di tipo String).
allowUnquotedControlChars false Indica se consentire alle stringhe JSON di contenere caratteri di controllo senza escape (caratteri ASCII con valore minore di 32, inclusi i caratteri di tabulazione e avanzamento riga) o meno.
allowUnquotedFieldNames false Indica se consentire l'uso di nomi di campo senza virgo chiave, consentiti da JavaScript, ma non dalla specifica JSON.
alternateVariantEncoding None Codifica usata per i valori Variant nel codice JSON di origine. Impostare su per Z85 decodificare i valori Variant con codifica Base85 anziché archiviati come JSON inline.
badRecordsPath None Percorso per archiviare i file per registrare le informazioni sui record JSON non validi.
L'uso dell'opzione badRecordsPath in un'origine dati basata su file presenta le limitazioni seguenti:
  • Non transazionale e può causare risultati incoerenti.
  • Gli errori temporanei vengono considerati errori.
columnNameOfCorruptRecord _corrupt_record Colonna per l'archiviazione di record malformati e non analizzabili. Se il parametro mode per il parsing è impostato su DROPMALFORMED, questa colonna sarà vuota.
dateFormat yyyy-MM-dd Formato per l'analisi delle stringhe di data.
dropFieldIfAllNull false Indica se ignorare colonne con tutti i valori nulli o matrici e strutture vuote durante l'inferenza dello schema.
encoding oppure charset UTF-8 Nome della codifica dei file JSON. Vedere java.nio.charset.Charset per l'elenco delle opzioni. Non è possibile usare UTF-16 e UTF-32 quando multiline è true.
inferTimestamp false Indica se provare a dedurre stringhe di timestamp come TimestampType. Se impostato su true, l'inferenza dello schema potrebbe richiedere molto più tempo. È necessario abilitare cloudFiles.inferColumnTypes per l’uso con il caricatore automatico.
lineSep Nessuno, che copre \r, \r\ne \n Una stringa tra due record JSON consecutivi.
locale US Un identificatore java.util.Locale. Influenza l'analisi predefinita di date, timestamp e numeri decimali all'interno del JSON.
maxNestingDepth 500 Profondità massima consentita di annidamento per oggetti e matrici JSON. Aumentare questo valore per i documenti annidati in modo approfondito. Valori validi: numeri interi positivi.
maxNumLen 1000 Lunghezza massima dei token numerici nell'input JSON. Aumentare questo valore per JSON con valori letterali numerici di grandi dimensioni. Valori validi: numeri interi positivi.
maxStringLen illimitata Lunghezza massima dei valori stringa nell'input JSON. Impostare per limitare l'utilizzo della memoria durante l'analisi di JSON con stringhe di grandi dimensioni. Valori validi: numeri interi positivi.
mode PERMISSIVE Modalità parser per la gestione di record malformati. Valori validi: PERMISSIVE, DROPMALFORMED, FAILFAST.
multiLine false Indica se i record JSON si estendono su più righe.
prefersDecimal false Tenta di dedurre stringhe come DecimalType invece di tipo float o double, quando possibile. È anche necessario usare l'inferenza dello schema, abilitando inferSchema o usando cloudFiles.inferColumnTypes con il caricatore automatico.
primitivesAsString false Stabilisce se dedurre tipi primitivi come i numeri e i booleani sotto forma di StringType.
readerCaseSensitive true Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. Disponibile in Databricks Runtime 13.3 e versioni successive.
rescuedDataColumn None Indica se raccogliere tutti i dati che non possono essere analizzati a causa di una mancata corrispondenza del tipo di dati o di una mancata corrispondenza dello schema (inclusa la combinazione di maiuscole e minuscole) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altre informazioni, vedere Che cos'è la colonna di dati salvata?
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
singleVariantColumn None Indica se inserire l'intero documento JSON, analizzato in una singola colonna Variant con la stringa specificata come nome della colonna. Se non è impostato, i campi JSON vengono inseriti nelle proprie colonne. Valori validi: qualsiasi stringa.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Formato per l'analisi delle stringhe di timestamp.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Formato per l'analisi del timestamp senza stringhe di fuso orario (TimestampNTZType).
timeZone None Oggetto java.time.ZoneId da utilizzare durante l'analisi di timestamp e date.
upgradeExceptionAsBadRecord false Indica se considerare le eccezioni di aggiornamento del tipo (ad esempio, quando un valore non può essere ampliato al tipo di colonna dichiarato) come record non validi anziché generare un'eccezione.

Kafka

Per l'elenco completo delle opzioni del lettore Kafka, vedere Opzioni Kafka di DataStreamReader. Le opzioni seguenti si applicano solo alle letture batch tramite spark.read.format("kafka").

Chiave Predefinito Description
endingOffsets latest Dove interrompere la lettura. Valori validi: latesto una stringa JSON di offset per ogni partizione, {"topicA":{"0":50,"1":-1}}ad esempio .
Nella stringa -1 JSON è l'offset più recente. -2, ovvero l'offset meno recente, non è consentito come offset finale.
endingOffsetsByTimestamp None Offset finali per partizione specificati come timestamp in millisecondi. Valori validi: stringa JSON di timestamp per ogni partizione, ad esempio {"topicA":{"0":2000,"1":3000}}.
endingTimestamp None Timestamp finale globale in millisecondi applicato a tutte le partizioni. Valori validi: numeri interi non negativi.

ORCO

Chiave Predefinito Description
mergeSchema false Indica se dedurre lo schema tra più file e unire lo schema di ogni file.

Parquet

Chiave Predefinito Description
datetimeRebaseMode LEGACY Controlla la ricalibrazione dei valori DATE e TIMESTAMP tra i calendari giuliano e gregoriano prolettico. Valori validi: EXCEPTION, LEGACY e CORRECTED.
int96RebaseMode LEGACY Controlla la ricalibrazione dei valori di timestamp INT96 tra i calendari Julian e Gregoriano prolettico. Valori validi: EXCEPTION, LEGACY e CORRECTED.
mergeSchema false Indica se dedurre lo schema tra più file e unire lo schema di ogni file.
readerCaseSensitive true Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
rescuedDataColumn None Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.

Archivio stati

Usare queste opzioni con spark.read.format("statestore") o la read_statestore funzione con valori di tabella per leggere i dati dello stato structured streaming. Vedi Leggi le informazioni sullo stato di Structured Streaming.

Chiave Predefinito Description
batchId ID batch più recente Batch di destinazione da cui leggere. Usare per eseguire query su uno stato precedente della query. Il batch deve essere sottoposto a commit ma non ancora pulito. Valori validi: numeri interi non negativi.
operatorId 0 Operatore di destinazione da cui leggere. Usare quando la query dispone di più operatori con stato. Valori validi: numeri interi non negativi.
storeName DEFAULT Nome dell'archivio stati di destinazione da cui leggere. Usare quando l'operatore con stato ha più istanze dell'archivio stati. È necessario specificare storeName o joinSide per un join di flusso, ma non entrambi. Valori validi: qualsiasi stringa.
joinSide None Lato di destinazione da cui leggere per un join di flusso.The target side to read from for a stream-stream join. È necessario specificare storeName o joinSide per un join di flusso, ma non entrambi. Valori validi: left, right.
snapshotStartBatchId None ID batch dello snapshot da usare come punto iniziale durante la lettura dello stato. Il lettore ricompila lo stato riproducendo le modifiche da questo snapshot fino a batchId. Utile quando uno snapshot è danneggiato. Deve specificare insieme a snapshotPartitionId. Non è possibile usare con readChangeFeed. Supporta l'archivio stati supportato da HDFS e l'archivio stati RocksDB con checkpoint del log delle modifiche abilitati. Disponibile in Databricks Runtime 15.4 LTS e versioni successive. Valori validi: numeri interi non negativi.
snapshotPartitionId None Se specificato, la query legge solo questa partizione. Deve specificare insieme a snapshotStartBatchId. Non è possibile usare con readChangeFeed. Disponibile in Databricks Runtime 15.4 LTS e versioni successive. Valori validi: numeri interi non negativi.
readChangeFeed false Quando true, restituisce le modifiche di stato in un intervallo specificato di batch tra changeStartBatchId e changeEndBatchId. Richiede changeStartBatchId. Impossibile usare con joinSide, batchIdsnapshotStartBatchId, o snapshotPartitionId. Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: true, false.
Per informazioni dettagliate, vedere Leggere le modifiche dello stato di Structured Streaming.
changeStartBatchId None ID batch iniziale per l'intervallo di feed di modifiche. Obbligatorio quando readChangeFeed è true. Si applica solo quando readChangeFeed è impostato su true. Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: numeri interi non negativi.
changeEndBatchId ID batch più recente ID batch finale per l'intervallo di feed di modifiche. Deve essere maggiore o uguale a changeStartBatchId. Si applica solo quando readChangeFeed è impostato su true. Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: numeri interi non negativi.
stateVarName None Nome della variabile di stato da leggere. Il nome della variabile di stato è il nome univoco di ogni variabile all'interno della init funzione di un StatefulProcessor oggetto usato dall'operatore transformWithState . Obbligatorio quando si usa l'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: qualsiasi stringa.
readRegisteredTimers false Quando true, legge i timer registrati usati dall'operatore transformWithState . Si applica solo all'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: true, false.
flattenCollectionTypes true Quando true, rende flat i record restituiti per le variabili di stato della mappa e dell'elenco. Quando false, restituisce i record come Spark SQL Array o Map. Si applica solo all'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: true, false.

Testo

Chiave Predefinito Description
encoding UTF-8 Nome della codifica del separatore di riga del file TEXT. Per un elenco delle opzioni, vedere java.nio.charset.Charset. Il contenuto del file non è interessato da questa opzione e viene letto as-is.
lineSep Nessuno, che copre \r, \r\n e \n Una stringa tra due record TEXT consecutivi.
wholeText false Indica se leggere un file come singolo record.

XML

Chiave Predefinito Description
rowTag None Il tag di riga dei file XML da trattare come una riga. Nell'esempio XML <books> <book><book>...<books>, il valore appropriato è book. Si tratta di un'opzione obbligatoria.
samplingRatio 1.0 Definisce una frazione di righe utilizzate per l'inferenza dello schema. Le funzioni predefinite XML ignorano questa opzione. Valori validi: 0.0 per 1.0.
excludeAttribute false Indica se escludere gli attributi negli elementi.
mode None Modalità per gestire i record danneggiati durante l'analisi. PERMISSIVE: per i record danneggiati, inserisce la stringa in formato non valido in un campo configurato da columnNameOfCorruptRecord e imposta i campi in formato non valido su null. Per mantenere i record danneggiati, è possibile impostare un campo di tipo string denominato columnNameOfCorruptRecord in uno schema definito dall'utente. Se il campo non è presente in uno schema, i record danneggiati vengono eliminati durante l'analisi. Quando si deduce uno schema, il parser aggiunge in modo implicito un campo columnNameOfCorruptRecord in uno schema di output. DROPMALFORMED: ignora i record danneggiati. Questa modalità non è supportata per le funzioni predefinite XML. FAILFAST: genera un'eccezione quando il parser incontra i record danneggiati.
inferSchema true Se true, tenta di dedurre un tipo appropriato per ogni colonna DataFrame risultante. Se false, tutte le colonne risultanti sono di tipo string. Le funzioni predefinite XML ignorano questa opzione.
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord Consente di rinominare il nuovo campo contenente una stringa in formato non valido creata dalla PERMISSIVE modalità.
attributePrefix None Prefisso per gli attributi per distinguere gli attributi dagli elementi. Questo sarà il prefisso per i nomi dei campi. Il valore predefinito è _. Può essere vuoto per la lettura del codice XML, ma non per la scrittura. Si applica anche alle opzioni XML DataFrameWriter.
valueTag _VALUE Tag utilizzato per i dati di caratteri all'interno di elementi che hanno anche attributi o elementi figlio. L'utente può specificare il campo valueTag nello schema oppure verrà aggiunto automaticamente durante l'inferenza dello schema quando i dati di tipo carattere sono presenti in elementi con altri elementi o attributi. Si applica anche alle opzioni XML DataFrameWriter.
encoding UTF-8 Per la lettura, decodifica i file XML in base al tipo di codifica specificato. Per la scrittura, specifica la codifica (charset) dei file XML salvati. Le funzioni predefinite XML ignorano questa opzione. Si applica anche alle opzioni XML DataFrameWriter.
ignoreSurroundingSpaces true Indica se gli spazi vuoti che circondano i valori devono essere ignorati. I dati di caratteri composti solo da spazi bianchi vengono ignorati.
rowValidationXSDPath None Percorso di un file XSD facoltativo utilizzato per convalidare il codice XML per ogni riga singolarmente. Le righe che non riescono a convalidare vengono considerate come errori di analisi. L'XSD non influisce in caso contrario sullo schema, indipendentemente dal fatto che sia specificato o dedotto.
ignoreNamespace false Se true, i prefissi degli spazi dei nomi sugli elementi e gli attributi XML vengono ignorati. I tag <abc:author> e <def:author>, ad esempio, vengono considerati come se entrambi siano solo <author>. I namespace non possono essere ignorati nell'elemento rowTag, solo nei suoi figli leggibili. L'analisi XML non è sensibile ai namespace anche se false.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato timestamp personalizzata che segue il formato del modello datetime pattern. Questo vale per il tipo timestamp. Si applica anche alle opzioni XML DataFrameWriter.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato personalizzata per timestamp senza fuso orario che segue il formato del modello datetime. Questo vale per il tipo TimestampNTZType. Si applica anche alle opzioni XML DataFrameWriter.
dateFormat yyyy-MM-dd Stringa di formato data personalizzata che segue il modello datetime. Questo vale per il tipo di data. Si applica anche alle opzioni XML DataFrameWriter.
locale en-US Imposta un locale come tag di lingua nel formato IETF BCP 47. Ad esempio, locale viene usato durante l'analisi di date e timestamp.
nullValue stringa null Imposta la rappresentazione in forma di stringa del valore null. Quando si tratta di null, il parser non scrive attributi ed elementi per i campi. Si applica anche alle opzioni XML DataFrameWriter.
readerCaseSensitive true Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole.
rescuedDataColumn None Indica se raccogliere tutti i dati che non possono essere analizzati a causa di una mancata corrispondenza di tipo di dati o di schema (inclusa una differenza nel caso delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, vedere Che cos'è la colonna di dati salvata?. COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.
singleVariantColumn none Specifica il nome della singola colonna variante. Se questa opzione viene specificata per la lettura, analizzare l'intero record XML in una singola colonna Variant con il valore della stringa di opzione specificato come nome della colonna. Se questa opzione viene specificata per la scrittura, scrivere il valore della singola colonna Variant nei file XML. Si applica anche alle opzioni XML DataFrameWriter.
useLegacyXMLParser true Indica se usare il parser XML legacy. Il parser legacy ha una convalida meno rigorosa per il contenuto in formato non valido, ma è meno efficiente per la memoria. Impostare su per false acconsentire esplicitamente al parser predefinito più restrittivo.
wildcardColName xs_any Nome della colonna utilizzato per acquisire elementi XML che corrispondono all'elemento dello schema con caratteri jolly (xs:any). Non può essere usato insieme a rescuedDataColumn.

Opzioni DataStreamReader

Usare queste opzioni con DataStreamReader.option() per configurare le letture di streaming dalle tabelle Delta Lake e da altre origini basate su file.

Per le opzioni di formato di file (JSON, CSV, Parquet e altri), vedere Opzioni DataFrameReader.

Per le opzioni del caricatore automatico (cloudFiles.*), vedere Caricamento automatico.

Example

L'esempio seguente imposta maxFilesPerTrigger su 10 per un flusso di tabelle Delta Lake:

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

Comune

Le opzioni seguenti si applicano alle tabelle Delta Lake e ad altre origini di streaming basate su file.

Chiave Predefinito Description
cleanSource off Come gestire i file di origine dopo l'elaborazione da parte del flusso. Valori validi: off (nessuna azione), delete (eliminare definitivamente il file di origine), archive (passare a sourceArchiveDir). Se impostato su archive, sourceArchiveDir deve essere impostato anche . Non si applica allo streaming di tabelle Delta Lake.
fileNameOnly false Indica se identificare i file già elaborati in base al nome file solo anziché al percorso completo. Quando true, i file in percorsi diversi con lo stesso nome file vengono considerati come lo stesso file e non vengono rielaborati. Non si applica allo streaming di tabelle Delta Lake.
latestFirst false Indica se elaborare prima i file modificati più di recente all'interno di ogni micro batch. Utile quando si desidera elaborare i dati più recenti il più rapidamente possibile. Quando true e maxFilesPerTrigger o maxBytesPerTrigger è impostato, maxFileAge viene ignorato. Non si applica allo streaming di tabelle Delta Lake.
maxBytesPerTrigger None Valore massimo flessibile per la quantità di dati elaborati per ogni micro batch. Un batch può elaborare più del limite se l'unità di input più piccola lo supera. Se usato insieme a maxFilesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto uno dei limiti. Valori validi: numeri interi positivi.
Per il caricatore automatico, usare cloudFiles.maxBytesPerTrigger invece. Vedere Common( Comune).
maxCachedFiles 10000 Numero massimo di file non elaborati da memorizzare nella cache per i micro batch successivi. Impostare su 0 per disattivare la memorizzazione nella cache. Aumentare questo valore quando la directory di origine contiene molti nuovi file per ogni trigger. Non si applica allo streaming di tabelle Delta Lake. Valori validi: numeri interi positivi o 0.
maxFileAge 7d Validità massima dei file considerati per l'elaborazione, rispetto al timestamp del file modificato più di recente anziché all'ora di sistema corrente. I file precedenti a questa soglia vengono ignorati. Accetta stringhe di durata, 7d ad esempio o 4h. Ignorato quando latestFirst è true e maxFilesPerTrigger o maxBytesPerTrigger è impostato. Non si applica allo streaming di tabelle Delta Lake.
maxFilesPerTrigger 1000 per Delta Lake e Auto Loader. Nessun valore massimo per altre origini basate su file. Limite superiore per il numero di nuovi file elaborati in ogni micro batch. Se usato insieme a maxBytesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto uno dei limiti. Valori validi: numeri interi positivi.
Per il caricatore automatico, usare cloudFiles.maxFilesPerTrigger invece. Vedere Common( Comune).
sourceArchiveDir None Percorso della directory di archiviazione quando cleanSource è impostato su archive. I file di origine vengono spostati in questo percorso dopo l'elaborazione, mantenendo la struttura di directory relativa. Non si applica allo streaming di tabelle Delta Lake.

Caricatore automatico

Usare queste opzioni con l'origine cloudFiles per configurare il caricatore automatico per l'inserimento in streaming dall'archiviazione cloud. Le opzioni specifiche dell'origine cloudFiles sono precedute cloudFiles dal prefisso per mantenerle in uno spazio dei nomi separato da altre opzioni di origine Structured Streaming .

Comune

Chiave Predefinito Description
cloudFiles.allowOverwrites false Indica se consentire modifiche al file della directory di input per sovrascrivere i dati esistenti.
Per le avvertenze di configurazione, vedere Il caricatore automatico elabora nuovamente il file quando il file viene aggiunto o sovrascritto?.
cloudFiles.backfillInterval None Il caricatore automatico può attivare i backfill asincroni a un determinato intervallo. Ad esempio 1 day , per il riempimento giornaliero o 1 week per il riempimento settimanale. Per altre informazioni, vedere Attivare i backfill regolari usando cloudFiles.backfillInterval.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
cloudFiles.cleanSource OFF Indica se eliminare automaticamente i file elaborati dalla directory di input. Se impostato su OFF (impostazione predefinita), non vengono eliminati file.
Se impostato su DELETE, Il caricatore automatico elimina automaticamente i file 30 giorni dopo l'elaborazione. A tale scopo, il caricatore automatico deve disporre delle autorizzazioni di scrittura per la directory di origine.
Se impostato su MOVE, il caricatore automatico sposta automaticamente i file nel percorso specificato in cloudFiles.cleanSource.moveDestination 30 giorni dopo l'elaborazione. A tale scopo, Auto Loader deve disporre delle autorizzazioni di scrittura per la directory sorgente e per la destinazione di spostamento.
Un file viene considerato elaborato quando ha un valore non Null per commit_time nel risultato della cloud_files_state funzione con valori di tabella. Vedere cloud_files_state funzione con valori di tabella. L'attesa aggiuntiva di 30 giorni dopo l'elaborazione può essere configurata tramite cloudFiles.cleanSource.retentionDuration.
Esaminare le considerazioni seguenti prima di abilitare cloudFiles.cleanSource:
  • Azure Databricks non consiglia di usare questa opzione se sono presenti più flussi che utilizzano dati dal percorso di origine perché il consumer più veloce eliminerà i file e non verrà inserito nelle origini più lente.
  • L'abilitazione di questa funzionalità richiede il caricamento automatico per mantenere uno stato aggiuntivo nel relativo checkpoint, che comporta un sovraccarico delle prestazioni, ma consente una migliore osservabilità tramite la cloud_files_state funzione con valori di tabella. Vedere cloud_files_state funzione con valori di tabella.
  • cleanSource usa l'impostazione corrente per decidere se scegliere MOVE o DELETE un determinato file. Si supponga, ad esempio, che l'impostazione fosse MOVE quando il file è stato elaborato originariamente, ma è stato modificato in DELETE quando il file è diventato un candidato per la pulizia 30 giorni dopo. In questo caso cleanSource eliminerà il file.
  • Non è garantito che i file vengano puliti non appena retentionDuration scade. Per ridurre i costi, il caricatore automatico elimina i file simultaneamente con l'elaborazione del flusso e termina non appena l'elaborazione del flusso viene completata o terminata. I file candidati per la pulizia, ma non possono essere puliti durante l'elaborazione del flusso verranno prelevati alla successiva esecuzione del caricatore automatico.

Disponibile in Databricks Runtime 16.4 e versioni successive.
cloudFiles.cleanSource.retentionDuration 30 days Tempo di attesa prima che i file elaborati diventino candidati per l'archiviazione con cleanSource. Deve essere maggiore di 7 giorni per DELETE. Nessuna restrizione minima per MOVE.
Il valore è una stringa CalendarInterval . Ad esempio, "14 days", "30 days", "2 weeks"o "1 month".
Disponibile in Databricks Runtime 16.4 e versioni successive.
cloudFiles.cleanSource.moveDestination None Percorso in cui archiviare i file elaborati quando cloudFiles.cleanSource è impostato su MOVE. Può trattarsi di un percorso di archiviazione cloud o di un percorso del volume del catalogo Unity (ad esempio, /Volumes/my_catalog/my_schema/my_volume/archive/).
Il percorso di spostamento deve:
  • Non essere un elemento figlio della directory di origine. Se si inserisce la destinazione di spostamento all'interno della directory di origine, i file archiviati vengono nuovamente inseriti.
  • Trovarsi nello stesso percorso esterno, volume o montaggio DBFS dell'origine. Gli spostamenti tra bucket e contenitori diversi non sono supportati e generano un errore.

Il caricatore automatico deve disporre delle autorizzazioni di scrittura per questa directory.
Disponibile in Databricks Runtime 16.4 e versioni successive.
cloudFiles.format Nessuno (opzione obbligatoria) Formato del file di dati nel percorso di origine. I valori validi includono:
cloudFiles.includeExistingFiles true Indica se includere file esistenti nel percorso di input di elaborazione del flusso o elaborare solo i nuovi file in arrivo dopo l'installazione iniziale. Questa opzione viene valutata solo quando avvii uno streaming per la prima volta. La modifica di questa opzione dopo il riavvio del flusso non ha alcun effetto.
cloudFiles.inferColumnTypes false Stabilisce se dedurre tipologie esatte di colonna quando si sfrutta l'inferenza dello schema. Per impostazione predefinita, le colonne vengono dedotte come stringhe durante l'inferenza di set di dati JSON e CSV. Per altri dettagli, vedere Inferenza dello schema .
cloudFiles.maxBytesPerTrigger None Numero massimo di nuovi byte da elaborare in ogni trigger. È possibile specificare una stringa di byte, ad esempio 10g per limitare ogni microbatch a 10 GB di dati. Questo è un massimo morbido. Se sono presenti file di 3 GB ciascuno, Azure Databricks elabora 12 GB in un microbatch. Se usato insieme a cloudFiles.maxFilesPerTrigger, Azure Databricks consuma fino al limite inferiore di cloudFiles.maxFilesPerTrigger o cloudFiles.maxBytesPerTrigger, a seconda di quale valore viene raggiunto per primo. Questa opzione non ha alcun effetto se usata con Trigger.Once() (Trigger.Once() è deprecata).
In Databricks Runtime 18.0 e versioni successive, questa opzione viene configurata dinamicamente e non deve essere impostata manualmente.
cloudFiles.maxFileAge None Per quanto tempo viene rilevato un evento di file a scopo di deduplicazione. Databricks sconsiglia di ottimizzare questo parametro a meno che non si stiano inserendo dati nell'ordine di milioni di file all'ora. Per altri dettagli, vedere la sezione relativa al rilevamento degli eventi file .
L'ottimizzazione troppo aggressiva di cloudFiles.maxFileAge può causare problemi di qualità dei dati, ad esempio l'inserimento duplicato o i file mancanti. Di conseguenza, Databricks consiglia un'impostazione conservativa per cloudFiles.maxFileAge, ad esempio 90 giorni, che è simile a quella consigliata per le soluzioni di inserimento dati simili.
cloudFiles.maxFilesPerTrigger 1000 Numero massimo di nuovi file da elaborare in ogni trigger. Se usato insieme a cloudFiles.maxBytesPerTrigger, Azure Databricks consuma fino al limite inferiore di cloudFiles.maxFilesPerTrigger o cloudFiles.maxBytesPerTrigger, a seconda di quale valore viene raggiunto per primo. Questa opzione non ha alcun effetto se usata con Trigger.Once() (deprecato).
In Databricks Runtime 18.0 e versioni successive, questa opzione viene configurata dinamicamente e non deve essere impostata manualmente.
cloudFiles.partitionColumns None Elenco delimitato da virgole di colonne di partizione in stile Hive che si desidera dedurre dalla struttura di directory dei file. Le colonne di partizione in stile Hive sono coppie chiave-valore combinate da un segno di uguaglianza, <base-path>/a=x/b=1/c=y/file.formatad esempio . In questo esempio le colonne di partizione sono a, b e c. Per impostazione predefinita, queste colonne vengono aggiunte automaticamente allo schema se si usa l'inferenza dello schema e si specifica l'oggetto <base-path> da cui caricare i dati. Se si specifica uno schema, il caricatore automatico prevede che queste colonne vengano incluse nello schema. Se non si desidera che queste colonne facciano parte dello schema, è possibile specificare "" per ignorare queste colonne. Inoltre, è possibile usare questa opzione quando si vuole dedurre il percorso del file in strutture di directory complesse, come nell'esempio seguente:
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
Specificando cloudFiles.partitionColumns come year,month,day restituisce year=2022 per file1.csv, ma le colonne month e day sono null.
month e day vengono analizzati correttamente per file2.csv e file3.csv.
cloudFiles.schemaEvolutionMode addNewColumnsquando non viene specificato uno schema; in caso contrario, none La modalità per l'evoluzione dello schema man mano che vengono individuate nuove colonne nei dati. Per impostazione predefinita, le colonne vengono dedotte come stringhe durante l'inferenza di set di dati JSON. Per altri dettagli, vedere Evoluzione dello schema .
cloudFiles.schemaHints None Informazioni sullo schema fornite ad Auto Loader durante l'inferenza dello schema. Vedere hint di schema per maggiori dettagli.
cloudFiles.schemaLocation Nessuno (obbligatorio per dedurre lo schema) La posizione in cui archiviare lo schema dedotto e le modifiche successive. Per altri dettagli, vedere Inferenza dello schema .
cloudFiles.useStrictGlobber false Indica se usare un globber rigoroso che si allinei con il comportamento globbing predefinito di altre risorse di file in Apache Spark. Per altri dettagli, vedere Modelli di caricamento dei dati comuni . Disponibile in Databricks Runtime 12.2 LTS e versioni successive.
cloudFiles.validateOptions true Indica se convalidare le opzioni del caricatore automatico e restituire un errore per opzioni sconosciute o incoerenti.

Directory

Chiave Predefinito Description
cloudFiles.useIncrementalListing (obsoleto) auto in Databricks Runtime 17.2 e versioni successive in false Databricks Runtime 17.3 e versioni successive Questa funzionalità è stata deprecata. Databricks consiglia di usare la modalità di notifica file con eventi di file anziché cloudFiles.useIncrementalListing.
Indicare se utilizzare l'elenco incrementale anziché quello completo nella modalità elenco directory. Per impostazione predefinita, il caricatore automatico fa il massimo sforzo per rilevare automaticamente se una determinata directory è applicabile per l'elenco incrementale. È possibile usare in modo esplicito l'elenco incrementale o usare l'elenco completo della directory impostandolo rispettivamente come true o false.
L'abilitazione errata dell'elenco incrementale in una directory non lessicalmente ordinata impedisce al caricatore automatico di individuare nuovi file.
Funziona con Azure Data Lake Storage (abfss://), S3 (s3://) e GCS (gs://).
Disponibile in Databricks Runtime 9.1 LTS e versioni successive.
Valori disponibili: auto, true, false

Notifica file

Per informazioni sulla configurazione della modalità di notifica file, incluse le autorizzazioni cloud necessarie, le istruzioni di installazione e i metodi di autenticazione, vedere Configurare i flussi del caricatore automatico in modalità di notifica file.

Chiave Predefinito Description
cloudFiles.fetchParallelism 1 Numero di thread da usare per il recupero di messaggi dal servizio di accodamento.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
cloudFiles.pathRewrites None Obbligatorio solo se si specifica un queueUrl oggetto che riceve notifiche di file da più bucket S3 e si vuole usare i punti di montaggio configurati per l'accesso ai dati in questi contenitori. Usa questa opzione per riscrivere il prefisso del percorso bucket/key con il punto di montaggio. È possibile riscrivere solo i prefissi. Ad esempio, per la configurazione {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, il percorso s3://<databricks-mounted-bucket>/path/2017/08/fileA.json viene riscritto in dbfs:/mnt/data-warehouse/2017/08/fileA.json.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.
cloudFiles.resourceTag None Una serie di coppie di tag chiave-valore che consentono di associare e identificare le risorse correlate, ad esempio:
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
Per altre informazioni su AWS, vedere Tag di allocazione dei costi di Amazon SQS e Configurazione dei tag per un argomento Amazon SNS. (1)
Per altre informazioni su Azure, vedere Denominazione di code e metadati e la trattazione di properties.labels nelle sottoscrizioni di eventi. Il caricatore automatico archivia queste coppie di tag chiave-valore in JSON come etichette. (1)
Per altre informazioni su GCP, vedere Creazione di report sull'utilizzo con le etichette. (1)
Non usare quando cloudFiles.useManagedFileEvents è impostato su true. Impostare invece i tag delle risorse usando la console del provider di servizi cloud.
cloudFiles.useManagedFileEvents false Se impostato su true, Il caricatore automatico usa il servizio eventi file per individuare i file nel percorso esterno. È possibile usare questa opzione solo se il percorso di caricamento si trova in un percorso esterno con eventi di file abilitati. Vedere Usare la modalità di notifica file con gli eventi di file.
Gli eventi di file offrono prestazioni a livello di notifiche nell'individuazione file, perché il caricatore automatico può individuare nuovi file dopo l'ultima esecuzione. A differenza dell'elenco di directory, questo processo non deve elencare tutti i file nella directory.
Ci sono alcune situazioni in cui il caricatore automatico usa l'elenco di directory anche se l'opzione degli eventi di file è abilitata:
  • Durante il caricamento iniziale, quando includeExistingFiles è impostato su true, viene eseguito un elenco di directory completo per individuare tutti i file presenti nella directory prima dell'avvio del caricatore automatico.
  • Il servizio eventi file ottimizza l'individuazione dei file memorizzando nella cache i file creati più di recente. Se Auto Loader viene eseguito raramente, questa cache può scadere e Auto Loader ritorna all'elenco di directory per individuare i file e aggiornare la cache. Per evitare questo scenario, richiamare il caricatore automatico almeno una volta ogni sette giorni.

Vedere Quando il caricatore automatico con gli eventi di file usa l'elenco di directory? per un elenco completo delle situazioni in cui il caricatore automatico usa l'elenco di directory con questa opzione.
Disponibile in Databricks Runtime 14.3 LTS e versioni successive.
cloudFiles.listOnStart false Se impostato su true, Il caricatore automatico esegue un elenco di directory completo all'avvio del flusso, anziché iniziare con il token di continuazione nel checkpoint. Usare questa opzione per eseguire il ripristino da errori, ad esempio CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. Vedere How do I recover from a CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN error?.
cloudFiles.useNotifications false Indica se usare la modalità di notifica file per determinare quando sono presenti nuovi file. Se false, usare la modalità elenco di directory. Consulta Confronta le modalità di rilevamento dei file del caricatore automatico.
Non usare quando cloudFiles.useManagedFileEvents è impostato su true.

(1) Il caricatore automatico aggiunge per impostazione predefinita le seguenti coppie di tag chiave-valore su base migliorativa:

  • vendor: Databricks
  • path: percorso da cui vengono caricati i dati. Non disponibile in GCP a causa di limitazioni di etichettatura.
  • checkpointLocation: La posizione del checkpoint dello stream. Non disponibile in GCP a causa di limitazioni di etichettatura.
  • streamId: un identificatore univoco globale per il flusso.

Databricks riserva questi nomi di chiave e non è possibile sovrascriverne i valori.

Specifico del cloud

Il caricatore automatico offre opzioni per la configurazione dell'infrastruttura cloud per la modalità di notifica file. Per le autorizzazioni cloud necessarie e le istruzioni di configurazione, vedere Configurare flussi del caricatore automatico in modalità di notifica file.

AWS

Specificare le opzioni seguenti solo se si sceglie cloudFiles.useNotifications = true e si vuole che il caricatore automatico configuri automaticamente i servizi di notifica:

Chiave Predefinito Description
cloudFiles.region Area dell'istanza EC2 L'area in cui risiede il bucket S3 di origine e dove si vogliono creare i servizi AWS SNS e SQS.
Chiave Predefinito Description
cloudFiles.restrictNotificationSetupToSameAWSAccountId false Consentire solo le notifiche di eventi dai bucket AWS S3 nello stesso account del topic SNS. Quando è vero, Auto Loader accetta solo le notifiche degli eventi dai bucket di AWS S3 nello stesso account del topic SNS.
Quando false, i criteri di accesso non limitano le configurazioni di bucket tra diversi account e topic SNS. Ciò è utile quando l'argomento SNS e il percorso del bucket sono associati a account diversi.
Disponibile in Databricks Runtime 17.2 e versioni successive.

Specificare l'opzione seguente solo se si sceglie cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda già configurata:

Chiave Predefinito Description
cloudFiles.queueUrl None L'URL della coda SQS. Se specificato, Auto Loader consuma direttamente gli eventi da questa coda invece di configurare i propri servizi AWS SNS e SQS.

Opzioni di autenticazione di AWS

Fornire l'opzione di autenticazione seguente per usare una credenziale del servizio Databricks:

Chiave Predefinito Description
databricks.serviceCredential None Il nome della credenziale di servizio Databricks . Disponibile in Databricks Runtime 16.1 e versioni successive.

Quando le credenziali del servizio Databricks o i ruoli IAM non sono disponibili, è invece possibile fornire le opzioni di autenticazione seguenti:

Chiave Predefinito Description
cloudFiles.awsAccessKey None La chiave di accesso ID AWS per l’utente. Deve essere fornito con cloudFiles.awsSecretKey.
cloudFiles.awsSecretKey None La chiave di accesso segreta AWS per l’utente. Deve essere fornito con cloudFiles.awsAccessKey.
cloudFiles.roleArn None L'ARN di un ruolo IAM da assumere, se necessario. Il ruolo può essere assunto dal profilo dell'istanza del cluster o fornendo le credenziali con cloudFiles.awsAccessKey e cloudFiles.awsSecretKey.
cloudFiles.roleExternalId None Identificatore da specificare durante l'assunzione di un ruolo tramite cloudFiles.roleArn.
cloudFiles.roleSessionName None Nome di sessione facoltativo da usare durante l'assunzione di un ruolo tramite cloudFiles.roleArn.
cloudFiles.stsEndpoint None Endpoint facoltativo da fornire per l'accesso ad AWS STS quando si presuppone un ruolo usando cloudFiles.roleArn.
Azure

È necessario specificare i valori per tutte le opzioni seguenti se si specifica cloudFiles.useNotifications = true e si vuole che il caricatore automatico configuri automaticamente i servizi di notifica:

Chiave Predefinito Description
cloudFiles.resourceGroup None Gruppo di risorse Azure in cui viene creato l'account di archiviazione.
cloudFiles.subscriptionId None ID sottoscrizione Azure in cui viene creato il gruppo di risorse.
databricks.serviceCredential None Il nome della credenziale di servizio Databricks . Disponibile in Databricks Runtime 16.1 e versioni successive.

Se una credenziale del servizio Databricks non è disponibile, è possibile fornire invece le opzioni di autenticazione seguenti:

Chiave Predefinito Description
cloudFiles.clientId None ID cliente o ID applicazione del principale del servizio.
cloudFiles.clientSecret None Segreto del client principale di servizio.
cloudFiles.connectionString None La stringa di connessione per l'account di archiviazione, in base alla chiave di accesso dell'account o alla firma di accesso condiviso (SAS).
cloudFiles.tenantId None ID tenant Azure in cui viene creata l'entità servizio.

Specificare l'opzione seguente solo se si imposta cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda esistente:

Chiave Predefinito Description
cloudFiles.queueName None Nome della coda di Azure. Se specificato, l'origine dei file cloud utilizza direttamente gli eventi di questa coda invece di configurare i propri servizi Griglia di eventi di Azure e Queue Storage. In tal caso, il databricks.serviceCredential o cloudFiles.connectionString richiede solo autorizzazioni di lettura per la coda.
GCP

Auto Loader può configurare automaticamente i servizi di notifica sfruttando le credenziali del servizio Databricks . L'account del servizio creato con le credenziali del servizio Databricks richiederà le autorizzazioni specificate in Configurare flussi del caricatore automatico in modalità di notifica file.

Chiave Predefinito Description
cloudFiles.projectId None ID del progetto in cui si trova il bucket GCS. Anche la sottoscrizione google Cloud Pub/Sub viene creata all'interno di questo progetto.
databricks.serviceCredential None Il nome della credenziale di servizio Databricks . Disponibile in Databricks Runtime 16.1 e versioni successive.

Se le credenziali di un servizio Databricks non sono disponibili, è possibile usare direttamente gli account del servizio Google. È possibile configurare il cluster per presupporre un account del servizio seguendo la configurazione del servizio Google oppure specificare direttamente le opzioni di autenticazione seguenti:

Chiave Predefinito Description
cloudFiles.client None ID client dell'account del servizio Google.
cloudFiles.clientEmail None Indirizzo di posta elettronica dell'account del servizio Google.
cloudFiles.privateKey None Chiave privata generata per l'account del servizio Google.
cloudFiles.privateKeyId None ID della chiave privata generata per l'account del servizio Google.

Specificare l'opzione seguente solo se si sceglie cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda già configurata:

Chiave Predefinito Description
cloudFiles.subscription None Nome della sottoscrizione Google Cloud Pub/Sub. Se specificato, l'origine dei file cloud utilizza gli eventi di questa coda invece di configurare i propri servizi GCS Notification e Google Cloud Pub/Sub.

Delta Lake

Quando si esegue la lettura da una tabella Delta Lake tramite , si applicano le opzioni seguenti.spark.readStream

Chiave Predefinito Description
allowSourceColumnDrop None Impostare su un numero di versione della tabella Delta o "always" per consentire al flusso di continuare dopo l'eliminazione delle colonne dallo schema della tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.
allowSourceColumnRename None Impostare su un numero di versione della tabella Delta o "always" per consentire al flusso di continuare dopo la ridenominazione delle colonne nella tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.
allowSourceColumnTypeChange None Impostare su un numero di versione della tabella Delta o "always" per consentire al flusso di continuare dopo la modifica dei tipi di colonna nella tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedere Estensione del tipo.
excludeRegex None Modello di espressione regolare. File i cui percorsi corrispondono al modello vengono esclusi dalla lettura in streaming. Utile per filtrare i file che non sono conformi alla convenzione di denominazione prevista.
failOnDataLoss true Indica se la query di streaming non riesce se i dati di origine sono stati eliminati a causa della conservazione dei log (logRetentionDuration). Impostare su false per ignorare i dati mancanti e continuare l'elaborazione. Vedere Configurare la conservazione dei dati per le query di spostamento cronologico.
ignoreChanges (obsoleto) false Disponibile in Databricks Runtime 11.3 LTS e versioni precedenti. Ricrea i file di dati riscritti dopo le operazioni di modifica, ad UPDATEesempio , MERGE INTODELETE, o OVERWRITE. Le righe non modificate possono essere generate insieme a nuove righe, pertanto i consumer downstream devono gestire duplicati. Le operazioni di eliminazione non vengono propagate a valle. Sostituito da skipChangeCommits in Databricks Runtime 12.2 LTS e versioni successive.
ignoreDeletes (obsoleto) false Ignora le transazioni che eliminano i dati ai limiti della partizione (solo la partizione completa viene eliminata). Non gestisce eliminazioni, aggiornamenti o altre modifiche non di partizione. Utilizzare invece skipChangeCommits.
readChangeFeed oppure readChangeData false Indica se abilitare la lettura del feed di dati delle modifiche per la query di streaming. Se abilitato, il flusso genera modifiche a livello di riga (inserimenti, aggiornamenti ed eliminazioni) con colonne di metadati aggiuntive. Consulta Utilizzare il feed di dati delle modifiche di Delta Lake su Azure Databricks.
schemaTrackingLocation None Percorso di una directory in cui Delta Lake tiene traccia delle modifiche dello schema per la lettura in streaming. Obbligatorio quando si esegue lo streaming da tabelle con mapping di colonne abilitato e si usano allowSourceColumn* le opzioni per gestire l'evoluzione dello schema. Deve trovarsi all'interno checkpointLocation della query di streaming. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake.
skipChangeCommits false Ignora le transazioni che eliminano o modificano i record e i processi esistenti vengono aggiunti solo. Databricks consiglia questa opzione per la maggior parte dei carichi di lavoro che non usano feed di dati delle modifiche. Disponibile in Databricks Runtime 12.2 LTS e versioni successive. Vedere Ignorare i commit delle modifiche upstream con skipChangeCommits.
startingTimestamp Ultima versione disponibile Timestamp da cui iniziare la lettura. Il flusso legge tutte le modifiche apportate alla tabella di cui è stato eseguito il commit o dopo il timestamp specificato. Se il timestamp precede tutti i commit della tabella disponibili, il flusso inizia dal commit meno recente disponibile. Non può essere usato insieme a startingVersion. Ignorato se il checkpoint di streaming esiste già.
Valori validi: una stringa di timestamp, "2019-01-01T00:00:00.000Z" ad esempio o una stringa di data, "2019-01-01"ad esempio .
startingVersion Ultima versione disponibile Versione della tabella Delta da cui iniziare la lettura. Il flusso legge tutte le modifiche di cui è stato eseguito il commit in o dopo la versione specificata. Specificare "latest" per iniziare solo dalle modifiche più recenti. Non può essere usato insieme a startingTimestamp. Ignorato se il checkpoint di streaming esiste già. Consulta Lavora con la cronologia delle tabelle.
withEventTimeOrder false Divide lo snapshot della tabella iniziale in bucket di tempo dell'evento per impedire che i record vengano contrassegnati erroneamente come eventi tardivi e eliminati in query con stato con filigrane. Non può essere modificato dopo l'avvio dell'elaborazione iniziale degli snapshot senza eliminare il checkpoint. Disponibile in Databricks Runtime 11.3 LTS e versioni successive. Vedere Elaborare lo snapshot iniziale senza eliminare i dati.

Kafka

Usare queste opzioni con spark.readStream.format("kafka") o spark.read.format("kafka"):

Chiave Predefinito Description
assign None Partizioni specifiche da utilizzare. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Valori validi: stringa JSON, ad esempio {"topicA":[0,1],"topicB":[2,4]}.
failOnDataLoss true Indica se la query non riesce se i dati potrebbero essere andati persi, ad esempio a causa di argomenti eliminati o troncamento offset. Impostare su false per ignorare i dati mancanti e continuare. Valori validi: true, false.
Databricks stima in modo conservativo se i dati potrebbero essere andati persi. Tuttavia, questo potrebbe causare falsi allarmi.
fetchoffset.numretries 3 Numero di tentativi durante il recupero degli offset Kafka non riesce. Valori validi: numeri interi non negativi.
fetchoffset.retryintervalms 1000 Intervallo in millisecondi tra tentativi di recupero offset. Valori validi: numeri interi non negativi.
groupIdPrefix spark-kafka-source (streaming), spark-kafka-relation (batch) Prefisso personalizzato da usare per l'ID gruppo di consumer Kafka generato automaticamente. Se kafka.group.id è impostato in modo esplicito, il connettore ignora questa opzione. Valori validi: qualsiasi stringa.
includeHeaders false Indica se includere intestazioni di messaggio Kafka come colonna nell'output. Valori validi: true, false.
kafkaconsumer.polltimeoutms None Timeout in millisecondi per la chiamata al consumer poll() Kafka. Valori validi: numeri interi positivi.
kafka.bootstrap.servers None Elenco delimitato da virgole di indirizzi host:port per i broker Kafka. Imposta la proprietà del bootstrap.servers client Kafka.
Se non sono presenti dati di Kafka, controllare l'elenco di indirizzi del broker per gli indirizzi non corretti. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. I client Kafka presuppongono che i broker saranno disponibili alla fine e riprovare per sempre quando ricevono errori di rete.
maxRecordsPerPartition None Numero massimo di record per ogni partizione Spark. Se impostato, il connettore divide le partizioni Kafka in modo che ogni partizione Spark legga al massimo questo numero di record. Valori validi: numeri interi positivi.
È anche possibile usare questa opzione con minPartitions. Quando vengono impostate entrambe le opzioni, Spark usa qualsiasi opzione restituisca più partizioni.
minPartitions None Numero minimo di partizioni Spark da leggere da Kafka. Quando impostato, il connettore divide le partizioni Kafka di grandi dimensioni per aumentare il parallelismo. Quando non è impostato, Spark crea una partizione per ogni partizione di argomento Kafka. Utile per gestire l'asimmetria o il picco dei carichi di dati. Valori validi: numeri interi positivi.
Questa opzione reinizializza i consumer Kafka per ogni trigger, che potrebbero influire sulle prestazioni con SSL.
startingOffsets latest (streaming), earliest (batch) Offset da cui la query inizia la lettura. Valori validi: earliest, latesto una stringa JSON di offset per ogni partizione, {"topicA":{"0":23,"1":-2}}ad esempio . Nella stringa -1 JSON è l'offset più recente. -2 è l'offset meno recente.
Per le query di streaming, questa opzione si applica solo all'avvio di una nuova query. Le query riprese usano sempre il checkpoint. Durante una query, le nuove partizioni iniziano a leggere al primo offset.
Per le query batch, latest non è consentito.
startingOffsetsByTimestamp None Elenco di offset iniziali per ogni partizione, specificato come timestamp in millisecondi. Quando non esiste alcun offset per un timestamp, il comportamento della query viene determinato da startingOffsetsByTimestampStrategy. Valori validi: stringa JSON di timestamp per ogni partizione, ad esempio {"topicA":{"0":1000,"1":2000}}.
Per le query di streaming, questa opzione si applica solo all'avvio di una nuova query. Le query riprese usano sempre il checkpoint. Durante una query, le nuove partizioni iniziano a leggere al primo offset.
startingOffsetsByTimestampStrategy error Strategia da usare quando non viene trovato alcun offset per un timestamp specificato in startingOffsetsByTimestamp o startingTimestamp. Valori validi: error (genera un'eccezione), latest (usa l'offset disponibile più recente).
startingTimestamp None Timestamp iniziale globale in millisecondi che si applica a tutte le partizioni. Quando non esiste alcun offset per il timestamp, il comportamento viene controllato da startingOffsetsByTimestampStrategy. Valori validi: numeri interi non negativi.
subscribe None Argomenti da sottoscrivere. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Valori validi: elenco delimitato da virgole di nomi di argomenti.
subscribePattern None Modello utilizzato per sottoscrivere argomenti. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Ad esempio: topic.*. Valori validi: qualsiasi stringa regex Java.

Le opzioni seguenti si applicano solo alle letture in streaming con spark.readStream.format("kafka"):

Chiave Predefinito Description
bytesEstimateWindowLength 300s Intervallo di tempo usato per stimare i byte rimanenti per la estimatedTotalBytesBehindLatest metrica. Valori validi: stringhe di durata come 10m o 600s. Consultare Recupero delle metriche Kafka.
maxOffsetsPerTrigger None Numero massimo di offset da elaborare per intervallo di trigger. Gli offset vengono distribuiti proporzionalmente tra le partizioni degli argomenti. Valori validi: numeri interi positivi.
maxTriggerDelay 15m Tempo massimo di attesa per minOffsetsPerTrigger l'accumulo prima dell'attivazione. Valori validi: stringhe di durata come 10m o 600s.
minOffsetsPerTrigger None Numero minimo di offset da accumulare prima di attivare un micro batch. Quando maxTriggerDelay viene raggiunto, il micro batch viene eseguito indipendentemente. Valori validi: numeri interi positivi.

Per le opzioni di offset applicabili solo alle letture batch con spark.read.format("kafka"), vedere Opzioni Kafka di DataFrameReader.

Per le opzioni del client Kafka (kafka.*) e dell'autenticazione, vedere Opzioni.

Opzioni DataFrameWriter

Usare queste opzioni con DataFrameWriter.option() e DataFrameWriterV2.option() per controllare come Azure Databricks scrive i dati.

Example

Nell'esempio seguente viene impostato su mergeSchemaTrue per la scrittura di una tabella Delta Lake:

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

Avro

Chiave Predefinito Description
avroSchema None Schema Avro completo come stringa JSON. Usare questa opzione per convertire i tipi SPARK SQL in tipi Avro specifici. Si applica al file Avro.
avroSchemaUrl None URL che punta a un file di schema Avro. Usare anziché avroSchema quando lo schema viene archiviato esternamente. Si escludono con avroSchema a vicenda. Si applica al file Avro.
compression snappy Codec di compressione da usare durante la scrittura. Valori validi: uncompressed, deflate, snappybzip2, xz, , . zstandard Si applica al file Avro.
recordName topLevelRecord Nome del record di primo livello nello schema Avro di output. Si applica al file Avro.
positionalFieldMatching false Indica se trovare una corrispondenza tra lo schema Spark e lo schema Avro in base alla posizione del campo anziché in base al nome. Si applica al file Avro.
recordNamespace Stringa vuota Spazio dei nomi per il record di primo livello nello schema Avro di output. Si applica al file Avro.

Delta Lake e Apache Iceberg

Chiave Predefinito Description
clusterByAuto false Indica se abilitare il clustering liquido automatico, in cui Azure Databricks seleziona le colonne di clustering in base ai modelli di query. Valido solo con mode("overwrite"). Non può essere usato con append la modalità . Disponibile in Databricks Runtime 16.4 e versioni successive. Si applica a Usa clustering liquido per le tabelle.
mergeSchema None Indica se abilitare l'evoluzione dello schema per l'operazione di scrittura. Le nuove colonne nel dataframe di origine vengono aggiunte allo schema della tabella di destinazione. Si applica alle accodamenti batch e streaming. Si applica allo schema della tabella di aggiornamento.
overwriteSchema None Indica se sostituire lo schema della tabella e il partizionamento durante la sovrascrittura. Richiede mode("overwrite") senza replaceWhere. Non è possibile usare con partitionOverwriteMode. Si applica allo schema della tabella di aggiornamento.
partitionOverwriteMode None Modalità di sovrascrittura della partizione. Impostare su su dynamic per sovrascrivere solo le partizioni contenenti nuovi dati, lasciando invariate tutte le altre partizioni. Modalità legacy, non supportata nel calcolo serverless o in Databricks SQL. Valori validi: static, dynamic. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
replaceOn None Espressione booleana che corrisponde alle righe nella tabella di destinazione da sostituire con le righe della query di origine. Può fare riferimento a colonne sia dalla tabella di destinazione che dalla query di origine. Le righe nella destinazione che corrispondono a una riga di origine vengono eliminate e sostituite. Se l'origine è vuota, non vengono eseguite eliminazioni. Usare targetAlias per evitare ambiguità nei riferimenti alle colonne. Disponibile in Databricks Runtime 17.1 e versioni successive. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
replaceUsing None Elenco delimitato da virgole di nomi di colonna usati per trovare le corrispondenze tra la tabella di destinazione e la query di origine. Sia la destinazione che l'origine devono contenere tutte le colonne elencate. Le righe nella destinazione che corrispondono a una riga di origine nel confronto di uguaglianza vengono eliminate e sostituite. NULL i valori vengono considerati non uguali e non corrispondono. Disponibile in Databricks Runtime 16.3 e versioni successive. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
replaceWhere None Espressione di predicato. Sovrascrive in modo atomico solo i record che corrispondono al predicato. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
targetAlias None Alias stringa per la tabella di destinazione. Usare con replaceOn o replaceWhere per disambiguare i riferimenti a colonne quando la condizione fa riferimento a colonne sia dalla tabella di destinazione che dalla query di origine. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake.
txnAppId None Stringa univoca che identifica l'applicazione per le scritture idempotenti nelle foreachBatch operazioni. Usare insieme txnVersion a per assicurarsi che le operazioni di scrittura esattamente una sola volta in più tabelle Delta Lake. Si applica a Use for idempotent table writes(Usa foreachBatch per le scritture di tabelle idempotenti).
txnVersion None Numero che aumenta in modo monotonico usato come versione della transazione per le scritture idempotenti nelle foreachBatch operazioni. Usare insieme txnAppId a per assicurarsi che le operazioni di scrittura esattamente una sola volta in più tabelle Delta Lake. Si applica a Use for idempotent table writes(Usa foreachBatch per le scritture di tabelle idempotenti).
optimizeWrite None Indica se abilitare l'opzione Ottimizzazione automatica scrittura per questa operazione di scrittura. Esegue l'override della spark.databricks.delta.optimizeWrite.enabled configurazione. Si applica a Che è Delta Lake in Azure Databricks?.
userMetadata None Stringa definita dall'utente aggiunta ai metadati di commit per l'operazione di scrittura. Visibile nell'output di DESCRIBE HISTORY. Si applica alle tabelle Enrich con metadati personalizzati.

CSV

Chiave Predefinito Description
charToEscapeQuoteEscaping \0 (non abilitato) Carattere utilizzato per eseguire l'escape del carattere di escape quando differisce dal carattere di virgolette. Si applica a csv (DataFrameWriter).
compression none Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica a csv (DataFrameWriter).
dateFormat yyyy-MM-dd Formattare la stringa per i valori delle colonne di data. Si applica a csv (DataFrameWriter).
emptyValue Stringa vuota Stringa scritta per valori vuoti (non Null). Si applica a csv (DataFrameWriter).
encoding UTF-8 Codifica dei caratteri per i file di output. Si applica a csv (DataFrameWriter).
escape \ Carattere utilizzato per eseguire l'escape dei valori tra virgolette. Si applica a csv (DataFrameWriter).
escapeQuotes true Indica se inserire caratteri di escape tra virgolette all'interno di valori di campo tra virgolette. Si applica a csv (DataFrameWriter).
header false Indica se scrivere nomi di colonna come prima riga dell'output. Si applica a csv (DataFrameWriter).
ignoreLeadingWhiteSpace false Indica se tagliare gli spazi vuoti iniziali dai valori durante la scrittura. Si applica a csv (DataFrameWriter).
ignoreTrailingWhiteSpace false Indica se tagliare gli spazi vuoti finali dai valori durante la scrittura. Si applica a csv (DataFrameWriter).
lineSep \n Stringa separatore di riga utilizzata tra i record. Si applica a csv (DataFrameWriter).
locale en-US Un identificatore java.util.Locale. Influenza la formattazione dei valori di data e timestamp durante la scrittura.
nullValue Stringa vuota Stringa scritta per i valori Null. Si applica a csv (DataFrameWriter).
quote " Carattere utilizzato per virgolettere i valori di campo che contengono il separatore. Si applica a csv (DataFrameWriter).
quoteAll false Indica se racchiudere tutti i valori di campo tra virgolette indipendentemente dal contenuto. Si applica a csv (DataFrameWriter).
sep , Carattere delimitatore di campo. Si applica a csv (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato per i valori di colonna timestamp. Si applica a csv (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato per timestamp senza valori di colonna fuso orario (TimestampNTZType).

Excel

Chiave Predefinito Description
dataAddress None Nome del foglio o cella iniziale per la scrittura. Se omesso, scrive in un foglio denominato Sheet1 a partire dalla cella A1. Accetta un nome di foglio ("SheetName") o un riferimento a una singola cella ("SheetName!A1"). Gli intervalli di celle non sono supportati per le scritture.
dateFormatInWrite yyyy-mm-dd Excel stringa di formato cella applicata alle colonne Date. Usa Excel sintassi del formato.
headerRows 0 Indica se scrivere nomi di colonna come prima riga. Valori validi: 0, 1.
timestampNTZFormat yyyy-mm-dd hh:mm:ss Excel stringa di formato cella applicata alle colonne TimestampNTZ e Timestamp. Usa Excel sintassi del formato.
version xlsx Versione del formato di file Excel da scrivere. Valori validi: xlsx, xls.

JSON

Chiave Predefinito Description
compression none Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica a json (DataFrameWriter).
dateFormat yyyy-MM-dd Formattare la stringa per i valori delle colonne di data. Si applica a json (DataFrameWriter).
encoding UTF-8 Codifica dei caratteri per i file di output. Si applica a json (DataFrameWriter).
ignoreNullFields valore di spark.sql.jsonGenerator.ignoreNullFields Indica se omettere campi con valori Null dall'output JSON. Si applica a json (DataFrameWriter).
lineSep \n Stringa separatore di riga utilizzata tra i record. Si applica a json (DataFrameWriter).
locale en-US Un identificatore java.util.Locale. Influenza la formattazione dei valori di data e timestamp durante la scrittura.
pretty false Indica se abilitare l'output JSON con rientro, multilinea.
sortKeys false Indica se ordinare le chiavi degli oggetti JSON in ordine alfabetico nell'output. Utile per produrre output deterministico.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato per i valori di colonna timestamp. Si applica a json (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato per timestamp senza valori di colonna fuso orario (TimestampNTZType).
writeNonAsciiCharacterAsCodePoint false Indica se codificare caratteri non ASCII come \uXXXX sequenze di escape Unicode anziché caratteri UTF-8 letterali nell'output.

ORCO

Chiave Predefinito Description
compression zstd Codec di compressione da usare durante la scrittura. Valori validi: none, uncompressed, snappyzlib, lzo, zstd, lz4brotli. Si applica a orc (DataFrameWriter).

Parquet

Chiave Predefinito Description
compression snappy Codec di compressione da usare durante la scrittura. Valori validi: none, uncompressedsnappy, , gziplzo, brotli, lz4, lz4_rawzstd. Si applica a parquet (DataFrameWriter).
spark.sql.parquet.outputTimestampType INT96 Tipo fisico utilizzato per codificare le colonne timestamp. Valori validi: INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS. Usare INT96 per la compatibilità con i lettori Parquet legacy che non supportano i tipi di timestamp standard.

Testo

Chiave Predefinito Description
compression none Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica al testo (DataFrameWriter).
encoding UTF-8 Codifica dei caratteri per i file di output.
lineSep \n Stringa separatore di riga utilizzata tra i record. Si applica al testo (DataFrameWriter).

XML

Chiave Predefinito Description
arrayElementName item Nome dell'elemento per gli elementi della matrice che non hanno un nome esplicito. Si applica a xml (DataFrameWriter).
attributePrefix _ Prefisso anteporto ai nomi di campo che corrispondono agli attributi XML. Si applica a xml (DataFrameWriter).
compression none Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica a xml (DataFrameWriter).
dateFormat yyyy-MM-dd Formattare la stringa per i valori delle colonne di data. Si applica a xml (DataFrameWriter).
declaration version="1.0" encoding="UTF-8" standalone="yes" Stringa di dichiarazione XML scritta all'inizio di ogni file di output. Impostare su una stringa vuota per eliminare la dichiarazione. Si applica a xml (DataFrameWriter).
encoding UTF-8 Codifica dei caratteri per i file di output. Si applica a xml (DataFrameWriter).
indent 4 spazi Stringa utilizzata per impostare il rientro degli elementi figlio nell'output. Impostare su una stringa vuota per disattivare il rientro e scrivere ogni riga su una singola riga.
locale en-US Un identificatore java.util.Locale. Influenza la formattazione dei valori di data e timestamp durante la scrittura.
nullValue null Stringa scritta per i valori Null. Se impostato su null, gli attributi e gli elementi figlio per i campi Null vengono omessi. Si applica a xml (DataFrameWriter).
rootTag ROWS Tag dell'elemento radice che esegue il wrapping di tutti gli elementi di riga nell'output. Si applica a xml (DataFrameWriter).
rowTag ROW Tag dell'elemento che rappresenta una riga nell'output. Si applica a xml (DataFrameWriter).
singleVariantColumn None Nome della singola colonna Variant da scrivere in file XML. Si applica a xml (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Stringa di formato per i valori di colonna timestamp. Si applica a xml (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Stringa di formato per timestamp senza valori di colonna fuso orario. Si applica a xml (DataFrameWriter).
validateName true Indica se generare un'eccezione se un nome di colonna non è un identificatore di elemento XML valido. Si applica a xml (DataFrameWriter).
valueTag _VALUE Nome del campo utilizzato per i dati di tipo carattere negli elementi XML che dispongono anche di attributi o elementi figlio. Si applica a xml (DataFrameWriter).

Opzioni dataStreamWriter

Usare queste opzioni con DataStreamWriter.option() per configurare le scritture di streaming.

Example

Nell'esempio seguente viene impostata la posizione del checkpoint per un flusso:

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

Comune

Chiave Predefinito Description
checkpointLocation Nessuno (obbligatorio) Percorso della directory del checkpoint per la query di streaming. Obbligatorio per garantire la tolleranza di errore e le garanzie di elaborazione esattamente una volta. Ogni query di streaming deve usare un percorso di checkpoint univoco. Databricks consiglia di archiviare i checkpoint in un volume di Unity Catalog o in un percorso di archiviazione cloud. Consulta Checkpoint di Structured Streaming.
path None Percorso di output per sink di streaming basati su file, ad esempio Parquet. Si applica solo ai formati basati su file.

Sink della console

Chiave Predefinito Description
numRows 20 Numero di righe da visualizzare per ogni micro batch durante la scrittura nel sink della console.
truncate true Indica se troncare stringhe lunghe durante la visualizzazione delle righe. Impostare su per false visualizzare i valori stringa completi.

Delta Lake

Quando si scrive un flusso in una tabella Delta Lake tramite , si applicano le opzioni seguenti.format("delta") Le opzioni di sola sovrascrittura, overwriteSchemaad esempio , replaceWheree partitionOverwriteMode non sono supportate per le scritture in streaming.

Chiave Predefinito Description
mergeSchema false Se evolvere lo schema della tabella Delta Lake quando il dataframe di streaming contiene nuove colonne. Si applica solo alla modalità di output di accodamento. Si applica allo schema della tabella di aggiornamento.
userMetadata None Stringa definita dall'utente aggiunta ai metadati di commit per l'operazione di scrittura. Visibile nell'output di DESCRIBE HISTORY. Si applica alle tabelle Enrich con metadati personalizzati.

Sink di file

L'opzione seguente si applica quando si scrive un flusso in formati basati su file (Parquet, JSON, CSV, ORC, text). Per le opzioni specifiche del formato, vedere Opzioni DataFrameWriter.

Chiave Predefinito Description
retention None Per quanto tempo conservare i file di metadati sink usati per la tolleranza di errore e la compattazione. Accetta una stringa temporale, 7 days ad esempio o 24 hours. Quando non è impostato, i file di metadati vengono conservati a tempo indeterminato.

Sink Kafka

Per un elenco completo delle opzioni per la scrittura di flussi in Kafka, vedere Opzioni.

Chiave Predefinito Description
kafka.bootstrap.servers None Obbligatorio. Elenco delimitato da virgole degli indirizzi del broker host:port Kafka.
topic None Argomento Kafka di destinazione per tutte le righe. Obbligatorio se il dataframe non include una topic colonna.
kafka.* None Qualsiasi configurazione del producer Kafka preceduta da kafka.. Ad esempio: kafka.compression.type.

Sink di memoria

Chiave Predefinito Description
queryName Nessuno (obbligatorio) Nome della tabella in memoria in cui scrive la query. Obbligatorio per il sink di memoria. Configurabile anche tramite .queryName().
mode exactlyonce Garanzia di recapito per il sink di memoria. exactlyonce usa la modalità micro batch con semantica exactly-once. atleastonce usa la modalità continua con semantica at-least-once. Valori validi: exactlyonce, atleastonce.

Opzioni della funzione Spark

Alcune funzioni predefinite di Spark SQL accettano una options mappa che controlla il comportamento di analisi o serializzazione. Passare le opzioni come Python dict o scala Map[String, String].

Example

L'esempio seguente analizza una colonna JSON durante l'eliminazione di record in formato non valido:

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

Avro

Le funzioni Avro accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

L'esempio seguente decodifica una colonna Avro con l'evoluzione dello schema abilitata:

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

Inoltre, le varianti del Registro schemi di from_avro e to_avro accettano le opzioni seguenti:

Chiave Predefinito Description
schemaId None ID schema dal Registro di sistema dello schema confluent da usare per decodificare i dati Avro codificati con uno schema incompatibile con jsonFormatSchema. Si applica solo a from_avro .
confluent.schema.registry.* None Proprietà di configurazione del client del Registro di sistema dello schema confluent. Passare qualsiasi proprietà client di Confluent SR usando questo prefisso, ad esempio confluent.schema.registry.basic.auth.user.info per le credenziali di autenticazione di base. Obbligatorio per le varianti del Registro di sistema dello schema di from_avro e to_avro.

CSV

Le funzioni CSV accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

L'esempio seguente legge CSV con un separatore personalizzato e NULL un valore:

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

JSON

Le funzioni JSON accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

L'esempio seguente scrive JSON con NULL campi ignorati e piuttosto formattati abilitati:

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobuf e to_protobuf non usano un'origine dati basata su file. I dati protobuf vengono sempre letti e scritti come colonne binarie usando queste funzioni. Le opzioni vengono passate come e Map[String, String] fanno distinzione tra maiuscole e minuscole.

Example

L'esempio seguente decodifica una colonna Protobuf usando la modalità PERMISSIVE:

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Le funzioni Protobuf usano le opzioni seguenti:

Chiave Predefinito Description
mode FAILFAST Come gestire i record danneggiati. FAILFAST genera un'eccezione. PERMISSIVE imposta campi in formato non valido su Null. Valori validi: FAILFAST, PERMISSIVE. Si applica a from_protobuf.
recursive.fields.max.depth -1 (disabilitato) Profondità massima di ricorsione per i campi Protobuf ricorsivi. Impostare su 0 per disattivare il supporto dei campi ricorsivi. Valori validi: 0 per 10. Si applica a from_protobuf.
convert.any.fields.to.json false Indica se convertire i campi Protobuf Any in una stringa JSON anziché in un oggetto STRUCT. Si applica a from_protobuf.
emit.default.values false Indica se generare campi con valori zero o predefiniti (semantica proto3). Quando false, i campi con valori predefiniti vengono omessi dall'output. Si applica a from_protobuf.
enums.as.ints false Indica se eseguire il rendering dei campi enum come valori interi anziché come stringhe. Si applica a from_protobuf.
upcast.unsigned.ints false Se eseguire l'upcast uint32 a Long e uint64 per evitare Decimal(20,0) l'overflow di integer. Si applica a from_protobuf.
unwrap.primitive.wrapper.types false Indica se annullare il wrapping google.protobuf dei tipi wrapper (ad esempio e Int32ValueStringValue) nei tipi Spark primitivi corrispondenti. Si applica a from_protobuf.
retain.empty.message.types false Indica se mantenere i tipi di messaggio Protobuf vuoti nello schema di output inserendo una colonna fittizia. Si applica a from_protobuf.
schema.registry.subject None Nome soggetto registro schemi. Obbligatorio quando si usano le varianti del Registro di sistema dello schema di from_protobuf e to_protobuf.
schema.registry.address None Indirizzo del Registro di sistema dello schema (host e porta). Obbligatorio quando si usano le varianti del Registro di sistema dello schema di from_protobuf e to_protobuf.
schema.registry.protobuf.name None Specifica il messaggio Protobuf da usare quando l'oggetto del Registro di sistema dello schema contiene più messaggi. Optional.

XML

Le funzioni XML accettano le stesse opzioni delle opzioni del dataframe corrispondenti:

Example

Nell'esempio seguente viene scritto xml con tag radice e riga personalizzati:

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))