教程:构建一个地理空间管道,并使用原生空间类型

您将创建并部署一个管道,使用 Lakeflow Spark 声明式管道(SDP)和 Auto Loader 进行数据编排,以摄取 GPS 数据、将坐标转换为原生空间类型,并与仓库地理围栏进行关联,以跟踪到达情况。 本教程使用 Databricks 本机空间类型(GEOMETRYGEOGRAPHY)和内置空间函数(例如ST_PointST_GeomFromWKT,以及ST_Contains),以便无需外部库即可大规模运行地理空间工作流。

在本教程中,你将:

  • 在 Unity Catalog 数据卷中创建一个管道,并生成 GPS 和地理围栏示例数据。
  • 使用 Auto Loader 将原始 GPS 数据增量导入青铜级流式处理表。
  • 构建一个银级流式处理表,将经纬度转换为原生 GEOMETRY 点类型。
  • 从 WKT 多边形创建仓库地理栅栏的物化视图。
  • 执行空间连接,生成仓库到达记录表(即哪个设备进入了哪个地理围栏)。

最终形成一种“奖章式”管道:青铜级(原始 GPS)、银级(作为几何体的点)和金级 (地理围栏和到达事件)。 更多信息请参阅什么是勋章式湖屋架构?

要求

若要完成此教程,必须满足以下要求:

步骤 1:创建管道

创建新的 ETL 管道,并为表设置默认目录和架构。

  1. 在工作区中,单击“加号”图标。在边栏中新建,然后选择“ETL 管道”。 这将打开包含默认管道名称的管道编辑器,如下所示 New Pipeline <date> <time>

  2. 选择名称并输入描述性名称,例如 Spatial pipeline tutorial

  3. 在名称右侧,单击目录和架构以选择您具有写入权限的默认项。

    如果未在代码中指定目录或架构,则默认使用此目录和架构。 用您在此处选择的值替换以下步骤中的<catalog><schema>

  4. (可选)在为你创建的 my_transformation 源文件中,从语言下拉列表中选择 PythonSQL 以设置文件的语言。

  5. 单击 “代码”图标。使用示例代码

Lakeflow 管道编辑器打开时,您的管道中会包含示例文件。 接下来,创建示例 GPS 和地理围栏数据。

步骤 2:创建示例 GPS 和地理围栏数据

此步骤将在一个卷中生成示例数据:原始 GPS 定位数据(JSON)和仓库地理围栏(包含 WKT 多边形的 JSON)。 GPS 坐标点生成在与两个仓库多边形重叠的边界框内,因此后续步骤中的空间连接将返回到达记录。 如果卷或表中已有自己的数据,则可以跳过此步骤。

  1. 在 Lakeflow 管道编辑器的资产浏览器中,单击 加号图标添加,然后选择 探索

  2. “名称Setup spatial data”设置为“,选择”Python“,并保留默认目标文件夹。

  3. 单击 “创建”

  4. 在新笔记本中,粘贴以下代码。 将 <catalog><schema> 替换为您在步骤 1 中设置的默认目录和模式。

    在笔记本中使用以下代码生成 GPS 和地理围栏数据。

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. 运行笔记本单元格(Shift + Enter)。

运行完成后,该卷将包含 gps(原始定位数据)和 geofences(WKT 格式的多边形)。 在下一步中,您将 GPS 数据导入到 Bronze 表中。

步骤 3:将 GPS 数据引入青铜流数据表

使用 Auto Loader 增量导入卷中的原始 GPS JSON 数据,并写入 Bronze 流式表。

  1. 在资产浏览器中,点击加号图标。添加,然后点击转换

  2. 名称gps_bronze设置为,选择 SQLPython,然后单击“创建”。

  3. 将文件内容替换为以下内容(使用与语言匹配的选项卡)。 将<catalog><schema>替换为默认目录和架构。

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. 单击“ 播放”图标。运行文件运行管道 以运行更新。

更新完成后,管道图会显示 gps_bronze 表。 接下来,添加一个将坐标转换为本机几何点的银级表。

步骤 4:添加包含几何点的银级流式表

创建一个流式表,该表从铜级表读取数据,并使用 ST_Point(longitude, latitude) 添加 GEOMETRY 列。

  1. 在资产浏览器中,点击加号图标。添加,然后点击转换

  2. 名称raw_gps_silver设置为,选择 SQLPython,然后单击“创建”。

  3. 将以下代码粘贴到新文件中。

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. 单击“ 播放”图标。运行文件运行管道

管道图现在显示 gps_bronzeraw_gps_silver。 接下来,将仓库地理围栏作为具体化视图添加。

步骤 5:创建仓库地理围栏黄金表

创建一个具体化视图,该视图从数据集读取地理围栏,并使用 ST_GeomFromWKT 将 WKT 列转换为 GEOMETRY 列。

  1. 在资产浏览器中,点击加号图标。添加,然后点击转换

  2. 名称warehouse_geofences_gold设置为,选择 SQLPython,然后单击“创建”。

  3. 粘贴以下代码。 将<catalog><schema>替换为默认目录和架构。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. 单击“ 播放”图标。运行文件运行管道

现在,管道中已包含地理围栏表。 接下来,添加空间连接以计算仓库收货。

步骤 6:使用空间联接创建仓库到达表

添加一个物化视图,通过 ST_Contains(boundary_geom, point_geom) 将银色 GPS 点与地理围栏联接,以确定设备何时处于仓库多边形内。

  1. 在资产浏览器中,点击加号图标。添加,然后点击转换

  2. 名称warehouse_arrivals设置为,选择 SQLPython,然后单击“创建”。

  3. 粘贴以下代码。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. 单击“ 播放”图标。运行文件运行管道

更新完成后,管道图将显示所有四个数据集: gps_bronzeraw_gps_silverwarehouse_geofences_goldwarehouse_arrivals

验证空间联接

确认空间连接生成了行:位于地理围栏内的“silver”表中的点会出现在 warehouse_arrivals 中。 在笔记本或 SQL 编辑器中运行以下命令之一(使用与管道目标相同的目录和架构)。

按仓库计算到达次数(SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

应该看到 Warehouse_AWarehouse_B 的非零计数(示例 GPS 数据与这两个多边形重叠)。 用于检查的示例行:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

在 Python(笔记本)中的相同验证:

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

如果在warehouse_arrivals中看到行,说明ST_Contains(boundary_geom, point_geom)连接正常工作。

步骤 7:计划管道(可选)

为了在数据集中有新的 GPS 数据到达时保持管道的最新状态,请创建一个任务,按计划运行该管道。

  1. 在编辑器顶部,选择“ 计划 ”按钮。
  2. 如果出现“ 计划 ”对话框,请选择“ 添加计划”。
  3. (可选)为作业命名。
  4. 默认情况下,计划每天运行一次。 可以接受此项或自行设置。 选择 “高级 ”可设置特定时间; 通过更多选项 ,可以添加运行通知。
  5. 选择“ 创建 ”以应用计划。

有关作业运行的详细信息,请参阅 Lakeflow 作业的监视和可观测性

其他资源