Post

[Spark] Kafka๋ฅผ ์‚ฌ์šฉํ•œ Spark Streaming

[Spark] Kafka๋ฅผ ์‚ฌ์šฉํ•œ Spark Streaming

๐Ÿ“Œ Apache Kafka๋ž€?

Apache Kafka ๋Š” ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ฐœ์ƒํ•˜๋Š” ๋Œ€๊ทœ๋ชจ ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ์„ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์„ค๊ณ„๋œ ๋ถ„์‚ฐ ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ์ด๋‹ค.

Pub/Sub ๋ชจ๋ธ์„ ๋”ฐ๋ฅธ๋‹ค. Producer ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์‚ฐํ•˜๊ณ  Consumer ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ํ•œ๋‹ค. ์ด๋“ค์„ ์™„์ „ํžˆ ๋ถ„๋ฆฌํ•˜์—ฌ ์„œ๋กœ ์ง์ ‘์ ์œผ๋กœ ํ†ต์‹ ํ•  ํ•„์š”๊ฐ€ ์—†๋„๋ก ํ•œ๋‹ค. Topic ์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฅ˜ํ•˜๋Š” ์ฑ„๋„์ด๋‹ค. producer์™€ consumer๋Š” ๊ฐ๊ฐ ํ† ํ”ฝ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ  ๊ตฌ๋…ํ•œ๋‹ค.

ํŠน์ง•

  • ๋‹จ์ผ ์„œ๋ฒ„๊ฐ€ ์•„๋‹Œ ์—ฌ๋Ÿฌ ๋…ธ๋“œ์— ๋ถ„์‚ฐ๋˜์–ด ๋™์ž‘๋˜๋„๋ก ์„ค๊ณ„๋˜์—ˆ๋‹ค. kafka ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๊ตฌ์„ฑํ•˜๋Š” ๊ฐœ๋ณ„ ๋…ธ๋“œ๋ฅผ Broker ๋ผ๊ณ  ํ•˜๋ฉฐ, ์—ฌ๋Ÿฌ ๊ฐœ์˜ broker๊ฐ€ ๋ชจ์—ฌ ํ•˜๋‚˜์˜ ์นดํ”„์นด ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๊ตฌ์„ฑํ•œ๋‹ค.
  • ํ•˜๋‚˜์˜ ๊ฑฐ๋Œ€ํ•œ ํ† ํ”ฝ์„ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ถ„ํ• ํ•˜์—ฌ, ์ด๋“ค์„ ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค์— ๋ถ„์‚ฐํ•˜์—ฌ ์ €์žฅํ•œ๋‹ค. ๋งŒ์•ฝ ๋ฐ์ดํ„ฐ์˜ ์–‘์ด ๋Š˜์–ด๋‚˜๋ฉด ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋ฅผ ๋Š˜๋ ค ๋” ๋งŽ์€ ๋ธŒ๋กœ์ปค์— ์ž‘์—…์„ ๋ถ„์‚ฐ์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. consumer๋Š” ํŒŒํ‹ฐ์…˜์— ๋Œ€ํ•˜์—ฌ ๋ณ‘๋Ÿด ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋ฏ€๋กœ, ๋น ๋ฅธ ์†๋„๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜ ๋‚ด์—์„œ๋Š” ๋ฉ”์‹œ์ง€์˜ ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•œ๋‹ค.
  • ๋ฐ์ดํ„ฐ์˜ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ๊ฐ ํŒŒํ‹ฐ์…˜์˜ ๋ณต์ œ๋ณธ์„ ๋‹ค๋ฅธ ๋ธŒ๋กœ์ปค์— ์ €์žฅํ•œ๋‹ค.
  • ์ˆ˜์‹ ํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ์•„๋‹Œ ๋””์Šคํฌ์— ์˜๊ตฌ์ ์œผ๋กœ ์ €์žฅํ•œ๋‹ค. consumer๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด๋„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฆ‰์‹œ ์‚ญ์ œ๋˜์ง€ ์•Š๊ณ  ์„ค์ •๋œ ๊ธฐ๊ฐ„ ๋™์•ˆ ๋””์Šคํฌ์— ๋ณด๊ด€๋œ๋‹ค. ์ด๋Š” ์žฅ์•  ๋ฐœ์ƒ ์‹œ ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•œ๋‹ค. ์ด ๋•Œ๋ฌธ์— ๋‹ค๋ฅธ consumer๊ฐ€ ๊ฐ™์€ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ๋ฒˆ ์ฝ์–ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, ๊ฒ€์ƒ‰ ์—”์ง„, ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค ๋“ฑ๊ณผ ํ†ตํ•ฉ์ด ์‰ฝ๋‹ค.
  • ์ˆ˜ํ‰์  ํ™•์žฅ์ด ์‰ฝ๋‹ค. ํด๋Ÿฌ์Šคํ„ฐ์˜ ์šฉ๋Ÿ‰์ด ๋ถ€์กฑํ•ด์ง€๋ฉด ํด๋Ÿฌ์Šคํ„ฐ์— ๋ธŒ๋กœ์ปค๋ฅผ ์ถ”๊ฐ€ํ•˜๋ฉด ๋œ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spark = SparkSession \
    .builder \
    .appName("StructuredWordCount") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.shuffle.partitions", "3") \
    .getOrCreate()

events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "quickstart") \
    .option("startingOffsets", "earliest") \
    .load()

spark.sql.shuffle.partitions ์—์„œ ๋ฐ์ดํ„ฐ ์…”ํ”Œ๋ง์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์„ค์ •ํ•œ๋‹ค.

