Post

[Spark] Streaming

[Spark] Streaming

๐Ÿ“Œ Spark Streaming

Spark Streaming ์€ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ Spark Core API์˜ ํ™•์žฅ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ด๋‹ค. ์ด๋ฒคํŠธ ๋‹จ์œ„ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๊ฐ€ ์•„๋‹ˆ๋ผ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค.

์ด๋ฒคํŠธ ๋‹จ์œ„ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์‹œ์Šคํ…œ์— ๋„์ฐฉํ•˜๋Š” ์ฆ‰์‹œ ์ด๋ฒคํŠธ๋ฅผ ๋ฐ”๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ๋ฐ˜๋ฉด ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์งง์€ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ ๋™์•ˆ ์ˆ˜์ง‘ํ•˜๊ณ , ํ•ด๋‹น ๋ฐฐ์น˜๋ฅผ ํ•˜๋‚˜์˜ ๋‹จ์œ„๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

Spark Streaming์˜ ๊ฐ€์žฅ ํ•ต์‹ฌ ๊ฐœ๋…์€ Discretized Stream, DStream ์ด๋‹ค. DStream์€ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๋‚˜ํƒ€๋‚ด๋Š” ๊ณ ์ˆ˜์ค€ ์ถ”์ƒํ™”๋กœ, ๋‚ด๋ถ€์ ์œผ๋กœ ์—ฐ์†์ ์ธ RDD์˜ ์‹œํ€€์Šค๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.

Spark Streaming์€ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์งง์€ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์œผ๋กœ ์ž๋ฅด๊ณ , ๊ฐ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ ๋™์•ˆ ์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ๋Š” ํ•˜๋‚˜์˜ RDD๋กœ ๋ณ€ํ™˜๋œ๋‹ค. ์ด๋Ÿฌํ•œ RDD๋“ค์ด ์—ฐ์†์ ์œผ๋กœ ์ด์–ด์ ธ ์žˆ๋Š” ๊ฒƒ์ด DStream์ด๋‹ค. ์ด๋Š” ๊ณง DStream์— RDD ์—ฐ์‚ฐ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค.

๋™์ž‘ ๊ณผ์ •

  1. Kafka, HDFS, S3 ๋“ฑ๊ณผ ๊ฐ™์€ ๋ฐ์ดํ„ฐ ์†Œ์Šค๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ˆ˜์‹ ํ•œ๋‹ค.
  2. ์ˆ˜์‹ ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ผ์ •ํ•œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์œผ๋กœ ๋ฌถ์— RDD๋ฅผ ์ƒ์„ฑํ•˜๊ณ  RDD๋“ค์˜ ์—ฐ์†์ธ DStream์„ ๋งŒ๋“ ๋‹ค.
  3. DStream์˜ ๊ฐ RDD์— ๋Œ€ํ•ด transformation ์—ฐ์‚ฐ์„ ๋ณ‘๋ ฌ๋กœ ์ง„ํ–‰ํ•œ๋‹ค.
  4. ์ฒ˜๋ฆฌ๋œ ๊ฒฐ๊ณผ๋ฅผ ๋‚ด๋ณด๋‚ธ๋‹ค.

๐Ÿ“Œ Structured Streaming

Structured Streaming ์€ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ โ€˜๋ฌดํ•œ ํ…Œ์ด๋ธ”โ€™๋กœ ๊ฐ„์ฃผํ•œ๋‹ค. ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ๋„์ฐฉํ•  ๋•Œ๋งˆ๋‹ค ์ด ๋ฐ์ดํ„ฐ๋Š” ๋งˆ์น˜ ๋ฌดํ•œ ํ…Œ์ด๋ธ”์— ์ƒˆ๋กœ์šด ํ–‰์œผ๋กœ ์ถ”๊ฐ€๋˜๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ์ทจ๊ธ‰๋œ๋‹ค. ์ด ๊ฐœ๋…์€ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ์— DataFrame ๋ฐ SQL ์—ฐ์‚ฐ์„ ๊ทธ๋Œ€๋กœ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์˜๋ฏธ์ด๋‹ค.

