다음을 통해 공유


CREATE STREAMING TABLE (파이프라인)

스트리밍 테이블은 스트리밍 또는 증분 데이터 처리를 지원하는 테이블입니다. 스트리밍 테이블은 파이프라인에서 지원됩니다. 스트리밍 테이블을 새로 고칠 때마다 원본 테이블에 추가된 데이터가 스트리밍 테이블에 추가됩니다. 수동으로 또는 일정에 따라 스트리밍 테이블을 새로 고칠 수 있습니다.

새로 고침을 수행하거나 예약하는 방법에 대한 자세한 내용은 파이프라인 업데이트 실행을 참조하세요.

Syntax

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ {flow_clause | AS query} ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]
   } [ ... ]

flow_clause
  FLOW { { INSERT [ONCE] BY NAME query } |
  { AUTO CDC auto_cdc_flow_spec } }

매개 변수

  • REFRESH

    지정한 경우 테이블을 만들거나 기존 테이블과 해당 내용을 업데이트합니다.

  • 개인

    프라이빗 스트리밍 테이블을 만듭니다.

    • 카탈로그에 추가되지 않으며 정의 파이프라인 내에서만 액세스할 수 있습니다.
    • 카탈로그의 기존 개체와 동일한 이름을 가질 수 있습니다. 파이프라인 내에서 프라이빗 스트리밍 테이블과 카탈로그의 개체 이름이 같으면 이름에 대한 참조가 프라이빗 스트리밍 테이블로 확인됩니다.
    • 프라이빗 스트리밍 테이블은 단일 업데이트가 아니라 파이프라인의 수명 동안만 유지됩니다.

    프라이빗 스트리밍 테이블은 이전에 매개 변수를 사용하여 TEMPORARY 만들어졌습니다.

  • table_name

    새로 만든 테이블의 이름입니다. 정규화된 테이블 이름은 고유해야 합니다.

  • 테이블_규격

    이 선택적 절은 열 목록, 해당 형식, 속성, 설명 및 열 제약 조건을 정의합니다.

  • 테이블 제약조건

    스키마를 지정할 때 기본 키와 외장 키를 정의할 수 있습니다. 이러한 제약 조건은 정보 제공용일 뿐이며 적용되지 않습니다. SQL 언어 참조의 CONSTRAINT 절 참조하세요.

    비고

    테이블 제약 조건을 정의하려면 파이프라인이 Unity 카탈로그 사용 파이프라인이어야 합니다.

  • 테이블_조항

    필요에 따라 테이블에 대한 분할, 주석 및 사용자 정의 속성을 지정합니다. 각 하위 절은 한 번만 지정할 수 있습니다.

    • DELTA 사용

      데이터 형식을 지정합니다. 유일한 옵션은 DELTA입니다.

      이 절은 선택 사항이며 기본적으로 DELTA로 설정됩니다.

    • 기준으로 분할됨

      테이블의 분할에 사용할 하나 이상의 열의 선택적 목록입니다. CLUSTER BY와 상호 배타적입니다.

      Liquid 클러스터링에서는 클러스터링을 위한 유연하고 최적화된 솔루션을 제공합니다. CLUSTER BY 대신 PARTITIONED BY 사용하는 것을 고려하세요.

    • CLUSTER BY

      테이블에서 액체 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다. 자동 액체 클러스터링을 CLUSTER BY AUTO사용하고 Databricks는 클러스터링 키를 지능적으로 선택하여 쿼리 성능을 최적화합니다. PARTITIONED BY와 상호 배타적입니다.

      테이블에 대한 액체 클러스터링 사용을 참조하세요.

    • 위치

      테이블 데이터의 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다.

    • 주석

      선택적인 STRING 리터럴로 테이블을 설명합니다.

    • TBLPROPERTIES

      테이블을 위한 테이블 속성의 선택적 목록입니다.

    • 와 함께 ROW FILTER

    테이블에 행 필터 함수를 추가합니다. 해당 테이블에 대한 이후 쿼리는 함수가 TRUE로 평가되는 행의 하위 집합을 받습니다. 이는 함수가 호출하는 사용자의 ID 및 그룹 멤버 자격을 검사하여 특정 행을 필터링할지 여부를 결정할 수 있으므로 세분화된 액세스 제어에 유용합니다.

    ROW FILTER 조항참조하세요.

    • 흐름

      필요에 따라 테이블을 만든 흐름 인라인을 정의합니다. 흐름은 테이블의 내용을 새로 고치는 상태 저장 쿼리입니다. 지정하지 않은 경우 FLOW 대신 사용 AS query 하거나 .와 함께 흐름을 별도로 CREATE FLOW정의할 수 있습니다. 다음 흐름 유형 중 하나를 지정할 수 있습니다.

      • 이름별 INSERT

        열 이름을 사용하여 테이블에 데이터를 삽입합니다. ONCE 옵션이 제공되지 않으면 쿼리는 스트리밍 쿼리여야 합니다. 키워드 STREAM를 사용하여 스트리밍 의미 체계로 원본에서 읽기 작업을 수행합니다. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다.

        비고

        FLOW INSERT BY NAME 는 .를 사용하는 AS query것과 같습니다. 다음 두 문은 동일한 동작을 가합니다.

        CREATE OR REFRESH STREAMING TABLE raw_data
        AS SELECT * FROM STREAM read_files('abfss://my_path');
        
        CREATE OR REFRESH STREAMING TABLE raw_data
        FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
        
      • 한번

        필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다. ONCE 제공된 경우 쿼리는 스트리밍 쿼리가 아니며 흐름은 기본적으로 한 번 실행됩니다. 전체 새로 고침을 사용하여 테이블을 새로 고치면 흐름이 ONCE 다시 실행되어 데이터를 다시 만듭니다. ONCE 흐름에 INSERT BY NAME 만 적용됩니다.

      • AUTO CDC

        중요합니다

        Databricks Runtime 17.3 이상 및 파이프라인 채널에서 PREVIEW 사용할 수 있습니다.

        AUTO CDC CDC(변경 데이터 캡처) 레코드를 원본에서 테이블로 처리하는 흐름을 정의합니다. 원본 데이터에 CDC 의미 체계가 포함된 경우 사용합니다 AUTO CDC . AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.

  • AS 쿼리

    이 절은 query데이터를 사용하여 테이블을 채웁니다. 이 쿼리는 스트리밍 쿼리여야 합니다. 소스에서 읽기 위해 스트리밍 의미론을 사용하려면 STREAM 키워드를 사용하세요. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다. 변경 커밋이 있는 데이터를 수집하려면 오류를 처리하는 읽기 옵션을 추가할 SkipChangeCommits 수 있습니다.

    query table_specification 함께 지정하면 table_specification 지정된 테이블 스키마에 query반환된 모든 열이 포함되어야 합니다. 그렇지 않으면 오류가 발생합니다. table_specification 지정되었지만 query 반환되지 않은 열은 쿼리할 때 null 값을 반환합니다.

    스트리밍 데이터에 대한 자세한 내용은 파이프라인 사용하여 데이터 변환참조하세요.

    • 읽기 옵션

      쿼리에서 읽기 옵션을 지정하여 원본에서 데이터를 읽는 방법을 구성할 수 있습니다. 예를 들어 원본 데이터의 변경 커밋을 건너뛰도록 지정할 skipChangeCommits 수 있습니다. 읽기 옵션은 쿼리 절에 WITH 지도로 지정됩니다. 다음은 그 예입니다.

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS=TRUE, STARTINGVERSION=X)
      

      선택 =TRUE 사항이므로 다음과 같이 부울 옵션을 지정할 수도 있습니다.

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS)
      

      비고

      읽기 옵션은 Databricks Runtime 17.3 이상에서만 지원됩니다.

      각 옵션에 대한 자세한 내용은 Delta 테이블 스트리밍 읽기 및 쓰기를 참조하여 델타에 대해 아래 읽기 옵션이 지원됩니다.

      • maxFilesPerTrigger
      • maxBytesPerTrigger
      • startingVersion
      • startingTimestamp
      • readChangeFeed
      • withEventTimeOrder
      • skipChangeCommits

