Controllo della concorrenza per le tabelle Delta

Quando più notebook, pipeline o processi Spark di Fabric scrivono contemporaneamente nella stessa tabella Delta, Delta Lake usa il controllo della concorrenza ottimistica (OCC) per garantire la coerenza della tabella. Ogni transazione legge uno snapshot, scrive nuovi file, quindi verifica che nel frattempo non sia avvenuto alcun commit in conflitto. Se viene rilevato un conflitto, la transazione ha esito negativo con un'eccezione anziché danneggiare i dati.

Questo articolo illustra i modelli pratici per la gestione delle scritture simultanee in Fabric. Per una specifica completa del protocollo OCC, vedere Controllo della concorrenza Delta Lake (documentazione open source).

Livelli di isolamento

Tutte le tabelle Delta usano il livello di isolamento serializzabile . Serializzabile è il livello più rigoroso e l'unico supportato. Garantisce che il risultato delle transazioni simultanee sia identico a un ordine di esecuzione sequenziale.

Delta Lake usa anche un livello snapshotisolation interno per le operazioni che non modificano i dati logici , ad esempio OPTIMIZE. SnapshotIsolation salta il controllo delle aggiunte concorrenti, consentendo alla compattazione di procedere senza entrare in conflitto con inserimenti concorrenti. Non si configura direttamente SnapshotIsolation. Delta Lake lo applica automaticamente quando appropriato.

Con l'isolamento Serializable, un append cieco concorrente (INSERT INTO) può entrare in conflitto con un MERGE o UPDATE che legge la stessa partizione.

Quali operazioni sono in conflitto

Non tutte le scritture simultanee sono in conflitto. Il fattore chiave è se due operazioni toccano gli stessi file sottostanti.

Coppia concorrente Conflitto? Perché
Due INSERT operazioni (aggiunta) No Ognuno aggiunge nuovi file senza leggere quelli esistenti (accodamento cieco).
INSERT + OPTIMIZE No OPTIMIZE esegue il commit in SnapshotIsolation perché non modifica i dati logici, quindi ignora completamente il controllo di accodamento simultaneo. Le operazioni di append aggiungono nuovi file che non si sovrappongono ai file in fase di compattazione.
Due UPDATEoperazioni , DELETEo MERGE Sì, se leggono o modificano file sovrapposti Ciascun writer riscrive i file, quindi lo snapshot del secondo writer è obsoleto.
OPTIMIZE + UPDATE/DELETE/MERGE Sì, se toccano gli stessi file OPTIMIZE rimuove e legge i file (con dataChange=false). Se un'operazione di modifica dei dati legge anche gli stessi file, viene generato un oggetto ConcurrentDeleteReadException .
Due OPTIMIZE esecuzioni Sì, se selezionano gli stessi file Entrambi tentano di rimuovere e riscrivere lo stesso insieme di file, innescando un ConcurrentDeleteDeleteException.
INSERT + MERGE/UPDATE/DELETE Sì, se l'operazione di modifica dei dati legge la stessa partizione In Serializable, un accodamento cieco può essere in conflitto con modifiche simultanee ai dati se l'operazione legge una partizione a cui ha scritto l'accodamento.

Tip

Le pipeline di sola aggiunta (INSERT INTO, df.write.mode("append")) sono il modo più semplice per evitare del tutto i conflitti. Se il carico di lavoro può aggiungere dati in coda prima e riconciliarli in un secondo momento, si elimina la contenzione tra scritture.

Isolare i writer tramite partizionamento

Il modo più comune per eseguire DML simultaneo sulla stessa tabella senza conflitti consiste nel partizionare la tabella in base alla colonna che separa i writer, quindi includere tale colonna in ogni condizione dell'operazione. Quando ogni writer scrive su una partizione diversa, le operazioni interessano insiemi distinti di file e non entrano in conflitto.

Uno scenario tipico: più pipeline elaborano ciascuna i dati di una diversa business unit o di un tenant diverso. Suddividere in base a quella dimensione e aggiungere l'elemento MERGE di ogni pipeline alla relativa partizione.

-- Each pipeline targets its own partition, so concurrent runs don't conflict
MERGE INTO events AS target
USING staged AS source
ON target.event_id = source.event_id
    AND target.business_unit = 'EMEA'
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Importante

La colonna della partizione deve essere visualizzata nella condizione di unione stessa, non solo nei dati di origine. Senza di esso, Delta Lake non è in grado di determinare in fase di convalida che le due operazioni hanno toccato set di file non contigui e il controllo dei conflitti considera l'operazione come una lettura di tabella completa.

Per altre informazioni sulle strategie di partizionamento, vedere Partizionamento per le tabelle Delta.

Ripetizione del commit integrata

Delta Lake ritenta automaticamente un commit quando rileva che viene eseguito il commit di un'altra transazione per prima. A ogni tentativo, legge il commit vincente, esegue il controllo dei conflitti e, se non esiste alcun conflitto logico, tenta nuovamente il commit nella successiva versione disponibile. Questo processo viene ripetuto in modo trasparente senza alcuna azione dal codice.