SparkSesion ์„ ์ƒ์„ฑํ–ˆ๋‹ค๋ฉด ์ด์ œ ์นดํ”„์นด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์ •์˜ํ•œ๋‹ค.

format ์„ kafka ๋กœ ์ง€์ •ํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๋งŒ๋“ ๋‹ค.

kafka.bootstrap.servers ๋Š” ์ŠคํŒŒํฌ๊ฐ€ ์นดํ”„์นด ํด๋Ÿฌ์Šคํ„ฐ์— ์ ‘์†ํ•˜๊ธฐ ์œ„ํ•ด ํ•„์š”ํ•œ ๋ธŒ๋กœ์ปค ๋ชฉ๋ก์ด๋‹ค. ์ŠคํŒŒํฌ driver์™€ executor๋Š” ํ•ด๋‹น ์ฃผ์†Œ๋ฅผ ํ†ตํ•ด ๋ธŒ๋กœ์ปค์™€ ํ†ต์‹ ์„ ์ง„ํ–‰ํ•œ๋‹ค.

subscribe ๋Š” ์–ด๋–ค ์นดํ”„์นด ํ† ํ”ฝ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ์ง€ ์ง€์ •ํ•œ๋‹ค.

startingOffsets ๋Š” ์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ๊ฐ€ ์‹œ์ž‘๋  ๋•Œ ํ† ํ”ฝ์˜ ์–ด๋А ์ง€์ ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์„์ง€ ๊ฒฐ์ •ํ•œ๋‹ค. earliest ๋กœ ์„ค์ •ํ•˜๋ฉด ํ† ํ”ฝ์˜ ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ๋ถ€ํ„ฐ ์ฝ๋Š”๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
schema = StructType([
    StructField("city", StringType()),
    StructField("domain", StringType()),
    StructField("event", StringType())
])

value_df = events.select(
    col('key'),
    from_json(
        col("value").cast("string"), schema
    ).alias("value")
)

์นดํ”„์นด๋ฅผ ํ†ตํ•ด JSON ํŒŒ์ผ์„ ์ฝ๊ธฐ ์œ„ํ•ด์„œ, ๋จผ์ € JSON ์Šคํ‚ค๋งˆ๋ฅผ ์ •์˜ํ•ด์•ผ ํ•œ๋‹ค. ์ดํ›„ from_json ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด JSON ๋ฌธ์ž์—ด์„ ์ •์˜ํ•œ schema ์— ๋”ฐ๋ผ ํŒŒ์‹ฑํ•˜์—ฌ ๊ตฌ์กฐ์ฒด๋ฅผ ๋งŒ๋“ ๋‹ค.

1
2
3
4
5
output_df = concat_df.selectExpr(
                'null',
"""
    to_json(named_struct("lower_city", lower_city, "domain", domain, "behavior", behavior)) as value
""".strip())

์นดํ”„์นด ์‹ฑํฌ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ key์™€ value ๋‘ ๊ฐœ์˜ ์ปฌ๋Ÿผ์„ ๊ธฐ๋Œ€ํ•œ๋‹ค. named_struct ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์—ฌ๋Ÿฌ ์ปฌ๋Ÿผ์„ ํ•˜๋‚˜์˜ ๊ตฌ์กฐ์ฒด๋กœ ๋ฌถ๊ณ , to_json ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ํ•˜๋‚˜์˜ JSON ๋ฌธ์ž์—ด๋กœ ์ง๋ ฌํ™”ํ•œ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
file_writer = concat_df \
                .writeStream \
                .queryName("transformed json") \
                .format("json") \
                .outputMode("append") \
                .option("path", "transformed") \
                .option("checkpointLocation", "chk/json") \
                .start()

kafka_writer = output_df \
                .writeStream \
                .queryName("transformed kafka") \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka:9092") \
                .option("checkpointLocation", "chk/kafka") \
                .option("topic", "transformed") \
                .outputMode("append") \
                .start()

spark.streams.awaitAnyTermination()

๋งŒ์•ฝ ๋‘ ๊ฐœ ์ด์ƒ์˜ ์•„์›ƒํ’‹์— ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๋Š” ๊ฒฝ์šฐ ์œ„์™€ ๊ฐ™์ด ๋”ฐ๋กœ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์ •์˜ํ•˜๋ฉด ๋œ๋‹ค. ๋‹ค๋งŒ ์ฃผ์˜ํ•ด์•ผ ํ•  ์ ์€, ๋‘ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ๋Š” ๋…ผ๋ฆฌ์ ์œผ๋กœ ๋ณ„๊ฐœ์˜ ์ฟผ๋ฆฌ์ด๋ฏ€๋กœ ๋‹ค๋ฅธ ์ฒดํฌํฌ์ธํŠธ ๋””๋ ‰ํ„ฐ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค. ๋งŒ์•ฝ ์ด๋ฅผ ๊ณต์œ ํ•œ๋‹ค๋ฉด ์ƒํ˜ธ ๊ฐ„์„ญ์ด ๋ฐœ์ƒํ•˜๊ฒŒ ๋˜๋ฉฐ, ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์˜ค๊ฒŒ ๋œ๋‹ค.