๋™์ž‘ ๊ณผ์ •

  1. ์ •ํ•ด์ง„ ํŠธ๋ฆฌ๊ฑฐ ๊ฐ„๊ฒฉ๋งˆ๋‹ค ์ŠคํŒŒํฌ๋Š” ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ๋งˆ์ง€๋ง‰์œผ๋กœ ํ™•์ธํ•œ ์‹œ์  ์ดํ›„ ๋„์ฐฉํ•œ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•œ๋‹ค.
  2. ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋Š” ํ•˜๋‚˜์˜ ๋ฐฐ์น˜๋กœ ๊ฐ„์ฃผ๋˜๋ฉฐ ์‚ฌ์šฉ์ž๊ฐ€ ์ •์˜ํ•œ ์ฟผ๋ฆฌ๊ฐ€ ์ด ๋ฐฐ์น˜์— ์ ์šฉ๋œ๋‹ค.
  3. ์ฟผ๋ฆฌ๊ฐ€ ์‹คํ–‰๋˜๋ฉด ์ค‘๊ฐ„ ์—ฐ์‚ฐ ๊ฒฐ๊ณผ๊ฐ€ ๋‹ด๊ธฐ๊ฒŒ ๋˜๋ฉฐ, ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์ด ์—…๋ฐ์ดํŠธ๋œ๋‹ค.
  4. ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์—์„œ ๋ณ€๊ฒฝ๋œ ๋ถ€๋ถ„์„ ๋‚ด๋ณด๋‚ธ๋‹ค.

Event Time ๊ธฐ๋ฐ˜ ์ฒ˜๋ฆฌ

Structured Streaming์€ ๋ฐ์ดํ„ฐ๊ฐ€ ์‹œ์Šคํ…œ์— ๋„์ฐฉํ•œ ์‹œ๊ฐ„์ด ์•„๋‹Œ ์ด๋ฒคํŠธ๊ฐ€ ์‹ค์ œ๋กœ ๋ฐœ์ƒํ•œ ์‹œ๊ฐ„์„ ๊ธฐ์ค€์œผ๋กœ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

image.png

โ€˜์œˆ๋„์šฐ ์—ฐ์‚ฐโ€™์€ ํŠน์ • ์‹œ๊ฐ„ ๋‹จ์œ„๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋ฃนํ™”ํ•˜์—ฌ ์ง‘๊ณ„ํ•˜๋Š” ๊ธฐ๋Šฅ์œผ๋กœ, ์ฃผ๋กœ sliding window ๊ธฐ๋ฒ•์„ ์‚ฌ์šฉํ•œ๋‹ค. ์‹œ๊ณ„์—ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„์„ํ•  ๋•Œ ํŠนํžˆ ์œ ์šฉํ•˜๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด 5๋ถ„ ํฌ๊ธฐ์˜ ์œˆ๋„์šฐ๋ฅผ 1๋ถ„ ๊ฐ„๊ฒฉ์œผ๋กœ ์Šฌ๋ผ์ด๋”ฉํ•˜๋Š” ์ƒํ™ฉ์€ ์œ„ ๊ทธ๋ฆผ๊ณผ ๊ฐ™๋‹ค.

๋„คํŠธ์›Œํฌ ์ง€์—ฐ ๋“ฑ์œผ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœ์ƒ๋œ ์ˆœ์„œ๋Œ€๋กœ ๋„์ฐฉํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ๋Š”๋ฐ, Structured Streaming์€ Watermarking ์ด๋ผ๋Š” ๋ฐฉ์‹์œผ๋กœ ์ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•œ๋‹ค. Watermark ๋Š” ์œˆ๋„์šฐ ๊ณ„์‚ฐ์„ ์œ„ํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋‹ค๋ฆด ์ตœ๋Œ€ ์‹œ๊ฐ„์ด๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์›Œํ„ฐ๋งˆํฌ๋ฅผ 10๋ถ„์œผ๋กœ ์„ค์ •ํ•˜๋ฉด, ํ•ด๋‹น ์œˆ๋„์šฐ ๊ณ„์‚ฐ์— 10๋ถ„๊นŒ์ง€ ๋Šฆ๊ฒŒ ๋„์ฐฉํ•˜๋Š” ๋ฐ์ดํ„ฐ๋Š” ๊ฒฐ๊ณผ์— ํฌํ•จํ•œ๋‹ค.

๊ด€๋ จ ๋ฉ”์„œ๋“œ

1
2
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 10)

StreamingContext ๋Š” ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ์˜ entrypoint๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.

