Post

[Spark] Spark DataFrame๊ณผ SparkSQL

[Spark] Spark DataFrame๊ณผ SparkSQL

๐Ÿ“Œ Spark DataFrame

Spark DataFrame ์€ RDD์˜ ๊ฐœ๋…์„ ํ™•์žฅํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ด๋ฆ„๊ณผ ํƒ€์ž…์„ ๊ฐ€์ง„ ์—ด๋กœ ๊ตฌ์„ฑ๋œ 2์ฐจ์› ํ…Œ์ด๋ธ”๋กœ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค. ๋ฐ์ดํ„ฐ์˜ ์Šคํ‚ค๋งˆ๋ฅผ ๋‚ด๋ถ€์ ์œผ๋กœ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค.

ํŠน์ง•

  • ๋ฐ์ดํ„ฐ๋Š” ์ด๋ฆ„๊ณผ ํƒ€์ž…์ด ์ •์˜๋œ ์—ด๋กœ ๊ตฌ์„ฑ๋˜์—ˆ๊ธฐ ๋–„๋ฌธ์—, ์—ด์˜ ์ด๋ฆ„์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์Šคํ‚ค๋งˆ ์ •๋ณด๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์œผ๋ฏ€๋กœ ์กด์žฌํ•˜์ง€ ์•Š๋Š” ์—ด์˜ ์ด๋ฆ„์„ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜ ์ ์ ˆํ•˜์ง€ ์•Š์€ ์—ฐ์‚ฐ์„ ์‹œ๋„ํ•˜๋ฉด ์ปดํŒŒ์ผ ๋‹จ๊ณ„์—์„œ ์˜ค๋ฅ˜๋ฅผ ๊ฒ€์ถœํ•  ์ˆ˜ ์žˆ๋‹ค.
  • lazy evaluation ์„ ์ง€์›ํ•œ๋‹ค. action ์—ฐ์‚ฐ์ด ์ˆ˜ํ–‰๋˜๊ธฐ ์ „ transformation ์—ฐ์‚ฐ์€ ์ˆ˜ํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค. action ์—ฐ์‚ฐ์ด ํ˜ธ์ถœ๋˜๋Š” ์ˆœ๊ฐ„ ์‹คํ–‰ plan์„ ์‚ดํŽด๋ณด๊ณ  catalyst optimizer ๋ฅผ ํ†ตํ•ด ํšจ์œจ์ ์ธ ์‹คํ–‰ ์ˆœ์„œ ๋ฐ ๋ฐฉ๋ฒ•์„ ๊ฒฐ์ •ํ•œ ํ›„ ํด๋Ÿฌ์Šคํ„ฐ์— ์ž‘์—…์„ ๋ถ„๋ฐฐํ•˜์—ฌ ์‹คํ–‰ํ•œ๋‹ค.
  • ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋‰˜์–ด ์—ฌ๋Ÿฌ ๋…ธ๋“œ์— ๋ถ„์‚ฐ ์ €์žฅ๋œ๋‹ค. ๋”ฐ๋ผ์„œ ๋ณ‘๋ ฌ ์—ฐ์‚ฐ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
  • ์ผ๋ถ€ ํŒŒํ‹ฐ์…˜์ด ์œ ์‹ค๋˜์–ด๋„ ๊ธฐ๋ก๋œ ์‹คํ–‰ ๊ณ„ํš(lineage)๋ฅผ ํ†ตํ•ด ์ž๋™์œผ๋กœ ํ•ด๋‹น ํŒŒํ‹ฐ์…˜์„ ๋ณต๊ตฌํ•  ์ˆ˜ ์žˆ๋‹ค.

RDD vs. DataFrame