๋˜ํ•œ awaitTermination ๋Œ€์‹  awaitAnyTermination ๋ฅผ ์‚ฌ์šฉํ•˜์˜€๋Š”๋ฐ, ์ŠคํŒŒํฌ ์„ธ์…˜ ๋‚ด์—์„œ ์‹คํ–‰ ์ค‘์ธ ๋ชจ๋“  ์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ ์ค‘ ์–ด๋А ํ•˜๋‚˜๋ผ๋„ ์ข…๋ฃŒํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋‘ ๊ฐœ ์ด์ƒ์˜ ์ฟผ๋ฆฌ๋ฅผ ๋™์‹œ๋ณ‘๋ ฌ์ ์œผ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

๐Ÿ“Œ Stateless/Stateful transformation

์—ฌ๊ธฐ์„œ state๋ž€ ์ด์ „ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์ •๋ณด๋‚˜ ์ค‘๊ฐ„ ๊ณ„์‚ฐ ๊ฒฐ๊ณผ๋ฅผ ๋งํ•œ๋‹ค. ์ŠคํŠธ๋ฆฌ๋ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์ด๋ฅผ ๊ธฐ์–ตํ•˜๋Š”์ง€ ์—ฌ๋ถ€์— ๋”ฐ๋ผ Stateless์™€ Stateful๋กœ ๋‚˜๋‰œ๋‹ค.

Stateless transformation ์€ ๊ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋…๋ฆฝ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ์—ฐ์‚ฐ์ด๋‹ค. ํ˜„์žฌ ์ฒ˜๋ฆฌ ์ค‘์ธ ๋ฐ์ดํ„ฐ ์™ธ ๋‹ค๋ฅธ ์ •๋ณด๋Š” ํ•„์š”ํ•˜์ง€ ์•Š๋‹ค. ์ƒํƒœ๋ฅผ ๊ด€๋ฆฌํ•  ํ•„์š”๊ฐ€ ์—†๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ„๋‹จํ•˜๋ฉฐ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ์ ๊ณ  ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ๋น ๋ฅด๋‹ค. ๊ฐ ๋ฐ์ดํ„ฐ๊ฐ€ ๋…๋ฆฝ์ ์ด๋ฏ€๋กœ worker ๋…ธ๋“œ๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์„ฑ๋Šฅ์„ ๋†’์ผ ์ˆ˜ ์žˆ๋‹ค. select, withColumn, filter, map, explode ์—ฐ์‚ฐ ๋“ฑ์ด ์žˆ๋‹ค.

์ „์ฒด ๊ฒฐ๊ณผ๋ฅผ ์ถœ๋ ฅํ•˜๋Š” Complete ๋ชจ๋“œ๋Š” ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋‹ค.

Stateful transformation ์€ ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ์˜ ์ปจํ…์ŠคํŠธ๋ฅผ ์œ ์ง€ํ•˜๊ณ  ์ด๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ˜„์žฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์—ฐ์‚ฐ์ด๋‹ค. groupBy().agg() ์™€ ๊ฐ™์€ ์ŠคํŠธ๋ฆฌ๋ฐ ์ง‘๊ณ„, window ๋ฉ”์„œ๋“œ์™€ ๊ฐ™์€ ์œˆ๋„์šฐ ์ง‘๊ณ„, ๋‘ ๊ฐœ์˜ ์ŠคํŠธ๋ฆผ์„ ์กฐ์ธํ•˜๋Š” ์—ฐ์‚ฐ์ด ๋Œ€ํ‘œ์ ์ด๋‹ค. ์ƒํƒœ๋ฅผ ์ €์žฅํ•ด์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ถฉ๋ถ„ํ•œ ๋ฉ”๋ชจ๋ฆฌ ๊ณต๊ฐ„์ด ํ•„์ˆ˜์ ์ด๋‹ค. ์ƒํƒœ ๊ด€๋ฆฌ๋ฅผ ์ŠคํŒŒํฌ๊ฐ€ ์ž๋™์œผ๋กœ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ๊ฐœ๋ฐœ์ž๊ฐ€ ์ง์ ‘ ์ƒํƒœ๋ฅผ ๊ด€๋ฆฌํ•  ์ˆ˜๋„ ์žˆ๋‹ค.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "pos") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

value_df = events.select(
            col('key'),
            from_json(
                col("value").cast("string"), schema).alias("value"))

tf_df = value_df.selectExpr(
            'value.product_id',
            'value.amount')

์œ„ ๊ณผ์ •์€ Stateless๋ผ๊ณ  ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ๋ชจ๋“  ์—ฐ์‚ฐ์€ ๊ฐ ํ–‰์„ ๊ธฐ์ค€์œผ๋กœ ๋…๋ฆฝ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ๋‹ค์‹œ ๋งํ•ด, ํ˜„์žฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ ๊ณผ๊ฑฐ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ํ•„์š”ํ•˜์ง€ ์•Š๋‹ค.

1
2
3
total_df = tf_df.select("product_id", "amount")\
    .groupBy("product_id")\
    .sum("amount").withColumnRenamed("sum(amount)", "total_amount")

๋ฐ˜๋ฉด ์œ„ ๊ณผ์ •์€ Stateful์ด๋‹ค. total_amount ๋ฅผ ๊ณ„์‚ฐํ•˜๊ธฐ ์œ„ํ•ด ์ŠคํŒŒํฌ๋Š” ์ƒํ’ˆ์˜ ํŒ๋งค์•ก ํ•ฉ๊ณ„๊ฐ€ ์–ผ๋งˆ์ธ์ง€ ๊ธฐ์–ตํ•˜๊ณ  ์žˆ์–ด์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