필요한 권한

파이프라인에 대한 실행 사용자에게는 다음 권한이 있어야 합니다.

  • SELECT 스트리밍 테이블에서 참조하는 기본 테이블에 대한 권한입니다.
  • 부모 카탈로그에는 USE CATALOG 권한이, 부모 스키마에는 USE SCHEMA 권한이 필요합니다.
  • CREATE MATERIALIZED VIEW 스트리밍 테이블의 스키마에 대한 권한입니다.

사용자가 스트리밍 테이블이 정의된 파이프라인을 업데이트하려면 다음이 필요합니다.

  • 부모 카탈로그에는 USE CATALOG 권한이, 부모 스키마에는 USE SCHEMA 권한이 필요합니다.
  • 스트리밍 테이블의 소유권 또는 REFRESH 스트리밍 테이블에 대한 권한입니다.
  • 스트리밍 테이블의 소유자는 스트리밍 테이블에서 참조하는 기본 테이블에 대한 권한이 있어야 합니다 SELECT .

사용자가 결과 스트리밍 테이블을 쿼리할 수 있도록 하려면 다음이 필요합니다.

  • 부모 카탈로그에는 USE CATALOG 권한이, 부모 스키마에는 USE SCHEMA 권한이 필요합니다.
  • SELECT 스트리밍 테이블에 대한 권한입니다.

제한점

  • 테이블 소유자만 스트리밍 테이블을 새로 고쳐 최신 데이터를 가져올 수 있습니다.
  • ALTER TABLE 명령은 스트리밍 테이블에서 허용되지 않습니다. 테이블의 정의 및 속성은 CREATE OR REFRESH 또는 ALTER STREAMING TABLE 문을 통해 변경해야 합니다.
  • INSERT INTOMERGE 같은 DML 명령을 통해 테이블 스키마를 발전시키는 것은 지원되지 않습니다.
  • 스트리밍 테이블에서는 다음 명령이 지원되지 않습니다.
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 테이블 이름을 바꾸거나 소유자를 변경하는 것은 지원되지 않습니다.
  • 생성된 열, ID 열 및 기본 열은 지원되지 않습니다.

예시

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;

-- Define a streaming table with an inline append flow:
CREATE OR REFRESH STREAMING TABLE raw_data
FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');

-- Define a streaming table with an inline AUTO CDC flow:
CREATE OR REFRESH STREAMING TABLE target
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;