๊ตฌ๋ถ„RDDDataFrame
๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๋น„๊ตฌ์กฐํ™” ๋ฐ์ดํ„ฐ(๋ฐ˜)๊ตฌ์กฐํ™” ๋ฐ์ดํ„ฐ
์ถ”์ƒํ™” ์ˆ˜์ค€์ €์ˆ˜์ค€๊ณ ์ˆ˜์ค€
์‚ฌ์šฉ ๋‚œ์ด๋„์กฐ๊ธˆ ์—ฌ๋Ÿฌ์›€์‰ฌ์›€
์„ฑ๋Šฅ๊ฐœ๋ฐœ์ž๊ฐ€ ์ง์ ‘ ์ตœ์ ํ™” ๋กœ์ง ์ž‘์„ฑcatalyst optimizer๋ฅผ ํ†ตํ•ด ์ตœ์ ํ™” ๊ฐ€๋Šฅ
์Šคํ‚ค๋งˆ์—†์Œ์žˆ์Œ

Spark DataFrame vs. Pandas DataFrame

๊ตฌ๋ถ„Spark DataFramePandas DataFrame
์•„ํ‚คํ…์ฒ˜๋ถ„์‚ฐ ์ปดํ“จํŒ…๋‹จ์ผ ๋…ธ๋“œ ์ธ๋ฉ”๋ชจ๋ฆฌ
ํ™•์žฅ์„ฑ์ˆ˜ํ‰์  ํ™•์žฅ์ˆ˜์ง์  ํ™•์žฅ
์‹คํ–‰ ๋ฐฉ์‹์ง€์—ฐ ์—ฐ์‚ฐ์ฆ‰์‹œ ์—ฐ์‚ฐ
๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ๊ฐ€๋Šฅ ์—ฌ๋ถ€๋ถˆ๋ณ€์„ฑ๊ฐ€๋ณ€์„ฑ

๐Ÿ“Œ SparkSQL

SparkSQL ์€ DataFrame API์™€ ํ†ตํ•ฉ๋œ ํ•˜๋‚˜์˜ ํŒจํ‚ค์ง€๋กœ, SQL ์ฟผ๋ฆฌ์™€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์—ฐ์‚ฐ์„ ์ž์œ ๋กญ๊ฒŒ ์˜ค๊ฐ€๋ฉฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. SQL ์ฟผ๋ฆฌ๋ฅผ ์ง์ ‘ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๊ณ , ๊ฒฐ๊ณผ ๋˜ํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

SQL ์ฟผ๋ฆฌ๋กœ ๋ฌด์—‡์„ ํ• ์ง€ ์ •์˜ํ•˜๋ฉด catalyst optimizer ๊ฐ€ ์ฟผ๋ฆฌ๋ฅผ ๋ถ„์„ํ•˜์—ฌ ํšจ์œจ์ ์ธ plan์„ ์ž๋™์œผ๋กœ ์ˆ˜๋ฆฝํ•œ๋‹ค. JSON, Parquet ๋“ฑ๊ณผ ๊ฐ™์€ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ์œผ๋ฉฐ, BI ๋„๊ตฌ์˜ ๋ฐ์ดํ„ฐ ์†Œ์Šค๋กœ๋„ ํ™œ์šฉ๋  ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ Hive ์—์„œ ์‚ฌ์šฉํ•˜๋˜ ์ฟผ๋ฆฌ(HiveQL)๋„ ์ง€์›ํ•œ๋‹ค.

BI ๋„๊ตฌ๋ž€ ๋ฐ์ดํ„ฐ์—์„œ ์˜๋ฏธ ์žˆ๋Š” ์ •๋ณด๋‚˜ ์ธ์‚ฌ์ดํŠธ๋ฅผ ์ฐพ๋„๋ก ๋•๋Š” ์†Œํ”„ํŠธ์›จ์–ด์ด๋‹ค. ๋Œ€ํ‘œ์ ์œผ๋กœ Tableau์™€ Power BI๊ฐ€ ์žˆ๋‹ค.

๐Ÿ“Œ ๋ฉ”์„œ๋“œ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark.sql import (
    Row,
    SparkSession)
