此笔记本演示如何使用矢量搜索Python SDK,该 SDK 提供 VectorSearchClient 作为使用矢量搜索的主要 API。
此笔记本使用 对外部模型的 Databricks 支持 来访问 OpenAI 嵌入模型以生成嵌入内容。
%pip install --upgrade --force-reinstall databricks-vectorsearch tiktoken
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient(disable_notice=True)
# Display help for the Vector Search Client
help(VectorSearchClient)
将玩具数据集加载到源 Delta 表中
下面将创建源 Delta 表。
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog_name = "main"
schema_name = "default"
source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment the following line if you want to start from scratch.
# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
区块示例数据集
对示例数据集进行分块有助于避免超出嵌入模型的上下文限制。 OpenAI 模型最多支持 8192 个令牌。 但是,Databricks 建议将数据拆分为较小的上下文区块,以便可以将更广泛的示例馈送到 RAG 应用程序的推理模型中。
import tiktoken
import pandas as pd
max_chunk_tokens = 1024
encoding = tiktoken.get_encoding("cl100k_base")
def chunk_text(text):
# Encode and then decode within the UDF
tokens = encoding.encode(text)
chunks = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunks
# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
text_chunks = chunk_text(row['text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()
# Replace the id column with a new unique chunk id
# and the text column with the text chunk
row_data['id'] = f"{row['id']}_{chunk_no}"
row_data['text'] = chunk
processed_data.append(row_data)
chunk_no += 1
chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)
# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
.option("delta.enableChangeDataFeed", "true") \
.saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))
创建矢量搜索终结点
vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
vsc.get_endpoint(
name=vector_search_endpoint_name
)
注册 OpenAI 嵌入模型接口
有关详细使用情况信息,请参阅 用于配置 OpenAI 终结点的外部模型文档。
若要提供凭据,请使用 Databricks 机密管理器。
embedding_model_endpoint_name = "openai-embedding-endpoint"
import mlflow.deployments
mlflow_deploy_client = mlflow.deployments.get_deploy_client("databricks")
# Configure the secret manager with the OpenAPI key and provide the
# correct scope and key name below.
mlflow_deploy_client.create_endpoint(
name=embedding_model_endpoint_name,
config={
"served_entities": [{
"external_model": {
"name": "text-embedding-ada-002",
"provider": "openai",
"task": "llm/v1/embeddings",
"openai_config": {
"openai_api_key": "{{secrets/demo/openai-api-key}}" # CHANGE ME
}
}
}]
}
)
创建矢量索引
# Vector index
vs_index = f"{source_table_name}_openai_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint_name
)
index.describe()['status']['message']
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the vector index.
import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
print("Waiting for index to be ready...")
time.sleep(30)
print("Index is ready!")
index.describe()
相似性搜索
以下单元格演示如何查询矢量索引以查找类似的文档。
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5
)
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
# Search with a filter. Note that the syntax depends on the endpoint type.
# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5,
filters={"title NOT": "Hercules"}
)
# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=["id", "text", "title"],
# num_results=5,
# filters='title != "Hercules"'
# )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
删除向量索引
vsc.delete_index(
endpoint_name=vector_search_endpoint_name,
index_name=vs_index_fullname
)