이 자습서에서는 Spark에 청구서를 로드하고, Azure 문서 인텔리전스를 사용하여 구조화된 데이터를 추출하고, 텍스트를 번역하고, Azure OpenAI로 보강하고, 쿼리할 수 있는 Azure AI Search 인덱스에 결과를 씁니다. 자습서를 완료하는 데 약 30분이 걸립니다.
필수 조건
- Microsoft Fabric 구독. 또는 무료 Microsoft Fabric 평가판 등록합니다.
- 레이크하우스에 연결된 패브릭 노트북
- Azure AI services(Foundry 도구) 리소스 및 키
- Azure Translator 리소스 및 키
- Azure AI Search 서비스 및 관리 키
-
(또는 해당) 배포가 있는
gpt-4o-mini리소스
종속성 설정
패키지를 가져오고 이 워크플로에 사용되는 Azure 리소스에 연결합니다.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("Foundry-resource-key") # replace with your Azure AI services key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your Azure Translator resource key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your Azure AI Search key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your Azure OpenAI key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-4o-mini"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
Spark에 데이터 로드
이 코드는 데모용으로 사용되는 Azure 스토리지 계정에서 몇 개의 외부 파일을 로드합니다. 파일은 다양한 청구서이며 코드는 이를 데이터 프레임으로 읽습니다.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
문서 인텔리전스 적용
이 코드는 AnalyzeInvoices 변환기를 로드하고 청구서가 포함된 데이터 프레임에 대한 참조를 전달합니다. Azure Document Intelligence의 미리 빌드된 청구서 모델을 호출합니다.
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
문서 인텔리전스 출력 간소화
FormOntologyLearner 변환기는 동적 AnalyzeInvoices 출력에서 테이블 형식 구조를 유추하여 더 간단한 다운스트림 분석을 위해 열과 행으로 구성합니다.
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
테이블 형식 데이터 프레임을 사용하면 SparkSQL을 사용하여 양식에 있는 중첩 테이블을 평면화할 수 있습니다.
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
번역 추가
이 코드는 Foundry Tools 서비스의 Azure Translator 호출하는 변환기인 Translate를 로드합니다. "설명" 열에 영어로 된 원본 텍스트는 다양한 언어로 컴퓨터 번역됩니다. 모든 출력은 "output.translations" 배열로 통합됩니다.
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
OpenAI를 사용하여 제품을 이모지로 변환
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
OpenAI를 사용하여 공급업체 주소의 대륙을 유추하기
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
양식에 대한 Azure AI Search 인덱스 만들기
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
검색 쿼리 사용해 보기
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2024-07-01".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
Azure AI Search 도구로 사용할 수 있는 챗봇 빌드
import json
from openai import AzureOpenAI
client = AzureOpenAI(
api_key=openai_key,
api_version="2024-10-21",
azure_endpoint=openai_url,
)
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = client.chat.completions.create(
model=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response.choices[0].message.content
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
챗봇에 질문하기
custom_chatbot("What did Luke Diaz buy?")
결과 확인
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)