from pyspark.sql.functions import col, asc, desc

def parse_line(line: str):
    fields = line.split('|') # |
    return Row(
        name=str(fields[0]),
        country=str(fields[1]),
        email=str(fields[2]),
        compensation=int(fields[3]))

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
lines = spark.sparkContext.textFile("file:///home/jovyan/work/sample/income.txt")
income_data = lines.map(parse_line)

Row ๊ฐ์ฒด๋Š” SparkSQL์—์„œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํ•œ ํ–‰์„ ํ‘œํ˜„ํ•˜๋Š” ๊ฐ์ฒด์ด๋‹ค. ํ•„๋“œ ์ด๋ฆ„๊ณผ ์ธ๋ฑ์Šค๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค. Row ๊ฐ์ฒด๋กœ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•˜๋ฉด ํ•„๋“œ ์ด๋ฆ„๊ณผ ํƒ€์ž…์„ ์ž๋™์œผ๋กœ ๋ถ„์„ํ•˜์—ฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์Šคํ‚ค๋งˆ๋ฅผ ์ž๋™์œผ๋กœ ์ถ”๋ก ํ•œ๋‹ค.

1
schema_income = spark.createDataFrame(data=income_data).cache()

createDataFrame ์€ ๋ฐ์ดํ„ฐ๋กœ๋ถ€ํ„ฐ ์ŠคํŒŒํฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. data ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ RDD, ๋ฆฌ์ŠคํŠธ์™€ ๊ฐ™์€ ์ปฌ๋ ‰์…˜์„ ๋ฐ›๋Š”๋‹ค. ์ž๋™์œผ๋กœ ๋ฐ์ดํ„ฐ์˜ ๊ตฌ์กฐ๋ฅผ ๋ถ„์„ํ•˜์—ฌ ์Šคํ‚ค๋งˆ๋ฅผ ์ž๋™์œผ๋กœ ์ถ”๋ก ํ•œ๋‹ค.

cache ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ฒ˜์Œ ๊ณ„์‚ฐํ•œ ๋’ค ๊ฒฐ๊ณผ๋ฅผ ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์—ฐ์‚ฐ ์†๋„๋ฅผ ๋†’์ด๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•œ๋‹ค.

1
schema_income.createOrReplaceTempView("income")

createOrReplaceTempView ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ SQL๋กœ ์ฟผ๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” view๋กœ ๋“ฑ๋กํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ์ฆ‰, ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๊ฐ์ฒด๋ฅผ DB ํ…Œ์ด๋ธ”์ฒ˜๋Ÿผ ์ทจ๊ธ‰ํ•˜์—ฌ SQL ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•˜๊ฒŒ ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋™์ผํ•œ ์ด๋ฆ„์˜ ๋ทฐ๊ฐ€ ์กด์žฌํ•˜๋ฉด ๊ธฐ์กด ๋ทฐ๋ฅผ ์‚ญ์ œํ•˜๊ณ  ๋ฎ์–ด์“ด๋‹ค. ์ƒ์„ฑ๋œ ๋ทฐ๋Š” ์„ธ์…˜ ๋ฒ”์œ„์—์„œ๋งŒ ์œ ํšจํ•˜๋ฉฐ ์ŠคํŒŒํฌ ์„ธ์…˜์ด ์ข…๋ฃŒ๋˜๋ฉด ์ž๋™์œผ๋กœ ์‚ฌ๋ผ์ง„๋‹ค.

1
2
3
medium_income_df = spark.sql(
    "SELECT * FROM income WHERE compensation >= 70000 AND compensation <= 100000")
medium_income_df.show()