๋‘ ๋ฒˆ์งธ ์ธ์ž๋Š” ๋ฐฐ์น˜ ๊ฐ„๊ฒฉ์„ ์˜๋ฏธํ•˜๋Š”๋ฐ, 10์œผ๋กœ ์„ค์ •ํ•˜๋ฉด โ€˜๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ 10์ดˆ ๊ฐ„๊ฒฉ์œผ๋กœ ์ž๋ฅด๊ฒ ๋‹คโ€™๋Š” ์˜๋ฏธ์ด๋‹ค.

1
lines = ssc.socketTextStream("127.0.0.1", 9999)

socketTextStream ์€ ์ง€์ •๋œ TCP ์†Œ์ผ“์œผ๋กœ๋ถ€ํ„ฐ ํ…์ŠคํŠธ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ˆ˜์‹ ํ•˜์—ฌ DStream์„ ์ƒ์„ฑํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

์ฒซ ๋ฒˆ์งธ ์ธ์ž๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ผ ์„œ๋ฒ„์˜ IP ์ฃผ์†Œ, ๋‘ ๋ฒˆ์งธ ์ธ์ž๋Š” ์„œ๋ฒ„์˜ ํฌํŠธ ๋ฒˆํ˜ธ์ด๋‹ค.

socketTextStream ์ด ํ˜ธ์ถœ๋˜๊ณ  ์ŠคํŠธ๋ฆฌ๋ฐ ์ปจํ…์ŠคํŠธ๊ฐ€ ์‹œ์ž‘(ssc.start())๋˜๋ฉด, ์ŠคํŒŒํฌ๋Š” ํด๋Ÿฌ์Šคํ„ฐ์˜ executor ์ค‘ ํ•˜๋‚˜์— Receiver ๋ผ๋Š” ํƒœ์Šคํฌ๋ฅผ ์‹คํ–‰์‹œํ‚จ๋‹ค. ์ด ๋ฆฌ์‹œ๋ฒ„๋Š” ์ง€์ •๋œ ํ˜ธ์ŠคํŠธ์™€ TCP ์—ฐ๊ฒฐ์„ ๋งบ๊ณ , ์ˆ˜์‹  ๋Œ€๊ธฐํ•œ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ๋ฆฌ์‹œ๋ฒ„๋Š” ์ด๋ฅผ ์ˆ˜์ง‘ํ•˜์—ฌ ๋ฉ”๋ชจ๋ฆฌ์— Block ํ˜•ํƒœ๋กœ ์ €์žฅํ•œ๋‹ค. ์„ค์ •๋œ ๋ฐฐ์น˜ ๊ฐ„๊ฒฉ๋งˆ๋‹ค ์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ ๋ธ”๋ก๋“ค์€ ํ•˜๋‚˜์˜ RDD๋กœ ๋ณ€ํ™˜๋˜๊ณ , RDD๋“ค์˜ ์—ฐ์†์ ์ธ ์ŠคํŠธ๋ฆผ์ด DStream์ด ๋œ๋‹ค.

1
ssc.awaitTermination()

awaitTermination ์€ ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๋“œ๋ผ์ด๋ฒ„๊ฐ€ ๋ฐ”๋กœ ์ข…๋ฃŒ๋˜๋Š” ๊ฒƒ์„ ๋ง‰๊ณ  ์ŠคํŠธ๋ฆฌ๋ฐ ์ž‘์—…์ด ๊ณ„์† ์ง„ํ–‰๋˜๋„๋ก ๋Œ€๊ธฐ์‹œํ‚ค๋Š” ์—ญํ• ์„ ํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ์ŠคํŠธ๋ฆฌ๋ฐ์€ ๋ช…์‹œ์ ์œผ๋กœ ์ค‘๋‹จ๋˜๊ธฐ ์ „๊นŒ์ง€๋Š” ์ง€์†์ ์œผ๋กœ ์‹คํ–‰๋˜์–ด์•ผ ํ•˜๋Š”๋ฐ, ์ผ๋ฐ˜์ ์ธ ํ”„๋กœ๊ทธ๋žจ์€ ๋งˆ์ง€๋ง‰ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ์ž๋™์œผ๋กœ ์ข…๋ฃŒ๋œ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด awaitTermination ์€ ํ˜„์žฌ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ฐจ๋‹จํ•˜์—ฌ ์‚ฌ์šฉ์ž๊ฐ€ ์ง์ ‘ ์ค‘๋‹จํ•˜๊ฑฐ๋‚˜, ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š” ์ด์ƒ ๊ณ„์† ๋Œ€๊ธฐํ•œ๋‹ค.

