Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Le tabelle sincronizzate consentono di gestire i dati lakehouse tramite Lakebase Postgres. Le tabelle del catalogo Unity vengono sincronizzate in Postgres in modo che le applicazioni possano eseguire query sui dati lakehouse direttamente con bassa latenza. Questo processo è comunemente noto come ETL inverso. Il lakehouse è ottimizzato per l'analisi e l'arricchimento, mentre Lakebase è progettato per carichi di lavoro operativi che richiedono query in stile ricerca veloce e coerenza transazionale.
Che cosa sono le tabelle sincronizzate?
Le tabelle sincronizzate consentono di gestire dati di livello di analisi da Unity Catalog tramite Lakebase Postgres, rendendoli disponibili per le applicazioni che necessitano di query a bassa latenza e transazioni ACID complete. Consentono di colmare il divario tra l'archiviazione analitica e i sistemi operativi mantenendo i dati pronti per la gestione in applicazioni in tempo reale.
Fonti supportate
Le tabelle sincronizzate supportano i tipi di origine del catalogo Unity seguenti:
- Tabelle Delta gestite ed esterne
- Tabelle Di Iceberg gestite ed esterne
- Viste e viste materializzate
Come funziona
Le tabelle sincronizzate di Databricks creano una copia gestita dei dati di Unity Catalog in Lakebase. Quando si crea una tabella sincronizzata, si ottiene:
- Tabella sincronizzata in Unity Catalog che fa riferimento alla pipeline di sincronizzazione
- Una tabella Postgres in Lakebase (di sola lettura, su cui è possibile eseguire query dalle applicazioni)
Ad esempio, è possibile sincronizzare tabelle gold, funzionalità ingegneriate o output ml da analytics.gold.user_profiles in una nuova tabella analytics.gold.user_profiles_syncedsincronizzata. In Postgres il nome dello schema del catalogo Unity diventa il nome dello schema Postgres, quindi viene visualizzato come gold.user_profiles_synced:
SELECT * FROM gold.user_profiles_synced WHERE user_id = 12345;
Le applicazioni si connettono con i driver Postgres standard ed eseguono query sui dati sincronizzati insieme al proprio stato operativo.
Avviso
Sebbene sia possibile modificare una tabella sincronizzata direttamente in Postgres, Azure Databricks consiglia di eseguire solo query di lettura per proteggere l'integrità dei dati con l'origine. Per le operazioni supportate sulle tabelle sincronizzate, vedere Operazioni consentite nelle tabelle sincronizzate in Postgres.
Le pipeline di sincronizzazione utilizzano le Lakeflow Spark Declarative Pipelines gestite per aggiornare continuamente sia la tabella sincronizzata di Unity Catalog che la tabella Postgres con le modifiche dalla tabella di origine. Ogni sincronizzazione può usare fino a 16 connessioni al database Lakebase.
Lakebase Postgres supporta fino a 1.000 connessioni simultanee con garanzie transazionali, in modo che le applicazioni possano leggere dati arricchiti gestendo al tempo stesso inserimenti, aggiornamenti ed eliminazioni nello stesso database.
Modalità di sincronizzazione
Scegliere la modalità di sincronizzazione corretta in base alle esigenze dell'applicazione:
| Modalità | Descrizione | Quando utilizzare | Prestazioni |
|---|---|---|---|
| Snapshot | Copia monouso di tutti i dati | Le modifiche >cambiano il 10% delle righe per ciclo o la sorgente non supporta CDF (viste, tabelle Iceberg) | 10 volte più efficiente se si modificano >10% di dati di origine |
| Attivato | Aggiornamenti pianificati eseguiti su richiesta o a intervalli | Le righe di origine cambiano in base a una frequenza nota. Gli inserimenti, gli aggiornamenti e le eliminazioni vengono propagati ogni volta che si aggiorna. | Buon rapporto costo/ritardo. Costoso se si eseguono <intervalli di 5 minuti |
| Continuo | Streaming in tempo reale con secondi di latenza | Le modifiche devono essere visualizzate in Lakebase quasi in tempo reale | Ritardo più basso, costo più alto. Intervalli minimi di 15 secondi |
Le modalità attivate e continue richiedono l'abilitazione del feed di dati delle modifiche (CDF) nella tabella di origine. Se CDF non è abilitato, verrà visualizzato un avviso nell'interfaccia utente con il comando esatto ALTER TABLE da eseguire. Per altre informazioni sul feed di dati delle modifiche, vedere Usare il feed di dati delle modifiche Delta Lake in Databricks.
Annotazioni
Le origini che non supportano CDF (ad esempio viste, viste materializzate e tabelle Iceberg) possono essere sincronizzate solo in modalità snapshot . Per la modalità snapshot, l'origine deve supportare SELECT *.
Casi d'uso di esempio
È possibile usare tabelle sincronizzate per i casi d'uso di gestione dei dati, ad esempio:
- Motori di personalizzazione che servono profili utente aggiornati alle app di Databricks
- Applicazioni che gestiscono stime del modello o valori di funzionalità calcolati nella lakehouse
- Dashboard rivolti ai clienti che servono gli indicatori KPI in tempo reale
- Servizi di rilevamento delle frodi che forniscono punteggi di rischio per un intervento immediato
- Strumenti di supporto che forniscono i record dei clienti arricchiti dai dati del lakehouse
Creare una tabella sincronizzata
Prerequisiti
È necessario:
- Un'area di lavoro di Databricks con Lakebase abilitata.
- Un progetto Lakebase (vedere Creare un progetto).
- Tabella del catalogo Unity da sincronizzare.
- Autorizzazioni per creare tabelle sincronizzate. È necessario USE_SCHEMA e CREATE_TABLE in qualsiasi schema usato.
Per le modalità attivate o continue , è necessario abilitare il feed di dati di modifica nella tabella di origine:
ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Per la pianificazione della capacità e la compatibilità dei tipi di dati, vedere Tipi di dati e compatibilità e pianificazione della capacità.
INTERFACCIA UTENTE
Passare a Catalogo nella barra laterale dell'area di lavoro e selezionare la tabella del catalogo Unity da sincronizzare.
Fare clic su Crea>tabella sincronizzata nella vista dettagli della tabella.
Nella finestra di dialogo Crea tabella sincronizzata :
Gli elenchi di catalogo e schema includono solo schemi del catalogo Unity in cui l'utente corrente ha USE_SCHEMA e CREATE_TABLE privilegi. Se non viene visualizzato uno schema previsto, confermare le autorizzazioni con l'amministratore del catalogo.
Nome tabella: immettere un nome per la tabella sincronizzata(viene creato nello stesso catalogo e schema della tabella di origine). In questo modo viene creata sia una tabella sincronizzata del catalogo Unity che una tabella Postgres su cui è possibile eseguire query.
Tipo di database: scegliere Lakebase Serverless (scalabilità automatica).
Modalità di sincronizzazione: scegliere Snapshot, Attivato o Continuo in base alle proprie esigenze (vedere le modalità di sincronizzazione precedenti).
Configurare le selezioni di progetto, ramo e database.
Verificare che la chiave primaria sia corretta (in genere rilevata automaticamente).
Importante
Le colonne nella chiave primaria non sono annullabili nella tabella sincronizzata. Le righe con valori Null nelle colonne chiave primaria vengono escluse dalla sincronizzazione.
(Facoltativo) Se due righe possono condividere la stessa chiave primaria nella tabella di origine, selezionare una chiave Timeseries per configurare la deduplicazione. Quando viene specificata una chiave timeseries, la tabella sincronizzata contiene solo la riga con il valore della chiave timeseries più recente per ogni chiave primaria. Per la modalità di guasto senza una chiave di serie temporale, consultare Chiavi duplicate.
Se hai scelto la modalità Triggered o Continua e non hai ancora abilitato il Feed dati delle modifiche, verrà visualizzato un avviso con il comando esatto da eseguire. Per domande sulla compatibilità dei tipi di dati, vedere Tipi di dati e compatibilità.
Fare clic su Crea per creare la tabella sincronizzata.
Monitorare la tabella sincronizzata in Catalog. La scheda Panoramica mostra lo stato di sincronizzazione, la configurazione, lo stato della pipeline e il timestamp dell'ultima sincronizzazione. Usare Sincronizzazione ora per l'aggiornamento manuale.
PYTHON SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.postgres import (
SyncedTable,
SyncedTableSyncedTableSpec,
SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy,
)
w = WorkspaceClient()
synced_table = w.postgres.create_synced_table(
synced_table=SyncedTable(spec=SyncedTableSyncedTableSpec(
source_table_full_name="main.sales.orders",
branch="projects/my-project/branches/production",
primary_key_columns=["order_id"],
scheduling_policy=SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.SNAPSHOT,
postgres_database="mydb",
create_database_objects_if_missing=True,
)),
synced_table_id="my-catalog.sales.orders",
).wait()
print(f"Synced table created: {synced_table.name}")
synced_table_id Usa il formato catalog.schema.table e diventa il nome della tabella sincronizzata di Unity Catalog. In Postgres la tabella {table} viene creata nello schema {schema}, all'interno del database impostato con postgres_database (qui, mydb).
JAVA SDK
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.*;
import java.util.List;
WorkspaceClient w = new WorkspaceClient();
SyncedTable syncedTable = w.postgres().createSyncedTable(
new CreateSyncedTableRequest()
.setSyncedTableId("my-catalog.sales.orders")
.setSyncedTable(new SyncedTable()
.setSpec(new SyncedTableSyncedTableSpec()
.setSourceTableFullName("main.sales.orders")
.setBranch("projects/my-project/branches/production")
.setPrimaryKeyColumns(List.of("order_id"))
.setSchedulingPolicy(SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.SNAPSHOT)
.setPostgresDatabase("mydb")
.setCreateDatabaseObjectsIfMissing(true))))
.waitForCompletion();
System.out.println("Synced table created: " + syncedTable.getName());
curva
curl -X POST "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables?synced_table_id=my-catalog.sales.orders" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"source_table_full_name": "main.sales.orders",
"branch": "projects/my-project/branches/production",
"primary_key_columns": ["order_id"],
"scheduling_policy": "SNAPSHOT",
"postgres_database": "mydb",
"create_database_objects_if_missing": true
}
}'
Viene restituita un'operazione a esecuzione prolungata. Eseguire il polling del campo restituito name fino a done: true. Vedere Operazioni a esecuzione prolungata. Per la configurazione dell'autenticazione, vedere Autenticazione.
Pianificare o attivare le sincronizzazioni successive
Alla creazione, lo snapshot iniziale viene eseguito automaticamente. Per le modalità snapshot e attivate , le sincronizzazioni successive devono essere attivate in modo esplicito. La modalità continua si gestisce automaticamente.
Attività di sincronizzazione della tabella del database nella pipeline
L'attività della pipeline di sincronizzazione delle tabelle del database nei Processi Lakeflow esegue la pipeline di una tabella sincronizzata come passaggio nel flusso di lavoro. Configurare l'attività con un trigger di aggiornamento tabella o una pianificazione.
Trigger per gli aggiornamenti delle tabelle di origine
Genera il compito quando viene aggiornata la tabella del Catalogo Unity di origine. Con la modalità attivata , vengono applicate in modo incrementale solo le nuove modifiche, offrendo aggiornamenti quasi in tempo reale senza il costo sempre attivo della modalità continua.
- Nella barra laterale fare clic su Flussi di lavoro.
- Fare clic su Crea processo o aprire un processo esistente.
- Nella scheda Attività fare clic su + Aggiungi un altro tipo di attività.
- In Inserimento e trasformazione selezionare Pipeline di sincronizzazione tabelle di database.
- Nel campo Pipeline selezionare la pipeline associata alla tabella sincronizzata.
- In Pianificazioni e trigger fare clic su Aggiungi trigger.
- Selezionare Aggiornamento tabella come tipo di trigger.
- In Tabelle selezionare la tabella del catalogo Unity di origine da monitorare.
- Fare clic su Salva.
Generare attività in base a una pianificazione
Esegue la sincronizzazione a cadenza fissa. Ideale per la modalità snapshot , in cui un aggiornamento completo notturno o settimanale è in genere il modello più efficiente.
- Seguire i passaggi da 1 a 5 precedenti per aggiungere un'attività della pipeline di sincronizzazione tabelle di database a un processo.
- In Pianificazioni e trigger fare clic su Aggiungi trigger.
- Scegli Pianificato come tipo di trigger.
- Impostare la pianificazione cron e il fuso orario, quindi fare clic su Salva.
Controllare lo stato di sincronizzazione
Per controllare lo stato corrente e l'ora dell'ultima sincronizzazione di una tabella sincronizzata:
INTERFACCIA UTENTE
In Catalogo passare alla tabella sincronizzata e selezionare la scheda Panoramica . Mostra lo stato di sincronizzazione corrente, lo stato della pipeline e il timestamp dell'ultima sincronizzazione.
PYTHON SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.postgres.get_synced_table("synced_tables/my-catalog.sales.orders")
print(f"State: {table.status.detailed_state}")
print(f"Last sync: {table.status.last_sync_time}")
print(f"Message: {table.status.message}")
JAVA SDK
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.SyncedTable;
WorkspaceClient w = new WorkspaceClient();
SyncedTable table = w.postgres().getSyncedTable("synced_tables/my-catalog.sales.orders");
System.out.println("State: " + table.getStatus().getDetailedState());
System.out.println("Last sync: " + table.getStatus().getLastSyncTime());
System.out.println("Message: " + table.getStatus().getMessage());
curva
curl "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}"
Tipi di dati e compatibilità
I tipi di dati di Unity Catalog vengono mappati ai tipi Postgres durante la creazione di tabelle sincronizzate. I tipi complessi (ARRAY, MAP, STRUCT) vengono archiviati come JSONB in Postgres.
| Tipo di colonna di origine | Tipo di colonna Postgres |
|---|---|
| BIGINT | BIGINT |
| BINARY | BYTEA |
| BOOLEAN | BOOLEAN |
| DATTERO | DATTERO |
| DECIMAL(p,s) | NUMERICO |
| DOPPIO | PRECISIONE DOPPIA |
| FLOAT | REALE |
| INT | INTEGER |
| INTERVAL | INTERVAL |
| SMALLINT | SMALLINT |
| filo | TESTO |
| TIMESTAMP | TIMESTAMP CON FUSO ORARIO |
| TIMESTAMP_NTZ | TIMESTAMP SENZA FUSO ORARIO |
| TINYINT | SMALLINT |
| ARRAY<tipoElemento> | JSONB |
| MAP<tipoChiave,tipoValore> | JSONB |
| STRUCT<NomeCampo:TipoCampo[, ...]> | JSONB |
Annotazioni
I tipi GEOGRAPHY, GEOMETRY, VARIANT e OBJECT non sono supportati.
Gestire caratteri non validi
Alcuni caratteri come byte Null (0x00) sono consentiti nelle colonne STRING, ARRAY, MAP o STRUCT del catalogo Unity, ma non supportate nelle colonne POSTGRES TEXT o JSONB. Ciò può causare errori di sincronizzazione con errori come:
ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text
- Il primo errore si verifica quando compare un byte nullo in una colonna stringa di primo livello, che esegue il mapping diretto a Postgres
TEXT. - Il secondo errore si verifica quando un byte Null viene visualizzato in una stringa annidata all'interno di un tipo complesso (
STRUCT,ARRAYoMAP), serializzato comeJSONB. Durante la serializzazione, tutte le stringhe vengono convertite in PostgresTEXT, dove\u0000non è consentito.
Soluzioni:
Purificare i campi stringa: rimuovere i caratteri non supportati prima della sincronizzazione. Per i byte Null nelle colonne STRING:
SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_tableConverti in BINARY: per le colonne STRING in cui è necessario conservare i byte non elaborati, convertire in tipo BINARY.
Pianificazione della capacità
Quando si pianifica l'implementazione delle tabelle sincronizzate, prendere in considerazione i requisiti delle risorse seguenti:
- Utilizzo connessione: ogni tabella sincronizzata usa fino a 16 connessioni al database Lakebase, che vengono conteggiate per il limite di connessione dell'istanza.
- Limiti di dimensioni: il limite totale delle dimensioni dei dati logici in tutte le tabelle sincronizzate è di 8 TB. Le singole tabelle non hanno limiti, ma Databricks consiglia di non superare 1 TB per le tabelle che richiedono aggiornamenti.
- Dimensioni di aggiornamento completo: quando si attiva un aggiornamento completo, la versione precedente in Postgres non viene eliminata fino al completamento della nuova sincronizzazione. Entrambe le versioni vengono conteggiate temporaneamente al limite di dimensioni del database logico durante l'aggiornamento.
- Tabelle per origine: una singola tabella di origine può avere fino a 20 tabelle sincronizzate.
-
Requisiti di denominazione: i nomi di database, schema e tabella possono contenere solo caratteri alfanumerici e caratteri di sottolineatura (
[A-Za-z0-9_]+). - Indicazioni sull'identificatore di origine: evitare di usare lettere maiuscole o caratteri speciali nei nomi di colonna o tabella nella tabella del catalogo Unity di origine. Se li si mantiene, è necessario citare tali identificatori quando si fa riferimento a tali identificatori in Postgres.
- Evoluzione dello schema: sono supportate solo le modifiche dello schema additive (ad esempio l'aggiunta di colonne) per le modalità attivate e continue.
- Chiavi duplicate: se due righe hanno la stessa chiave primaria nella tabella di origine, la pipeline di sincronizzazione ha esito negativo a meno che non si configuri la deduplicazione usando una chiave timeseries.
- Idempotenza delle API: le API delle tabelle sincronizzate sono idempotenti, quindi, in caso di errori temporanei, ripetere il tentativo per garantire operazioni puntuali.
- Frequenza di aggiornamento: per la scalabilità automatica di Lakebase la pipeline di sincronizzazione supporta scritture continue e attivate a circa 150 righe al secondo per unità di capacità (CU) e scritture snapshot fino a 2.000 righe al secondo per cu.
Operazioni consentite nelle tabelle sincronizzate in Postgres
Azure Databricks consiglia di eseguire solo le operazioni seguenti in Postgres per le tabelle sincronizzate per evitare sovrascrizioni accidentali o incoerenze di dati:
- Query di sola lettura
- Creazione di indici
- Eliminazione della tabella (per liberare spazio dopo la rimozione della tabella sincronizzata dal catalogo unity)
Anche se è possibile modificare le tabelle sincronizzate in Postgres in altri modi, interferisce con la pipeline di sincronizzazione.
Proprietà e autorizzazioni
Se si crea un nuovo database, uno schema o una tabella Postgres, la proprietà postgres viene impostata come segue:
- La proprietà viene assegnata all'utente che crea il database, lo schema o la tabella, se il relativo account di accesso Azure Databricks esiste come ruolo in Postgres. Per aggiungere un ruolo di identità di Azure Databricks in Postgres, vedere Ruoli di Postgres.
- In caso contrario, la proprietà viene assegnata al proprietario dell'oggetto padre in Postgres (in genere ).
databricks_superuser
Gestire l'accesso alle tabelle sincronizzate
Dopo aver creato una tabella sincronizzata, può databricks_superuser leggere una tabella sincronizzata da Postgres. Il databricks_superuser ha pg_read_all_data, che consente a questo ruolo di leggere da tutte le tabelle. Ha anche il pg_write_all_data privilegio, che consente a questo ruolo di scrivere su tutte le tabelle. Ciò significa che un databricks_superuser oggetto può anche scrivere in una tabella sincronizzata in Postgres. Lakebase supporta questo comportamento di scrittura nel caso in cui sia necessario apportare modifiche urgenti nella tabella di destinazione. Tuttavia, Azure Databricks consiglia di apportare correzioni nella tabella di origine.
Il
databricks_superuserpuò anche concedere questi privilegi ad altri utenti:GRANT USAGE ON SCHEMA synced_table_schema TO user;GRANT SELECT ON synced_table_name TO user;databricks_superuserpuò revocare questi privilegi:REVOKE USAGE ON SCHEMA synced_table_schema FROM user;REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
Gestire le operazioni delle tabelle sincronizzate
databricks_superuser Può gestire gli utenti autorizzati a eseguire operazioni specifiche su una tabella sincronizzata. Le operazioni supportate per le tabelle sincronizzate sono:
CREATE INDEXALTER INDEXDROP INDEXDROP TABLE
Tutte le altre operazioni DDL vengono negate per le tabelle sincronizzate.
Per concedere questi privilegi ad altri utenti, è databricks_superuser necessario innanzitutto creare un'estensione in databricks_auth:
CREATE EXTENSION IF NOT EXISTS databricks_auth;
databricks_superuser Può quindi aggiungere un utente per gestire una tabella sincronizzata:
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
Può databricks_superuser rimuovere un utente dalla gestione di una tabella sincronizzata:
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
Il sistema databricks_superuser può visualizzare tutti i responsabili:
SELECT * FROM databricks_synced_table_managers;
Eliminare una tabella sincronizzata
L'eliminazione di una tabella sincronizzata da Unity Catalog elimina anche la tabella Postgres corrispondente.
INTERFACCIA UTENTE
In Catalogo individuare la tabella sincronizzata, fare clic selezionare Elimina.
PYTHON SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.postgres.delete_synced_table("synced_tables/my-catalog.sales.orders").wait()
JAVA SDK
import com.databricks.sdk.WorkspaceClient;
WorkspaceClient w = new WorkspaceClient();
w.postgres().deleteSyncedTable("synced_tables/my-catalog.sales.orders").waitForCompletion();
curva
curl -X DELETE "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}"
Ulteriori informazioni
| Attività | Descrizione |
|---|---|
| Creare un progetto | Configurare un progetto Lakebase |
| Connettersi al database | Informazioni sulle opzioni di connessione per Lakebase |
| Registrare il database nel catalogo unity | Rendere visibili i dati di Lakebase in Unity Catalog per una governance unificata e le query tra più origini. |
| Integrazione del catalogo Unity | Informazioni sulla governance e sulle autorizzazioni |
Integrazione del catalogo
- Duplicazione del catalogo: La creazione di una tabella sincronizzata in un catalogo standard destinata a un database Postgres registrato anche come catalogo di database separato fa sì che la tabella sincronizzata venga visualizzata in Unity Catalog sia con i cataloghi standard che con i cataloghi di database.
Altre opzioni
Per la sincronizzazione dei dati in sistemi non Databricks, vedere Soluzioni ETL inverse di Partner Connect , ad esempio Census o Hightouch.