groupBy ๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด ์ŠคํŒŒํฌ๋Š” ๋‚ด๋ถ€์ ์œผ๋กœ ๋ฉ”๋ชจ๋ฆฌ ๊ณต๊ฐ„์„ ์ค€๋น„ํ•˜๊ณ , ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ๋ฉ”๋ชจ๋ฆฌ์—์„œ ๋ฐ์ดํ„ฐ์˜ ํ‚ค๋ฅผ ์ฐพ๊ณ , ์ƒํƒœ๋ฅผ ์—…๋ฐ์ดํŠธํ•œ๋‹ค.

๐Ÿ“Œ ์œˆ๋„์šฐ ์ง‘๊ณ„ ์—ฐ์‚ฐ

ํฌ๊ฒŒ ๋‘ ๊ฐ€์ง€ ์œˆ๋„์šฐ ์ข…๋ฅ˜๊ฐ€ ์กด์žฌํ•œ๋‹ค.

image.png

Tumbling Window๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๊ณ ์ •๋œ ํฌ๊ธฐ๋กœ ์„œ๋กœ ๊ฒน์น˜์น˜ ์•Š๊ฒŒ ๊ตฌ๊ฐ„์„ ๋‚˜๋ˆ„๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ์ฆ‰, ๊ฐ ์œˆ๋„์šฐ ์‚ฌ์ด์—๋Š” ๋นˆํ‹ˆ์ด ์—†๋‹ค. ๊ฐ ์ด๋ฒคํŠธ๋Š” ํ•˜๋‚˜์˜ ์œˆ๋„์šฐ์—๋งŒ ์†ํ•œ๋‹ค.

1
2
3
timestamp_format = "yyyy-MM-dd HH:mm:ss"
tf_df = value_df.select("value.*") \
                .withColumn("create_date", to_timestamp("create_date", timestamp_format))

window ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์‹œ๊ฐ„ ๊ธฐ์ค€์ด ๋˜๋Š” ์ปฌ๋Ÿผ์€ Timestamp ํƒ€์ž…์ด์–ด์•ผ ํ•œ๋‹ค.

1
2
3
4
window_duration = "5 minutes"
window_agg_df = tf_df \
    .groupBy(window(col("create_date"), window_duration)) \
    .sum("amount").withColumnRenamed("sum(amount)", "total_amount")

window ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์œˆ๋„์šฐ๋ฅผ ์ •์˜ํ•œ๋‹ค. col ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์–ด๋–ค ์‹œ๊ฐ„ ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ํ•  ์ง€ ์ง€์ •ํ•˜๊ณ , window_duration ์„ ํ†ตํ•ด ์œˆ๋„์šฐ์˜ ํฌ๊ธฐ๋ฅผ ์„ค์ •ํ•œ๋‹ค. sliding_duration ์„ ์ง€์ •ํ•˜์ง€ ์•Š์•˜์œผ๋ฏ€๋กœ ์ž๋™์œผ๋กœ ํ…€๋ธ”๋ง ์œˆ๋„์šฐ๋กœ ๋™์ž‘ํ•œ๋‹ค.

image.png

Sliding Window ๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๊ณ ์ •๋œ ํฌ๊ธฐ๋กœ, ์„œ๋กœ ๊ฒน์น˜๋„๋ก ๋‚˜๋ˆ„๋Š” ๋ฐฉ์‹์ด๋‹ค. ์ฆ‰, ํ•˜๋‚˜์˜ ์ด๋ฒคํŠธ๊ฐ€ ์—ฌ๋Ÿฌ ์›๋„์šฐ์— ๋™์‹œ์— ํฌํ•จ๋  ์ˆ˜ ์žˆ๋‹ค.

๋‘ ๊ฐ€์ง€ ํŒŒ๋ผ๋ฏธํ„ฐ๊ฐ€ ์กด์žฌํ•œ๋‹ค. โ€˜์œˆ๋„์šฐ ํฌ๊ธฐโ€™๋Š” ์ง‘๊ณ„๊ฐ€ ์ˆ˜ํ–‰๋  ์ „์ฒด ๊ตฌ๊ฐ„์˜ ๊ธธ์ด์ด๋‹ค. โ€˜์Šฌ๋ผ์ด๋“œ ๊ฐ„๊ฒฉโ€™์€ ์œˆ๋„์šฐ๊ฐ€ ๋‹ค์Œ ์œ„์น˜๋กœ ์ด๋™ํ•˜๋Š” ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์ด๋‹ค. ๋งŒ์•ฝ ์œˆ๋„์šฐ ํฌ๊ธฐ์™€ ์Šฌ๋ผ์ด๋“œ ๊ฐ„๊ฒฉ์ด ๊ฐ™๋‹ค๋ฉด ์œˆ๋„์šฐ๋Š” ๊ฒน์น˜์ง€ ์•Š๊ฒŒ ๋˜๋ฉฐ, ํ…€๋ธ”๋ง ์œˆ๋„์šฐ์™€ ๋™์ผํ•˜๊ฒŒ ๋™์ž‘ํ•œ๋‹ค.

1
2
3
4
5
window_duration = "10 minutes"
sliding_duration = "5 minutes"
window_agg_df = tf_df \
    .groupBy(window(col("create_date"), window_duration, sliding_duration)) \
    .sum("amount").withColumnRenamed("sum(amount)", "total_amount")

window ๋ฉ”์„œ๋“œ์— sliding_duration ์„ ์ง€์ •ํ•˜๋ฉด ์Šฌ๋ผ์ด๋”ฉ ์œˆ๋„์šฐ๋กœ ๋™์ž‘ํ•œ๋‹ค.

๐Ÿ“Œ Watermark