timeout ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์„ค์ •ํ•˜์—ฌ ๋Œ€๊ธฐํ•  ์ตœ๋Œ€ ์‹œ๊ฐ„์„ ๋ช…์‹œํ•  ์ˆ˜ ์—†๋‹ค. ๋งŒ์•ฝ ์—†๋‹ค๋ฉด ๋ฌดํ•œ์ • ๋Œ€๊ธฐํ•œ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

readStream ์€ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ธฐ ์œ„ํ•œ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋ฆฌํ„ดํ•œ๋‹ค.

format ์€ ๋ฐ์ดํ„ฐ ์†Œ์Šค์˜ ํ˜•์‹์„ ์ง€์ •ํ•œ๋‹ค. socket ์€ ์ง€์ •๋œ TCP ์†Œ์ผ“์„ ํ†ตํ•ด ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์œผ๋ฉฐ, rate ๋Š” ์‹ค์ œ ์™ธ๋ถ€ ๋ฐ์ดํ„ฐ ์†Œ์Šค ์—†์ด ํด๋Ÿฌ์ŠคํŠธ์˜ ์ŠคํŠธ๋ฆฌ๋ฐ ์„ฑ๋Šฅ์„ ๋ฒค์น˜๋งˆํ‚นํ•  ๋•Œ ์„ค์ •ํ•œ๋‹ค. file ์€ ํŒŒ์ผ ๋‹จ์œ„๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฃผ๊ธฐ์ ์œผ๋กœ ์ƒ์„ฑ๋  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค. kafka ๋Š” ์ด๋ฆ„ ๊ทธ๋Œ€๋กœ ์นดํ”„์นด๋กœ๋ถ€ํ„ฐ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

์ดํ›„ option ์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ ์†Œ์Šค ์„œ๋ฒ„์˜ IP ์ฃผ์†Œ์™€ ํ˜ธํŠธ ๋ฒˆํ˜ธ๋ฅผ ๋ช…์‹œํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, load ๋ฅผ ํ†ตํ•ด ์ •์˜๋œ ์„ค์ •์„ ๋ฐ”ํƒ•์œผ๋กœ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•œ๋‹ค.

1
2
3
4
5
6
7
query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

query.awaitTermination()

writeStream ์€ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์“ฐ๊ธฐ ์œ„ํ•œ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋ฆฌํ„ดํ•œ๋‹ค.

outputMode ๋Š” ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์ด ์—…๋ฐ์ดํŠธ๋  ๋•Œ๋งˆ๋‹ค ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ์™ธ๋ถ€๋กœ ๋‚ด๋ณด๋‚ผ์ง€๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค. complete ๋กœ ์„ค์ •๋˜๋ฉด ๋งค ํŠธ๋ฆฌ๊ฑฐ๋งˆ๋‹ค ์ „์ฒด ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์˜ ๋ชจ๋“  ๋‚ด์šฉ์„ ๋‚ด๋ณด๋‚ด๋ฉฐ, append ๋กœ ์„ค์ •๋˜๋ฉด ์ƒˆ๋กญ๊ฒŒ ์ถ”๊ฐ€๋œ ํ–‰๋งŒ, update ๋กœ ์„ค์ •๋˜๋ฉด ๋‚ด์šฉ์ด ๋ณ€๊ฒฝ๋œ ํ–‰๋งŒ ๋‚ด๋ณด๋‚ธ๋‹ค.

start ๋Š” ์ •์˜๋œ ๋ชจ๋“  ์„ค์ •์„ ๋ฐ”ํƒ•์œผ๋กœ ์‹ค์ œ ์ฟผ๋ฆฌ ์‹คํ–‰์„ ์‹œ์ž‘ํ•˜๋Š” action ์—ฐ์‚ฐ์ด๋‹ค.

Processing Model

