[Spark] Streaming
๐ Spark Streaming
Spark Streaming
์ ์ค์๊ฐ์ผ๋ก ๋ค์ด์ค๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๊ธฐ ์ํ Spark Core
API์ ํ์ฅ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ด๋ค. ์ด๋ฒคํธ ๋จ์ ์คํธ๋ฆผ ์ฒ๋ฆฌ๊ฐ ์๋๋ผ ๋ง์ดํฌ๋ก ๋ฐฐ์น ์ฒ๋ฆฌ ๋ฐฉ์์ผ๋ก ๋์ํ๋ค.
์ด๋ฒคํธ ๋จ์ ์คํธ๋ฆผ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๊ฐ ์์คํ ์ ๋์ฐฉํ๋ ์ฆ์ ์ด๋ฒคํธ๋ฅผ ๋ฐ๋ก ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ์ด๋ค. ๋ฐ๋ฉด ๋ง์ดํฌ๋ก ๋ฐฐ์น ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ฅผ ์งง์ ์๊ฐ ๊ฐ๊ฒฉ ๋์ ์์งํ๊ณ , ํด๋น ๋ฐฐ์น๋ฅผ ํ๋์ ๋จ์๋ก ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ์ด๋ค.
Spark Streaming์ ๊ฐ์ฅ ํต์ฌ ๊ฐ๋
์ Discretized Stream, DStream
์ด๋ค. DStream์ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๋ํ๋ด๋ ๊ณ ์์ค ์ถ์ํ๋ก, ๋ด๋ถ์ ์ผ๋ก ์ฐ์์ ์ธ RDD์ ์ํ์ค๋ก ๊ตฌ์ฑ๋๋ค.
Spark Streaming์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์งง์ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ์๋ฅด๊ณ , ๊ฐ ์๊ฐ ๊ฐ๊ฒฉ ๋์ ์์ง๋ ๋ฐ์ดํฐ๋ ํ๋์ RDD๋ก ๋ณํ๋๋ค. ์ด๋ฌํ RDD๋ค์ด ์ฐ์์ ์ผ๋ก ์ด์ด์ ธ ์๋ ๊ฒ์ด DStream์ด๋ค. ์ด๋ ๊ณง DStream์ RDD ์ฐ์ฐ์ ์ฌ์ฉํ ์ ์๋ค๋ ๊ฒ์ ์๋ฏธํ๋ค.
๋์ ๊ณผ์
- Kafka, HDFS, S3 ๋ฑ๊ณผ ๊ฐ์ ๋ฐ์ดํฐ ์์ค๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์์ ํ๋ค.
- ์์ ๋ ๋ฐ์ดํฐ๋ฅผ ์ผ์ ํ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ๋ฌถ์ RDD๋ฅผ ์์ฑํ๊ณ RDD๋ค์ ์ฐ์์ธ DStream์ ๋ง๋ ๋ค.
- DStream์ ๊ฐ RDD์ ๋ํด transformation ์ฐ์ฐ์ ๋ณ๋ ฌ๋ก ์งํํ๋ค.
- ์ฒ๋ฆฌ๋ ๊ฒฐ๊ณผ๋ฅผ ๋ด๋ณด๋ธ๋ค.
๐ Structured Streaming
Structured Streaming
์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ โ๋ฌดํ ํ
์ด๋ธโ๋ก ๊ฐ์ฃผํ๋ค. ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ๋์ฐฉํ ๋๋ง๋ค ์ด ๋ฐ์ดํฐ๋ ๋ง์น ๋ฌดํ ํ
์ด๋ธ์ ์๋ก์ด ํ์ผ๋ก ์ถ๊ฐ๋๋ ๊ฒ์ฒ๋ผ ์ทจ๊ธ๋๋ค. ์ด ๊ฐ๋
์ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ์ DataFrame ๋ฐ SQL ์ฐ์ฐ์ ๊ทธ๋๋ก ์ ์ฉํ ์ ์๋ค๋ ์๋ฏธ์ด๋ค.
๋์ ๊ณผ์
- ์ ํด์ง ํธ๋ฆฌ๊ฑฐ ๊ฐ๊ฒฉ๋ง๋ค ์คํํฌ๋ ๋ฐ์ดํฐ ์์ค์์ ๋ง์ง๋ง์ผ๋ก ํ์ธํ ์์ ์ดํ ๋์ฐฉํ ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์๋์ง ํ์ธํ๋ค.
- ์๋ก์ด ๋ฐ์ดํฐ๋ ํ๋์ ๋ฐฐ์น๋ก ๊ฐ์ฃผ๋๋ฉฐ ์ฌ์ฉ์๊ฐ ์ ์ํ ์ฟผ๋ฆฌ๊ฐ ์ด ๋ฐฐ์น์ ์ ์ฉ๋๋ค.
- ์ฟผ๋ฆฌ๊ฐ ์คํ๋๋ฉด ์ค๊ฐ ์ฐ์ฐ ๊ฒฐ๊ณผ๊ฐ ๋ด๊ธฐ๊ฒ ๋๋ฉฐ, ๊ฒฐ๊ณผ ํ ์ด๋ธ์ด ์ ๋ฐ์ดํธ๋๋ค.
- ๊ฒฐ๊ณผ ํ ์ด๋ธ์์ ๋ณ๊ฒฝ๋ ๋ถ๋ถ์ ๋ด๋ณด๋ธ๋ค.
Event Time ๊ธฐ๋ฐ ์ฒ๋ฆฌ
Structured Streaming์ ๋ฐ์ดํฐ๊ฐ ์์คํ ์ ๋์ฐฉํ ์๊ฐ์ด ์๋ ์ด๋ฒคํธ๊ฐ ์ค์ ๋ก ๋ฐ์ํ ์๊ฐ์ ๊ธฐ์ค์ผ๋ก ์ฐ์ฐ์ ์ํํ ์๋ ์๋ค.
โ์๋์ฐ ์ฐ์ฐโ์ ํน์ ์๊ฐ ๋จ์๋ก ๋ฐ์ดํฐ๋ฅผ ๊ทธ๋ฃนํํ์ฌ ์ง๊ณํ๋ ๊ธฐ๋ฅ์ผ๋ก, ์ฃผ๋ก sliding window
๊ธฐ๋ฒ์ ์ฌ์ฉํ๋ค. ์๊ณ์ด ๋ฐ์ดํฐ๋ฅผ ๋ถ์ํ ๋ ํนํ ์ ์ฉํ๋ค.
์๋ฅผ ๋ค์ด 5๋ถ ํฌ๊ธฐ์ ์๋์ฐ๋ฅผ 1๋ถ ๊ฐ๊ฒฉ์ผ๋ก ์ฌ๋ผ์ด๋ฉํ๋ ์ํฉ์ ์ ๊ทธ๋ฆผ๊ณผ ๊ฐ๋ค.
๋คํธ์ํฌ ์ง์ฐ ๋ฑ์ผ๋ก ๋ฐ์ดํฐ๊ฐ ๋ฐ์๋ ์์๋๋ก ๋์ฐฉํ์ง ์์ ์ ์๋๋ฐ, Structured Streaming์ Watermarking
์ด๋ผ๋ ๋ฐฉ์์ผ๋ก ์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ค. Watermark
๋ ์๋์ฐ ๊ณ์ฐ์ ์ํด ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ค๋ฆด ์ต๋ ์๊ฐ์ด๋ค. ์๋ฅผ ๋ค์ด ์ํฐ๋งํฌ๋ฅผ 10๋ถ์ผ๋ก ์ค์ ํ๋ฉด, ํด๋น ์๋์ฐ ๊ณ์ฐ์ 10๋ถ๊น์ง ๋ฆ๊ฒ ๋์ฐฉํ๋ ๋ฐ์ดํฐ๋ ๊ฒฐ๊ณผ์ ํฌํจํ๋ค.
๊ด๋ จ ๋ฉ์๋
1
2
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 10)
StreamingContext
๋ ์คํํฌ ์คํธ๋ฆฌ๋ฐ์ entrypoint๋ฅผ ์์ฑํ๋ค.
๋ ๋ฒ์งธ ์ธ์๋ ๋ฐฐ์น ๊ฐ๊ฒฉ์ ์๋ฏธํ๋๋ฐ, 10์ผ๋ก ์ค์ ํ๋ฉด โ๋ฐ์ดํฐ ์คํธ๋ฆผ์ 10์ด ๊ฐ๊ฒฉ์ผ๋ก ์๋ฅด๊ฒ ๋คโ๋ ์๋ฏธ์ด๋ค.
1
lines = ssc.socketTextStream("127.0.0.1", 9999)
socketTextStream
์ ์ง์ ๋ TCP ์์ผ์ผ๋ก๋ถํฐ ํ
์คํธ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์์ ํ์ฌ DStream์ ์์ฑํ๋ ๋ฉ์๋์ด๋ค.
์ฒซ ๋ฒ์งธ ์ธ์๋ ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ผ ์๋ฒ์ IP ์ฃผ์, ๋ ๋ฒ์งธ ์ธ์๋ ์๋ฒ์ ํฌํธ ๋ฒํธ์ด๋ค.
socketTextStream
์ด ํธ์ถ๋๊ณ ์คํธ๋ฆฌ๋ฐ ์ปจํ
์คํธ๊ฐ ์์(ssc.start()
)๋๋ฉด, ์คํํฌ๋ ํด๋ฌ์คํฐ์ executor ์ค ํ๋์ Receiver
๋ผ๋ ํ์คํฌ๋ฅผ ์คํ์ํจ๋ค. ์ด ๋ฆฌ์๋ฒ๋ ์ง์ ๋ ํธ์คํธ์ TCP ์ฐ๊ฒฐ์ ๋งบ๊ณ , ์์ ๋๊ธฐํ๋ค. ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ๋ฆฌ์๋ฒ๋ ์ด๋ฅผ ์์งํ์ฌ ๋ฉ๋ชจ๋ฆฌ์ Block ํํ๋ก ์ ์ฅํ๋ค. ์ค์ ๋ ๋ฐฐ์น ๊ฐ๊ฒฉ๋ง๋ค ์์ง๋ ๋ฐ์ดํฐ ๋ธ๋ก๋ค์ ํ๋์ RDD๋ก ๋ณํ๋๊ณ , RDD๋ค์ ์ฐ์์ ์ธ ์คํธ๋ฆผ์ด DStream์ด ๋๋ค.
1
ssc.awaitTermination()
awaitTermination
์ ์คํํฌ ์คํธ๋ฆฌ๋ฐ ์ ํ๋ฆฌ์ผ์ด์
์ ๋๋ผ์ด๋ฒ๊ฐ ๋ฐ๋ก ์ข
๋ฃ๋๋ ๊ฒ์ ๋ง๊ณ ์คํธ๋ฆฌ๋ฐ ์์
์ด ๊ณ์ ์งํ๋๋๋ก ๋๊ธฐ์ํค๋ ์ญํ ์ ํ๋ ๋ฉ์๋์ด๋ค. ์คํธ๋ฆฌ๋ฐ์ ๋ช
์์ ์ผ๋ก ์ค๋จ๋๊ธฐ ์ ๊น์ง๋ ์ง์์ ์ผ๋ก ์คํ๋์ด์ผ ํ๋๋ฐ, ์ผ๋ฐ์ ์ธ ํ๋ก๊ทธ๋จ์ ๋ง์ง๋ง ์ฝ๋๋ฅผ ์คํํ๋ฉด ์๋์ผ๋ก ์ข
๋ฃ๋๋ค. ์ด๋ฌํ ๋ฌธ์ ๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํด awaitTermination
์ ํ์ฌ ์ฐ๋ ๋๋ฅผ ์ฐจ๋จํ์ฌ ์ฌ์ฉ์๊ฐ ์ง์ ์ค๋จํ๊ฑฐ๋, ์์ธ๊ฐ ๋ฐ์ํ์ง ์๋ ์ด์ ๊ณ์ ๋๊ธฐํ๋ค.
timeout
ํ๋ผ๋ฏธํฐ๋ฅผ ์ค์ ํ์ฌ ๋๊ธฐํ ์ต๋ ์๊ฐ์ ๋ช
์ํ ์ ์๋ค. ๋ง์ฝ ์๋ค๋ฉด ๋ฌดํ์ ๋๊ธฐํ๋ค.
1
2
3
4
5
6
7
8
9
10
11
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
readStream
์ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ธฐ ์ํ ์ธํฐํ์ด์ค๋ฅผ ๋ฆฌํดํ๋ค.
format
์ ๋ฐ์ดํฐ ์์ค์ ํ์์ ์ง์ ํ๋ค. socket
์ ์ง์ ๋ TCP ์์ผ์ ํตํด ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ผ๋ฉฐ, rate
๋ ์ค์ ์ธ๋ถ ๋ฐ์ดํฐ ์์ค ์์ด ํด๋ฌ์คํธ์ ์คํธ๋ฆฌ๋ฐ ์ฑ๋ฅ์ ๋ฒค์น๋งํนํ ๋ ์ค์ ํ๋ค. file
์ ํ์ผ ๋จ์๋ก ๋ฐ์ดํฐ๊ฐ ์ฃผ๊ธฐ์ ์ผ๋ก ์์ฑ๋ ๋ ์ฌ์ฉํ๋ค. kafka
๋ ์ด๋ฆ ๊ทธ๋๋ก ์นดํ์นด๋ก๋ถํฐ ์ค์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์์ ํ ๋ ์ฌ์ฉํ๋ค.
์ดํ option
์ ํตํด ๋ฐ์ดํฐ ์์ค ์๋ฒ์ IP ์ฃผ์์ ํธํธ ๋ฒํธ๋ฅผ ๋ช
์ํ ์ ์์ผ๋ฉฐ, load
๋ฅผ ํตํด ์ ์๋ ์ค์ ์ ๋ฐํ์ผ๋ก ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ค.
1
2
3
4
5
6
7
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
writeStream
์ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ๋ฅผ ์ฐ๊ธฐ ์ํ ์ธํฐํ์ด์ค๋ฅผ ๋ฆฌํดํ๋ค.
outputMode
๋ ๊ฒฐ๊ณผ ํ
์ด๋ธ์ด ์
๋ฐ์ดํธ๋ ๋๋ง๋ค ์ด๋ค ๋ฐ์ดํฐ๋ฅผ ์ธ๋ถ๋ก ๋ด๋ณด๋ผ์ง๋ฅผ ๊ฒฐ์ ํ๋ค. complete
๋ก ์ค์ ๋๋ฉด ๋งค ํธ๋ฆฌ๊ฑฐ๋ง๋ค ์ ์ฒด ๊ฒฐ๊ณผ ํ
์ด๋ธ์ ๋ชจ๋ ๋ด์ฉ์ ๋ด๋ณด๋ด๋ฉฐ, append
๋ก ์ค์ ๋๋ฉด ์๋กญ๊ฒ ์ถ๊ฐ๋ ํ๋ง, update
๋ก ์ค์ ๋๋ฉด ๋ด์ฉ์ด ๋ณ๊ฒฝ๋ ํ๋ง ๋ด๋ณด๋ธ๋ค.
start
๋ ์ ์๋ ๋ชจ๋ ์ค์ ์ ๋ฐํ์ผ๋ก ์ค์ ์ฟผ๋ฆฌ ์คํ์ ์์ํ๋ action ์ฐ์ฐ์ด๋ค.
Processing Model
Structured Streaming์ ์ ์ฒด์ ์ธ ๋์ ๊ณผ์ ์ ๋ํด ์์๋ณด์.
- ๋ฐ์ดํฐํ๋ ์ ๋๋ SQL API๋ฅผ ํตํด ์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ๋ฅผ ์์ฑํ๋ค.
- ์์ฑ๋ ์ฟผ๋ฆฌ๋ catalyst optimizer๋ฅผ ํฌํจํ๊ณ ์๋ ์คํํฌ SQL ์์ง์ผ๋ก ์ ๋ฌ๋๋ค.
- ์คํํฌ SQL ์์ง์ ์ด๋ฅผ ์ปดํ์ผํ๊ณ ๋ ผ๋ฆฌ์ ๋ฐ ๋ฌผ๋ฆฌ์ ๊ณํ์ ์๋ฆฝํ๋ค.
start
๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด ์ค์ ๋ก ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ๊ฐ ์ด๋ฃจ์ด์ง๋ค. ๋๋ผ์ด๋ฒ ํ๋ก์ธ์ค ๋ด ๋ณ๋์ ์ฐ๋ ๋๊ฐ ์์ฑ๋๋ค.- ์์ฑ๋ ๋ณ๋์ ์ฐ๋ ๋๋ ์ ํด์ง ํธ๋ฆฌ๊ฑฐ์ ๋ฐ๋ผ ๋ฐ๋ณต์ ์ผ๋ก ์์ ์ ์ํํ๋ค. ์ด ๊ณผ์ ์ ์ด์ Streaming์ ๋์ ๊ณผ์ ๊ณผ ๋์ผํ๋ค(๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ , ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์๋์ง ํ์ธํ๋ ๋์ ๋ฑ).
ํธ๋ฆฌ๊ฑฐ ์ค์
Trigger
๋ ์ธ์ , ์ผ๋ง๋ ์์ฃผ ์คํธ๋ฆฌ๋ฐ ์์ค์ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ์ฌ ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ฅผ ์คํํ ์ง ๊ฒฐ์ ํ๋ ์ค์ ์ด๋ค. ํธ๋ฆฌ๊ฑฐ ์ค์ ์ข
๋ฅ๋ ๋ค์๊ณผ ๊ฐ๋ค.
Unspecified
: ํ๋์ ๋ง์ดํฌ๋ก ๋ฐฐ์น๊ฐ ์๋ฃ๋๋ฉด ์ฆ์ ๋ค์ ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ฅผ ์์ํ๋ค. ์ง์ฐ ์๊ฐ์ ๊ฐ๋ฅํ ๋ฎ๊ฒ ์ ์งํ๋ฉด์ ํด๋ฌ์คํฐ์ ๋ชจ๋ ๊ฐ์ฉ ๋ฆฌ์์ค๋ฅผ ํ์ฉํ์ฌ ์ต๋ ์ฒ๋ฆฌ๋์ ๋ด๊ณ ์ถ์ ๋ ์ฌ์ฉํ๋ค.Fixed Interval
: ๊ณ ์ ๋ ์๊ฐ ๊ฐ๊ฒฉ๋ง๋ค ์๋ก์ด ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ฅผ ์คํํ๋๋ก ์ค์ ํ๋ค. ์ด์ ๋ฐฐ์น์ ์ฒ๋ฆฌ ์๊ฐ์ด ์ค์ ๋ ๊ฐ๊ฒฉ๋ณด๋ค ์งง์ผ๋ฉด ๋ค์ ๊ฐ๊ฒฉ์ด ๋ ๋๊น์ง ๋๊ธฐํ๋ฉฐ, ์ด์ ๋ฐฐ์น์ ์ฒ๋ฆฌ ์๊ฐ์ด ๊ฐ๊ฒฉ๋ณด๋ค ๊ธธ์ด์ง๋ฉด ์ฆ์ ๋ค์ ๋ฐฐ์น๋ฅผ ์์ํ๋ค.One-Time
: ์ฟผ๋ฆฌ๊ฐ ์์๋ ๋ ๊ทธ ์์ ์ ์ฌ์ฉ ๊ฐ๋ฅํ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ๋จ ํ๋์ ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ก ์ฒ๋ฆฌํ ํ ์ฟผ๋ฆฌ๋ฅผ ์๋์ผ๋ก ์ค๋จํ๋ค. ๋ฐ์ดํฐ๊ฐ ๋๋ฌด ๋ง์ผ๋ฉด ๋ฉ๋ชจ๋ฆฌ ๊ด๋ จ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์๋ค.Available Now
:One-Time
๊ณผ ์ ์ฒด์ ์ผ๋ก ์ ์ฌํ๋, ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๊ฐ์ ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌํ ์ ์๋ค.Continuous Processing
: ๋ง์ดํฌ๋ก ๋ฐฐ์น ๊ฐ๋ ์ ์ฌ์ฉํ๋ ๊ฒ์ด ์๋ ์ด๋ฒคํธ ๋จ์ ์คํธ๋ฆผ ์ฒ๋ฆฌ์ ์ ์ฌํ๊ฒ ๋ฐ์ดํฐ๊ฐ ๋์ฐฉํ๋ ์ฆ์ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ์ด๋ค. ์ง์ฐ ์๊ฐ์ ๊ทนํ๊น์ง ๋ฎ์ถ ์ ์๋ค. ์ต์ ํ ๋ฒ์ ์ฒ๋ฆฌ ๋ณด์ฅ์ ์ง์ํ๋ค.
1
2
3
4
5
6
7
spark = SparkSession \
.builder \
.appName("StructuredStreamingSum") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.streaming.schemaInference", "true") \
.config("maxFilesPerTrigger", 1) \
.getOrCreate()
spark.streaming.stopGracefullyOnShutdown
์ true
๋ก ์ค์ ํ๋ฉด ์คํํฌ ์คํธ๋ฆฌ๋ฐ ์ ํ๋ฆฌ์ผ์ด์
์ด ์ข
๋ฃ๋ ๋ ๋ชจ๋ ์คํ ์ค์ธ ๋ฐฐ์น ์ฐ์ฐ์ด ๋๋ ๋ค gracefullyํ๊ฒ ์ข
๋ฃ๋๋๋ก ํ๋ค.
spark.sql.streaming.schemaInference
๋ฅผ true
๋ก ์ค์ ํ๋ฉด ์คํธ๋ฆฌ๋ฐ ์์ค๋ฅผ ์ฝ์ ๋ ์๋์ผ๋ก ์คํค๋ง๋ฅผ ์ถ๋ก ํ๋ค.
maxFilesPerTrigger
์ ํ๋์ ํธ๋ฆฌ๊ฑฐ์ ์ฒ๋ฆฌํ ์ต๋ ํ์ผ ๊ฐ์๋ฅผ ์ง์ ํ๋ค.
1
2
3
4
5
6
7
8
9
10
query = shorten_df \
.writeStream \
.format("json") \
.option("path", "streaming_output") \
.option("checkpointLocation", "checkpoint") \
.outputMode("append") \
.trigger(processingTime='5 seconds') \
.start()
query.awaitTermination()
checkpointLocation
์ ์ฒดํฌํฌ์ธํธ๋ฅผ ํ์ฑํํ๊ณ ๊ด๋ จ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ๋๋ ํฐ๋ฆฌ๋ฅผ ์ง์ ํ๋ ์ต์
์ด๋ค. ์ด๋๊น์ง ๋ฐ์ดํฐ๋ฅผ ์ฝ์๋์ง์ ๋ํ ์ ๋ณด์ธ ์คํ์
, ์ด๋ค ๋ฐฐ์น๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌ๋์๋์ง์ ๋ํ ์ ๋ณด์ธ ์ปค๋ฐ, ์ค๊ฐ ์ง๊ณ ๊ฐ์ด๋ ์ํ ์ ๋ณด๋ฅผ ์ ์ฅํ๋ค.
์ฒดํฌํฌ์ธํธ๋ Structured Streaming์ด ์ฟผ๋ฆฌ์ ์งํ ์ํฉ๊ณผ ์ํ๋ฅผ ์ ์ฅ์์ ์ง์์ ์ผ๋ก ์ ์ฅํ๋ ๋ฉ์ปค๋์ฆ์ด๋ค.
trigger
๋ ์๋ก์ด ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ฅผ ์ธ์ , ์ผ๋ง๋ ์์ฃผ ์คํํ ์ง ๊ฒฐ์ ํ๋ ๋ฉ์๋์ด๋ค. processingTime
์ต์
์ interval์ ์ค์ ํ๋ค.
Fault Tolerance
Structured Streaming์ ๊ถ๊ทน์ ์ธ ๋ชฉํ๋ ๋ค์๊ณผ ๊ฐ๋ค.
- ๋ชจ๋ ์ ๋ ฅ ๋ฐ์ดํฐ๋ ์ต์ ํ ๋ฒ ์ด์ ์ฒ๋ฆฌ๋๋ค.
- ์ถ๋ ฅ ์์คํ ์๋ ๋์ผํ ๊ฒฐ๊ณผ๊ฐ ๋ ๋ฒ ์ด์ ๊ธฐ๋ก๋์ง ์๋๋ค.
์ด๋ฅผ ๋ง์กฑํ๊ธฐ ์ํด ๋ฐ์ดํฐ ์์ค๋ ํน์ ์คํ์
์ผ๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฝ์ด์ฌ ์ ์์ด์ผ ํ๋ฉฐ, ์คํํฌ๋ checkpoint
์ Write-Ahead Logs, WAL
๋ฅผ ํตํด ๊ฐ ๋ง์ดํฌ๋ก ๋ฐฐ์น์ ์คํ์
์ ์ ์ฅ์์ ์ ์ฅํ๋ค.
์คํํฌ๊ฐ ํน์ ๋ฐฐ์น๋ฅผ ์์ํ ๋ ์ฒ๋ฆฌํ ๋ฐ์ดํฐ์ ๋ฒ์๋ฅผ WAL์ ๊ธฐ๋กํ ํ, ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋๋ฉด ํด๋น ์คํ์ ๋ฒ์๋ฅผ ์ฒดํฌํฌ์ธํธ์ ์ปค๋ฐํ๋ค.
๋ง์ฝ ์ฒ๋ฆฌ ๋์ค ์ ํ๋ฆฌ์ผ์ด์ ์ด ์คํจํ๋ฉด ์ฌ์์๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฒดํฌํฌ์ธํธ๋ฅผ ํ์ธํ๊ณ , ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋ ๋ฐฐ์น์ ์คํ์ ์ ํ์ธํ๊ณ , ํด๋น ์คํ์ ๋ค์๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์์ฒญํ๋ค.
๋์ผํ ์ ๋ ฅ์ ๋ํด ์ฌ์ฒ๋ฆฌ๊ฐ ๋ฐ์ํ์ ๋, ์ด์ ๊ณผ ๋์ผํ ๊ฒฐ๊ณผ๊ฐ ๋์์ผ ํ๋ฉฐ ๋์ผํ ์ฐ์ฐ์ ์ฌ๋ฌ ๋ฒ ์ํํ๋๋ผ๋ ๊ทธ ๊ฒฐ๊ณผ๊ฐ ๋จ ํ ๋ฒ ์ํํ์ ๋์ ๋์ผํ๊ฒ ์ ์ง๋์ด์ผ ํ๋ค.
๐ ์ฐธ๊ณ
https://docs.newrelic.com/kr/docs/nrql/using-nrql/create-smoother-charts-sliding-windows/