sql ์€ ์ŠคํŒŒํฌ ์„ธ์…˜ ๋‚ด์—์„œ SQL ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ๋ฌธ์ž์—ด ํ˜•ํƒœ์˜ SQL ์ฟผ๋ฆฌ๋ฅผ ์ธ์ž๋กœ ๋ฐ›๋Š”๋‹ค. sql ๋ฉ”์„œ๋“œ ์ž์ฒด๋Š” transformation ์—ฐ์‚ฐ์ด๋ผ, ์‹ค์ œ action ์—ฐ์‚ฐ์ด ํ˜ธ์ถœ๋˜๊ธฐ ์ „๊นŒ์ง€ ์‹คํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค.

show ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋‚ด์šฉ์„ ์ถœ๋ ฅํ•˜๋Š” action ๋ฉ”์„œ๋“œ์ด๋‹ค.

1
2
for income_data in medium_income_df.collect():
    print(income_data)

collect ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ Row ๊ฐ์ฒด์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค.

1
data.printSchema()

printSchema ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์Šคํ‚ค๋งˆ๋ฅผ ํŠธ๋ฆฌ ํ˜•ํƒœ๋กœ ๋ณด์—ฌ์ฃผ๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ์—ด์˜ ์ด๋ฆ„๊ณผ ๋ฐ์ดํ„ฐ ํƒ€์ž…, ๋„ ํ—ˆ์šฉ ์—ฌ๋ถ€๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

1
data.select("name", "age").show()

select ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํŠน์ • ์—ด์„ ์„ ํƒํ•˜์—ฌ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•˜๋Š” transformation ๋ฉ”์„œ๋“œ์ด๋‹ค.

1
data.filter(data.age > 20).show()

filter ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์—์„œ ํŠน์ • ์กฐ๊ฑด์„ ๋งŒ์กฑํ•˜๋Š” Row ๋งŒ ์„ ํƒํ•˜์—ฌ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•˜๋Š” transformation ์—ฐ์‚ฐ์ด๋‹ค. SQL์˜ WHERE ์ ˆ๊ณผ ๋™์ผํ•œ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๋ฉฐ, where ๋ฉ”์„œ๋“œ์™€ filter ๋ฉ”์„œ๋“œ๋Š” ์ด๋ฆ„๋งŒ ๋‹ค๋ฅผ ๋ฟ ๊ธฐ๋Šฅ์ ์œผ๋กœ ์ฐจ์ด๊ฐ€ ์—†๋‹ค.

1
2
3
4
5
6
7
df = spark.createDataFrame([
        Row(a=1,
            intlist=[1,2,3],
            mapfield={"a": "b"}
           )])

df.select(functions.explode(df.intlist).alias("anInt")).collect()

explode ๋Š” ๋ฐฐ์—ด์ด๋‚˜ ๋งต ํ˜•ํƒœ์˜ ์—ด์„ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํ–‰์œผ๋กœ ํŽผ์น˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

์œ„ ์˜ˆ์ œ์—์„œ explode ๋ฉ”์„œ๋“œ๋Š” ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ์ธ intlist ์˜ ๊ฐ ์š”์†Œ 1, 2, 3์— ๋Œ€ํ•ด ๊ฐ๊ฐ ์ƒˆ๋กœ์šด ํ–‰์„ ์ƒ์„ฑํ•˜์—ฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๊ตฌ์„ฑํ•œ๋‹ค.

1
2
3
df = spark.createDataFrame([
        Row(word="hello world and pyspark")])
df.select(functions.split(df.word, ' ').alias("word")).collect()

split ๋ฉ”์„œ๋“œ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋ฌธ์ž์—ด ํƒ€์ž… ์—ด์„ ๊ตฌ๋ถ„์ž๋กœ ๋‚˜๋ˆ„์–ด ๋ฐฐ์—ด ํƒ€์ž…์˜ ์ƒˆ๋กœ์šด ์—ด๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ํŒŒ์ด์ฌ ๋‚ด์žฅ ํ•จ์ˆ˜์ธ split ๊ณผ ์œ ์‚ฌํ•˜๋‹ค.