Structured Streaming์˜ ์ „์ฒด์ ์ธ ๋™์ž‘ ๊ณผ์ •์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž.

  1. ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๋˜๋Š” SQL API๋ฅผ ํ†ตํ•ด ์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ๋ฅผ ์ž‘์„ฑํ•œ๋‹ค.
  2. ์ž‘์„ฑ๋œ ์ฟผ๋ฆฌ๋Š” catalyst optimizer๋ฅผ ํฌํ•จํ•˜๊ณ  ์žˆ๋Š” ์ŠคํŒŒํฌ SQL ์—”์ง„์œผ๋กœ ์ „๋‹ฌ๋œ๋‹ค.
  3. ์ŠคํŒŒํฌ SQL ์—”์ง„์€ ์ด๋ฅผ ์ปดํŒŒ์ผํ•˜๊ณ  ๋…ผ๋ฆฌ์  ๋ฐ ๋ฌผ๋ฆฌ์  ๊ณ„ํš์„ ์ˆ˜๋ฆฝํ•œ๋‹ค.
  4. start ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ์‹ค์ œ๋กœ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฒ˜๋ฆฌ๊ฐ€ ์ด๋ฃจ์–ด์ง„๋‹ค. ๋“œ๋ผ์ด๋ฒ„ ํ”„๋กœ์„ธ์Šค ๋‚ด ๋ณ„๋„์˜ ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ƒ์„ฑ๋œ๋‹ค.
  5. ์ƒ์„ฑ๋œ ๋ณ„๋„์˜ ์“ฐ๋ ˆ๋“œ๋Š” ์ •ํ•ด์ง„ ํŠธ๋ฆฌ๊ฑฐ์— ๋”ฐ๋ผ ๋ฐ˜๋ณต์ ์œผ๋กœ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ์ด ๊ณผ์ •์€ ์ด์ „ Streaming์˜ ๋™์ž‘ ๊ณผ์ •๊ณผ ๋™์ผํ•˜๋‹ค(๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ณ , ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๋™์ž‘ ๋“ฑ).

ํŠธ๋ฆฌ๊ฑฐ ์„ค์ •

Trigger ๋Š” ์–ธ์ œ, ์–ผ๋งˆ๋‚˜ ์ž์ฃผ ์ŠคํŠธ๋ฆฌ๋ฐ ์†Œ์Šค์˜ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜์—ฌ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋ฅผ ์‹คํ–‰ํ• ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์„ค์ •์ด๋‹ค. ํŠธ๋ฆฌ๊ฑฐ ์„ค์ • ์ข…๋ฅ˜๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  • Unspecified: ํ•˜๋‚˜์˜ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ์ฆ‰์‹œ ๋‹ค์Œ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋ฅผ ์‹œ์ž‘ํ•œ๋‹ค. ์ง€์—ฐ ์‹œ๊ฐ„์„ ๊ฐ€๋Šฅํ•œ ๋‚ฎ๊ฒŒ ์œ ์ง€ํ•˜๋ฉด์„œ ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋ชจ๋“  ๊ฐ€์šฉ ๋ฆฌ์†Œ์Šค๋ฅผ ํ™œ์šฉํ•˜์—ฌ ์ตœ๋Œ€ ์ฒ˜๋ฆฌ๋Ÿ‰์„ ๋‚ด๊ณ  ์‹ถ์„ ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.
  • Fixed Interval: ๊ณ ์ •๋œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ๋งˆ๋‹ค ์ƒˆ๋กœ์šด ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋ฅผ ์‹คํ–‰ํ•˜๋„๋ก ์„ค์ •ํ•œ๋‹ค. ์ด์ „ ๋ฐฐ์น˜์˜ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ์„ค์ •๋œ ๊ฐ„๊ฒฉ๋ณด๋‹ค ์งง์œผ๋ฉด ๋‹ค์Œ ๊ฐ„๊ฒฉ์ด ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋ฉฐ, ์ด์ „ ๋ฐฐ์น˜์˜ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ๊ฐ„๊ฒฉ๋ณด๋‹ค ๊ธธ์–ด์ง€๋ฉด ์ฆ‰์‹œ ๋‹ค์Œ ๋ฐฐ์น˜๋ฅผ ์‹œ์ž‘ํ•œ๋‹ค.
  • One-Time: ์ฟผ๋ฆฌ๊ฐ€ ์‹œ์ž‘๋  ๋•Œ ๊ทธ ์‹œ์ ์— ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ๋‹จ ํ•˜๋‚˜์˜ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋กœ ์ฒ˜๋ฆฌํ•œ ํ›„ ์ฟผ๋ฆฌ๋ฅผ ์ž๋™์œผ๋กœ ์ค‘๋‹จํ•œ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์œผ๋ฉด ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ จ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค.
  • Available Now: One-Time ๊ณผ ์ „์ฒด์ ์œผ๋กœ ์œ ์‚ฌํ•˜๋‚˜, ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋กœ ๋‚˜๋ˆ„์–ด ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.
  • Continuous Processing: ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ๊ฐœ๋…์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹Œ ์ด๋ฒคํŠธ ๋‹จ์œ„ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์™€ ์œ ์‚ฌํ•˜๊ฒŒ ๋ฐ์ดํ„ฐ๊ฐ€ ๋„์ฐฉํ•˜๋Š” ์ฆ‰์‹œ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ์ง€์—ฐ ์‹œ๊ฐ„์„ ๊ทนํ•œ๊นŒ์ง€ ๋‚ฎ์ถœ ์ˆ˜ ์žˆ๋‹ค. ์ตœ์†Œ ํ•œ ๋ฒˆ์˜ ์ฒ˜๋ฆฌ ๋ณด์žฅ์„ ์ง€์›ํ•œ๋‹ค.

