您将创建并部署一个管道,使用 Lakeflow Spark 声明式管道(SDP)和 Auto Loader 进行数据编排,以摄取 GPS 数据、将坐标转换为原生空间类型,并与仓库地理围栏进行关联,以跟踪到达情况。 本教程使用 Databricks 本机空间类型(GEOMETRY、GEOGRAPHY)和内置空间函数(例如ST_PointST_GeomFromWKT,以及ST_Contains),以便无需外部库即可大规模运行地理空间工作流。
在本教程中,你将:
- 在 Unity Catalog 数据卷中创建一个管道,并生成 GPS 和地理围栏示例数据。
- 使用 Auto Loader 将原始 GPS 数据增量导入青铜级流式处理表。
- 构建一个银级流式处理表,将经纬度转换为原生
GEOMETRY点类型。 - 从 WKT 多边形创建仓库地理栅栏的物化视图。
- 执行空间连接,生成仓库到达记录表(即哪个设备进入了哪个地理围栏)。
最终形成一种“奖章式”管道:青铜级(原始 GPS)、银级(作为几何体的点)和金级 (地理围栏和到达事件)。 更多信息请参阅什么是勋章式湖屋架构?。
要求
若要完成此教程,必须满足以下要求:
- 登录到 Azure Databricks 工作区。
- 为工作区启用 Unity 目录 。
- 若要使用 Serverless Lakeflow Spark 声明式管道(在配备 Unity Catalog 的工作区中默认启用),请确保您的工作区中已启用无服务器计算。 如果无法使用无服务器计算,这些步骤也可与您工作区的默认计算方式配合使用。
- 有权 创建计算资源 或访问计算资源。
- 有权 在目录中创建新架构。 所需的权限是
USE CATALOG和CREATE SCHEMA。 - 有权在现有架构中创建新卷。 所需的权限是
USE SCHEMA和CREATE VOLUME。 - 使用支持本机空间类型和空间函数的运行时。
- 有关创建、运行、刷新和查看管道及其输出所需的完整权限集,请参阅 管理管道的标识、权限和权限。
步骤 1:创建管道
创建新的 ETL 管道,并为表设置默认目录和架构。
在工作区中,单击
在边栏中新建,然后选择“ETL 管道”。 这将打开包含默认管道名称的管道编辑器,如下所示
New Pipeline <date> <time>。选择名称并输入描述性名称,例如
Spatial pipeline tutorial。在名称右侧,单击目录和架构以选择您具有写入权限的默认项。
如果未在代码中指定目录或架构,则默认使用此目录和架构。 用您在此处选择的值替换以下步骤中的
<catalog>和<schema>。(可选)在为你创建的
my_transformation源文件中,从语言下拉列表中选择 Python 或 SQL 以设置文件的语言。单击
使用示例代码。
Lakeflow 管道编辑器打开时,您的管道中会包含示例文件。 接下来,创建示例 GPS 和地理围栏数据。
步骤 2:创建示例 GPS 和地理围栏数据
此步骤将在一个卷中生成示例数据:原始 GPS 定位数据(JSON)和仓库地理围栏(包含 WKT 多边形的 JSON)。 GPS 坐标点生成在与两个仓库多边形重叠的边界框内,因此后续步骤中的空间连接将返回到达记录。 如果卷或表中已有自己的数据,则可以跳过此步骤。
在 Lakeflow 管道编辑器的资产浏览器中,单击
、添加,然后选择 探索。
将“名称
Setup spatial data”设置为“,选择”Python“,并保留默认目标文件夹。单击 “创建” 。
在新笔记本中,粘贴以下代码。 将
<catalog>和<schema>替换为您在步骤 1 中设置的默认目录和模式。在笔记本中使用以下代码生成 GPS 和地理围栏数据。
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")运行笔记本单元格(Shift + Enter)。
运行完成后,该卷将包含 gps(原始定位数据)和 geofences(WKT 格式的多边形)。 在下一步中,您将 GPS 数据导入到 Bronze 表中。
步骤 3:将 GPS 数据引入青铜流数据表
使用 Auto Loader 增量导入卷中的原始 GPS JSON 数据,并写入 Bronze 流式表。
在资产浏览器中,点击
添加,然后点击转换。
将名称
gps_bronze设置为,选择 SQL 或 Python,然后单击“创建”。将文件内容替换为以下内容(使用与语言匹配的选项卡)。 将
<catalog>和<schema>替换为默认目录和架构。SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )单击“
运行文件 或 运行管道 以运行更新。
更新完成后,管道图会显示 gps_bronze 表。 接下来,添加一个将坐标转换为本机几何点的银级表。
步骤 4:添加包含几何点的银级流式表
创建一个流式表,该表从铜级表读取数据,并使用 ST_Point(longitude, latitude) 添加 GEOMETRY 列。
在资产浏览器中,点击
添加,然后点击转换。
将名称
raw_gps_silver设置为,选择 SQL 或 Python,然后单击“创建”。将以下代码粘贴到新文件中。
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )单击“
运行文件 或 运行管道。
管道图现在显示 gps_bronze 和 raw_gps_silver。 接下来,将仓库地理围栏作为具体化视图添加。
步骤 5:创建仓库地理围栏黄金表
创建一个具体化视图,该视图从数据集读取地理围栏,并使用 ST_GeomFromWKT 将 WKT 列转换为 GEOMETRY 列。
在资产浏览器中,点击
添加,然后点击转换。
将名称
warehouse_geofences_gold设置为,选择 SQL 或 Python,然后单击“创建”。粘贴以下代码。 将
<catalog>和<schema>替换为默认目录和架构。SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )单击“
运行文件 或 运行管道。
现在,管道中已包含地理围栏表。 接下来,添加空间连接以计算仓库收货。
步骤 6:使用空间联接创建仓库到达表
添加一个物化视图,通过 ST_Contains(boundary_geom, point_geom) 将银色 GPS 点与地理围栏联接,以确定设备何时处于仓库多边形内。
在资产浏览器中,点击
添加,然后点击转换。
将名称
warehouse_arrivals设置为,选择 SQL 或 Python,然后单击“创建”。粘贴以下代码。
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )单击“
运行文件 或 运行管道。
更新完成后,管道图将显示所有四个数据集: gps_bronze、 raw_gps_silver、 warehouse_geofences_gold和 warehouse_arrivals。
验证空间联接
确认空间连接生成了行:位于地理围栏内的“silver”表中的点会出现在 warehouse_arrivals 中。 在笔记本或 SQL 编辑器中运行以下命令之一(使用与管道目标相同的目录和架构)。
按仓库计算到达次数(SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
应该看到 Warehouse_A 和 Warehouse_B 的非零计数(示例 GPS 数据与这两个多边形重叠)。 用于检查的示例行:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
在 Python(笔记本)中的相同验证:
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
如果在warehouse_arrivals中看到行,说明ST_Contains(boundary_geom, point_geom)连接正常工作。
步骤 7:计划管道(可选)
为了在数据集中有新的 GPS 数据到达时保持管道的最新状态,请创建一个任务,按计划运行该管道。
- 在编辑器顶部,选择“ 计划 ”按钮。
- 如果出现“ 计划 ”对话框,请选择“ 添加计划”。
- (可选)为作业命名。
- 默认情况下,计划每天运行一次。 可以接受此项或自行设置。 选择 “高级 ”可设置特定时间; 通过更多选项 ,可以添加运行通知。
- 选择“ 创建 ”以应用计划。
有关作业运行的详细信息,请参阅 Lakeflow 作业的监视和可观测性 。