此笔记本演示如何使用矢量搜索Python SDK,该 SDK 提供 VectorSearchClient 作为使用矢量搜索的主要 API。
或者,可以直接调用 REST API。
要求
此笔记本假定存在名为 databricks-gte-large-en “模型服务”终结点。 若要创建该端点,请参阅笔记文件 使用马赛克 AI 模型服务调用 GTE 嵌入模型。
%pip install --upgrade --force-reinstall databricks-vectorsearch langchain
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()
help(VectorSearchClient)
将玩具数据集加载到源 Delta 表中
下面将创建源 Delta 表。
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog_name = "main"
schema_name = "default"
source_table_name = "en_wiki"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment if you want to start from scratch.
# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
source_df.write.format("delta").option("delta.enableChangeDataFeed", "true").saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))
创建矢量搜索终结点
vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
endpoint = vsc.get_endpoint(
name=vector_search_endpoint_name)
endpoint
创建矢量索引
# Vector index
vs_index = "en_wiki_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
embedding_model_endpoint = "databricks-gte-large-en"
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()
获取向量索引
使用 get_index() 通过向量索引名称检索向量索引对象。 还可以在 describe() 索引对象上查看索引的配置信息的摘要。
index = vsc.get_index(endpoint_name=vector_search_endpoint_name, index_name=vs_index_fullname)
index.describe()
# Wait for index to come online. Expect this command to take several minutes.
import time
while not index.describe().get('status').get('detailed_state').startswith('ONLINE'):
print("Waiting for index to be ONLINE...")
time.sleep(5)
print("Index is ONLINE")
index.describe()
相似性搜索
查询矢量索引以查找类似的文档。
# Returns [col1, col2, ...]
# You can set this to any subset of the columns.
all_columns = spark.table(source_table_fullname).columns
results = index.similarity_search(
query_text="Greek myths",
columns=all_columns,
num_results=2)
results
# Search with a filter. Note that the syntax depends on the endpoint type.
# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=all_columns,
filters={"id NOT": ("13770", "88231")},
num_results=2)
# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=all_columns,
# filters='id NOT IN ("13770", "88231")',
# num_results=2)
results
将结果转换为 LangChain 文档
检索到的第一列将加载到 page_content 中,其余列将加载到元数据中。
from langchain_core.documents import Document
from typing import List
def convert_vector_search_to_documents(results) -> List[Document]:
column_names = []
for column in results["manifest"]["columns"]:
column_names.append(column)
langchain_docs = []
for item in results["result"]["data_array"]:
metadata = {}
score = item[-1]
# print(score)
i = 1
for field in item[1:-1]:
# print(field + "--")
metadata[column_names[i]["name"]] = field
i = i + 1
doc = Document(page_content=item[0], metadata=metadata) # , 9)
langchain_docs.append(doc)
return langchain_docs
langchain_docs = convert_vector_search_to_documents(results)
langchain_docs
删除向量索引
vsc.delete_index(index_name=vs_index_fullname)