1
2
3
4
table_schema = t.StructType([
    t.StructField("country", t.StringType(), True),
    t.StructField("temperature", t.FloatType(), True),
    t.StructField("observed_date", t.StringType(), True)])

StructType ์€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์Šคํ‚ค๋งˆ๋ฅผ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ ์ •์˜ํ•˜๊ธฐ ์œ„ํ•œ ๊ฐ์ฒด์ด๋‹ค. StructType ์€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ StructField ๊ฐ์ฒด๋กœ ๊ตฌ์„ฑ๋œ ๋ฆฌ์ŠคํŠธ์ด๋‹ค.

StructField ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋‹จ์ผ ์—ด์— ๋Œ€ํ•œ ๋ชจ๋“  ์ •๋ณด๋ฅผ ๋‹ด๊ณ  ์žˆ์œผ๋ฉฐ, name, dataType, nullable ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค.

1
2
3
4
5
f_temperature = data.withColumn(
                    "temperature",
                    (f.col("temperature") * 9 / 5) + 32)\
                .select("country", "temperature")
f_temperature.show()

withColumn ์€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์ƒˆ๋กœ์šด ์—ด์„ ์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ์ˆ˜์ •ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ์ฒซ ๋ฒˆ์งธ ์ธ์ž๋Š” ์ƒˆ๋กœ ๋งŒ๋“ค๊ฑฐ๋‚˜ ์ˆ˜์ •ํ•  ์—ด์˜ ์ด๋ฆ„, ๋‘ ๋ฒˆ์งธ ์ธ์ž๋Š” ํ•ด๋‹น ์—ด์— ๋“ค์–ด๊ฐˆ ๊ฐ’์„ ์ง€์ •ํ•œ๋‹ค.

1
2
3
4
5
6
7
8
meta = {
    "1100": "engineer",
    "2030": "developer",
    "3801": "painter",
    "3021": "chemistry teacher",
    "9382": "priest"
}
occupation_dict = spark.sparkContext.broadcast(meta)

broadcast ๋ฉ”์„œ๋“œ๋Š” ๋”•์…”๋„ˆ๋ฆฌ์™€ ๋ฆฌ์ŠคํŠธ๊ฐ™์€ ํŒŒ์ด์ฌ ๋ณ€์ˆ˜๋ฅผ ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ณ€์ˆ˜๋ผ๋Š” ํ˜•ํƒœ๋กœ ๊ฐ์‹ผ๋‹ค.

๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ณ€์ˆ˜๋Š” ์ฝ๊ธฐ ์ „์šฉ์ด๋ฉฐ, ๋“œ๋ผ์ด๋ฒ„์—์„œ ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋ชจ๋“  executor ๋…ธ๋“œ๋กœ ๋ฐ์•„ํ„ฐ๋ฅผ ๋‹จ ํ•œ ๋ฒˆ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•  ์ˆ˜ ์žˆ๋‹ค.

๋งŒ์•ฝ ๋ธŒ๋กœ๋“œ์บ์ŠคํŒ…์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด ์ „์†กํ•  ๋ฐ์ดํ„ฐ๋ฅผ ๋„คํŠธ์›Œํฌ๋ฅผ ํ†ตํ•ด ์—ฌ๋Ÿฌ ๋…ธ๋“œ์— ๋ฐ˜๋ณต์ ์œผ๋กœ ์ „์†กํ•ด์•ผ ํ•˜๋ฉฐ, ์ด๋Š” ํฐ ์˜ค๋ฒ„ํ—ค๋“œ์˜ ์›์ธ์ด ๋œ๋‹ค.

๊ทธ๋Ÿฌ๋‚˜ ๋ธŒ๋กœ๋“œ์บ์ŠคํŒ…์„ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ ๋ฒˆ์— ๋ชจ๋“  executor์—๊ฒŒ ์ „์†กํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๊ฐ๊ฐ์˜ executor๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ž์‹ ์˜ ๋ฉ”๋ชจ๋ฆฌ์— ์บ์‹œํ•œ๋‹ค.

