Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina elenca le opzioni di input e output disponibili per le API Spark che leggono e scrivono dati.
Opzioni DataFrameReader
Usare queste opzioni con DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO e Auto Loader per controllare come Azure Databricks legge i file di dati.
Example
L'esempio seguente imposta su multiLineTrue per la lettura di file JSON:
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
Comune
Le opzioni seguenti si applicano a tutti i formati di file.
| Chiave | Predefinito | Description |
|---|---|---|
ignoreCorruptFiles |
false |
Indica se ignorare i file danneggiati. Se true, i processi Spark continueranno a essere eseguiti quando si verificano file danneggiati e il contenuto letto verrà comunque restituito. Per COPY INTOè possibile osservare i file danneggiati ignorati come numSkippedCorruptFiles nella operationMetrics colonna della cronologia delta Lake. Disponibile in Databricks Runtime 11.3 LTS e versioni successive. |
ignoreMissingFiles |
false per caricatore automatico, true per COPY INTO (legacy) |
Indica se ignorare i file mancanti. Se true, i processi Spark continuano a essere eseguiti quando vengono rilevati file mancanti e il contenuto viene comunque restituito. Disponibile in Databricks Runtime 11.3 LTS e versioni successive. |
modifiedAfter |
None | Timestamp facoltativo come filtro per inserire solo i file con un timestamp di modifica dopo il timestamp specificato. |
modifiedBefore |
None | Timestamp facoltativo come filtro per inserire solo i file con un timestamp di modifica prima del timestamp specificato. |
pathGlobFilter oppure fileNamePattern |
None | Un potenziale modello glob da fornire per la scelta dei file. Equivalente a PATTERN in COPY INTO (legacy).
fileNamePattern può essere usato in read_files. |
recursiveFileLookup |
false |
Quando true, questa opzione esegue la ricerca nelle directory nidificate anche se i nomi non seguono uno schema di denominazione di partizione come date=2019-07-01. |
Avro
| Chiave | Predefinito | Description |
|---|---|---|
avroSchema |
None | Schema facoltativo fornito da un utente in formato Avro. Quando si legge Avro, questa opzione può essere impostata su uno schema evoluto compatibile ma diverso dallo schema Avro effettivo. Lo schema di deserializzazione è coerente con lo schema evoluto. Ad esempio, se si imposta uno schema evoluto contenente una colonna aggiuntiva con un valore predefinito, il risultato di lettura contiene anche la nuova colonna. |
avroSchemaEvolutionMode |
none |
Come gestire l'evoluzione dello schema quando si usa un registro schemi. Valori validi: none (ignorare le modifiche dello schema e continuare il processo), restart (quando vengono rilevate modifiche dello schema, genera un e UnknownFieldException richiede un riavvio del processo). |
datetimeRebaseMode |
LEGACY |
Controlla la ricalibrazione dei valori DATE e TIMESTAMP tra i calendari giuliano e gregoriano prolettico. Valori validi: EXCEPTION, LEGACY e CORRECTED. |
enableStableIdentifiersForUnionType |
false |
Indica se usare nomi di campo stabili per i tipi di Unione Avro. Se abilitata, i nomi dei campi dei tipi di unione vengono derivati dai relativi nomi di tipo in lettere minuscole , ad esempio member_int, member_string. Genera un'eccezione se due nomi di tipo sono identici dopo la minuscola. |
mergeSchema |
false |
Indica se dedurre lo schema tra più file e unire lo schema di ogni file.
mergeSchema per Avro non consente l'allentamento dei tipi di dati. |
mode |
FAILFAST |
Modalità parser per la gestione dei record danneggiati. Valori validi: FAILFAST (genera un'eccezione), PERMISSIVE (imposta campi in formato non valido su Null), DROPMALFORMED (elimina automaticamente i record non validi). |
readerCaseSensitive |
true |
Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. |
recursiveFieldMaxDepth |
None | Profondità massima di ricorsione per i campi Avro ricorsivi. Impostare su per 1 troncare tutti i campi ricorsivi, 2 per consentire un livello di ricorsione e così via fino a 15. Se non è impostato o 0, i campi ricorsivi non sono consentiti. Valori validi: 0 per 15. |
rescuedDataColumn |
None | Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico.COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento.Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?. |
stableIdentifierPrefixForUnionType |
member_ |
Prefisso da usare per i nomi dei campi del tipo di unione stabile quando enableStableIdentifiersForUnionType=true. |
CSV
| Chiave | Predefinito | Description |
|---|---|---|
badRecordsPath |
None | Percorso in cui archiviare i file per registrare le informazioni sui record CSV non validi. |
charToEscapeQuoteEscaping |
\0 |
Carattere utilizzato per sfuggire il carattere usato per sfuggire le virgolette. Ad esempio, per i seguenti record [ " a\\", b ]:
|
columnNameOfCorruptRecord |
_corrupt_record |
Supportato per Auto Loader. Non supportato per COPY INTO (obsoleto).Colonna per l'archiviazione di record malformati e non analizzabili. Se il parametro mode per il parsing è impostato su DROPMALFORMED, questa colonna sarà vuota. |
comment |
\0 |
Definisce il carattere che rappresenta un commento di riga quando viene trovato all'inizio di una riga di testo. Usare '\0' per disabilitare l'ignoramento dei commenti. |
dateFormat |
yyyy-MM-dd |
Formato per l'analisi delle stringhe di data. |
emptyValue |
Stringa vuota | Rappresentazione in forma di stringa di un valore di vuoto. |
enableDateTimeParsingFallback |
false |
Indica se eseguire il fallback al comportamento di analisi di data e timestamp legacy quando non è possibile analizzare un valore con il formato specificato. Quando false, gli errori di analisi generano un errore o producono null a seconda di mode. |
encoding oppure charset |
UTF-8 |
Nome della codifica dei file CSV. Vederejava.nio.charset.Charset per l'elenco delle opzioni.
UTF-16 e UTF-32 non possono essere usati quando multiline è true. |
enforceSchema |
true |
Indica se applicare forzatamente lo schema specificato o dedotto ai file CSV. Se l'opzione è abilitata, le intestazioni dei file CSV vengono ignorate. Questa opzione viene ignorata per impostazione predefinita quando si usa il caricatore automatico per salvare i dati e consentire l'evoluzione dello schema. |
escape |
\ |
Carattere di escape da utilizzare durante l'analisi dei dati. |
extension |
csv |
Estensione del nome file prevista. I file senza questa estensione vengono filtrati durante le letture. |
failOnUnknownFields |
false |
Indica se non riuscire quando il record CSV contiene colonne non presenti nello schema. Quando false, le colonne non riconosciute vengono eliminate o salvate automaticamente a seconda di rescuedDataColumn. |
failOnWidenedFields |
false |
Indica se non è possibile eseguire l'analisi di un valore di campo come tipo di schema dichiarato senza ampliare. Quando false, i valori con estensione di tipo vengono salvati automaticamente a seconda di rescuedDataColumn. L'impostazione failOnUnknownFields=true può mascherare gli effetti di questa opzione. |
header |
false |
Indica se i file CSV contengono un'intestazione. Auto Loader presuppone che i file abbiano intestazioni quando si deduce lo schema. |
ignoreLeadingWhiteSpace |
false |
Indica se ignorare gli spazi vuoti iniziali per ogni valore analizzato. |
ignoreTrailingWhiteSpace |
false |
Indica se ignorare gli spazi vuoti finali per ogni valore analizzato. |
inferSchema |
false |
Stabilire se dedurre i tipi di dati dei record CSV analizzati o assumere che tutte le colonne siano di StringType. Richiede un passaggio aggiuntivo sui dati se impostato su true. Per il caricatore automatico, usare cloudFiles.inferColumnTypes invece. |
inputBufferSize |
1048576 (1 MB) |
Dimensioni del buffer in byte per il parser CSV. Utile per ottimizzare l'utilizzo della memoria durante l'analisi di file CSV di grandi dimensioni. Valori validi: numeri interi positivi. |
lineSep |
Nessuno, che copre \r, \r\ne \n |
Una stringa di testo tra due record CSV consecutivi. |
locale |
US |
Un identificatore java.util.Locale. Influenza l'interpretazione delle date, dei timestamp e dei decimali predefinita all'interno del file di tipo CSV. |
maxCharsPerColumn |
-1 |
Numero massimo di caratteri previsti da un valore da analizzare. Può essere usato per evitare errori di memoria. Il valore predefinito è -1, ovvero illimitato. Valori validi: numeri interi positivi o -1 illimitati. |
maxColumns |
20480 |
Limite rigido del numero di colonne che un record può avere. Valori validi: numeri interi positivi. |
mergeSchema |
false |
Indica se dedurre lo schema tra più file e unire lo schema di ogni file. Abilitato per impostazione predefinita per l'Auto Loader quando si deduce lo schema. |
mode |
PERMISSIVE |
Modalità parser per la gestione di record malformati. Valori validi: PERMISSIVE, DROPMALFORMED, FAILFAST. |
multiLine |
false |
Indica se i record CSV si estendono su più righe. |
nanValue |
NaN |
Rappresentazione di stringa di un valore non numerico durante l'analisi delle colonne FloatType e DoubleType. |
negativeInf |
-Inf |
La rappresentazione sotto forma di stringa dell'infinito negativo durante l'analisi delle colonne FloatType o DoubleType. |
nullValue |
Stringa vuota | Rappresentazione in forma di stringa del valore null. |
parserCaseSensitive (obsoleto) |
false |
Durante la lettura dei file, decidere se allineare le colonne dichiarate nell'intestazione con lo schema rispettando la distinzione tra maiuscole e minuscole. Questa è true l'impostazione predefinita per Auto Loader. Le colonne che differiscono per caso verranno salvate nel rescuedDataColumn se abilitato. Questa opzione è stata deprecata a favore di readerCaseSensitive. |
positiveInf |
Inf |
La rappresentazione in forma di stringa dell'infinito positivo mentre si analizzano le colonne FloatType o DoubleType. |
preferDate |
true |
Quando possibile, tenta di interpretare le stringhe come date anziché come timestamp. È anche necessario usare l'inferenza dello schema, abilitando inferSchema o usando cloudFiles.inferColumnTypes con il caricatore automatico. |
quote |
" |
Carattere utilizzato per escludere i valori quando il delimitatore di campo fa parte del valore. |
readerCaseSensitive |
true |
Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. |
rescuedDataColumn |
None | Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento. |
sep oppure delimiter |
, |
Stringa di separazione tra le colonne. |
singleVariantColumn |
None | Se impostato su un nome di colonna, legge l'intero record CSV in una singola VariantType colonna con tale nome anziché analizzare ogni campo nella propria colonna. Richiede header=true. |
skipRows |
0 |
Numero di righe dall'inizio del file CSV che devono essere ignorate (incluse le righe commentate e vuote). Se header è vero, l'intestazione sarà la prima riga non ignorata e non commentata. Valori validi: numeri interi positivi o 0. |
timeFormat |
HH:mm:ss |
Formato per l'analisi dei valori delle TimeType colonne. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Formato per l'analisi delle stringhe di timestamp. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Formato per l'analisi del timestamp senza stringhe di fuso orario (TimestampNTZType). |
timeZone |
None | Oggetto java.time.ZoneId da utilizzare durante l'analisi di timestamp e date. |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
La strategia per il trattamento delle virgolette non scappate. Opzioni consentite:
|
Excel
| Chiave | Predefinito | Description |
|---|---|---|
dataAddress |
None | Intervallo di celle da leggere nella sintassi Excel. Se omesso, legge tutte le celle valide dal primo foglio. Utilizzare "SheetName!C5:H10" per leggere un intervallo da un foglio denominato, "C5:H10" per leggere un intervallo dal primo foglio o "SheetName" per leggere tutti i dati da un foglio specifico. |
headerRows |
0 |
Numero di righe iniziali da usare come intestazioni del nome di colonna. Quando dataAddress viene specificato, questo vale all'interno dell'intervallo di celle. Quando 0, i nomi di colonna vengono generati automaticamente come _c1, _c2, _c3e così via. Valori validi: 0, 1. |
ignoreMissingSheet |
false |
Indica se ignorare automaticamente i file che non contengono il foglio specificato da dataAddress. Quando false, viene generato un errore se manca un file nel foglio richiesto. Si applica solo quando viene specificato un nome di foglio in dataAddress. Valori validi: true, false. |
includePhoneticRuns |
false |
Indica se includere annotazioni fonetiche (ad esempio pinyin o furigana) concatenate ai valori di stringa delle celle durante la lettura di file XLSX. Valori validi: true, false. |
operation |
readSheet |
Operazione da eseguire nella cartella di lavoro Excel. Valori validi: readSheet (legge i dati da un foglio), listSheets (restituisce uno struct con campi sheetIndex: long e sheetName: String per ogni foglio). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Stringa di formato personalizzata per i valori timestamp-without-timezone archiviati come stringhe in Excel. I formati di data personalizzati seguono i formati dei modelli di data e ora. |
dateFormat |
yyyy-MM-dd |
Stringa di formato personalizzata per i valori stringa letti come Date. I formati di data personalizzati seguono i formati dei modelli di data e ora. |
JSON
| Chiave | Predefinito | Description |
|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
Indica se consentire alle barre rovesciate di eseguire l'escape di qualsiasi carattere che abbia esito positivo. Se non è abilitata, solo i caratteri specificatamente elencati dalla specifica JSON possono essere sottoposti a escape. |
allowComments |
false |
Indica se consentire o meno l'uso dei commenti di stile Java, C e C++ ('/', '*' e '//' ) all'interno del contenuto analizzato. |
allowNonNumericNumbers |
true |
Indica se consentire l'insieme di token non numerici (NaN) come valori numerici in virgola mobile legali. |
allowNumericLeadingZeros |
false |
Indica se consentire ai numeri integrali di iniziare con zeli aggiuntivi (ignorabili), (ad esempio 000001). |
allowSingleQuotes |
true |
Se consentire l'uso di virgolette singole (apostrofo, carattere '\') per quotare stringhe (nomi e valori di tipo String). |
allowUnquotedControlChars |
false |
Indica se consentire alle stringhe JSON di contenere caratteri di controllo senza escape (caratteri ASCII con valore minore di 32, inclusi i caratteri di tabulazione e avanzamento riga) o meno. |
allowUnquotedFieldNames |
false |
Indica se consentire l'uso di nomi di campo senza virgo chiave, consentiti da JavaScript, ma non dalla specifica JSON. |
alternateVariantEncoding |
None | Codifica usata per i valori Variant nel codice JSON di origine. Impostare su per Z85 decodificare i valori Variant con codifica Base85 anziché archiviati come JSON inline. |
badRecordsPath |
None | Percorso per archiviare i file per registrare le informazioni sui record JSON non validi. L'uso dell'opzione badRecordsPath in un'origine dati basata su file presenta le limitazioni seguenti:
|
columnNameOfCorruptRecord |
_corrupt_record |
Colonna per l'archiviazione di record malformati e non analizzabili. Se il parametro mode per il parsing è impostato su DROPMALFORMED, questa colonna sarà vuota. |
dateFormat |
yyyy-MM-dd |
Formato per l'analisi delle stringhe di data. |
dropFieldIfAllNull |
false |
Indica se ignorare colonne con tutti i valori nulli o matrici e strutture vuote durante l'inferenza dello schema. |
encoding oppure charset |
UTF-8 |
Nome della codifica dei file JSON. Vedere java.nio.charset.Charset per l'elenco delle opzioni. Non è possibile usare UTF-16 e UTF-32 quando multiline è true. |
inferTimestamp |
false |
Indica se provare a dedurre stringhe di timestamp come TimestampType. Se impostato su true, l'inferenza dello schema potrebbe richiedere molto più tempo. È necessario abilitare cloudFiles.inferColumnTypes per l’uso con il caricatore automatico. |
lineSep |
Nessuno, che copre \r, \r\ne \n |
Una stringa tra due record JSON consecutivi. |
locale |
US |
Un identificatore java.util.Locale. Influenza l'analisi predefinita di date, timestamp e numeri decimali all'interno del JSON. |
maxNestingDepth |
500 |
Profondità massima consentita di annidamento per oggetti e matrici JSON. Aumentare questo valore per i documenti annidati in modo approfondito. Valori validi: numeri interi positivi. |
maxNumLen |
1000 |
Lunghezza massima dei token numerici nell'input JSON. Aumentare questo valore per JSON con valori letterali numerici di grandi dimensioni. Valori validi: numeri interi positivi. |
maxStringLen |
illimitata | Lunghezza massima dei valori stringa nell'input JSON. Impostare per limitare l'utilizzo della memoria durante l'analisi di JSON con stringhe di grandi dimensioni. Valori validi: numeri interi positivi. |
mode |
PERMISSIVE |
Modalità parser per la gestione di record malformati. Valori validi: PERMISSIVE, DROPMALFORMED, FAILFAST. |
multiLine |
false |
Indica se i record JSON si estendono su più righe. |
prefersDecimal |
false |
Tenta di dedurre stringhe come DecimalType invece di tipo float o double, quando possibile. È anche necessario usare l'inferenza dello schema, abilitando inferSchema o usando cloudFiles.inferColumnTypes con il caricatore automatico. |
primitivesAsString |
false |
Stabilisce se dedurre tipi primitivi come i numeri e i booleani sotto forma di StringType. |
readerCaseSensitive |
true |
Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. Disponibile in Databricks Runtime 13.3 e versioni successive. |
rescuedDataColumn |
None | Indica se raccogliere tutti i dati che non possono essere analizzati a causa di una mancata corrispondenza del tipo di dati o di una mancata corrispondenza dello schema (inclusa la combinazione di maiuscole e minuscole) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altre informazioni, vedere Che cos'è la colonna di dati salvata?COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento. |
singleVariantColumn |
None | Indica se inserire l'intero documento JSON, analizzato in una singola colonna Variant con la stringa specificata come nome della colonna. Se non è impostato, i campi JSON vengono inseriti nelle proprie colonne. Valori validi: qualsiasi stringa. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Formato per l'analisi delle stringhe di timestamp. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Formato per l'analisi del timestamp senza stringhe di fuso orario (TimestampNTZType). |
timeZone |
None | Oggetto java.time.ZoneId da utilizzare durante l'analisi di timestamp e date. |
upgradeExceptionAsBadRecord |
false |
Indica se considerare le eccezioni di aggiornamento del tipo (ad esempio, quando un valore non può essere ampliato al tipo di colonna dichiarato) come record non validi anziché generare un'eccezione. |
Kafka
Per l'elenco completo delle opzioni del lettore Kafka, vedere Opzioni Kafka di DataStreamReader. Le opzioni seguenti si applicano solo alle letture batch tramite spark.read.format("kafka").
| Chiave | Predefinito | Description |
|---|---|---|
endingOffsets |
latest |
Dove interrompere la lettura. Valori validi: latesto una stringa JSON di offset per ogni partizione, {"topicA":{"0":50,"1":-1}}ad esempio .Nella stringa -1 JSON è l'offset più recente.
-2, ovvero l'offset meno recente, non è consentito come offset finale. |
endingOffsetsByTimestamp |
None | Offset finali per partizione specificati come timestamp in millisecondi. Valori validi: stringa JSON di timestamp per ogni partizione, ad esempio {"topicA":{"0":2000,"1":3000}}. |
endingTimestamp |
None | Timestamp finale globale in millisecondi applicato a tutte le partizioni. Valori validi: numeri interi non negativi. |
ORCO
| Chiave | Predefinito | Description |
|---|---|---|
mergeSchema |
false |
Indica se dedurre lo schema tra più file e unire lo schema di ogni file. |
Parquet
| Chiave | Predefinito | Description |
|---|---|---|
datetimeRebaseMode |
LEGACY |
Controlla la ricalibrazione dei valori DATE e TIMESTAMP tra i calendari giuliano e gregoriano prolettico. Valori validi: EXCEPTION, LEGACY e CORRECTED. |
int96RebaseMode |
LEGACY |
Controlla la ricalibrazione dei valori di timestamp INT96 tra i calendari Julian e Gregoriano prolettico. Valori validi: EXCEPTION, LEGACY e CORRECTED. |
mergeSchema |
false |
Indica se dedurre lo schema tra più file e unire lo schema di ogni file. |
readerCaseSensitive |
true |
Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. |
rescuedDataColumn |
None | Indica se raccogliere tutti i dati che non possono essere elaborati a causa di: mancata corrispondenza del tipo di dati e disallineamento dello schema (incluso il formato delle lettere delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, fare riferimento a Che cos'è la colonna di dati salvata?.COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento. |
Archivio stati
Usare queste opzioni con spark.read.format("statestore") o la read_statestore funzione con valori di tabella per leggere i dati dello stato structured streaming. Vedi Leggi le informazioni sullo stato di Structured Streaming.
| Chiave | Predefinito | Description |
|---|---|---|
batchId |
ID batch più recente | Batch di destinazione da cui leggere. Usare per eseguire query su uno stato precedente della query. Il batch deve essere sottoposto a commit ma non ancora pulito. Valori validi: numeri interi non negativi. |
operatorId |
0 |
Operatore di destinazione da cui leggere. Usare quando la query dispone di più operatori con stato. Valori validi: numeri interi non negativi. |
storeName |
DEFAULT |
Nome dell'archivio stati di destinazione da cui leggere. Usare quando l'operatore con stato ha più istanze dell'archivio stati. È necessario specificare storeName o joinSide per un join di flusso, ma non entrambi. Valori validi: qualsiasi stringa. |
joinSide |
None | Lato di destinazione da cui leggere per un join di flusso.The target side to read from for a stream-stream join. È necessario specificare storeName o joinSide per un join di flusso, ma non entrambi. Valori validi: left, right. |
snapshotStartBatchId |
None | ID batch dello snapshot da usare come punto iniziale durante la lettura dello stato. Il lettore ricompila lo stato riproducendo le modifiche da questo snapshot fino a batchId. Utile quando uno snapshot è danneggiato. Deve specificare insieme a snapshotPartitionId. Non è possibile usare con readChangeFeed. Supporta l'archivio stati supportato da HDFS e l'archivio stati RocksDB con checkpoint del log delle modifiche abilitati. Disponibile in Databricks Runtime 15.4 LTS e versioni successive. Valori validi: numeri interi non negativi. |
snapshotPartitionId |
None | Se specificato, la query legge solo questa partizione. Deve specificare insieme a snapshotStartBatchId. Non è possibile usare con readChangeFeed. Disponibile in Databricks Runtime 15.4 LTS e versioni successive. Valori validi: numeri interi non negativi. |
readChangeFeed |
false |
Quando true, restituisce le modifiche di stato in un intervallo specificato di batch tra changeStartBatchId e changeEndBatchId. Richiede changeStartBatchId. Impossibile usare con joinSide, batchIdsnapshotStartBatchId, o snapshotPartitionId. Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: true, false.Per informazioni dettagliate, vedere Leggere le modifiche dello stato di Structured Streaming. |
changeStartBatchId |
None | ID batch iniziale per l'intervallo di feed di modifiche. Obbligatorio quando readChangeFeed è true. Si applica solo quando readChangeFeed è impostato su true. Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: numeri interi non negativi. |
changeEndBatchId |
ID batch più recente | ID batch finale per l'intervallo di feed di modifiche. Deve essere maggiore o uguale a changeStartBatchId. Si applica solo quando readChangeFeed è impostato su true. Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: numeri interi non negativi. |
stateVarName |
None | Nome della variabile di stato da leggere. Il nome della variabile di stato è il nome univoco di ogni variabile all'interno della init funzione di un StatefulProcessor oggetto usato dall'operatore transformWithState . Obbligatorio quando si usa l'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: qualsiasi stringa. |
readRegisteredTimers |
false |
Quando true, legge i timer registrati usati dall'operatore transformWithState . Si applica solo all'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: true, false. |
flattenCollectionTypes |
true |
Quando true, rende flat i record restituiti per le variabili di stato della mappa e dell'elenco. Quando false, restituisce i record come Spark SQL Array o Map. Si applica solo all'operatore transformWithState . Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Valori validi: true, false. |
Testo
| Chiave | Predefinito | Description |
|---|---|---|
encoding |
UTF-8 |
Nome della codifica del separatore di riga del file TEXT. Per un elenco delle opzioni, vedere java.nio.charset.Charset. Il contenuto del file non è interessato da questa opzione e viene letto as-is. |
lineSep |
Nessuno, che copre \r, \r\n e \n |
Una stringa tra due record TEXT consecutivi. |
wholeText |
false |
Indica se leggere un file come singolo record. |
XML
| Chiave | Predefinito | Description |
|---|---|---|
rowTag |
None | Il tag di riga dei file XML da trattare come una riga. Nell'esempio XML <books> <book><book>...<books>, il valore appropriato è book. Si tratta di un'opzione obbligatoria. |
samplingRatio |
1.0 |
Definisce una frazione di righe utilizzate per l'inferenza dello schema. Le funzioni predefinite XML ignorano questa opzione. Valori validi: 0.0 per 1.0. |
excludeAttribute |
false |
Indica se escludere gli attributi negli elementi. |
mode |
None | Modalità per gestire i record danneggiati durante l'analisi.
PERMISSIVE: per i record danneggiati, inserisce la stringa in formato non valido in un campo configurato da columnNameOfCorruptRecord e imposta i campi in formato non valido su null. Per mantenere i record danneggiati, è possibile impostare un campo di tipo string denominato columnNameOfCorruptRecord in uno schema definito dall'utente. Se il campo non è presente in uno schema, i record danneggiati vengono eliminati durante l'analisi. Quando si deduce uno schema, il parser aggiunge in modo implicito un campo columnNameOfCorruptRecord in uno schema di output.
DROPMALFORMED: ignora i record danneggiati. Questa modalità non è supportata per le funzioni predefinite XML.
FAILFAST: genera un'eccezione quando il parser incontra i record danneggiati. |
inferSchema |
true |
Se true, tenta di dedurre un tipo appropriato per ogni colonna DataFrame risultante. Se false, tutte le colonne risultanti sono di tipo string. Le funzioni predefinite XML ignorano questa opzione. |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
Consente di rinominare il nuovo campo contenente una stringa in formato non valido creata dalla PERMISSIVE modalità. |
attributePrefix |
None | Prefisso per gli attributi per distinguere gli attributi dagli elementi. Questo sarà il prefisso per i nomi dei campi. Il valore predefinito è _. Può essere vuoto per la lettura del codice XML, ma non per la scrittura. Si applica anche alle opzioni XML DataFrameWriter. |
valueTag |
_VALUE |
Tag utilizzato per i dati di caratteri all'interno di elementi che hanno anche attributi o elementi figlio. L'utente può specificare il campo valueTag nello schema oppure verrà aggiunto automaticamente durante l'inferenza dello schema quando i dati di tipo carattere sono presenti in elementi con altri elementi o attributi. Si applica anche alle opzioni XML DataFrameWriter. |
encoding |
UTF-8 |
Per la lettura, decodifica i file XML in base al tipo di codifica specificato. Per la scrittura, specifica la codifica (charset) dei file XML salvati. Le funzioni predefinite XML ignorano questa opzione. Si applica anche alle opzioni XML DataFrameWriter. |
ignoreSurroundingSpaces |
true |
Indica se gli spazi vuoti che circondano i valori devono essere ignorati. I dati di caratteri composti solo da spazi bianchi vengono ignorati. |
rowValidationXSDPath |
None | Percorso di un file XSD facoltativo utilizzato per convalidare il codice XML per ogni riga singolarmente. Le righe che non riescono a convalidare vengono considerate come errori di analisi. L'XSD non influisce in caso contrario sullo schema, indipendentemente dal fatto che sia specificato o dedotto. |
ignoreNamespace |
false |
Se true, i prefissi degli spazi dei nomi sugli elementi e gli attributi XML vengono ignorati. I tag <abc:author> e <def:author>, ad esempio, vengono considerati come se entrambi siano solo <author>. I namespace non possono essere ignorati nell'elemento rowTag, solo nei suoi figli leggibili. L'analisi XML non è sensibile ai namespace anche se false. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Stringa di formato timestamp personalizzata che segue il formato del modello datetime pattern. Questo vale per il tipo timestamp. Si applica anche alle opzioni XML DataFrameWriter. |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Stringa di formato personalizzata per timestamp senza fuso orario che segue il formato del modello datetime. Questo vale per il tipo TimestampNTZType. Si applica anche alle opzioni XML DataFrameWriter. |
dateFormat |
yyyy-MM-dd |
Stringa di formato data personalizzata che segue il modello datetime. Questo vale per il tipo di data. Si applica anche alle opzioni XML DataFrameWriter. |
locale |
en-US |
Imposta un locale come tag di lingua nel formato IETF BCP 47. Ad esempio, locale viene usato durante l'analisi di date e timestamp. |
nullValue |
stringa null |
Imposta la rappresentazione in forma di stringa del valore null. Quando si tratta di null, il parser non scrive attributi ed elementi per i campi. Si applica anche alle opzioni XML DataFrameWriter. |
readerCaseSensitive |
true |
Specifica il comportamento di distinzione tra maiuscole e minuscole quando rescuedDataColumn è abilitato. Se true, salvare le colonne di dati i cui nomi differiscono per maiuscole e minuscole rispetto allo schema. Se false, leggere i dati senza distinzione tra maiuscole e minuscole. |
rescuedDataColumn |
None | Indica se raccogliere tutti i dati che non possono essere analizzati a causa di una mancata corrispondenza di tipo di dati o di schema (inclusa una differenza nel caso delle colonne) in una colonna separata. Questa colonna è inclusa per impostazione predefinita quando si usa il caricatore automatico. Per altri dettagli, vedere Che cos'è la colonna di dati salvata?.
COPY INTO (legacy) non supporta la colonna di dati salvata perché non è possibile impostare manualmente lo schema usando COPY INTO. Databricks consiglia di usare il caricatore automatico per la maggior parte degli scenari di inserimento. |
singleVariantColumn |
none |
Specifica il nome della singola colonna variante. Se questa opzione viene specificata per la lettura, analizzare l'intero record XML in una singola colonna Variant con il valore della stringa di opzione specificato come nome della colonna. Se questa opzione viene specificata per la scrittura, scrivere il valore della singola colonna Variant nei file XML. Si applica anche alle opzioni XML DataFrameWriter. |
useLegacyXMLParser |
true |
Indica se usare il parser XML legacy. Il parser legacy ha una convalida meno rigorosa per il contenuto in formato non valido, ma è meno efficiente per la memoria. Impostare su per false acconsentire esplicitamente al parser predefinito più restrittivo. |
wildcardColName |
xs_any |
Nome della colonna utilizzato per acquisire elementi XML che corrispondono all'elemento dello schema con caratteri jolly (xs:any). Non può essere usato insieme a rescuedDataColumn. |
Opzioni DataStreamReader
Usare queste opzioni con DataStreamReader.option() per configurare le letture di streaming dalle tabelle Delta Lake e da altre origini basate su file.
Per le opzioni di formato di file (JSON, CSV, Parquet e altri), vedere Opzioni DataFrameReader.
Per le opzioni del caricatore automatico (cloudFiles.*), vedere Caricamento automatico.
Example
L'esempio seguente imposta maxFilesPerTrigger su 10 per un flusso di tabelle Delta Lake:
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
Comune
Le opzioni seguenti si applicano alle tabelle Delta Lake e ad altre origini di streaming basate su file.
| Chiave | Predefinito | Description |
|---|---|---|
cleanSource |
off |
Come gestire i file di origine dopo l'elaborazione da parte del flusso. Valori validi: off (nessuna azione), delete (eliminare definitivamente il file di origine), archive (passare a sourceArchiveDir). Se impostato su archive, sourceArchiveDir deve essere impostato anche . Non si applica allo streaming di tabelle Delta Lake. |
fileNameOnly |
false |
Indica se identificare i file già elaborati in base al nome file solo anziché al percorso completo. Quando true, i file in percorsi diversi con lo stesso nome file vengono considerati come lo stesso file e non vengono rielaborati. Non si applica allo streaming di tabelle Delta Lake. |
latestFirst |
false |
Indica se elaborare prima i file modificati più di recente all'interno di ogni micro batch. Utile quando si desidera elaborare i dati più recenti il più rapidamente possibile. Quando true e maxFilesPerTrigger o maxBytesPerTrigger è impostato, maxFileAge viene ignorato. Non si applica allo streaming di tabelle Delta Lake. |
maxBytesPerTrigger |
None | Valore massimo flessibile per la quantità di dati elaborati per ogni micro batch. Un batch può elaborare più del limite se l'unità di input più piccola lo supera. Se usato insieme a maxFilesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto uno dei limiti. Valori validi: numeri interi positivi.Per il caricatore automatico, usare cloudFiles.maxBytesPerTrigger invece. Vedere Common( Comune). |
maxCachedFiles |
10000 |
Numero massimo di file non elaborati da memorizzare nella cache per i micro batch successivi. Impostare su 0 per disattivare la memorizzazione nella cache. Aumentare questo valore quando la directory di origine contiene molti nuovi file per ogni trigger. Non si applica allo streaming di tabelle Delta Lake. Valori validi: numeri interi positivi o 0. |
maxFileAge |
7d |
Validità massima dei file considerati per l'elaborazione, rispetto al timestamp del file modificato più di recente anziché all'ora di sistema corrente. I file precedenti a questa soglia vengono ignorati. Accetta stringhe di durata, 7d ad esempio o 4h. Ignorato quando latestFirst è true e maxFilesPerTrigger o maxBytesPerTrigger è impostato. Non si applica allo streaming di tabelle Delta Lake. |
maxFilesPerTrigger |
1000 per Delta Lake e Auto Loader. Nessun valore massimo per altre origini basate su file. |
Limite superiore per il numero di nuovi file elaborati in ogni micro batch. Se usato insieme a maxBytesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto uno dei limiti. Valori validi: numeri interi positivi.Per il caricatore automatico, usare cloudFiles.maxFilesPerTrigger invece. Vedere Common( Comune). |
sourceArchiveDir |
None | Percorso della directory di archiviazione quando cleanSource è impostato su archive. I file di origine vengono spostati in questo percorso dopo l'elaborazione, mantenendo la struttura di directory relativa. Non si applica allo streaming di tabelle Delta Lake. |
Caricatore automatico
Usare queste opzioni con l'origine cloudFiles per configurare il caricatore automatico per l'inserimento in streaming dall'archiviazione cloud. Le opzioni specifiche dell'origine cloudFiles sono precedute cloudFiles dal prefisso per mantenerle in uno spazio dei nomi separato da altre opzioni di origine Structured Streaming .
Comune
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.allowOverwrites |
false |
Indica se consentire modifiche al file della directory di input per sovrascrivere i dati esistenti. Per le avvertenze di configurazione, vedere Il caricatore automatico elabora nuovamente il file quando il file viene aggiunto o sovrascritto?. |
cloudFiles.backfillInterval |
None | Il caricatore automatico può attivare i backfill asincroni a un determinato intervallo. Ad esempio 1 day , per il riempimento giornaliero o 1 week per il riempimento settimanale. Per altre informazioni, vedere Attivare i backfill regolari usando cloudFiles.backfillInterval.Non usare quando cloudFiles.useManagedFileEvents è impostato su true. |
cloudFiles.cleanSource |
OFF |
Indica se eliminare automaticamente i file elaborati dalla directory di input. Se impostato su OFF (impostazione predefinita), non vengono eliminati file.Se impostato su DELETE, Il caricatore automatico elimina automaticamente i file 30 giorni dopo l'elaborazione. A tale scopo, il caricatore automatico deve disporre delle autorizzazioni di scrittura per la directory di origine.Se impostato su MOVE, il caricatore automatico sposta automaticamente i file nel percorso specificato in cloudFiles.cleanSource.moveDestination 30 giorni dopo l'elaborazione. A tale scopo, Auto Loader deve disporre delle autorizzazioni di scrittura per la directory sorgente e per la destinazione di spostamento.Un file viene considerato elaborato quando ha un valore non Null per commit_time nel risultato della cloud_files_state funzione con valori di tabella. Vedere cloud_files_state funzione con valori di tabella. L'attesa aggiuntiva di 30 giorni dopo l'elaborazione può essere configurata tramite cloudFiles.cleanSource.retentionDuration.Esaminare le considerazioni seguenti prima di abilitare cloudFiles.cleanSource:
Disponibile in Databricks Runtime 16.4 e versioni successive. |
cloudFiles.cleanSource.retentionDuration |
30 days |
Tempo di attesa prima che i file elaborati diventino candidati per l'archiviazione con cleanSource. Deve essere maggiore di 7 giorni per DELETE. Nessuna restrizione minima per MOVE.Il valore è una stringa CalendarInterval . Ad esempio, "14 days", "30 days", "2 weeks"o "1 month".Disponibile in Databricks Runtime 16.4 e versioni successive. |
cloudFiles.cleanSource.moveDestination |
None | Percorso in cui archiviare i file elaborati quando cloudFiles.cleanSource è impostato su MOVE. Può trattarsi di un percorso di archiviazione cloud o di un percorso del volume del catalogo Unity (ad esempio, /Volumes/my_catalog/my_schema/my_volume/archive/).Il percorso di spostamento deve:
Il caricatore automatico deve disporre delle autorizzazioni di scrittura per questa directory. Disponibile in Databricks Runtime 16.4 e versioni successive. |
cloudFiles.format |
Nessuno (opzione obbligatoria) |
Formato del file di dati nel percorso di origine. I valori validi includono:
|
cloudFiles.includeExistingFiles |
true |
Indica se includere file esistenti nel percorso di input di elaborazione del flusso o elaborare solo i nuovi file in arrivo dopo l'installazione iniziale. Questa opzione viene valutata solo quando avvii uno streaming per la prima volta. La modifica di questa opzione dopo il riavvio del flusso non ha alcun effetto. |
cloudFiles.inferColumnTypes |
false |
Stabilisce se dedurre tipologie esatte di colonna quando si sfrutta l'inferenza dello schema. Per impostazione predefinita, le colonne vengono dedotte come stringhe durante l'inferenza di set di dati JSON e CSV. Per altri dettagli, vedere Inferenza dello schema . |
cloudFiles.maxBytesPerTrigger |
None | Numero massimo di nuovi byte da elaborare in ogni trigger. È possibile specificare una stringa di byte, ad esempio 10g per limitare ogni microbatch a 10 GB di dati. Questo è un massimo morbido. Se sono presenti file di 3 GB ciascuno, Azure Databricks elabora 12 GB in un microbatch. Se usato insieme a cloudFiles.maxFilesPerTrigger, Azure Databricks consuma fino al limite inferiore di cloudFiles.maxFilesPerTrigger o cloudFiles.maxBytesPerTrigger, a seconda di quale valore viene raggiunto per primo. Questa opzione non ha alcun effetto se usata con Trigger.Once() (Trigger.Once() è deprecata).In Databricks Runtime 18.0 e versioni successive, questa opzione viene configurata dinamicamente e non deve essere impostata manualmente. |
cloudFiles.maxFileAge |
None | Per quanto tempo viene rilevato un evento di file a scopo di deduplicazione. Databricks sconsiglia di ottimizzare questo parametro a meno che non si stiano inserendo dati nell'ordine di milioni di file all'ora. Per altri dettagli, vedere la sezione relativa al rilevamento degli eventi file . L'ottimizzazione troppo aggressiva di cloudFiles.maxFileAge può causare problemi di qualità dei dati, ad esempio l'inserimento duplicato o i file mancanti. Di conseguenza, Databricks consiglia un'impostazione conservativa per cloudFiles.maxFileAge, ad esempio 90 giorni, che è simile a quella consigliata per le soluzioni di inserimento dati simili. |
cloudFiles.maxFilesPerTrigger |
1000 |
Numero massimo di nuovi file da elaborare in ogni trigger. Se usato insieme a cloudFiles.maxBytesPerTrigger, Azure Databricks consuma fino al limite inferiore di cloudFiles.maxFilesPerTrigger o cloudFiles.maxBytesPerTrigger, a seconda di quale valore viene raggiunto per primo. Questa opzione non ha alcun effetto se usata con Trigger.Once() (deprecato).In Databricks Runtime 18.0 e versioni successive, questa opzione viene configurata dinamicamente e non deve essere impostata manualmente. |
cloudFiles.partitionColumns |
None | Elenco delimitato da virgole di colonne di partizione in stile Hive che si desidera dedurre dalla struttura di directory dei file. Le colonne di partizione in stile Hive sono coppie chiave-valore combinate da un segno di uguaglianza, <base-path>/a=x/b=1/c=y/file.formatad esempio . In questo esempio le colonne di partizione sono a, b e c. Per impostazione predefinita, queste colonne vengono aggiunte automaticamente allo schema se si usa l'inferenza dello schema e si specifica l'oggetto <base-path> da cui caricare i dati. Se si specifica uno schema, il caricatore automatico prevede che queste colonne vengano incluse nello schema. Se non si desidera che queste colonne facciano parte dello schema, è possibile specificare "" per ignorare queste colonne. Inoltre, è possibile usare questa opzione quando si vuole dedurre il percorso del file in strutture di directory complesse, come nell'esempio seguente:<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csvSpecificando cloudFiles.partitionColumns come year,month,day restituisce year=2022 per file1.csv, ma le colonne month e day sono null.month e day vengono analizzati correttamente per file2.csv e file3.csv. |
cloudFiles.schemaEvolutionMode |
addNewColumnsquando non viene specificato uno schema; in caso contrario, none |
La modalità per l'evoluzione dello schema man mano che vengono individuate nuove colonne nei dati. Per impostazione predefinita, le colonne vengono dedotte come stringhe durante l'inferenza di set di dati JSON. Per altri dettagli, vedere Evoluzione dello schema . |
cloudFiles.schemaHints |
None | Informazioni sullo schema fornite ad Auto Loader durante l'inferenza dello schema. Vedere hint di schema per maggiori dettagli. |
cloudFiles.schemaLocation |
Nessuno (obbligatorio per dedurre lo schema) | La posizione in cui archiviare lo schema dedotto e le modifiche successive. Per altri dettagli, vedere Inferenza dello schema . |
cloudFiles.useStrictGlobber |
false |
Indica se usare un globber rigoroso che si allinei con il comportamento globbing predefinito di altre risorse di file in Apache Spark. Per altri dettagli, vedere Modelli di caricamento dei dati comuni . Disponibile in Databricks Runtime 12.2 LTS e versioni successive. |
cloudFiles.validateOptions |
true |
Indica se convalidare le opzioni del caricatore automatico e restituire un errore per opzioni sconosciute o incoerenti. |
Directory
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.useIncrementalListing (obsoleto) |
auto in Databricks Runtime 17.2 e versioni successive in false Databricks Runtime 17.3 e versioni successive |
Questa funzionalità è stata deprecata. Databricks consiglia di usare la modalità di notifica file con eventi di file anziché cloudFiles.useIncrementalListing.Indicare se utilizzare l'elenco incrementale anziché quello completo nella modalità elenco directory. Per impostazione predefinita, il caricatore automatico fa il massimo sforzo per rilevare automaticamente se una determinata directory è applicabile per l'elenco incrementale. È possibile usare in modo esplicito l'elenco incrementale o usare l'elenco completo della directory impostandolo rispettivamente come true o false.L'abilitazione errata dell'elenco incrementale in una directory non lessicalmente ordinata impedisce al caricatore automatico di individuare nuovi file. Funziona con Azure Data Lake Storage ( abfss://), S3 (s3://) e GCS (gs://).Disponibile in Databricks Runtime 9.1 LTS e versioni successive. Valori disponibili: auto, true, false |
Notifica file
Per informazioni sulla configurazione della modalità di notifica file, incluse le autorizzazioni cloud necessarie, le istruzioni di installazione e i metodi di autenticazione, vedere Configurare i flussi del caricatore automatico in modalità di notifica file.
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.fetchParallelism |
1 |
Numero di thread da usare per il recupero di messaggi dal servizio di accodamento. Non usare quando cloudFiles.useManagedFileEvents è impostato su true. |
cloudFiles.pathRewrites |
None | Obbligatorio solo se si specifica un queueUrl oggetto che riceve notifiche di file da più bucket S3 e si vuole usare i punti di montaggio configurati per l'accesso ai dati in questi contenitori. Usa questa opzione per riscrivere il prefisso del percorso bucket/key con il punto di montaggio. È possibile riscrivere solo i prefissi. Ad esempio, per la configurazione {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, il percorso s3://<databricks-mounted-bucket>/path/2017/08/fileA.json viene riscritto in dbfs:/mnt/data-warehouse/2017/08/fileA.json.Non usare quando cloudFiles.useManagedFileEvents è impostato su true. |
cloudFiles.resourceTag |
None | Una serie di coppie di tag chiave-valore che consentono di associare e identificare le risorse correlate, ad esempio:cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")Per altre informazioni su AWS, vedere Tag di allocazione dei costi di Amazon SQS e Configurazione dei tag per un argomento Amazon SNS. (1) Per altre informazioni su Azure, vedere Denominazione di code e metadati e la trattazione di properties.labels nelle sottoscrizioni di eventi. Il caricatore automatico archivia queste coppie di tag chiave-valore in JSON come etichette.
(1)Per altre informazioni su GCP, vedere Creazione di report sull'utilizzo con le etichette. (1) Non usare quando cloudFiles.useManagedFileEvents è impostato su true. Impostare invece i tag delle risorse usando la console del provider di servizi cloud. |
cloudFiles.useManagedFileEvents |
false |
Se impostato su true, Il caricatore automatico usa il servizio eventi file per individuare i file nel percorso esterno. È possibile usare questa opzione solo se il percorso di caricamento si trova in un percorso esterno con eventi di file abilitati. Vedere Usare la modalità di notifica file con gli eventi di file.Gli eventi di file offrono prestazioni a livello di notifiche nell'individuazione file, perché il caricatore automatico può individuare nuovi file dopo l'ultima esecuzione. A differenza dell'elenco di directory, questo processo non deve elencare tutti i file nella directory. Ci sono alcune situazioni in cui il caricatore automatico usa l'elenco di directory anche se l'opzione degli eventi di file è abilitata:
Vedere Quando il caricatore automatico con gli eventi di file usa l'elenco di directory? per un elenco completo delle situazioni in cui il caricatore automatico usa l'elenco di directory con questa opzione. Disponibile in Databricks Runtime 14.3 LTS e versioni successive. |
cloudFiles.listOnStart |
false |
Se impostato su true, Il caricatore automatico esegue un elenco di directory completo all'avvio del flusso, anziché iniziare con il token di continuazione nel checkpoint. Usare questa opzione per eseguire il ripristino da errori, ad esempio CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. Vedere How do I recover from a CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN error?. |
cloudFiles.useNotifications |
false |
Indica se usare la modalità di notifica file per determinare quando sono presenti nuovi file. Se false, usare la modalità elenco di directory. Consulta Confronta le modalità di rilevamento dei file del caricatore automatico.Non usare quando cloudFiles.useManagedFileEvents è impostato su true. |
(1) Il caricatore automatico aggiunge per impostazione predefinita le seguenti coppie di tag chiave-valore su base migliorativa:
-
vendor:Databricks -
path: percorso da cui vengono caricati i dati. Non disponibile in GCP a causa di limitazioni di etichettatura. -
checkpointLocation: La posizione del checkpoint dello stream. Non disponibile in GCP a causa di limitazioni di etichettatura. -
streamId: un identificatore univoco globale per il flusso.
Databricks riserva questi nomi di chiave e non è possibile sovrascriverne i valori.
Specifico del cloud
Il caricatore automatico offre opzioni per la configurazione dell'infrastruttura cloud per la modalità di notifica file. Per le autorizzazioni cloud necessarie e le istruzioni di configurazione, vedere Configurare flussi del caricatore automatico in modalità di notifica file.
AWS
Specificare le opzioni seguenti solo se si sceglie cloudFiles.useNotifications = true e si vuole che il caricatore automatico configuri automaticamente i servizi di notifica:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.region |
Area dell'istanza EC2 | L'area in cui risiede il bucket S3 di origine e dove si vogliono creare i servizi AWS SNS e SQS. |
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.restrictNotificationSetupToSameAWSAccountId |
false |
Consentire solo le notifiche di eventi dai bucket AWS S3 nello stesso account del topic SNS. Quando è vero, Auto Loader accetta solo le notifiche degli eventi dai bucket di AWS S3 nello stesso account del topic SNS. Quando false, i criteri di accesso non limitano le configurazioni di bucket tra diversi account e topic SNS. Ciò è utile quando l'argomento SNS e il percorso del bucket sono associati a account diversi.Disponibile in Databricks Runtime 17.2 e versioni successive. |
Specificare l'opzione seguente solo se si sceglie cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda già configurata:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.queueUrl |
None | L'URL della coda SQS. Se specificato, Auto Loader consuma direttamente gli eventi da questa coda invece di configurare i propri servizi AWS SNS e SQS. |
Opzioni di autenticazione di AWS
Fornire l'opzione di autenticazione seguente per usare una credenziale del servizio Databricks:
| Chiave | Predefinito | Description |
|---|---|---|
databricks.serviceCredential |
None | Il nome della credenziale di servizio Databricks . Disponibile in Databricks Runtime 16.1 e versioni successive. |
Quando le credenziali del servizio Databricks o i ruoli IAM non sono disponibili, è invece possibile fornire le opzioni di autenticazione seguenti:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.awsAccessKey |
None | La chiave di accesso ID AWS per l’utente. Deve essere fornito con cloudFiles.awsSecretKey. |
cloudFiles.awsSecretKey |
None | La chiave di accesso segreta AWS per l’utente. Deve essere fornito con cloudFiles.awsAccessKey. |
cloudFiles.roleArn |
None | L'ARN di un ruolo IAM da assumere, se necessario. Il ruolo può essere assunto dal profilo dell'istanza del cluster o fornendo le credenziali con cloudFiles.awsAccessKey e cloudFiles.awsSecretKey. |
cloudFiles.roleExternalId |
None | Identificatore da specificare durante l'assunzione di un ruolo tramite cloudFiles.roleArn. |
cloudFiles.roleSessionName |
None | Nome di sessione facoltativo da usare durante l'assunzione di un ruolo tramite cloudFiles.roleArn. |
cloudFiles.stsEndpoint |
None | Endpoint facoltativo da fornire per l'accesso ad AWS STS quando si presuppone un ruolo usando cloudFiles.roleArn. |
Azure
È necessario specificare i valori per tutte le opzioni seguenti se si specifica cloudFiles.useNotifications = true e si vuole che il caricatore automatico configuri automaticamente i servizi di notifica:
Se una credenziale del servizio Databricks non è disponibile, è possibile fornire invece le opzioni di autenticazione seguenti:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.clientId |
None | ID cliente o ID applicazione del principale del servizio. |
cloudFiles.clientSecret |
None | Segreto del client principale di servizio. |
cloudFiles.connectionString |
None | La stringa di connessione per l'account di archiviazione, in base alla chiave di accesso dell'account o alla firma di accesso condiviso (SAS). |
cloudFiles.tenantId |
None | ID tenant Azure in cui viene creata l'entità servizio. |
Specificare l'opzione seguente solo se si imposta cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda esistente:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.queueName |
None | Nome della coda di Azure. Se specificato, l'origine dei file cloud utilizza direttamente gli eventi di questa coda invece di configurare i propri servizi Griglia di eventi di Azure e Queue Storage. In tal caso, il databricks.serviceCredential o cloudFiles.connectionString richiede solo autorizzazioni di lettura per la coda. |
GCP
Auto Loader può configurare automaticamente i servizi di notifica sfruttando le credenziali del servizio Databricks . L'account del servizio creato con le credenziali del servizio Databricks richiederà le autorizzazioni specificate in Configurare flussi del caricatore automatico in modalità di notifica file.
Se le credenziali di un servizio Databricks non sono disponibili, è possibile usare direttamente gli account del servizio Google. È possibile configurare il cluster per presupporre un account del servizio seguendo la configurazione del servizio Google oppure specificare direttamente le opzioni di autenticazione seguenti:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.client |
None | ID client dell'account del servizio Google. |
cloudFiles.clientEmail |
None | Indirizzo di posta elettronica dell'account del servizio Google. |
cloudFiles.privateKey |
None | Chiave privata generata per l'account del servizio Google. |
cloudFiles.privateKeyId |
None | ID della chiave privata generata per l'account del servizio Google. |
Specificare l'opzione seguente solo se si sceglie cloudFiles.useNotifications = true e si vuole che il caricatore automatico usi una coda già configurata:
| Chiave | Predefinito | Description |
|---|---|---|
cloudFiles.subscription |
None | Nome della sottoscrizione Google Cloud Pub/Sub. Se specificato, l'origine dei file cloud utilizza gli eventi di questa coda invece di configurare i propri servizi GCS Notification e Google Cloud Pub/Sub. |
Delta Lake
Quando si esegue la lettura da una tabella Delta Lake tramite , si applicano le opzioni seguenti.spark.readStream
| Chiave | Predefinito | Description |
|---|---|---|
allowSourceColumnDrop |
None | Impostare su un numero di versione della tabella Delta o "always" per consentire al flusso di continuare dopo l'eliminazione delle colonne dallo schema della tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake. |
allowSourceColumnRename |
None | Impostare su un numero di versione della tabella Delta o "always" per consentire al flusso di continuare dopo la ridenominazione delle colonne nella tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake. |
allowSourceColumnTypeChange |
None | Impostare su un numero di versione della tabella Delta o "always" per consentire al flusso di continuare dopo la modifica dei tipi di colonna nella tabella di origine. Se impostato su un numero di versione, riconosce tutte le modifiche dello schema fino a tale versione. Richiede schemaTrackingLocation. Vedere Estensione del tipo. |
excludeRegex |
None | Modello di espressione regolare. File i cui percorsi corrispondono al modello vengono esclusi dalla lettura in streaming. Utile per filtrare i file che non sono conformi alla convenzione di denominazione prevista. |
failOnDataLoss |
true |
Indica se la query di streaming non riesce se i dati di origine sono stati eliminati a causa della conservazione dei log (logRetentionDuration). Impostare su false per ignorare i dati mancanti e continuare l'elaborazione. Vedere Configurare la conservazione dei dati per le query di spostamento cronologico. |
ignoreChanges (obsoleto) |
false |
Disponibile in Databricks Runtime 11.3 LTS e versioni precedenti. Ricrea i file di dati riscritti dopo le operazioni di modifica, ad UPDATEesempio , MERGE INTODELETE, o OVERWRITE. Le righe non modificate possono essere generate insieme a nuove righe, pertanto i consumer downstream devono gestire duplicati. Le operazioni di eliminazione non vengono propagate a valle. Sostituito da skipChangeCommits in Databricks Runtime 12.2 LTS e versioni successive. |
ignoreDeletes (obsoleto) |
false |
Ignora le transazioni che eliminano i dati ai limiti della partizione (solo la partizione completa viene eliminata). Non gestisce eliminazioni, aggiornamenti o altre modifiche non di partizione. Utilizzare invece skipChangeCommits. |
readChangeFeed oppure readChangeData |
false |
Indica se abilitare la lettura del feed di dati delle modifiche per la query di streaming. Se abilitato, il flusso genera modifiche a livello di riga (inserimenti, aggiornamenti ed eliminazioni) con colonne di metadati aggiuntive. Consulta Utilizzare il feed di dati delle modifiche di Delta Lake su Azure Databricks. |
schemaTrackingLocation |
None | Percorso di una directory in cui Delta Lake tiene traccia delle modifiche dello schema per la lettura in streaming. Obbligatorio quando si esegue lo streaming da tabelle con mapping di colonne abilitato e si usano allowSourceColumn* le opzioni per gestire l'evoluzione dello schema. Deve trovarsi all'interno checkpointLocation della query di streaming. Vedi Rinominare ed eliminare colonne con la mappatura delle colonne di Delta Lake. |
skipChangeCommits |
false |
Ignora le transazioni che eliminano o modificano i record e i processi esistenti vengono aggiunti solo. Databricks consiglia questa opzione per la maggior parte dei carichi di lavoro che non usano feed di dati delle modifiche. Disponibile in Databricks Runtime 12.2 LTS e versioni successive. Vedere Ignorare i commit delle modifiche upstream con skipChangeCommits. |
startingTimestamp |
Ultima versione disponibile | Timestamp da cui iniziare la lettura. Il flusso legge tutte le modifiche apportate alla tabella di cui è stato eseguito il commit o dopo il timestamp specificato. Se il timestamp precede tutti i commit della tabella disponibili, il flusso inizia dal commit meno recente disponibile. Non può essere usato insieme a startingVersion. Ignorato se il checkpoint di streaming esiste già.Valori validi: una stringa di timestamp, "2019-01-01T00:00:00.000Z" ad esempio o una stringa di data, "2019-01-01"ad esempio . |
startingVersion |
Ultima versione disponibile | Versione della tabella Delta da cui iniziare la lettura. Il flusso legge tutte le modifiche di cui è stato eseguito il commit in o dopo la versione specificata. Specificare "latest" per iniziare solo dalle modifiche più recenti. Non può essere usato insieme a startingTimestamp. Ignorato se il checkpoint di streaming esiste già. Consulta Lavora con la cronologia delle tabelle. |
withEventTimeOrder |
false |
Divide lo snapshot della tabella iniziale in bucket di tempo dell'evento per impedire che i record vengano contrassegnati erroneamente come eventi tardivi e eliminati in query con stato con filigrane. Non può essere modificato dopo l'avvio dell'elaborazione iniziale degli snapshot senza eliminare il checkpoint. Disponibile in Databricks Runtime 11.3 LTS e versioni successive. Vedere Elaborare lo snapshot iniziale senza eliminare i dati. |
Kafka
Usare queste opzioni con spark.readStream.format("kafka") o spark.read.format("kafka"):
| Chiave | Predefinito | Description |
|---|---|---|
assign |
None | Partizioni specifiche da utilizzare. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Valori validi: stringa JSON, ad esempio {"topicA":[0,1],"topicB":[2,4]}. |
failOnDataLoss |
true |
Indica se la query non riesce se i dati potrebbero essere andati persi, ad esempio a causa di argomenti eliminati o troncamento offset. Impostare su false per ignorare i dati mancanti e continuare. Valori validi: true, false.Databricks stima in modo conservativo se i dati potrebbero essere andati persi. Tuttavia, questo potrebbe causare falsi allarmi. |
fetchoffset.numretries |
3 |
Numero di tentativi durante il recupero degli offset Kafka non riesce. Valori validi: numeri interi non negativi. |
fetchoffset.retryintervalms |
1000 |
Intervallo in millisecondi tra tentativi di recupero offset. Valori validi: numeri interi non negativi. |
groupIdPrefix |
spark-kafka-source (streaming), spark-kafka-relation (batch) |
Prefisso personalizzato da usare per l'ID gruppo di consumer Kafka generato automaticamente. Se kafka.group.id è impostato in modo esplicito, il connettore ignora questa opzione. Valori validi: qualsiasi stringa. |
includeHeaders |
false |
Indica se includere intestazioni di messaggio Kafka come colonna nell'output. Valori validi: true, false. |
kafkaconsumer.polltimeoutms |
None | Timeout in millisecondi per la chiamata al consumer poll() Kafka. Valori validi: numeri interi positivi. |
kafka.bootstrap.servers |
None | Elenco delimitato da virgole di indirizzi host:port per i broker Kafka. Imposta la proprietà del bootstrap.servers client Kafka.Se non sono presenti dati di Kafka, controllare l'elenco di indirizzi del broker per gli indirizzi non corretti. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. I client Kafka presuppongono che i broker saranno disponibili alla fine e riprovare per sempre quando ricevono errori di rete. |
maxRecordsPerPartition |
None | Numero massimo di record per ogni partizione Spark. Se impostato, il connettore divide le partizioni Kafka in modo che ogni partizione Spark legga al massimo questo numero di record. Valori validi: numeri interi positivi. È anche possibile usare questa opzione con minPartitions. Quando vengono impostate entrambe le opzioni, Spark usa qualsiasi opzione restituisca più partizioni. |
minPartitions |
None | Numero minimo di partizioni Spark da leggere da Kafka. Quando impostato, il connettore divide le partizioni Kafka di grandi dimensioni per aumentare il parallelismo. Quando non è impostato, Spark crea una partizione per ogni partizione di argomento Kafka. Utile per gestire l'asimmetria o il picco dei carichi di dati. Valori validi: numeri interi positivi. Questa opzione reinizializza i consumer Kafka per ogni trigger, che potrebbero influire sulle prestazioni con SSL. |
startingOffsets |
latest (streaming), earliest (batch) |
Offset da cui la query inizia la lettura. Valori validi: earliest, latesto una stringa JSON di offset per ogni partizione, {"topicA":{"0":23,"1":-2}}ad esempio . Nella stringa -1 JSON è l'offset più recente.
-2 è l'offset meno recente.Per le query di streaming, questa opzione si applica solo all'avvio di una nuova query. Le query riprese usano sempre il checkpoint. Durante una query, le nuove partizioni iniziano a leggere al primo offset. Per le query batch, latest non è consentito. |
startingOffsetsByTimestamp |
None | Elenco di offset iniziali per ogni partizione, specificato come timestamp in millisecondi. Quando non esiste alcun offset per un timestamp, il comportamento della query viene determinato da startingOffsetsByTimestampStrategy. Valori validi: stringa JSON di timestamp per ogni partizione, ad esempio {"topicA":{"0":1000,"1":2000}}.Per le query di streaming, questa opzione si applica solo all'avvio di una nuova query. Le query riprese usano sempre il checkpoint. Durante una query, le nuove partizioni iniziano a leggere al primo offset. |
startingOffsetsByTimestampStrategy |
error |
Strategia da usare quando non viene trovato alcun offset per un timestamp specificato in startingOffsetsByTimestamp o startingTimestamp. Valori validi: error (genera un'eccezione), latest (usa l'offset disponibile più recente). |
startingTimestamp |
None | Timestamp iniziale globale in millisecondi che si applica a tutte le partizioni. Quando non esiste alcun offset per il timestamp, il comportamento viene controllato da startingOffsetsByTimestampStrategy. Valori validi: numeri interi non negativi. |
subscribe |
None | Argomenti da sottoscrivere. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Valori validi: elenco delimitato da virgole di nomi di argomenti. |
subscribePattern |
None | Modello utilizzato per sottoscrivere argomenti. È necessario specificare esattamente una delle subscribeopzioni , subscribePatterno assign . Ad esempio: topic.*. Valori validi: qualsiasi stringa regex Java. |
Le opzioni seguenti si applicano solo alle letture in streaming con spark.readStream.format("kafka"):
| Chiave | Predefinito | Description |
|---|---|---|
bytesEstimateWindowLength |
300s |
Intervallo di tempo usato per stimare i byte rimanenti per la estimatedTotalBytesBehindLatest metrica. Valori validi: stringhe di durata come 10m o 600s. Consultare Recupero delle metriche Kafka. |
maxOffsetsPerTrigger |
None | Numero massimo di offset da elaborare per intervallo di trigger. Gli offset vengono distribuiti proporzionalmente tra le partizioni degli argomenti. Valori validi: numeri interi positivi. |
maxTriggerDelay |
15m |
Tempo massimo di attesa per minOffsetsPerTrigger l'accumulo prima dell'attivazione. Valori validi: stringhe di durata come 10m o 600s. |
minOffsetsPerTrigger |
None | Numero minimo di offset da accumulare prima di attivare un micro batch. Quando maxTriggerDelay viene raggiunto, il micro batch viene eseguito indipendentemente. Valori validi: numeri interi positivi. |
Per le opzioni di offset applicabili solo alle letture batch con spark.read.format("kafka"), vedere Opzioni Kafka di DataFrameReader.
Per le opzioni del client Kafka (kafka.*) e dell'autenticazione, vedere Opzioni.
Opzioni DataFrameWriter
Usare queste opzioni con DataFrameWriter.option() e DataFrameWriterV2.option() per controllare come Azure Databricks scrive i dati.
Example
Nell'esempio seguente viene impostato su mergeSchemaTrue per la scrittura di una tabella Delta Lake:
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
| Chiave | Predefinito | Description |
|---|---|---|
avroSchema |
None | Schema Avro completo come stringa JSON. Usare questa opzione per convertire i tipi SPARK SQL in tipi Avro specifici. Si applica al file Avro. |
avroSchemaUrl |
None | URL che punta a un file di schema Avro. Usare anziché avroSchema quando lo schema viene archiviato esternamente. Si escludono con avroSchema a vicenda. Si applica al file Avro. |
compression |
snappy |
Codec di compressione da usare durante la scrittura. Valori validi: uncompressed, deflate, snappybzip2, xz, , . zstandard Si applica al file Avro. |
recordName |
topLevelRecord |
Nome del record di primo livello nello schema Avro di output. Si applica al file Avro. |
positionalFieldMatching |
false |
Indica se trovare una corrispondenza tra lo schema Spark e lo schema Avro in base alla posizione del campo anziché in base al nome. Si applica al file Avro. |
recordNamespace |
Stringa vuota | Spazio dei nomi per il record di primo livello nello schema Avro di output. Si applica al file Avro. |
Delta Lake e Apache Iceberg
| Chiave | Predefinito | Description |
|---|---|---|
clusterByAuto |
false |
Indica se abilitare il clustering liquido automatico, in cui Azure Databricks seleziona le colonne di clustering in base ai modelli di query. Valido solo con mode("overwrite"). Non può essere usato con append la modalità . Disponibile in Databricks Runtime 16.4 e versioni successive. Si applica a Usa clustering liquido per le tabelle. |
mergeSchema |
None | Indica se abilitare l'evoluzione dello schema per l'operazione di scrittura. Le nuove colonne nel dataframe di origine vengono aggiunte allo schema della tabella di destinazione. Si applica alle accodamenti batch e streaming. Si applica allo schema della tabella di aggiornamento. |
overwriteSchema |
None | Indica se sostituire lo schema della tabella e il partizionamento durante la sovrascrittura. Richiede mode("overwrite") senza replaceWhere. Non è possibile usare con partitionOverwriteMode. Si applica allo schema della tabella di aggiornamento. |
partitionOverwriteMode |
None | Modalità di sovrascrittura della partizione. Impostare su su dynamic per sovrascrivere solo le partizioni contenenti nuovi dati, lasciando invariate tutte le altre partizioni. Modalità legacy, non supportata nel calcolo serverless o in Databricks SQL. Valori validi: static, dynamic. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake. |
replaceOn |
None | Espressione booleana che corrisponde alle righe nella tabella di destinazione da sostituire con le righe della query di origine. Può fare riferimento a colonne sia dalla tabella di destinazione che dalla query di origine. Le righe nella destinazione che corrispondono a una riga di origine vengono eliminate e sostituite. Se l'origine è vuota, non vengono eseguite eliminazioni. Usare targetAlias per evitare ambiguità nei riferimenti alle colonne. Disponibile in Databricks Runtime 17.1 e versioni successive. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake. |
replaceUsing |
None | Elenco delimitato da virgole di nomi di colonna usati per trovare le corrispondenze tra la tabella di destinazione e la query di origine. Sia la destinazione che l'origine devono contenere tutte le colonne elencate. Le righe nella destinazione che corrispondono a una riga di origine nel confronto di uguaglianza vengono eliminate e sostituite.
NULL i valori vengono considerati non uguali e non corrispondono. Disponibile in Databricks Runtime 16.3 e versioni successive. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake. |
replaceWhere |
None | Espressione di predicato. Sovrascrive in modo atomico solo i record che corrispondono al predicato. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake. |
targetAlias |
None | Alias stringa per la tabella di destinazione. Usare con replaceOn o replaceWhere per disambiguare i riferimenti a colonne quando la condizione fa riferimento a colonne sia dalla tabella di destinazione che dalla query di origine. Si applica a Sovrascrivi in modo selettivo i dati con Delta Lake. |
txnAppId |
None | Stringa univoca che identifica l'applicazione per le scritture idempotenti nelle foreachBatch operazioni. Usare insieme txnVersion a per assicurarsi che le operazioni di scrittura esattamente una sola volta in più tabelle Delta Lake. Si applica a Use for idempotent table writes(Usa foreachBatch per le scritture di tabelle idempotenti). |
txnVersion |
None | Numero che aumenta in modo monotonico usato come versione della transazione per le scritture idempotenti nelle foreachBatch operazioni. Usare insieme txnAppId a per assicurarsi che le operazioni di scrittura esattamente una sola volta in più tabelle Delta Lake. Si applica a Use for idempotent table writes(Usa foreachBatch per le scritture di tabelle idempotenti). |
optimizeWrite |
None | Indica se abilitare l'opzione Ottimizzazione automatica scrittura per questa operazione di scrittura. Esegue l'override della spark.databricks.delta.optimizeWrite.enabled configurazione. Si applica a Che è Delta Lake in Azure Databricks?. |
userMetadata |
None | Stringa definita dall'utente aggiunta ai metadati di commit per l'operazione di scrittura. Visibile nell'output di DESCRIBE HISTORY. Si applica alle tabelle Enrich con metadati personalizzati. |
CSV
| Chiave | Predefinito | Description |
|---|---|---|
charToEscapeQuoteEscaping |
\0 (non abilitato) |
Carattere utilizzato per eseguire l'escape del carattere di escape quando differisce dal carattere di virgolette. Si applica a csv (DataFrameWriter). |
compression |
none |
Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica a csv (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
Formattare la stringa per i valori delle colonne di data. Si applica a csv (DataFrameWriter). |
emptyValue |
Stringa vuota | Stringa scritta per valori vuoti (non Null). Si applica a csv (DataFrameWriter). |
encoding |
UTF-8 |
Codifica dei caratteri per i file di output. Si applica a csv (DataFrameWriter). |
escape |
\ |
Carattere utilizzato per eseguire l'escape dei valori tra virgolette. Si applica a csv (DataFrameWriter). |
escapeQuotes |
true |
Indica se inserire caratteri di escape tra virgolette all'interno di valori di campo tra virgolette. Si applica a csv (DataFrameWriter). |
header |
false |
Indica se scrivere nomi di colonna come prima riga dell'output. Si applica a csv (DataFrameWriter). |
ignoreLeadingWhiteSpace |
false |
Indica se tagliare gli spazi vuoti iniziali dai valori durante la scrittura. Si applica a csv (DataFrameWriter). |
ignoreTrailingWhiteSpace |
false |
Indica se tagliare gli spazi vuoti finali dai valori durante la scrittura. Si applica a csv (DataFrameWriter). |
lineSep |
\n |
Stringa separatore di riga utilizzata tra i record. Si applica a csv (DataFrameWriter). |
locale |
en-US |
Un identificatore java.util.Locale. Influenza la formattazione dei valori di data e timestamp durante la scrittura. |
nullValue |
Stringa vuota | Stringa scritta per i valori Null. Si applica a csv (DataFrameWriter). |
quote |
" |
Carattere utilizzato per virgolettere i valori di campo che contengono il separatore. Si applica a csv (DataFrameWriter). |
quoteAll |
false |
Indica se racchiudere tutti i valori di campo tra virgolette indipendentemente dal contenuto. Si applica a csv (DataFrameWriter). |
sep |
, |
Carattere delimitatore di campo. Si applica a csv (DataFrameWriter). |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Stringa di formato per i valori di colonna timestamp. Si applica a csv (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Stringa di formato per timestamp senza valori di colonna fuso orario (TimestampNTZType). |
Excel
| Chiave | Predefinito | Description |
|---|---|---|
dataAddress |
None | Nome del foglio o cella iniziale per la scrittura. Se omesso, scrive in un foglio denominato Sheet1 a partire dalla cella A1. Accetta un nome di foglio ("SheetName") o un riferimento a una singola cella ("SheetName!A1"). Gli intervalli di celle non sono supportati per le scritture. |
dateFormatInWrite |
yyyy-mm-dd |
Excel stringa di formato cella applicata alle colonne Date. Usa Excel sintassi del formato. |
headerRows |
0 |
Indica se scrivere nomi di colonna come prima riga. Valori validi: 0, 1. |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
Excel stringa di formato cella applicata alle colonne TimestampNTZ e Timestamp. Usa Excel sintassi del formato. |
version |
xlsx |
Versione del formato di file Excel da scrivere. Valori validi: xlsx, xls. |
JSON
| Chiave | Predefinito | Description |
|---|---|---|
compression |
none |
Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica a json (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
Formattare la stringa per i valori delle colonne di data. Si applica a json (DataFrameWriter). |
encoding |
UTF-8 |
Codifica dei caratteri per i file di output. Si applica a json (DataFrameWriter). |
ignoreNullFields |
valore di spark.sql.jsonGenerator.ignoreNullFields |
Indica se omettere campi con valori Null dall'output JSON. Si applica a json (DataFrameWriter). |
lineSep |
\n |
Stringa separatore di riga utilizzata tra i record. Si applica a json (DataFrameWriter). |
locale |
en-US |
Un identificatore java.util.Locale. Influenza la formattazione dei valori di data e timestamp durante la scrittura. |
pretty |
false |
Indica se abilitare l'output JSON con rientro, multilinea. |
sortKeys |
false |
Indica se ordinare le chiavi degli oggetti JSON in ordine alfabetico nell'output. Utile per produrre output deterministico. |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Stringa di formato per i valori di colonna timestamp. Si applica a json (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Stringa di formato per timestamp senza valori di colonna fuso orario (TimestampNTZType). |
writeNonAsciiCharacterAsCodePoint |
false |
Indica se codificare caratteri non ASCII come \uXXXX sequenze di escape Unicode anziché caratteri UTF-8 letterali nell'output. |
ORCO
| Chiave | Predefinito | Description |
|---|---|---|
compression |
zstd |
Codec di compressione da usare durante la scrittura. Valori validi: none, uncompressed, snappyzlib, lzo, zstd, lz4brotli. Si applica a orc (DataFrameWriter). |
Parquet
| Chiave | Predefinito | Description |
|---|---|---|
compression |
snappy |
Codec di compressione da usare durante la scrittura. Valori validi: none, uncompressedsnappy, , gziplzo, brotli, lz4, lz4_rawzstd. Si applica a parquet (DataFrameWriter). |
spark.sql.parquet.outputTimestampType |
INT96 |
Tipo fisico utilizzato per codificare le colonne timestamp. Valori validi: INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS. Usare INT96 per la compatibilità con i lettori Parquet legacy che non supportano i tipi di timestamp standard. |
Testo
| Chiave | Predefinito | Description |
|---|---|---|
compression |
none |
Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica al testo (DataFrameWriter). |
encoding |
UTF-8 |
Codifica dei caratteri per i file di output. |
lineSep |
\n |
Stringa separatore di riga utilizzata tra i record. Si applica al testo (DataFrameWriter). |
XML
| Chiave | Predefinito | Description |
|---|---|---|
arrayElementName |
item |
Nome dell'elemento per gli elementi della matrice che non hanno un nome esplicito. Si applica a xml (DataFrameWriter). |
attributePrefix |
_ |
Prefisso anteporto ai nomi di campo che corrispondono agli attributi XML. Si applica a xml (DataFrameWriter). |
compression |
none |
Codec di compressione da usare durante la scrittura. Valori validi: none, bzip2, gziplz4, snappy, deflate. zstd Si applica a xml (DataFrameWriter). |
dateFormat |
yyyy-MM-dd |
Formattare la stringa per i valori delle colonne di data. Si applica a xml (DataFrameWriter). |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
Stringa di dichiarazione XML scritta all'inizio di ogni file di output. Impostare su una stringa vuota per eliminare la dichiarazione. Si applica a xml (DataFrameWriter). |
encoding |
UTF-8 |
Codifica dei caratteri per i file di output. Si applica a xml (DataFrameWriter). |
indent |
4 spazi | Stringa utilizzata per impostare il rientro degli elementi figlio nell'output. Impostare su una stringa vuota per disattivare il rientro e scrivere ogni riga su una singola riga. |
locale |
en-US |
Un identificatore java.util.Locale. Influenza la formattazione dei valori di data e timestamp durante la scrittura. |
nullValue |
null |
Stringa scritta per i valori Null. Se impostato su null, gli attributi e gli elementi figlio per i campi Null vengono omessi. Si applica a xml (DataFrameWriter). |
rootTag |
ROWS |
Tag dell'elemento radice che esegue il wrapping di tutti gli elementi di riga nell'output. Si applica a xml (DataFrameWriter). |
rowTag |
ROW |
Tag dell'elemento che rappresenta una riga nell'output. Si applica a xml (DataFrameWriter). |
singleVariantColumn |
None | Nome della singola colonna Variant da scrivere in file XML. Si applica a xml (DataFrameWriter). |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
Stringa di formato per i valori di colonna timestamp. Si applica a xml (DataFrameWriter). |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Stringa di formato per timestamp senza valori di colonna fuso orario. Si applica a xml (DataFrameWriter). |
validateName |
true |
Indica se generare un'eccezione se un nome di colonna non è un identificatore di elemento XML valido. Si applica a xml (DataFrameWriter). |
valueTag |
_VALUE |
Nome del campo utilizzato per i dati di tipo carattere negli elementi XML che dispongono anche di attributi o elementi figlio. Si applica a xml (DataFrameWriter). |
Opzioni dataStreamWriter
Usare queste opzioni con DataStreamWriter.option() per configurare le scritture di streaming.
Example
Nell'esempio seguente viene impostata la posizione del checkpoint per un flusso:
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
Scala
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
Comune
| Chiave | Predefinito | Description |
|---|---|---|
checkpointLocation |
Nessuno (obbligatorio) | Percorso della directory del checkpoint per la query di streaming. Obbligatorio per garantire la tolleranza di errore e le garanzie di elaborazione esattamente una volta. Ogni query di streaming deve usare un percorso di checkpoint univoco. Databricks consiglia di archiviare i checkpoint in un volume di Unity Catalog o in un percorso di archiviazione cloud. Consulta Checkpoint di Structured Streaming. |
path |
None | Percorso di output per sink di streaming basati su file, ad esempio Parquet. Si applica solo ai formati basati su file. |
Sink della console
| Chiave | Predefinito | Description |
|---|---|---|
numRows |
20 |
Numero di righe da visualizzare per ogni micro batch durante la scrittura nel sink della console. |
truncate |
true |
Indica se troncare stringhe lunghe durante la visualizzazione delle righe. Impostare su per false visualizzare i valori stringa completi. |
Delta Lake
Quando si scrive un flusso in una tabella Delta Lake tramite , si applicano le opzioni seguenti.format("delta") Le opzioni di sola sovrascrittura, overwriteSchemaad esempio , replaceWheree partitionOverwriteMode non sono supportate per le scritture in streaming.
| Chiave | Predefinito | Description |
|---|---|---|
mergeSchema |
false |
Se evolvere lo schema della tabella Delta Lake quando il dataframe di streaming contiene nuove colonne. Si applica solo alla modalità di output di accodamento. Si applica allo schema della tabella di aggiornamento. |
userMetadata |
None | Stringa definita dall'utente aggiunta ai metadati di commit per l'operazione di scrittura. Visibile nell'output di DESCRIBE HISTORY. Si applica alle tabelle Enrich con metadati personalizzati. |
Sink di file
L'opzione seguente si applica quando si scrive un flusso in formati basati su file (Parquet, JSON, CSV, ORC, text). Per le opzioni specifiche del formato, vedere Opzioni DataFrameWriter.
| Chiave | Predefinito | Description |
|---|---|---|
retention |
None | Per quanto tempo conservare i file di metadati sink usati per la tolleranza di errore e la compattazione. Accetta una stringa temporale, 7 days ad esempio o 24 hours. Quando non è impostato, i file di metadati vengono conservati a tempo indeterminato. |
Sink Kafka
Per un elenco completo delle opzioni per la scrittura di flussi in Kafka, vedere Opzioni.
| Chiave | Predefinito | Description |
|---|---|---|
kafka.bootstrap.servers |
None | Obbligatorio. Elenco delimitato da virgole degli indirizzi del broker host:port Kafka. |
topic |
None | Argomento Kafka di destinazione per tutte le righe. Obbligatorio se il dataframe non include una topic colonna. |
kafka.* |
None | Qualsiasi configurazione del producer Kafka preceduta da kafka.. Ad esempio: kafka.compression.type. |
Sink di memoria
| Chiave | Predefinito | Description |
|---|---|---|
queryName |
Nessuno (obbligatorio) | Nome della tabella in memoria in cui scrive la query. Obbligatorio per il sink di memoria. Configurabile anche tramite .queryName(). |
mode |
exactlyonce |
Garanzia di recapito per il sink di memoria.
exactlyonce usa la modalità micro batch con semantica exactly-once.
atleastonce usa la modalità continua con semantica at-least-once. Valori validi: exactlyonce, atleastonce. |
Opzioni della funzione Spark
Alcune funzioni predefinite di Spark SQL accettano una options mappa che controlla il comportamento di analisi o serializzazione. Passare le opzioni come Python dict o scala Map[String, String].
Example
L'esempio seguente analizza una colonna JSON durante l'eliminazione di record in formato non valido:
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Le funzioni Avro accettano le stesse opzioni delle opzioni del dataframe corrispondenti:
-
from_avroeschema_of_avrousare le opzioni DataFrameReader Avro. -
to_avrousa le opzioni Avro dataFrameWriter.
Example
L'esempio seguente decodifica una colonna Avro con l'evoluzione dello schema abilitata:
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
Inoltre, le varianti del Registro schemi di from_avro e to_avro accettano le opzioni seguenti:
| Chiave | Predefinito | Description |
|---|---|---|
schemaId |
None | ID schema dal Registro di sistema dello schema confluent da usare per decodificare i dati Avro codificati con uno schema incompatibile con jsonFormatSchema. Si applica solo a from_avro . |
confluent.schema.registry.* |
None | Proprietà di configurazione del client del Registro di sistema dello schema confluent. Passare qualsiasi proprietà client di Confluent SR usando questo prefisso, ad esempio confluent.schema.registry.basic.auth.user.info per le credenziali di autenticazione di base. Obbligatorio per le varianti del Registro di sistema dello schema di from_avro e to_avro. |
CSV
Le funzioni CSV accettano le stesse opzioni delle opzioni del dataframe corrispondenti:
-
from_csveschema_of_csvusare le opzioni CSV DataFrameReader. -
to_csvusa le opzioni CSV DataFrameWriter.
Example
L'esempio seguente legge CSV con un separatore personalizzato e NULL un valore:
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
Le funzioni JSON accettano le stesse opzioni delle opzioni del dataframe corrispondenti:
-
from_jsoneschema_of_jsonusare le opzioni JSON di DataFrameReader. -
to_jsonusa le opzioni JSON DataFrameWriter.
Example
L'esempio seguente scrive JSON con NULL campi ignorati e piuttosto formattati abilitati:
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf e to_protobuf non usano un'origine dati basata su file. I dati protobuf vengono sempre letti e scritti come colonne binarie usando queste funzioni. Le opzioni vengono passate come e Map[String, String] fanno distinzione tra maiuscole e minuscole.
Example
L'esempio seguente decodifica una colonna Protobuf usando la modalità PERMISSIVE:
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Le funzioni Protobuf usano le opzioni seguenti:
| Chiave | Predefinito | Description |
|---|---|---|
mode |
FAILFAST |
Come gestire i record danneggiati.
FAILFAST genera un'eccezione.
PERMISSIVE imposta campi in formato non valido su Null. Valori validi: FAILFAST, PERMISSIVE. Si applica a from_protobuf. |
recursive.fields.max.depth |
-1 (disabilitato) |
Profondità massima di ricorsione per i campi Protobuf ricorsivi. Impostare su 0 per disattivare il supporto dei campi ricorsivi. Valori validi: 0 per 10. Si applica a from_protobuf. |
convert.any.fields.to.json |
false |
Indica se convertire i campi Protobuf Any in una stringa JSON anziché in un oggetto STRUCT. Si applica a from_protobuf. |
emit.default.values |
false |
Indica se generare campi con valori zero o predefiniti (semantica proto3). Quando false, i campi con valori predefiniti vengono omessi dall'output. Si applica a from_protobuf. |
enums.as.ints |
false |
Indica se eseguire il rendering dei campi enum come valori interi anziché come stringhe. Si applica a from_protobuf. |
upcast.unsigned.ints |
false |
Se eseguire l'upcast uint32 a Long e uint64 per evitare Decimal(20,0) l'overflow di integer. Si applica a from_protobuf. |
unwrap.primitive.wrapper.types |
false |
Indica se annullare il wrapping google.protobuf dei tipi wrapper (ad esempio e Int32ValueStringValue) nei tipi Spark primitivi corrispondenti. Si applica a from_protobuf. |
retain.empty.message.types |
false |
Indica se mantenere i tipi di messaggio Protobuf vuoti nello schema di output inserendo una colonna fittizia. Si applica a from_protobuf. |
schema.registry.subject |
None | Nome soggetto registro schemi. Obbligatorio quando si usano le varianti del Registro di sistema dello schema di from_protobuf e to_protobuf. |
schema.registry.address |
None | Indirizzo del Registro di sistema dello schema (host e porta). Obbligatorio quando si usano le varianti del Registro di sistema dello schema di from_protobuf e to_protobuf. |
schema.registry.protobuf.name |
None | Specifica il messaggio Protobuf da usare quando l'oggetto del Registro di sistema dello schema contiene più messaggi. Optional. |
XML
Le funzioni XML accettano le stesse opzioni delle opzioni del dataframe corrispondenti:
-
from_xmleschema_of_xmlusare le opzioni XML dataframereader. -
to_xmlutilizza le opzioni XML DataFrameWriter.
Example
Nell'esempio seguente viene scritto xml con tag radice e riga personalizzati:
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))