자습서: 실시간 스트리밍 워크로드 실행

실시간 모드를 사용하면 엔드 투 엔드 대기 시간이 5밀리초의 짧은 매우 짧은 대기 시간 스트리밍을 가능하게 하므로 사기 감지 및 실시간 개인 설정과 같은 운영 워크로드에 이상적입니다. 이 자습서에서는 간단한 예제를 사용하여 첫 번째 실시간 스트리밍 쿼리를 설정하는 방법에 대해 설명합니다.

실시간 모드, 사용 시기 및 지원되는 기능에 대한 개념 정보는 구조적 스트리밍의 실시간 모드를 참조하세요. 구성 요구 사항은 실시간 모드 설정을 참조하세요.

요구 사항

시작하기 전에 실시간 모드 설정에 지정된 구성을 사용하는 클래식 컴퓨팅 클러스터를 만들 수 있는 권한이 있는지 확인합니다. 또는 작업 영역 관리자에게 연락하여 실시간 모드 클러스터를 생성해 달라고 요청하십시오.

1단계: Notebook 만들기

Notebook은 스트리밍 쿼리를 개발하고 테스트하기 위한 대화형 환경을 제공합니다. 이 Notebook을 사용하여 실시간 쿼리를 작성하고 결과가 지속적으로 업데이트되는지 확인합니다.

노트북을 만들려면 다음 단계를 수행하십시오.

  1. 사이드바에서 새로 만들기를 클릭한 다음 전자 필기장 아이콘을 클릭합니다.전자 필기장.
  2. 컴퓨팅 드롭다운 메뉴에서 실시간 모드 클러스터를 선택합니다.
  3. 기본 언어로 Python 또는 Scala를 선택합니다.

2단계: 실시간 모드 쿼리 실행

다음 코드를 복사하여 Notebook 셀에 붙여넣고 실행합니다. 이 예제에서는 지정된 속도로 행을 생성하고 결과를 실시간으로 표시하는 속도 원본을 사용합니다.

메모

트리거가 display 있는 realTime 함수는 Databricks Runtime 17.1 이상에서 사용할 수 있습니다.

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

코드를 실행한 후 새 행이 생성될 때 실시간으로 업데이트되는 테이블이 표시됩니다. 표에는 각 행마다 증가하는 timestamp 열과 value 열이 표시됩니다.

코드 이해

위의 코드는 실시간 스트리밍 쿼리의 필수 구성 요소를 보여 줍니다. 다음 표에서는 주요 매개 변수와 해당 매개 변수가 제어하는 내용을 설명합니다.

Python

매개 변수 설명
format("rate") 구성 가능한 속도로 행을 생성하는 기본 제공 원본인 속도 원본을 사용합니다. 외부 종속성 없이 테스트하는 데 유용합니다.
numPartitions 생성된 데이터의 파티션 수를 설정합니다.
rowsPerSecond 초당 생성되는 행 수를 제어합니다.
realTime="5 minutes" 실시간 모드를 사용하도록 설정합니다. 간격은 쿼리 검사점 진행 빈도를 지정합니다. 간격이 길면 검사점의 빈도가 낮아지지만 실패 후 복구 시간이 길어질 수 있습니다.
outputMode="update" 실시간 모드에는 업데이트 출력 모드가 필요합니다.

Scala

매개 변수 설명
format("rate") 구성 가능한 속도로 행을 생성하는 기본 제공 원본인 속도 원본을 사용합니다. 외부 종속성 없이 테스트하는 데 유용합니다.
numPartitions 생성된 데이터의 파티션 수를 설정합니다.
rowsPerSecond 초당 생성되는 행 수를 제어합니다.
Trigger.RealTime() 기본 검사점 간격을 사용하여 실시간 모드를 사용하도록 설정합니다. 예를 들어 Trigger.RealTime("5 minutes")간격을 지정할 수도 있습니다.
OutputMode.Update() 실시간 모드에는 업데이트 출력 모드가 필요합니다.

3단계: 결과 유효성 검사

쿼리를 실행하면 이 함수는 display 속도 원본이 새 행을 생성할 때 실시간으로 업데이트되는 테이블을 만듭니다. 각 행에는 다음이 포함됩니다.

  • 속도 원본에서 행을 생성한 시간에 대한 타임스탬프입니다.
  • 새 행이 추가될 때마다 증가하는 단조 증가 카운터입니다.

테이블은 최소 대기 시간으로 지속적으로 업데이트되며, 실시간 모드에서 데이터를 사용할 수 있게 되는 즉시 데이터를 처리하는 방법을 보여 줍니다. 이는 일괄 처리를 기다리지 않고 데이터를 즉시 보고 작업할 수 있는 실시간 모드의 핵심 이점입니다.

추가 리소스

이제 첫 번째 실시간 쿼리를 실행했으므로 다음 리소스를 탐색하여 Kafka, Kinesis 및 기타 지원되는 원본을 사용하여 프로덕션 스트리밍 애플리케이션을 빌드합니다.