1
2
3
4
5
6
7
spark = SparkSession \
    .builder \
    .appName("StructuredStreamingSum") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("maxFilesPerTrigger", 1) \
    .getOrCreate()

spark.streaming.stopGracefullyOnShutdown ์„ true ๋กœ ์„ค์ •ํ•˜๋ฉด ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์ข…๋ฃŒ๋  ๋•Œ ๋ชจ๋“  ์‹คํ–‰ ์ค‘์ธ ๋ฐฐ์น˜ ์—ฐ์‚ฐ์ด ๋๋‚œ ๋’ค gracefullyํ•˜๊ฒŒ ์ข…๋ฃŒ๋˜๋„๋ก ํ•œ๋‹ค.

spark.sql.streaming.schemaInference ๋ฅผ true ๋กœ ์„ค์ •ํ•˜๋ฉด ์ŠคํŠธ๋ฆฌ๋ฐ ์†Œ์Šค๋ฅผ ์ฝ์„ ๋•Œ ์ž๋™์œผ๋กœ ์Šคํ‚ค๋งˆ๋ฅผ ์ถ”๋ก ํ•œ๋‹ค.

maxFilesPerTrigger ์— ํ•˜๋‚˜์˜ ํŠธ๋ฆฌ๊ฑฐ์— ์ฒ˜๋ฆฌํ•  ์ตœ๋Œ€ ํŒŒ์ผ ๊ฐœ์ˆ˜๋ฅผ ์ง€์ •ํ•œ๋‹ค.

1
2
3
4
5
6
7
8
9
10
query = shorten_df \
            .writeStream \
            .format("json") \
            .option("path", "streaming_output") \
            .option("checkpointLocation", "checkpoint") \
            .outputMode("append") \
            .trigger(processingTime='5 seconds') \
            .start()

query.awaitTermination()

checkpointLocation ์€ ์ฒดํฌํฌ์ธํŠธ๋ฅผ ํ™œ์„ฑํ™”ํ•˜๊ณ  ๊ด€๋ จ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•  ๋””๋ ‰ํ„ฐ๋ฆฌ๋ฅผ ์ง€์ •ํ•˜๋Š” ์˜ต์…˜์ด๋‹ค. ์–ด๋””๊นŒ์ง€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์—ˆ๋Š”์ง€์— ๋Œ€ํ•œ ์ •๋ณด์ธ ์˜คํ”„์…‹, ์–ด๋–ค ๋ฐฐ์น˜๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋˜์—ˆ๋Š”์ง€์— ๋Œ€ํ•œ ์ •๋ณด์ธ ์ปค๋ฐ‹, ์ค‘๊ฐ„ ์ง‘๊ณ„ ๊ฐ’์ด๋‚˜ ์ƒํƒœ ์ •๋ณด๋ฅผ ์ €์žฅํ•œ๋‹ค.

์ฒดํฌํฌ์ธํŠธ๋ž€ Structured Streaming์ด ์ฟผ๋ฆฌ์˜ ์ง„ํ–‰ ์ƒํ™ฉ๊ณผ ์ƒํƒœ๋ฅผ ์ €์žฅ์†Œ์— ์ง€์†์ ์œผ๋กœ ์ €์žฅํ•˜๋Š” ๋ฉ”์ปค๋‹ˆ์ฆ˜์ด๋‹ค.