์ฆ‰, broadcast ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋น„์‹ผ ์…”ํ”Œ๋ง ์—ฐ์‚ฐ์„ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค.

1
occupation_lookup_udf = f.udf(get_occupation_name)

udf ๋Š” ์ผ๋ฐ˜ ํŒŒ์ด์ฌ ํ•จ์ˆ˜๋ฅผ ์ŠคํŒŒํฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก wrappingํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

๋‹ค๋งŒ ์ด๋ ‡๊ฒŒ ๋ณ€ํ™˜๋œ UDF๋Š” ์ŠคํŒŒํฌ ๋‚ด์žฅ ํ•จ์ˆ˜์— ๋น„ํ•ด ๋А๋ฆฌ๊ฒŒ ๋™์ž‘ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ŠคํŒŒํฌ์˜ ๋‚ด์žฅ ํ•จ์ˆ˜๋ฅผ ์šฐ์„ ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

์ŠคํŒŒํฌ์˜ JVM ํ™˜๊ฒฝ๊ณผ ํŒŒ์ด์ฌ ํ”„๋กœ์„ธ์Šค ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š” (์—ญ)์ง๋ ฌํ™” ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ๊ฐ ํ–‰๋งˆ๋‹ค ๋ฐœ์ƒํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

1
2
3
4
5
data = df.groupBy("hero1")\
            .agg(
                f.collect_set("hero2").alias("connection"))\
            .withColumnRenamed("hero1", "hero")
data.show()

collect_set ์€ ๊ทธ๋ฃนํ•‘ ํ›„ ๊ฐ ๊ทธ๋ฃน์— ์†ํ•œ ํŠน์ • ์—ด์˜ ๊ฐ’๋“ค์„ ๋ชจ์•„ ์ค‘๋ณต์„ ์ œ๊ฑฐํ•œ ํ•˜๋‚˜์˜ ๋ฐฐ์—ด๋กœ ๋งŒ๋“œ๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

collect_list ๋Š” ์ค‘๋ณต์„ ํ—ˆ์šฉํ•˜์—ฌ ๋ฐฐ์—ด์„ ๋งŒ๋“ ๋‹ค.

1
2
data = data.withColumn("connection", f.concat_ws(",", f.col("connection")))
data.show()

concat_ws ๋Š” ํŠน์ • ๊ตฌ๋ถ„์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์—ฌ๋Ÿฌ ๋ฌธ์ž์—ด์ด๋‚˜ ๋ฐฐ์—ด์˜ ์›์†Œ๋“ค์„ ํ•˜๋‚˜๋กœ ํ•ฉ์ณ ๋‹จ์ผ ๋ฌธ์ž์—ด๋กœ ๋งŒ๋“œ๋Š” ํ•จ์ˆ˜์ด๋‹ค. ์ฒซ ๋ฒˆ์งธ ์ธ์ž๋Š” ๊ตฌ๋ถ„์ž, ๋‘ ๋ฒˆ์งธ ์ธ์ž๋Š” ํ•ฉ์น  ๋Œ€์ƒ์ด ๋˜๋Š” ์—ด์ด๋‹ค.

1
data.coalesce(1).write.option("header", True).csv("output")

coalesce ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ค„์ด๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

repartition ์€ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ๋Š˜๋ฆฌ๊ฑฐ๋‚˜ ์ค„์ผ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. coalesce ๋ฉ”์„œ๋“œ๋Š” ์ „์ฒด ์…”ํ”Œ๋ง์„ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๋‹จ์ˆœํžˆ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ค„์ด๋Š” ๊ฒƒ์ด ๋ชฉ์ ์ด๋ผ๋ฉด coalesce ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ํšจ์œจ์ ์ด๋‹ค.

1
2
3
4
5
df.na.drop(how="any").show()
df.na.drop(thresh=2).show()
df.na.drop(subset=["salary"]).show()

