矢量搜索Python SDK 示例用法

此笔记本演示如何使用矢量搜索Python SDK,该 SDK 提供 VectorSearchClient 作为使用矢量搜索的主要 API。

或者,可以直接调用 REST API。

要求

此笔记本假定存在名为 databricks-gte-large-en “模型服务”终结点。 若要创建该端点,请参阅笔记文件 使用马赛克 AI 模型服务调用 GTE 嵌入模型

%pip install --upgrade --force-reinstall databricks-vectorsearch langchain
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient()
help(VectorSearchClient)

将玩具数据集加载到源 Delta 表中

下面将创建源 Delta 表。


# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.

catalog_name = "main"
schema_name = "default"

source_table_name = "en_wiki"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment if you want to start from scratch.

# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
source_df.write.format("delta").option("delta.enableChangeDataFeed", "true").saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))

创建矢量搜索终结点

vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
    name=vector_search_endpoint_name,
    endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
endpoint = vsc.get_endpoint(
  name=vector_search_endpoint_name)
endpoint

创建矢量索引

# Vector index
vs_index = "en_wiki_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"

embedding_model_endpoint = "databricks-gte-large-en"
index = vsc.create_delta_sync_index(
  endpoint_name=vector_search_endpoint_name,
  source_table_name=source_table_fullname,
  index_name=vs_index_fullname,
  pipeline_type='TRIGGERED',
  primary_key="id",
  embedding_source_column="text",
  embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()

获取向量索引

使用 get_index() 通过向量索引名称检索向量索引对象。 还可以在 describe() 索引对象上查看索引的配置信息的摘要。

index = vsc.get_index(endpoint_name=vector_search_endpoint_name, index_name=vs_index_fullname)

index.describe()
# Wait for index to come online. Expect this command to take several minutes.
import time
while not index.describe().get('status').get('detailed_state').startswith('ONLINE'):
  print("Waiting for index to be ONLINE...")
  time.sleep(5)
print("Index is ONLINE")
index.describe()

查询矢量索引以查找类似的文档。

# Returns [col1, col2, ...]
# You can set this to any subset of the columns.
all_columns = spark.table(source_table_fullname).columns

results = index.similarity_search(
  query_text="Greek myths",
  columns=all_columns,
  num_results=2)

results
# Search with a filter. Note that the syntax depends on the endpoint type.

# Standard endpoint syntax
results = index.similarity_search(
  query_text="Greek myths",
  columns=all_columns,
  filters={"id NOT": ("13770", "88231")},
  num_results=2)

# Storage-optimized endpoint syntax
# results = index.similarity_search(
#   query_text="Greek myths",
#   columns=all_columns,
#   filters='id NOT IN ("13770", "88231")',
#   num_results=2)

results

将结果转换为 LangChain 文档

检索到的第一列将加载到 page_content 中,其余列将加载到元数据中。

from langchain_core.documents import Document
from typing import List

def convert_vector_search_to_documents(results) -> List[Document]:
  column_names = []
  for column in results["manifest"]["columns"]:
      column_names.append(column)

  langchain_docs = []
  for item in results["result"]["data_array"]:
      metadata = {}
      score = item[-1]
      # print(score)
      i = 1
      for field in item[1:-1]:
          # print(field + "--")
          metadata[column_names[i]["name"]] = field
          i = i + 1
      doc = Document(page_content=item[0], metadata=metadata)  # , 9)
      langchain_docs.append(doc)
  return langchain_docs

langchain_docs = convert_vector_search_to_documents(results)

langchain_docs

删除向量索引

vsc.delete_index(index_name=vs_index_fullname)

示例笔记本

矢量搜索Python SDK 示例用法

获取笔记本