[Spark] Kafka๋ฅผ ์ฌ์ฉํ Spark Streaming
๐ Apache Kafka๋?
Apache Kafka
๋ ์ค์๊ฐ์ผ๋ก ๋ฐ์ํ๋ ๋๊ท๋ชจ ์ด๋ฒคํธ ์คํธ๋ฆผ์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ค๊ณ๋ ๋ถ์ฐ ์ด๋ฒคํธ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ์ด๋ค.
Pub/Sub
๋ชจ๋ธ์ ๋ฐ๋ฅธ๋ค. Producer
๋ ๋ฐ์ดํฐ๋ฅผ ์์ฐํ๊ณ Consumer
๋ ๋ฐ์ดํฐ๋ฅผ ์๋นํ๋ค. ์ด๋ค์ ์์ ํ ๋ถ๋ฆฌํ์ฌ ์๋ก ์ง์ ์ ์ผ๋ก ํต์ ํ ํ์๊ฐ ์๋๋ก ํ๋ค. Topic
์ ๋ฉ์์ง๋ฅผ ๋ถ๋ฅํ๋ ์ฑ๋์ด๋ค. producer์ consumer๋ ๊ฐ๊ฐ ํ ํฝ์ ๋ฉ์์ง๋ฅผ ๋ฐํํ๊ณ ๊ตฌ๋
ํ๋ค.
ํน์ง
- ๋จ์ผ ์๋ฒ๊ฐ ์๋ ์ฌ๋ฌ ๋
ธ๋์ ๋ถ์ฐ๋์ด ๋์๋๋๋ก ์ค๊ณ๋์๋ค. kafka ํด๋ฌ์คํฐ๋ฅผ ๊ตฌ์ฑํ๋ ๊ฐ๋ณ ๋
ธ๋๋ฅผ
Broker
๋ผ๊ณ ํ๋ฉฐ, ์ฌ๋ฌ ๊ฐ์ broker๊ฐ ๋ชจ์ฌ ํ๋์ ์นดํ์นด ํด๋ฌ์คํฐ๋ฅผ ๊ตฌ์ฑํ๋ค. - ํ๋์ ๊ฑฐ๋ํ ํ ํฝ์ ์ฌ๋ฌ ๊ฐ์ ํํฐ์ ์ผ๋ก ๋ถํ ํ์ฌ, ์ด๋ค์ ์ฌ๋ฌ ๋ธ๋ก์ปค์ ๋ถ์ฐํ์ฌ ์ ์ฅํ๋ค. ๋ง์ฝ ๋ฐ์ดํฐ์ ์์ด ๋์ด๋๋ฉด ํํฐ์ ์ ์๋ฅผ ๋๋ ค ๋ ๋ง์ ๋ธ๋ก์ปค์ ์์ ์ ๋ถ์ฐ์ํฌ ์ ์๋ค. consumer๋ ํํฐ์ ์ ๋ํ์ฌ ๋ณ๋ด ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ฏ๋ก, ๋น ๋ฅธ ์๋๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค. ๋ํ ํ๋์ ํํฐ์ ๋ด์์๋ ๋ฉ์์ง์ ์์๋ฅผ ๋ณด์ฅํ๋ค.
- ๋ฐ์ดํฐ์ ์ ์ค์ ๋ฐฉ์งํ๊ธฐ ์ํด ๊ฐ ํํฐ์ ์ ๋ณต์ ๋ณธ์ ๋ค๋ฅธ ๋ธ๋ก์ปค์ ์ ์ฅํ๋ค.
- ์์ ํ ๋ฉ์์ง๋ฅผ ๋ฉ๋ชจ๋ฆฌ๊ฐ ์๋ ๋์คํฌ์ ์๊ตฌ์ ์ผ๋ก ์ ์ฅํ๋ค. consumer๊ฐ ๋ฉ์์ง๋ฅผ ์ฝ์ด๋ ๋ฐ์ดํฐ๋ฅผ ์ฆ์ ์ญ์ ๋์ง ์๊ณ ์ค์ ๋ ๊ธฐ๊ฐ ๋์ ๋์คํฌ์ ๋ณด๊ด๋๋ค. ์ด๋ ์ฅ์ ๋ฐ์ ์ ๋ฐ์ดํฐ ์ ์ค์ ๋ฐฉ์งํ๋ค. ์ด ๋๋ฌธ์ ๋ค๋ฅธ consumer๊ฐ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๋ฒ ์ฝ์ด ์ฌ์ฉํ ์ ์๋ค.
- ๋ฐ์ดํฐ๋ฒ ์ด์ค, ๊ฒ์ ์์ง, ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค ๋ฑ๊ณผ ํตํฉ์ด ์ฝ๋ค.
- ์ํ์ ํ์ฅ์ด ์ฝ๋ค. ํด๋ฌ์คํฐ์ ์ฉ๋์ด ๋ถ์กฑํด์ง๋ฉด ํด๋ฌ์คํฐ์ ๋ธ๋ก์ปค๋ฅผ ์ถ๊ฐํ๋ฉด ๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spark = SparkSession \
.builder \
.appName("StructuredWordCount") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", "3") \
.getOrCreate()
events = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart") \
.option("startingOffsets", "earliest") \
.load()
spark.sql.shuffle.partitions
์์ ๋ฐ์ดํฐ ์
ํ๋ง์ ํํฐ์
์๋ฅผ ์ค์ ํ๋ค.
SparkSesion
์ ์์ฑํ๋ค๋ฉด ์ด์ ์นดํ์นด ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ ์ํ๋ค.
format
์ kafka
๋ก ์ง์ ํ์ฌ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๋ง๋ ๋ค.
kafka.bootstrap.servers
๋ ์คํํฌ๊ฐ ์นดํ์นด ํด๋ฌ์คํฐ์ ์ ์ํ๊ธฐ ์ํด ํ์ํ ๋ธ๋ก์ปค ๋ชฉ๋ก์ด๋ค. ์คํํฌ driver์ executor๋ ํด๋น ์ฃผ์๋ฅผ ํตํด ๋ธ๋ก์ปค์ ํต์ ์ ์งํํ๋ค.
subscribe
๋ ์ด๋ค ์นดํ์นด ํ ํฝ์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ์ง ์ง์ ํ๋ค.
startingOffsets
๋ ์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ๊ฐ ์์๋ ๋ ํ ํฝ์ ์ด๋ ์ง์ ๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ์์ง ๊ฒฐ์ ํ๋ค. earliest
๋ก ์ค์ ํ๋ฉด ํ ํฝ์ ๊ฐ์ฅ ์ค๋๋ ๋ฐ์ดํฐ๋ถํฐ ์ฝ๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
schema = StructType([
StructField("city", StringType()),
StructField("domain", StringType()),
StructField("event", StringType())
])
value_df = events.select(
col('key'),
from_json(
col("value").cast("string"), schema
).alias("value")
)
์นดํ์นด๋ฅผ ํตํด JSON ํ์ผ์ ์ฝ๊ธฐ ์ํด์, ๋จผ์ JSON ์คํค๋ง๋ฅผ ์ ์ํด์ผ ํ๋ค. ์ดํ from_json
๋ฉ์๋๋ฅผ ํตํด JSON ๋ฌธ์์ด์ ์ ์ํ schema
์ ๋ฐ๋ผ ํ์ฑํ์ฌ ๊ตฌ์กฐ์ฒด๋ฅผ ๋ง๋ ๋ค.
1
2
3
4
5
output_df = concat_df.selectExpr(
'null',
"""
to_json(named_struct("lower_city", lower_city, "domain", domain, "behavior", behavior)) as value
""".strip())
์นดํ์นด ์ฑํฌ๋ ๊ธฐ๋ณธ์ ์ผ๋ก key์ value ๋ ๊ฐ์ ์ปฌ๋ผ์ ๊ธฐ๋ํ๋ค. named_struct
๋ฉ์๋๋ฅผ ํตํด ์ฌ๋ฌ ์ปฌ๋ผ์ ํ๋์ ๊ตฌ์กฐ์ฒด๋ก ๋ฌถ๊ณ , to_json
๋ฉ์๋๋ฅผ ํตํด ํ๋์ JSON ๋ฌธ์์ด๋ก ์ง๋ ฌํํ๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
file_writer = concat_df \
.writeStream \
.queryName("transformed json") \
.format("json") \
.outputMode("append") \
.option("path", "transformed") \
.option("checkpointLocation", "chk/json") \
.start()
kafka_writer = output_df \
.writeStream \
.queryName("transformed kafka") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("checkpointLocation", "chk/kafka") \
.option("topic", "transformed") \
.outputMode("append") \
.start()
spark.streams.awaitAnyTermination()
๋ง์ฝ ๋ ๊ฐ ์ด์์ ์์ํ์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๊ฒฝ์ฐ ์์ ๊ฐ์ด ๋ฐ๋ก ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ ์ํ๋ฉด ๋๋ค. ๋ค๋ง ์ฃผ์ํด์ผ ํ ์ ์, ๋ ์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ๋ ๋ ผ๋ฆฌ์ ์ผ๋ก ๋ณ๊ฐ์ ์ฟผ๋ฆฌ์ด๋ฏ๋ก ๋ค๋ฅธ ์ฒดํฌํฌ์ธํธ ๋๋ ํฐ๋ฆฌ๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค. ๋ง์ฝ ์ด๋ฅผ ๊ณต์ ํ๋ค๋ฉด ์ํธ ๊ฐ์ญ์ด ๋ฐ์ํ๊ฒ ๋๋ฉฐ, ์์์น ๋ชปํ ๊ฒฐ๊ณผ๊ฐ ๋์ค๊ฒ ๋๋ค.
๋ํ awaitTermination
๋์ awaitAnyTermination
๋ฅผ ์ฌ์ฉํ์๋๋ฐ, ์คํํฌ ์ธ์
๋ด์์ ์คํ ์ค์ธ ๋ชจ๋ ์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ ์ค ์ด๋ ํ๋๋ผ๋ ์ข
๋ฃํ ๋๊น์ง ๋๊ธฐํ๋ ๋ฉ์๋์ด๋ค. ์ด๋ฅผ ํตํด ๋ ๊ฐ ์ด์์ ์ฟผ๋ฆฌ๋ฅผ ๋์๋ณ๋ ฌ์ ์ผ๋ก ์คํํ ์ ์๋ค.
๐ Stateless/Stateful transformation
์ฌ๊ธฐ์ state๋ ์ด์ ๋ฐ์ดํฐ์ ๋ํ ์ ๋ณด๋ ์ค๊ฐ ๊ณ์ฐ ๊ฒฐ๊ณผ๋ฅผ ๋งํ๋ค. ์คํธ๋ฆฌ๋ฐ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ด๋ฅผ ๊ธฐ์ตํ๋์ง ์ฌ๋ถ์ ๋ฐ๋ผ Stateless์ Stateful๋ก ๋๋๋ค.
Stateless transformation
์ ๊ฐ ๋ฐ์ดํฐ๋ฅผ ๋
๋ฆฝ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ์ฐ์ฐ์ด๋ค. ํ์ฌ ์ฒ๋ฆฌ ์ค์ธ ๋ฐ์ดํฐ ์ธ ๋ค๋ฅธ ์ ๋ณด๋ ํ์ํ์ง ์๋ค. ์ํ๋ฅผ ๊ด๋ฆฌํ ํ์๊ฐ ์๊ธฐ ๋๋ฌธ์ ๊ฐ๋จํ๋ฉฐ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ ๊ณ ์ฒ๋ฆฌ ์๋๊ฐ ๋น ๋ฅด๋ค. ๊ฐ ๋ฐ์ดํฐ๊ฐ ๋
๋ฆฝ์ ์ด๋ฏ๋ก worker ๋
ธ๋๋ฅผ ์ถ๊ฐํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ฑ๋ฅ์ ๋์ผ ์ ์๋ค. select
, withColumn
, filter
, map
, explode
์ฐ์ฐ ๋ฑ์ด ์๋ค.
์ ์ฒด ๊ฒฐ๊ณผ๋ฅผ ์ถ๋ ฅํ๋ Complete
๋ชจ๋๋ ์ฌ์ฉํ ์ ์๋ค.
Stateful transformation
์ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ์ ์ปจํ
์คํธ๋ฅผ ์ ์งํ๊ณ ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ์ฐ์ฐ์ด๋ค. groupBy().agg()
์ ๊ฐ์ ์คํธ๋ฆฌ๋ฐ ์ง๊ณ, window
๋ฉ์๋์ ๊ฐ์ ์๋์ฐ ์ง๊ณ, ๋ ๊ฐ์ ์คํธ๋ฆผ์ ์กฐ์ธํ๋ ์ฐ์ฐ์ด ๋ํ์ ์ด๋ค. ์ํ๋ฅผ ์ ์ฅํด์ผ ํ๊ธฐ ๋๋ฌธ์ ์ถฉ๋ถํ ๋ฉ๋ชจ๋ฆฌ ๊ณต๊ฐ์ด ํ์์ ์ด๋ค. ์ํ ๊ด๋ฆฌ๋ฅผ ์คํํฌ๊ฐ ์๋์ผ๋ก ํ ์ ์์ง๋ง, ๊ฐ๋ฐ์๊ฐ ์ง์ ์ํ๋ฅผ ๊ด๋ฆฌํ ์๋ ์๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
events = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "pos") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.load()
value_df = events.select(
col('key'),
from_json(
col("value").cast("string"), schema).alias("value"))
tf_df = value_df.selectExpr(
'value.product_id',
'value.amount')
์ ๊ณผ์ ์ Stateless๋ผ๊ณ ๋ณผ ์ ์๋ค. ๋ชจ๋ ์ฐ์ฐ์ ๊ฐ ํ์ ๊ธฐ์ค์ผ๋ก ๋ ๋ฆฝ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ด๋ค. ๋ค์ ๋งํด, ํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐ ๊ณผ๊ฑฐ์ ๋ฐ์ดํฐ๊ฐ ํ์ํ์ง ์๋ค.
1
2
3
total_df = tf_df.select("product_id", "amount")\
.groupBy("product_id")\
.sum("amount").withColumnRenamed("sum(amount)", "total_amount")
๋ฐ๋ฉด ์ ๊ณผ์ ์ Stateful์ด๋ค. total_amount
๋ฅผ ๊ณ์ฐํ๊ธฐ ์ํด ์คํํฌ๋ ์ํ์ ํ๋งค์ก ํฉ๊ณ๊ฐ ์ผ๋ง์ธ์ง ๊ธฐ์ตํ๊ณ ์์ด์ผ ํ๊ธฐ ๋๋ฌธ์ด๋ค.
groupBy
๊ฐ ํธ์ถ๋๋ฉด ์คํํฌ๋ ๋ด๋ถ์ ์ผ๋ก ๋ฉ๋ชจ๋ฆฌ ๊ณต๊ฐ์ ์ค๋นํ๊ณ , ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ๋ฉ๋ชจ๋ฆฌ์์ ๋ฐ์ดํฐ์ ํค๋ฅผ ์ฐพ๊ณ , ์ํ๋ฅผ ์
๋ฐ์ดํธํ๋ค.
๐ ์๋์ฐ ์ง๊ณ ์ฐ์ฐ
ํฌ๊ฒ ๋ ๊ฐ์ง ์๋์ฐ ์ข ๋ฅ๊ฐ ์กด์ฌํ๋ค.
Tumbling Window
๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๊ณ ์ ๋ ํฌ๊ธฐ๋ก ์๋ก ๊ฒน์น์น ์๊ฒ ๊ตฌ๊ฐ์ ๋๋๋ ๋ฐฉ๋ฒ์ด๋ค. ์ฆ, ๊ฐ ์๋์ฐ ์ฌ์ด์๋ ๋นํ์ด ์๋ค. ๊ฐ ์ด๋ฒคํธ๋ ํ๋์ ์๋์ฐ์๋ง ์ํ๋ค.
1
2
3
timestamp_format = "yyyy-MM-dd HH:mm:ss"
tf_df = value_df.select("value.*") \
.withColumn("create_date", to_timestamp("create_date", timestamp_format))
window
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ์๊ฐ ๊ธฐ์ค์ด ๋๋ ์ปฌ๋ผ์ Timestamp
ํ์
์ด์ด์ผ ํ๋ค.
1
2
3
4
window_duration = "5 minutes"
window_agg_df = tf_df \
.groupBy(window(col("create_date"), window_duration)) \
.sum("amount").withColumnRenamed("sum(amount)", "total_amount")
window
๋ฉ์๋๋ฅผ ํตํด ์๋์ฐ๋ฅผ ์ ์ํ๋ค. col
๋ฉ์๋๋ฅผ ํตํด ์ด๋ค ์๊ฐ ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ํ ์ง ์ง์ ํ๊ณ , window_duration
์ ํตํด ์๋์ฐ์ ํฌ๊ธฐ๋ฅผ ์ค์ ํ๋ค. sliding_duration
์ ์ง์ ํ์ง ์์์ผ๋ฏ๋ก ์๋์ผ๋ก ํ
๋ธ๋ง ์๋์ฐ๋ก ๋์ํ๋ค.
Sliding Window
๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๊ณ ์ ๋ ํฌ๊ธฐ๋ก, ์๋ก ๊ฒน์น๋๋ก ๋๋๋ ๋ฐฉ์์ด๋ค. ์ฆ, ํ๋์ ์ด๋ฒคํธ๊ฐ ์ฌ๋ฌ ์๋์ฐ์ ๋์์ ํฌํจ๋ ์ ์๋ค.
๋ ๊ฐ์ง ํ๋ผ๋ฏธํฐ๊ฐ ์กด์ฌํ๋ค. โ์๋์ฐ ํฌ๊ธฐโ๋ ์ง๊ณ๊ฐ ์ํ๋ ์ ์ฒด ๊ตฌ๊ฐ์ ๊ธธ์ด์ด๋ค. โ์ฌ๋ผ์ด๋ ๊ฐ๊ฒฉโ์ ์๋์ฐ๊ฐ ๋ค์ ์์น๋ก ์ด๋ํ๋ ์๊ฐ ๊ฐ๊ฒฉ์ด๋ค. ๋ง์ฝ ์๋์ฐ ํฌ๊ธฐ์ ์ฌ๋ผ์ด๋ ๊ฐ๊ฒฉ์ด ๊ฐ๋ค๋ฉด ์๋์ฐ๋ ๊ฒน์น์ง ์๊ฒ ๋๋ฉฐ, ํ ๋ธ๋ง ์๋์ฐ์ ๋์ผํ๊ฒ ๋์ํ๋ค.
1
2
3
4
5
window_duration = "10 minutes"
sliding_duration = "5 minutes"
window_agg_df = tf_df \
.groupBy(window(col("create_date"), window_duration, sliding_duration)) \
.sum("amount").withColumnRenamed("sum(amount)", "total_amount")
window
๋ฉ์๋์ sliding_duration
์ ์ง์ ํ๋ฉด ์ฌ๋ผ์ด๋ฉ ์๋์ฐ๋ก ๋์ํ๋ค.
๐ Watermark
Watermark
๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์์ ์ผ๋ง๋ ๋ฆ๊ฒ ๋์ฐฉํ๋ ๋ฐ์ดํฐ๊น์ง ์ฒ๋ฆฌํ ๊ฒ์ธ๊ฐ๋ฅผ ์ ์ํ๋ ๊ธฐ์ค์ด๋ค.
์ํฐ๋งํฌ๋ฅผ ์ดํดํ๊ธฐ ์ํด ๋ ๊ฐ์ง ์๊ฐ ๊ฐ๋
์ด ๋ฑ์ฅํ๋ค. Event Time
์ ์ค์ ๋ก ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ์๊ฐ์ด๋ฉฐ, Processing Time
์ ์ด๋ฒคํธ๊ฐ ์คํธ๋ฆฌ๋ฐ ์ ํ๋ฆฌ์ผ์ด์
์ ๋์ฐฉํ์ฌ ์ฒ๋ฆฌ๋๋ ์๊ฐ์ด๋ค. ํญ์ ์ด๋ฒคํธ ์๊ฐ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๊ฐ ์ ๋ ฌ๋์ด ๋ค์ด์ค๋ ๊ฒ์ด ์๋๋ค. ๋คํธ์ํฌ ์ง์ฐ์ด๋ ์์คํ
๋ถํ ๋ฑ ์ฌ๋ฌ๊ฐ์ง ์ด์ ๊ฐ ์กด์ฌํ๋ค.
์ํฐ๋งํฌ๋ ์ด๋ฒคํธ ์๊ฐ์ ๊ธฐ์ค์ผ๋ก ๋์ํ๋ค. Stateful transformation์์ ๋ฉ๋ชจ๋ฆฌ์ ์ค๊ฐ ์ง๊ณ ๊ฒฐ๊ณผ๋ฅผ ์ ์ฅํ ๋, ์ด๋ฅผ ์ ๋ฆฌํ๋ ๊ท์น์ด ์๋ค๋ฉด state๋ ๋ฌดํํ ์ปค์ง๊ฒ ๋๋ฉฐ, OOM ์ค๋ฅ๊ฐ ๋ฐ์ํ๊ฒ ๋๋ค.
๋ํ ์ํฐ๋งํฌ๋ ๋๋ฌด ๋ฆ๊ฒ ๋์ฐฉํ๋ ๋ฐ์ดํฐ๋ฅผ ๋ฌด์ํ๋ ๊ธฐ์ค์ด ๋๊ธฐ๋ ํ๋ค. ์ง์ฐ ์๊ณ๊ฐ๋ณด๋ค ๋ฆ๊ฒ ๋์ฐฉํ ๋ฐ์ดํฐ๋ ์ง๊ณ์ ํฌํจ์ํค์ง ์๋๋ค.
Output Mode
์ถ๋ ฅ ๋ชจ๋๋ ์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ sink
์ ์ด๋ป๊ฒ ์ธ์ง๋ฅผ ๊ฒฐ์ ํ๋ค.
Complete
๋ชจ๋๋ ๋งค ํธ๋ฆฌ๊ฑฐ๋ง๋ค ์ ์ฒด ์ง๊ณ ๊ฒฐ๊ณผ ํ
์ด๋ธ์ ์ถ๋ ฅํ๋ค. ์ ์ฒด ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฌ์ค์ผ ํ๋ฏ๋ก ์ํฐ๋งํฌ์ ์ํด ์ค๋๋ ์๋์ฐ ์ํ๋ฅผ ์ ๋ฆฌํ ์ ์๋ค. ์ฆ, Complete ๋ชจ๋์์ ์ํฐ๋งํฌ๋ ๋ฆ๊ฒ ์จ ๋ฐ์ดํฐ ํํฐ๋ง ์ญํ ๋ง ํ์ง, ์ํ ์ ๋ฆฌ ์ญํ ์ ์ํํ ์ ์๋ค. ์ด๋ ๊ฒฐ๊ตญ OOM ์ค๋ฅ๋ฅผ ์ ๋ฐํ๊ฒ ๋๋ฏ๋ก, ์ฅ์๊ฐ ์คํ๋๋ Stateful ์คํธ๋ฆฌ๋ฐ์์๋ ์ฌ์ฉํ๋ฉด ์ ๋๋ค.
Update
๋ชจ๋๋ ๋ง์ง๋ง ํธ๋ฆฌ๊ฑฐ ์ดํ ๊ฒฐ๊ณผ๊ฐ ๋ณ๊ฒฝ๋ ํ๋ง ์ถ๋ ฅํ๋ค. ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ ์๋์ฐ์ ์ง๊ณ ๊ฐ์ด ์
๋ฐ์ดํธ๋๋ฉด ํด๋น ์๋์ฐ์ ๋ณ๊ฒฝ๋ ๊ฒฐ๊ณผ๊ฐ ์ฑํฌ๋ก ์ถ๋ ฅ๋๋ค. ์ํฐ๋งํฌ๋ก ์งํ๋์ด ํน์ ์๋์ฐ๊ฐ ๋ง๋ฃ๋๋ฉด ํด๋น ์๋์ฐ์ ์ํ๋ฅผ ์ญ์ ํ๋ค. Update ๋ชจ๋๋ ๊ธฐ์กด ๊ฒฐ๊ณผ ํ์ ์์ ํด์ผ ํ๋ฏ๋ก ์ฑํฌ๋ ํค๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ฅผ ์ฐพ์ ์์ ํด์ผ ํ๋ค. ์ด๋ฅผ Upsert
๋ผ๊ณ ํ๋ค. ๋ฐ๋ผ์ ํค ๊ธฐ๋ฐ ์์ ์ด ๊ฐ๋ฅํ ์์คํ
์ ์ ํฉํ๋ค.
Append
๋ชจ๋๋ ํ์ ๋์ด ๋ค์๋ ๋ณ๊ฒฝ๋์ง ์์ ํ๋ง ์ถ๋ ฅํ๋ค. ์ํฐ๋งํฌ๊ฐ ์๋ ๊ฒฝ์ฐ ์ผ๋ฐ์ ์ธ ์ง๊ณ ์ฐ์ฐ์ ๊ณ์ฐ์ด ๊ณ์ ๋ฐ๋ ์ ์์ผ๋ฏ๋ก Append ๋ชจ๋๋ฅผ ์ฌ์ฉํ ์ ์๋ค. Append ๋ชจ๋์์ ์ํฐ๋งํฌ๋ ํน์ ์๋์ฐ๋ฅผ ์ต์ข
ํ์ ์ํค๋ ํจ๊ณผ๋ฅผ ๊ฐ์ ธ์จ๋ค. ์๋์ฐ์ ์ข
๋ฃ ์๊ฐ์ด ์ํฐ๋งํฌ๋ฅผ ์ง๋๋ฉด ํด๋น ์๋์ฐ์ ๊ฒฐ๊ณผ๋ ๋ ์ด์ ๋ณ๊ฒฝ๋์ง ์์ ๊ฒ์ด๋ผ๊ณ ๋ณด์ฅํ ์ ์๋ค. ๋จ, ์๋์ฐ์ ๊ฒฐ๊ณผ๋ ์ํฐ๋งํฌ์ ์ํด ๋ง๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ ค์ผ ๋ณผ ์ ์๋ค.
1
2
3
4
5
window_duration = "5 minutes"
window_agg_df = tf_df \
.withWatermark("create_date", "10 minutes") \
.groupBy(window(col("create_date"), window_duration)) \
.sum("amount").withColumnRenamed("sum(amount)", "total_amount")
withWatermark
๋ฉ์๋๋ฅผ ํตํด ์ํฐ๋งํฌ๋ฅผ ์ ์ํ๋ค. ์ฒซ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ์ ์ํฐ๋งํฌ ๊ณ์ฐ ์ ์ฌ์ฉํ ๊ธฐ์ค ์๊ฐ ์ปฌ๋ผ, ๋ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ์ ์ง์ฐ ์๊ณ๊ฐ์ ์ค์ ํ๋ค.
๐ Streaming - Static Join w/ Cassandra
Cassandra
Cassandra
๋ ์๋ง์ ๋ฐ์ดํฐ๋ฅผ ์ง์ฐ ์์ด ์ ์ฅํ๊ณ , ์ฌ์ฉ์ ์๊ฐ ๋์ด๋ ๋๋ง๋ค ์๋ฒ๋ฅผ ์ ์ฐํ๊ฒ ์ถ๊ฐํ ์ ์๊ณ , ํน์ ์๋ฒ์ ์ฅ์ ๊ฐ ๋ฐ์ํด๋ ์๋น์ค๊ฐ ์ค๋จ๋์ง ์๋๋ก ํ๋ ๋ชฉํ๋ฅผ ๊ฐ์ง๊ณ ๋ฑ์ฅํ ๋ถ์ฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ด๋ค.
์นด์ฐ๋๋ผ ํด๋ฌ์คํฐ์๋ ํน๋ณํ ์ญํ ์ ํ๋ ๋ง์คํฐ ๋
ธ๋๊ฐ ์กด์ฌํ์ง ์๊ณ , ๋ชจ๋ ๋
ธ๋๊ฐ ๋๋ฑํ ์ญํ ์ ์ํํ๋ค. ๋
ธ๋๋ค์ Gossip
์ด๋ผ๋ ํ๋กํ ์ฝ์ ํตํด ์๋ก์ ์ํ๋ฅผ ์ง์์ ์ผ๋ก ๊ตํํ๋ค. ๋ง์คํฐ ๋
ธ๋๊ฐ ์์ผ๋ฏ๋ก ์ด๋ค ๋
ธ๋๊ฐ ๋ค์ด๋์ด๋ ์ ์ฒด ํด๋ฌ์คํฐ๋ ๋ฉ์ถ์ง ์๋๋ค.
๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ์นด์ฐ๋๋ผ๋ ํด๋น ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ๋ ธ๋๋ค์ ๊ฒฐ์ ํ๊ณ ํด๋น ๋ ธ๋๋ค๊ณผ ๋ ธ๋์ ๋ณต์ ๋ณธ์๊ฒ ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋ค. ๊ณ ๊ฐ์ฉ์ฑ์ ๋์ด๊ธฐ ์ํด ์ฝ๊ธฐ ๋ฐ ์ฐ๊ธฐ ์์ฒญ ์ ๋ช ๊ฐ์ ๋ ธ๋๋ก๋ถํฐ ์๋ต์ ๋ฐ์์ผ ์์ฒญ์ด ์ฑ๊ณต๋์๋ค๊ณ ๊ฐ์ฃผํ๋ค.
์นด์ฐ๋๋ผ๋ ์ํ์ ํ์ฅ์ ์ต์ ํ๋์ด ์๋ค. ๋ฐ์ดํฐ๋ฅผ ๋ถ์ฐ์ํค๊ธฐ ์ํด Hash Ring
์ด๋ผ๋ ๊ฐ์์ ๋ง์ ์ฌ์ฉํ๋ค. ๊ฐ ๋
ธ๋๋ ๋ง์ ํน์ ๊ตฌ๊ฐ์ ๋ด๋นํ๋ฉฐ, ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ๋ ๋ฐ์ดํฐ์ PK๋ฅผ ํด์ฑํ์ฌ ๋ง ์์ ํน์ ์์น ๊ฐ์ ์ป๊ณ , ๋ฐ์ดํฐ๋ ํด๋น ์์น๊ฐ ์ํ ๊ตฌ๊ฐ์ ๋ด๋นํ๋ ๋
ธ๋์ ์ ์ฅ๋๋ค. ์ด ๊ณผ์ ์ ์๋น์ค ์ค๋จ ์์ด ์ด๋ฃจ์ด์ง๋ค.
๋ํ ์กฐ์ธ ์ฐ์ฐ์ ์ง์ํ์ง ์๋๋ค. ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ด๋ป๊ฒ ๋ฐ์ดํฐ๋ฅผ ์กฐํํ ๊ฒ์ธ์ง ์ค๊ณํ๊ณ , ์๋์ ์ผ๋ก ๋น์ ๊ทํํ์ฌ ์กฐํ ์ฟผ๋ฆฌ์ ์ต์ ํ๋ ํ ์ด๋ธ์ ๋ง๋ ๋ค.
Cassandra Query Language, CQL
์ด๋ผ๋ ์ฟผ๋ฆฌ ์ธ์ด๋ฅผ ์ฌ์ฉํ๋๋ฐ, SQL๊ณผ ์ ์ฌํ ๋ฌธ๋ฒ์ ์ฌ์ฉํ๋ค. ๊ฐ์ฅ ํฐ ์ฐจ์ด์ ์ WHERE
์ ์ ์ฌ์ฉ์ด ์ ํ์ ์ด๋ผ๋ ์ ์ด๋ค. ์ฑ๋ฅ์ ๋ณด์ฅํ๊ธฐ ์ํด PK์ ํฌํจ๋ ์ปฌ๋ผ์ ์กฐ๊ฑด์ผ๋ก๋ง ์กฐํํ ์ ์๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
spark = SparkSession \
.builder \
.appName("WaterMark") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", "3") \
.config("spark.cassandra.connection.host", "cassandra") \
.config("spark.cassandra.connection.port", "9042") \
.config("spark.cassandra.auth.username", "cassandra") \
.config("spark.cassandra.auth.password", "cassandra") \
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
.config("spark.sql.catalog.lh", "com.datastax.spark.connector.datasource.CassandraCatalog") \
.getOrCreate()
spark-cassandra-connector
๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ ์คํํฌ์ ์นด์ฐ๋๋ผ ํด๋ฌ์คํฐ๋ฅผ ์ฐ๊ฒฐํ๋ค.
1
2
3
4
join_df = event_df.join(
user_df,
event_df.login_id == user_df.login_id,
"inner").drop(user_df.login_id)
์คํํฌ๋ ์นดํ์นด๋ก๋ถํฐ ๋ง์ดํฌ๋ก ๋ฐฐ์น ๋จ์๋ก ์ด๋ฒคํธ๋ฅผ ๊ฐ์ ธ์จ๋ค. ๊ฐ ๋ง์ดํฌ๋ก ๋ฐฐ์น์ ๋ํด ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋ํด ์ค ์ ์ ๋ฐ์ดํฐ์ ์กฐ์ธ์ ์ํํ๋ค. ์ต์ข ๊ฒฐ๊ณผ๋ ์ ์ ๋ฐ์ดํฐ๊ฐ ํฌํจ๋ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐํ๋ ์์ด ๋๋ค. ์ด์ ์ํ๋ฅผ ๊ธฐ์ตํ ํ์๊ฐ ์์ผ๋ฏ๋ก Stateless transformation์ด๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def cassandraWriter(batch_df, batch_id):
batch_df \
.write \
.format("org.apache.spark.sql.cassandra") \
.option("keyspace", "test_db") \
.option("table", "users") \
.mode("append") \
.save()
query = output_df \
.writeStream \
.foreachBatch(cassandraWriter) \
.outputMode("update") \
.trigger(processingTime="5 seconds") \
.start()
foreachBatch
๋ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐํ๋ ์์ ๋ง์ดํฌ๋ก ๋ฐฐ์น ๋จ์๋ก ์ฒ๋ฆฌํ ๋ ๊ฐ ๋ฐฐ์น๋ฅผ ์ผ๋ฐ์ ์ธ ์ ์ ๋ฐ์ดํฐํ๋ ์์ฒ๋ผ ๋ค๋ฃฐ ์ ์๊ฒ ํ๋ ๋ฉ์๋์ด๋ค.
๐ Streaming - Streaming Join
1
2
3
4
5
join_df = impressions_df.join(
clicks_df,
impressions_df.uuid == clicks_df.uuid,
"inner"
)
์คํํฌ๋ ์์ชฝ ์คํธ๋ฆผ ๊ฐ๊ฐ์ ๋ํด ์ํ ์ ์ฅ์๋ฅผ ์ ์งํ๋ค. ์กฐ์ธ ์กฐ๊ฑด์ด ์ผ์นํ๋ฉด ๋ ๋ฐ์ดํฐ๋ฅผ ํฉ์น ์๋ก์ด ํ์ ๋ง๋ค์ด ๊ฒฐ๊ณผ ์คํธ๋ฆผ์ผ๋ก ๋ด๋ณด๋ธ๋ค.
inner join์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ์กฐ์ธ ์กฐ๊ฑด์ด ์ผ์นํ๋ ์๊ฐ ๊ฒฐ๊ณผ๊ฐ ์์ฑ๋์ง๋ง, outer join์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ์ธ์ ๊น์ง ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๊ณ , ํด๋น ํ๋๋ฅผ null๋ก ์ค์ ํ ์ง์ ๋ํ ๊ธฐ์ค์ด ์์ด์ผ ํ๋ค.
1
2
3
4
5
6
7
8
9
10
impressions_df = impression_events.select(...) \
.withColumnRenamed("create_date", "impr_date") \
.withWatermark("impr_date", "5 minutes") \
.withColumnRenamed("uuid", "impr_uuid")
clicks_df = click_events.select(...) \
.withColumnRenamed("create_date", "click_date") \
.withWatermark("click_date", "5 minutes") \
.withColumnRenamed("uuid", "click_uuid")
์ด๋ฌํ ๋ฌธ์ ๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํด ์์ชฝ ์คํธ๋ฆผ์ ์ํฐ๋งํฌ๋ฅผ ์ค์ ํ๋ค.
1
2
3
4
5
6
7
8
9
join_df = impressions_df.join(
clicks_df,
expr("""
impr_uuid = click_uuid AND
click_date >= impr_date AND
click_date <= impr_date + interval 5 minutes
"""),
"leftOuter"
)
๊ทธ๋ฆฌ๊ณ expr
๋ฉ์๋๋ฅผ ํตํด ๋ช
์์ ์ผ๋ก ์กฐ์ธ ์กฐ๊ฑด์ ์ธ๋ถ์ ์ผ๋ก ์ค์ ํ๋ค.
์ ๋ฆฌํ๋ฉด, ์ํฐ๋งํฌ๋ฅผ ํตํด ์ธ์ ์ํ๋ฅผ ์ ๋ฆฌํด์ผ ํ ์ง ๊ฒฐ์ ํ ์ ์์ผ๋ฉฐ, expr
๋ฅผ ํตํ ์ธ๋ถ ์กฐ๊ฑด์ ํตํด ์ธ์ ๊น์ง์ ๋ฐ์ดํฐ๋ฅผ ์กฐ์ธํ ์ง ๊ฒฐ์ ํ ์ ์๋ค.
๐ ์ฐธ๊ณ
https://www.databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html