Watermark ๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์—์„œ ์–ผ๋งˆ๋‚˜ ๋Šฆ๊ฒŒ ๋„์ฐฉํ•˜๋Š” ๋ฐ์ดํ„ฐ๊นŒ์ง€ ์ฒ˜๋ฆฌํ•  ๊ฒƒ์ธ๊ฐ€๋ฅผ ์ •์˜ํ•˜๋Š” ๊ธฐ์ค€์ด๋‹ค.

์›Œํ„ฐ๋งˆํฌ๋ฅผ ์ดํ•ดํ•˜๊ธฐ ์œ„ํ•ด ๋‘ ๊ฐ€์ง€ ์‹œ๊ฐ„ ๊ฐœ๋…์ด ๋“ฑ์žฅํ•œ๋‹ค. Event Time ์€ ์‹ค์ œ๋กœ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•œ ์‹œ๊ฐ„์ด๋ฉฐ, Processing Time ์€ ์ด๋ฒคํŠธ๊ฐ€ ์ŠคํŠธ๋ฆฌ๋ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋„์ฐฉํ•˜์—ฌ ์ฒ˜๋ฆฌ๋˜๋Š” ์‹œ๊ฐ„์ด๋‹ค. ํ•ญ์ƒ ์ด๋ฒคํŠธ ์‹œ๊ฐ„ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ •๋ ฌ๋˜์–ด ๋“ค์–ด์˜ค๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋‹ค. ๋„คํŠธ์›Œํฌ ์ง€์—ฐ์ด๋‚˜ ์‹œ์Šคํ…œ ๋ถ€ํ•˜ ๋“ฑ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์ด์œ ๊ฐ€ ์กด์žฌํ•œ๋‹ค.

์›Œํ„ฐ๋งˆํฌ๋Š” ์ด๋ฒคํŠธ ์‹œ๊ฐ„์„ ๊ธฐ์ค€์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค. Stateful transformation์—์„œ ๋ฉ”๋ชจ๋ฆฌ์— ์ค‘๊ฐ„ ์ง‘๊ณ„ ๊ฒฐ๊ณผ๋ฅผ ์ €์žฅํ•  ๋•Œ, ์ด๋ฅผ ์ •๋ฆฌํ•˜๋Š” ๊ทœ์น™์ด ์—†๋‹ค๋ฉด state๋Š” ๋ฌดํ•œํžˆ ์ปค์ง€๊ฒŒ ๋˜๋ฉฐ, OOM ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋œ๋‹ค.

๋˜ํ•œ ์›Œํ„ฐ๋งˆํฌ๋Š” ๋„ˆ๋ฌด ๋Šฆ๊ฒŒ ๋„์ฐฉํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ฌด์‹œํ•˜๋Š” ๊ธฐ์ค€์ด ๋˜๊ธฐ๋„ ํ•œ๋‹ค. ์ง€์—ฐ ์ž„๊ณ„๊ฐ’๋ณด๋‹ค ๋Šฆ๊ฒŒ ๋„์ฐฉํ•œ ๋ฐ์ดํ„ฐ๋Š” ์ง‘๊ณ„์— ํฌํ•จ์‹œํ‚ค์ง€ ์•Š๋Š”๋‹ค.

Output Mode

์ถœ๋ ฅ ๋ชจ๋“œ๋Š” ์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ sink ์— ์–ด๋–ป๊ฒŒ ์“ธ์ง€๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค.

Complete ๋ชจ๋“œ๋Š” ๋งค ํŠธ๋ฆฌ๊ฑฐ๋งˆ๋‹ค ์ „์ฒด ์ง‘๊ณ„ ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์„ ์ถœ๋ ฅํ•œ๋‹ค. ์ „์ฒด ๊ฒฐ๊ณผ๋ฅผ ๋ณด์—ฌ์ค˜์•ผ ํ•˜๋ฏ€๋กœ ์›Œํ„ฐ๋งˆํฌ์— ์˜ํ•ด ์˜ค๋ž˜๋œ ์œˆ๋„์šฐ ์ƒํƒœ๋ฅผ ์ •๋ฆฌํ•  ์ˆ˜ ์—†๋‹ค. ์ฆ‰, Complete ๋ชจ๋“œ์—์„œ ์›Œํ„ฐ๋งˆํฌ๋Š” ๋Šฆ๊ฒŒ ์˜จ ๋ฐ์ดํ„ฐ ํ•„ํ„ฐ๋ง ์—ญํ• ๋งŒ ํ•˜์ง€, ์ƒํƒœ ์ •๋ฆฌ ์—ญํ• ์€ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์—†๋‹ค. ์ด๋Š” ๊ฒฐ๊ตญ OOM ์˜ค๋ฅ˜๋ฅผ ์œ ๋ฐœํ•˜๊ฒŒ ๋˜๋ฏ€๋กœ, ์žฅ์‹œ๊ฐ„ ์‹คํ–‰๋˜๋Š” Stateful ์ŠคํŠธ๋ฆฌ๋ฐ์—์„œ๋Š” ์‚ฌ์šฉํ•˜๋ฉด ์•ˆ ๋œ๋‹ค.