Un conflitto logico (ad esempio, due operazioni che riscrivono lo stesso file) non può essere risolto automaticamente. Il tentativo solleva una delle eccezioni elencate in Eccezioni di conflitto comuni. Tuttavia, molti conflitti di versione temporanei, ad esempio due accodamenti ciechi per lo stesso slot di versione, vengono risolti automaticamente e non vengono mai visualizzati nell'applicazione.

Eccezioni di conflitto comuni

Quando viene rilevato un conflitto, Delta Lake genera un'eccezione specifica. Comprendere l'eccezione visualizzata consente di identificare la causa radice.

Eccezione Cos'è successo
ConcurrentAppendException Un altro writer ha aggiunto dei file in una partizione (o un insieme di file) che l'operazione stava leggendo. È comune quando un MERGE viene eseguito su una partizione che riceve anche operazioni di inserimento da un'altra pipeline. Con isolamento Serializable, anche gli append ciechi (semplici operazioni INSERT) possono generare questa eccezione.
ConcurrentDeleteReadException Un altro writer ha eliminato o riscritto un file letto dall'operazione. Tipico quando OPTIMIZE compatta file che anche un processo in contemporanea UPDATE o MERGE stava leggendo, oppure quando due operazioni di modifica dei dati si sovrappongono sulle stesse righe.
ConcurrentDeleteDeleteException Entrambe le operazioni hanno tentato di eliminare o riscrivere lo stesso file. Spesso causato da esecuzioni sovrapposte OPTIMIZE o due pipeline che riscriveno la stessa partizione contemporaneamente.
ConcurrentWriteException Conflitto generico segnalato quando un'altra transazione è stata sottoposta a commit sulla stessa versione della tabella prima che fosse possibile eseguire la risoluzione del conflitto, ad esempio durante un aggiornamento da filesystem a commit gestiti.
MetadataChangedException Lo schema o le proprietà della tabella sono state modificate a metà transazione, ad esempio una scrittura simultanea ALTER TABLE o di evoluzione dello schema.
ConcurrentTransactionException Due query di Structured Streaming con la stessa posizione di checkpoint hanno scritto contemporaneamente nella tabella. Eliminare i duplicati nei processi di streaming o utilizzare percorsi di checkpoint distinti.
ProtocolChangedException Una transazione concorrente ha aggiornato o eseguito il downgrade del protocollo della tabella mentre la transazione corrente tentava anch'essa di modificare il protocollo. Può verificarsi anche quando una funzionalità di tabella viene eliminata contemporaneamente.

Strategie comuni per evitare conflitti di scrittura

Abilitare la compattazione automatica

La compattazione automatica viene eseguita in modo sincrono come parte delle operazioni di scrittura. La compattazione sincrona impedisce ai job di compattazione pianificati separatamente di sovrapporsi alle operazioni di modifica dei dati, causando così eccezioni di scrittura simultanea.

Pianificare la manutenzione all'esterno delle finestre di scrittura

OPTIMIZE e VACUUM possono essere in conflitto con le operazioni di modifica dei dati simultanee. In Fabric, pianifica processi del notebook o attività della pipeline per la compattazione delle tabelle e VACUUM durante periodi di bassa attività, ad esempio dopo il completamento dell'acquisizione notturna anziché durante l'acquisizione stessa.

Usare i modelli di accodamento e unione

Per l'ingestione ad alta concorrenza, carica i dati grezzi con scritture solo in append in una tabella di staging (nessun conflitto possibile), quindi esegui un singolo job MERGE per riconciliare i dati nella tabella di destinazione. Lo schema serializza l'operazione soggetta a conflitti mantenendo l'acquisizione dei dati completamente parallela.

Aggiungere la logica di ripetizione dei tentativi per i conflitti logici

Il meccanismo integrato di nuovo tentativo del commit gestisce automaticamente le collisioni di versione transitorie, ma i conflitti logici, in cui due operazioni si sovrappongono realmente, sollevano un'eccezione. Poiché Delta Lake non produce mai scritture parziali, una transazione non riuscita è sicura per riprovare a livello di applicazione. Nelle pipeline in cui si prevedono occasionali conflitti logici, implementare una logica di ritentativo attorno all’operazione di scrittura:

from delta.exceptions import ConcurrentAppendException
import time

# Retry with backoff on transient concurrent write conflicts
max_retries = 3
for attempt in range(max_retries):
    try:
        spark.sql("MERGE INTO target USING source ON ...")
        break
    except ConcurrentAppendException:
        if attempt < max_retries - 1:
            time.sleep(2 ** attempt)
        else:
            raise

Scegliere la strategia di layout corretta

Il clustering liquido e il partizionamento risolvono problemi diversi. Il clustering liquido ottimizza il layout dei file per le prestazioni di lettura. Il partizionamento crea limiti fisici che impediscono conflitti di scrittura simultanei. Se il carico di lavoro richiede entrambi, partiziona in base alla colonna di isolamento dello scrittore e usa Z-Order all'interno di ogni partizione per migliorare le prestazioni di lettura.