df.printSchema()

df.na ๋Š” null์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๊ธฐ๋Šฅ์„ ๋ชจ์•„๋†“์€ ๊ฐ์ฒด์ด๋‹ค.

na.drop ์€ null ์ด ํฌํ•จ๋œ ํ–‰์„ ์ œ๊ฑฐํ•˜์—ฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ •์ œํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. how ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” ํ–‰์„ ์ œ๊ฑฐํ•  ๋•Œ ์‚ฌ์šฉํ•  ๊ธฐ์ค€์ด๋‹ค. any ๋กœ ์„ค์ •๋˜๋ฉด ํ–‰์— ํ•˜๋‚˜๋ผ๋„ null ์ด ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ, all ๋กœ ์„ค์ •๋˜๋ฉด ํ–‰์˜ ๋ชจ๋“  ์—ด์ด null ์ธ ๊ฒฝ์šฐ ํ–‰์„ ์ œ๊ฑฐํ•œ๋‹ค.

thresh ํŒŒ๋ผ๋ฏธํ„ฐ์—๋Š” ์œ ํšจํ•œ ๊ฐ’์˜ ์ตœ์†Œ ๊ฐœ์ˆ˜๋ฅผ ์ง€์ •ํ•œ๋‹ค. thresh=2 ๋Š” null ์ด ์•„๋‹Œ ๊ฐ’์ด ์ตœ์†Œ 2๊ฐœ ์ด์ƒ ์žˆ๋‹ค๋ฉด ํ–‰์„ ์œ ์ง€ํ•œ๋‹ค.

subset ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” null ์—ฌ๋ถ€๋ฅผ subset ์œผ๋กœ ์ง€์ •๋œ ์—ด๋งŒ ํ™•์ธํ•˜๋„๋ก ํ•œ๋‹ค.

1
df.na.fill(0).show()

na.fill ์€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๊ฒฐ์ธก๊ฐ’์„ ์‚ฌ์šฉ์ž๊ฐ€ ์ง€์ •ํ•œ ๊ฐ’์œผ๋กœ ๋Œ€์ฒดํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

1
2
3
df_user.join(df_salary, 
               df_user.id == df_salary.id, 
               "fullouter").show()

๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์—ญ์‹œ ์กฐ์ธ ์—ฐ์‚ฐ์ด ์กด์žฌํ•œ๋‹ค. join ๋ฉ”์„œ๋“œ๋Š” ๋‘ ๊ฐœ์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ํŠน์ • ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๊ฒฐํ•ฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•œ๋‹ค.

how ํŒŒ๋ผ๋ฏธํ„ฐ์— ์กฐ์ธ ์œ ํ˜•์„ ๋ช…์‹œํ•  ์ˆ˜ ์žˆ๋‹ค. ์กฐ์ธ ์œ ํ˜•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

how ํŒŒ๋ผ๋ฏธํ„ฐ์กฐ์ธ ๊ฒฐ๊ณผ์˜ ํ–‰
inner (default)์–‘์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ๋ชจ๋‘ ์ผ์น˜ํ•˜๋Š” ํ‚ค์˜ ํ–‰
leftouter์™ผ์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋ชจ๋“  ํ–‰
rightouter์˜ค๋ฅธ์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋ชจ๋“  ํ–‰
fullouter์–‘์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋ชจ๋“  ํ–‰
leftsemi์˜ค๋ฅธ์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์ผ์น˜ํ•˜๋Š” ํ‚ค๊ฐ€ ์žˆ๋Š” ์™ผ์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํ–‰
leftanti์˜ค๋ฅธ์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์ผ์น˜ํ•˜๋Š” ํ‚ค๊ฐ€ ์—†๋Š” ์™ผ์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํ–‰
cross์–‘์ชฝ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์นดํ…Œ์‹œ์•ˆ ๊ณฑ
This post is licensed under CC BY 4.0 by the author.