Update ๋ชจ๋“œ๋Š” ๋งˆ์ง€๋ง‰ ํŠธ๋ฆฌ๊ฑฐ ์ดํ›„ ๊ฒฐ๊ณผ๊ฐ€ ๋ณ€๊ฒฝ๋œ ํ–‰๋งŒ ์ถœ๋ ฅํ•œ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด์™€ ์œˆ๋„์šฐ์˜ ์ง‘๊ณ„ ๊ฐ’์ด ์—…๋ฐ์ดํŠธ๋˜๋ฉด ํ•ด๋‹น ์œˆ๋„์šฐ์˜ ๋ณ€๊ฒฝ๋œ ๊ฒฐ๊ณผ๊ฐ€ ์‹ฑํฌ๋กœ ์ถœ๋ ฅ๋œ๋‹ค. ์›Œํ„ฐ๋งˆํฌ๋กœ ์ง„ํ–‰๋˜์–ด ํŠน์ • ์œˆ๋„์šฐ๊ฐ€ ๋งŒ๋ฃŒ๋˜๋ฉด ํ•ด๋‹น ์œˆ๋„์šฐ์˜ ์ƒํƒœ๋ฅผ ์‚ญ์ œํ•œ๋‹ค. Update ๋ชจ๋“œ๋Š” ๊ธฐ์กด ๊ฒฐ๊ณผ ํ–‰์„ ์ˆ˜์ •ํ•ด์•ผ ํ•˜๋ฏ€๋กœ ์‹ฑํฌ๋Š” ํ‚ค๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ–‰๋ฅผ ์ฐพ์•„ ์ˆ˜์ •ํ•ด์•ผ ํ•œ๋‹ค. ์ด๋ฅผ Upsert ๋ผ๊ณ  ํ•œ๋‹ค. ๋”ฐ๋ผ์„œ ํ‚ค ๊ธฐ๋ฐ˜ ์ˆ˜์ •์ด ๊ฐ€๋Šฅํ•œ ์‹œ์Šคํ…œ์— ์ ํ•ฉํ•˜๋‹ค.

Append ๋ชจ๋“œ๋Š” ํ™•์ •๋˜์–ด ๋‹ค์‹œ๋Š” ๋ณ€๊ฒฝ๋˜์ง€ ์•Š์„ ํ–‰๋งŒ ์ถœ๋ ฅํ•œ๋‹ค. ์›Œํ„ฐ๋งˆํฌ๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ ์ผ๋ฐ˜์ ์ธ ์ง‘๊ณ„ ์—ฐ์‚ฐ์€ ๊ณ„์‚ฐ์ด ๊ณ„์† ๋ฐ”๋€” ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ Append ๋ชจ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋‹ค. Append ๋ชจ๋“œ์—์„œ ์›Œํ„ฐ๋งˆํฌ๋Š” ํŠน์ • ์œˆ๋„์šฐ๋ฅผ ์ตœ์ข… ํ™•์ •์‹œํ‚ค๋Š” ํšจ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค. ์›๋„์šฐ์˜ ์ข…๋ฃŒ ์‹œ๊ฐ„์ด ์›Œํ„ฐ๋งˆํฌ๋ฅผ ์ง€๋‚˜๋ฉด ํ•ด๋‹น ์œˆ๋„์šฐ์˜ ๊ฒฐ๊ณผ๋Š” ๋” ์ด์ƒ ๋ณ€๊ฒฝ๋˜์ง€ ์•Š์„ ๊ฒƒ์ด๋ผ๊ณ  ๋ณด์žฅํ•  ์ˆ˜ ์žˆ๋‹ค. ๋‹จ, ์›๋„์šฐ์˜ ๊ฒฐ๊ณผ๋Š” ์›Œํ„ฐ๋งˆํฌ์— ์˜ํ•ด ๋งŒ๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.


1
2
3
4
5
window_duration = "5 minutes"
window_agg_df = tf_df \
    .withWatermark("create_date", "10 minutes") \
    .groupBy(window(col("create_date"), window_duration)) \
    .sum("amount").withColumnRenamed("sum(amount)", "total_amount")

withWatermark ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์›Œํ„ฐ๋งˆํฌ๋ฅผ ์ •์˜ํ•œ๋‹ค. ์ฒซ ๋ฒˆ์งธ ํŒŒ๋ผ๋ฏธํ„ฐ์— ์›Œํ„ฐ๋งˆํฌ ๊ณ„์‚ฐ ์‹œ ์‚ฌ์šฉํ•  ๊ธฐ์ค€ ์‹œ๊ฐ„ ์ปฌ๋Ÿผ, ๋‘ ๋ฒˆ์งธ ํŒŒ๋ผ๋ฏธํ„ฐ์— ์ง€์—ฐ ์ž„๊ณ„๊ฐ’์„ ์„ค์ •ํ•œ๋‹ค.

๐Ÿ“Œ Streaming - Static Join w/ Cassandra

Cassandra

Cassandra ๋Š” ์ˆ˜๋งŽ์€ ๋ฐ์ดํ„ฐ๋ฅผ ์ง€์—ฐ ์—†์ด ์ €์žฅํ•˜๊ณ , ์‚ฌ์šฉ์ž ์ˆ˜๊ฐ€ ๋Š˜์–ด๋‚  ๋•Œ๋งˆ๋‹ค ์„œ๋ฒ„๋ฅผ ์œ ์—ฐํ•˜๊ฒŒ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๊ณ , ํŠน์ • ์„œ๋ฒ„์— ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•ด๋„ ์„œ๋น„์Šค๊ฐ€ ์ค‘๋‹จ๋˜์ง€ ์•Š๋„๋ก ํ•˜๋Š” ๋ชฉํ‘œ๋ฅผ ๊ฐ€์ง€๊ณ  ๋“ฑ์žฅํ•œ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์ด๋‹ค.