trigger ๋Š” ์ƒˆ๋กœ์šด ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋ฅผ ์–ธ์ œ, ์–ผ๋งˆ๋‚˜ ์ž์ฃผ ์‹คํ–‰ํ• ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. processingTime ์˜ต์…˜์— interval์„ ์„ค์ •ํ•œ๋‹ค.

Fault Tolerance

Structured Streaming์˜ ๊ถ๊ทน์ ์ธ ๋ชฉํ‘œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  1. ๋ชจ๋“  ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋Š” ์ตœ์†Œ ํ•œ ๋ฒˆ ์ด์ƒ ์ฒ˜๋ฆฌ๋œ๋‹ค.
  2. ์ถœ๋ ฅ ์‹œ์Šคํ…œ์—๋Š” ๋™์ผํ•œ ๊ฒฐ๊ณผ๊ฐ€ ๋‘ ๋ฒˆ ์ด์ƒ ๊ธฐ๋ก๋˜์ง€ ์•Š๋Š”๋‹ค.

์ด๋ฅผ ๋งŒ์กฑํ•˜๊ธฐ ์œ„ํ•ด ๋ฐ์ดํ„ฐ ์†Œ์Šค๋Š” ํŠน์ • ์˜คํ”„์…‹์œผ๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ฝ์–ด์˜ฌ ์ˆ˜ ์žˆ์–ด์•ผ ํ•˜๋ฉฐ, ์ŠคํŒŒํฌ๋Š” checkpoint ์™€ Write-Ahead Logs, WAL ๋ฅผ ํ†ตํ•ด ๊ฐ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜์˜ ์˜คํ”„์…‹์„ ์ €์žฅ์†Œ์— ์ €์žฅํ•œ๋‹ค.

์ŠคํŒŒํฌ๊ฐ€ ํŠน์ • ๋ฐฐ์น˜๋ฅผ ์‹œ์ž‘ํ•  ๋•Œ ์ฒ˜๋ฆฌํ•  ๋ฐ์ดํ„ฐ์˜ ๋ฒ”์œ„๋ฅผ WAL์— ๊ธฐ๋กํ•œ ํ›„, ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋˜๋ฉด ํ•ด๋‹น ์˜คํ”„์…‹ ๋ฒ”์œ„๋ฅผ ์ฒดํฌํฌ์ธํŠธ์— ์ปค๋ฐ‹ํ•œ๋‹ค.

๋งŒ์•ฝ ์ฒ˜๋ฆฌ ๋„์ค‘ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์‹คํŒจํ•˜๋ฉด ์žฌ์‹œ์ž‘๋œ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ์ฒดํฌํฌ์ธํŠธ๋ฅผ ํ™•์ธํ•˜๊ณ , ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋œ ๋ฐฐ์น˜์˜ ์˜คํ”„์…‹์„ ํ™•์ธํ•˜๊ณ , ํ•ด๋‹น ์˜คํ”„์…‹ ๋‹ค์Œ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์š”์ฒญํ•œ๋‹ค.

๋™์ผํ•œ ์ž…๋ ฅ์— ๋Œ€ํ•ด ์žฌ์ฒ˜๋ฆฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ, ์ด์ „๊ณผ ๋™์ผํ•œ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์™€์•ผ ํ•˜๋ฉฐ ๋™์ผํ•œ ์—ฐ์‚ฐ์„ ์—ฌ๋Ÿฌ ๋ฒˆ ์ˆ˜ํ–‰ํ•˜๋”๋ผ๋„ ๊ทธ ๊ฒฐ๊ณผ๊ฐ€ ๋‹จ ํ•œ ๋ฒˆ ์ˆ˜ํ–‰ํ–ˆ์„ ๋•Œ์™€ ๋™์ผํ•˜๊ฒŒ ์œ ์ง€๋˜์–ด์•ผ ํ•œ๋‹ค.

๐Ÿ“Œ ์ฐธ๊ณ 

https://docs.newrelic.com/kr/docs/nrql/using-nrql/create-smoother-charts-sliding-windows/

This post is licensed under CC BY 4.0 by the author.