์นด์‚ฐ๋“œ๋ผ ํด๋Ÿฌ์Šคํ„ฐ์—๋Š” ํŠน๋ณ„ํ•œ ์—ญํ• ์„ ํ•˜๋Š” ๋งˆ์Šคํ„ฐ ๋…ธ๋“œ๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š๊ณ , ๋ชจ๋“  ๋…ธ๋“œ๊ฐ€ ๋™๋“ฑํ•œ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ๋…ธ๋“œ๋“ค์€ Gossip ์ด๋ผ๋Š” ํ”„๋กœํ† ์ฝœ์„ ํ†ตํ•ด ์„œ๋กœ์˜ ์ƒํƒœ๋ฅผ ์ง€์†์ ์œผ๋กœ ๊ตํ™˜ํ•œ๋‹ค. ๋งˆ์Šคํ„ฐ ๋…ธ๋“œ๊ฐ€ ์—†์œผ๋ฏ€๋กœ ์–ด๋–ค ๋…ธ๋“œ๊ฐ€ ๋‹ค์šด๋˜์–ด๋„ ์ „์ฒด ํด๋Ÿฌ์Šคํ„ฐ๋Š” ๋ฉˆ์ถ”์ง€ ์•Š๋Š”๋‹ค.

๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ์นด์‚ฐ๋“œ๋ผ๋Š” ํ•ด๋‹น ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•  ๋…ธ๋“œ๋“ค์„ ๊ฒฐ์ •ํ•˜๊ณ  ํ•ด๋‹น ๋…ธ๋“œ๋“ค๊ณผ ๋…ธ๋“œ์˜ ๋ณต์ œ๋ณธ์—๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•œ๋‹ค. ๊ณ ๊ฐ€์šฉ์„ฑ์„ ๋†’์ด๊ธฐ ์œ„ํ•ด ์ฝ๊ธฐ ๋ฐ ์“ฐ๊ธฐ ์š”์ฒญ ์‹œ ๋ช‡ ๊ฐœ์˜ ๋…ธ๋“œ๋กœ๋ถ€ํ„ฐ ์‘๋‹ต์„ ๋ฐ›์•„์•ผ ์š”์ฒญ์ด ์„ฑ๊ณต๋˜์—ˆ๋‹ค๊ณ  ๊ฐ„์ฃผํ•œ๋‹ค.

์นด์‚ฐ๋“œ๋ผ๋Š” ์ˆ˜ํ‰์  ํ™•์žฅ์— ์ตœ์ ํ™”๋˜์–ด ์žˆ๋‹ค. ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„์‚ฐ์‹œํ‚ค๊ธฐ ์œ„ํ•ด Hash Ring ์ด๋ผ๋Š” ๊ฐ€์ƒ์˜ ๋ง์„ ์‚ฌ์šฉํ•œ๋‹ค. ๊ฐ ๋…ธ๋“œ๋Š” ๋ง์˜ ํŠน์ • ๊ตฌ๊ฐ„์„ ๋‹ด๋‹นํ•˜๋ฉฐ, ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•  ๋•Œ ๋ฐ์ดํ„ฐ์˜ PK๋ฅผ ํ•ด์‹ฑํ•˜์—ฌ ๋ง ์œ„์˜ ํŠน์ • ์œ„์น˜ ๊ฐ’์„ ์–ป๊ณ , ๋ฐ์ดํ„ฐ๋Š” ํ•ด๋‹น ์œ„์น˜๊ฐ€ ์†ํ•œ ๊ตฌ๊ฐ„์„ ๋‹ด๋‹นํ•˜๋Š” ๋…ธ๋“œ์— ์ €์žฅ๋œ๋‹ค. ์ด ๊ณผ์ •์€ ์„œ๋น„์Šค ์ค‘๋‹จ ์—†์ด ์ด๋ฃจ์–ด์ง„๋‹ค.

๋˜ํ•œ ์กฐ์ธ ์—ฐ์‚ฐ์„ ์ง€์›ํ•˜์ง€ ์•Š๋Š”๋‹ค. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์–ด๋–ป๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•  ๊ฒƒ์ธ์ง€ ์„ค๊ณ„ํ•˜๊ณ , ์˜๋„์ ์œผ๋กœ ๋น„์ •๊ทœํ™”ํ•˜์—ฌ ์กฐํšŒ ์ฟผ๋ฆฌ์— ์ตœ์ ํ™”๋œ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ ๋‹ค.

Cassandra Query Language, CQL ์ด๋ผ๋Š” ์ฟผ๋ฆฌ ์–ธ์–ด๋ฅผ ์‚ฌ์šฉํ•˜๋Š”๋ฐ, SQL๊ณผ ์œ ์‚ฌํ•œ ๋ฌธ๋ฒ•์„ ์‚ฌ์šฉํ•œ๋‹ค. ๊ฐ€์žฅ ํฐ ์ฐจ์ด์ ์€ WHERE ์ ˆ์˜ ์‚ฌ์šฉ์ด ์ œํ•œ์ ์ด๋ผ๋Š” ์ ์ด๋‹ค. ์„ฑ๋Šฅ์„ ๋ณด์žฅํ•˜๊ธฐ ์œ„ํ•ด PK์— ํฌํ•จ๋œ ์ปฌ๋Ÿผ์„ ์กฐ๊ฑด์œผ๋กœ๋งŒ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋‹ค.


1
2
3
4
5
6
7
8
9
10
11
12
spark = SparkSession \
    .builder \
    .appName("WaterMark") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.shuffle.partitions", "3") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.auth.username", "cassandra") \
    .config("spark.cassandra.auth.password", "cassandra") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.sql.catalog.lh", "com.datastax.spark.connector.datasource.CassandraCatalog") \
    .getOrCreate()

spark-cassandra-connector ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ŠคํŒŒํฌ์™€ ์นด์‚ฐ๋“œ๋ผ ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์—ฐ๊ฒฐํ•œ๋‹ค.

1
2
3
4
join_df = event_df.join(
            user_df,
            event_df.login_id == user_df.login_id,
            "inner").drop(user_df.login_id)

์ŠคํŒŒํฌ๋Š” ์นดํ”„์นด๋กœ๋ถ€ํ„ฐ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ๋‹จ์œ„๋กœ ์ด๋ฒคํŠธ๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค. ๊ฐ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜์— ๋Œ€ํ•ด ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œํ•ด ์ค€ ์ •์  ๋ฐ์ดํ„ฐ์™€ ์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ์ตœ์ข… ๊ฒฐ๊ณผ๋Š” ์ •์  ๋ฐ์ดํ„ฐ๊ฐ€ ํฌํ•จ๋œ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ด ๋œ๋‹ค. ์ด์ „ ์ƒํƒœ๋ฅผ ๊ธฐ์–ตํ•  ํ•„์š”๊ฐ€ ์—†์œผ๋ฏ€๋กœ Stateless transformation์ด๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def cassandraWriter(batch_df, batch_id):
    batch_df \
        .write \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", "test_db") \
        .option("table", "users") \
        .mode("append") \
        .save()
    
query = output_df \
        .writeStream \
        .foreachBatch(cassandraWriter) \
        .outputMode("update") \
        .trigger(processingTime="5 seconds") \
        .start()

foreachBatch ๋Š” ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ๋‹จ์œ„๋กœ ์ฒ˜๋ฆฌํ•  ๋•Œ ๊ฐ ๋ฐฐ์น˜๋ฅผ ์ผ๋ฐ˜์ ์ธ ์ •์  ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ฒ˜๋Ÿผ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๊ฒŒ ํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

๐Ÿ“Œ Streaming - Streaming Join

1
2
3
4
5
join_df = impressions_df.join(
    clicks_df,
    impressions_df.uuid == clicks_df.uuid,
    "inner"
)

์ŠคํŒŒํฌ๋Š” ์–‘์ชฝ ์ŠคํŠธ๋ฆผ ๊ฐ๊ฐ์— ๋Œ€ํ•ด ์ƒํƒœ ์ €์žฅ์†Œ๋ฅผ ์œ ์ง€ํ•œ๋‹ค. ์กฐ์ธ ์กฐ๊ฑด์ด ์ผ์น˜ํ•˜๋ฉด ๋‘ ๋ฐ์ดํ„ฐ๋ฅผ ํ•ฉ์นœ ์ƒˆ๋กœ์šด ํ–‰์„ ๋งŒ๋“ค์–ด ๊ฒฐ๊ณผ ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋‚ด๋ณด๋‚ธ๋‹ค.

inner join์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ์กฐ์ธ ์กฐ๊ฑด์ด ์ผ์น˜ํ•˜๋Š” ์ˆœ๊ฐ„ ๊ฒฐ๊ณผ๊ฐ€ ์ƒ์„ฑ๋˜์ง€๋งŒ, outer join์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ์–ธ์ œ๊นŒ์ง€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๊ณ , ํ•ด๋‹น ํ•„๋“œ๋ฅผ null๋กœ ์„ค์ •ํ• ์ง€์— ๋Œ€ํ•œ ๊ธฐ์ค€์ด ์žˆ์–ด์•ผ ํ•œ๋‹ค.

1
2
3
4
5
6
7
8
9
10
impressions_df = impression_events.select(...) \
    .withColumnRenamed("create_date", "impr_date") \
    .withWatermark("impr_date", "5 minutes") \
    .withColumnRenamed("uuid", "impr_uuid")

clicks_df = click_events.select(...) \
    .withColumnRenamed("create_date", "click_date") \
    .withWatermark("click_date", "5 minutes") \
    .withColumnRenamed("uuid", "click_uuid")

์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์–‘์ชฝ ์ŠคํŠธ๋ฆผ์— ์›Œํ„ฐ๋งˆํฌ๋ฅผ ์„ค์ •ํ•œ๋‹ค.

1
2
3
4
5
6
7
8
9
join_df = impressions_df.join(
    clicks_df,
    expr("""
        impr_uuid = click_uuid AND
        click_date >= impr_date AND
        click_date <= impr_date + interval 5 minutes
        """),
    "leftOuter"
)

๊ทธ๋ฆฌ๊ณ  expr ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋ช…์‹œ์ ์œผ๋กœ ์กฐ์ธ ์กฐ๊ฑด์„ ์„ธ๋ถ€์ ์œผ๋กœ ์„ค์ •ํ•œ๋‹ค.

์ •๋ฆฌํ•˜๋ฉด, ์›Œํ„ฐ๋งˆํฌ๋ฅผ ํ†ตํ•ด ์–ธ์ œ ์ƒํƒœ๋ฅผ ์ •๋ฆฌํ•ด์•ผ ํ• ์ง€ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, expr ๋ฅผ ํ†ตํ•œ ์„ธ๋ถ€ ์กฐ๊ฑด์„ ํ†ตํ•ด ์–ธ์ œ๊นŒ์ง€์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐ์ธํ• ์ง€ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

๐Ÿ“Œ ์ฐธ๊ณ 

https://www.databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

This post is licensed under CC BY 4.0 by the author.