[Spark] Spark DataFrame๊ณผ SparkSQL
๐ Spark DataFrame
Spark DataFrame
์ RDD์ ๊ฐ๋
์ ํ์ฅํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ด๋ฆ๊ณผ ํ์
์ ๊ฐ์ง ์ด๋ก ๊ตฌ์ฑ๋ 2์ฐจ์ ํ
์ด๋ธ๋ก ๋ค๋ฃฐ ์ ์๋๋ก ํ๋ค. ๋ฐ์ดํฐ์ ์คํค๋ง๋ฅผ ๋ด๋ถ์ ์ผ๋ก ๊ฐ์ง๊ณ ์๋ค.
ํน์ง
- ๋ฐ์ดํฐ๋ ์ด๋ฆ๊ณผ ํ์ ์ด ์ ์๋ ์ด๋ก ๊ตฌ์ฑ๋์๊ธฐ ๋๋ฌธ์, ์ด์ ์ด๋ฆ์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ์ ์ ๊ทผํ ์ ์๋ค.
- ์คํค๋ง ์ ๋ณด๋ฅผ ๊ฐ์ง๊ณ ์์ผ๋ฏ๋ก ์กด์ฌํ์ง ์๋ ์ด์ ์ด๋ฆ์ ์ฌ์ฉํ๊ฑฐ๋ ์ ์ ํ์ง ์์ ์ฐ์ฐ์ ์๋ํ๋ฉด ์ปดํ์ผ ๋จ๊ณ์์ ์ค๋ฅ๋ฅผ ๊ฒ์ถํ ์ ์๋ค.
lazy evaluation
์ ์ง์ํ๋ค. action ์ฐ์ฐ์ด ์ํ๋๊ธฐ ์ transformation ์ฐ์ฐ์ ์ํ๋์ง ์๋๋ค. action ์ฐ์ฐ์ด ํธ์ถ๋๋ ์๊ฐ ์คํ plan์ ์ดํด๋ณด๊ณcatalyst optimizer
๋ฅผ ํตํด ํจ์จ์ ์ธ ์คํ ์์ ๋ฐ ๋ฐฉ๋ฒ์ ๊ฒฐ์ ํ ํ ํด๋ฌ์คํฐ์ ์์ ์ ๋ถ๋ฐฐํ์ฌ ์คํํ๋ค.- ์ฌ๋ฌ ๊ฐ์ ํํฐ์ ์ผ๋ก ๋๋์ด ์ฌ๋ฌ ๋ ธ๋์ ๋ถ์ฐ ์ ์ฅ๋๋ค. ๋ฐ๋ผ์ ๋ณ๋ ฌ ์ฐ์ฐ์ด ๊ฐ๋ฅํ๋ค.
- ์ผ๋ถ ํํฐ์ ์ด ์ ์ค๋์ด๋ ๊ธฐ๋ก๋ ์คํ ๊ณํ(lineage)๋ฅผ ํตํด ์๋์ผ๋ก ํด๋น ํํฐ์ ์ ๋ณต๊ตฌํ ์ ์๋ค.
RDD vs. DataFrame
๊ตฌ๋ถ | RDD | DataFrame |
---|---|---|
๋ฐ์ดํฐ ๊ตฌ์กฐ | ๋น๊ตฌ์กฐํ ๋ฐ์ดํฐ | (๋ฐ)๊ตฌ์กฐํ ๋ฐ์ดํฐ |
์ถ์ํ ์์ค | ์ ์์ค | ๊ณ ์์ค |
์ฌ์ฉ ๋์ด๋ | ์กฐ๊ธ ์ฌ๋ฌ์ | ์ฌ์ |
์ฑ๋ฅ | ๊ฐ๋ฐ์๊ฐ ์ง์ ์ต์ ํ ๋ก์ง ์์ฑ | catalyst optimizer๋ฅผ ํตํด ์ต์ ํ ๊ฐ๋ฅ |
์คํค๋ง | ์์ | ์์ |
Spark DataFrame vs. Pandas DataFrame
๊ตฌ๋ถ | Spark DataFrame | Pandas DataFrame |
---|---|---|
์ํคํ ์ฒ | ๋ถ์ฐ ์ปดํจํ | ๋จ์ผ ๋ ธ๋ ์ธ๋ฉ๋ชจ๋ฆฌ |
ํ์ฅ์ฑ | ์ํ์ ํ์ฅ | ์์ง์ ํ์ฅ |
์คํ ๋ฐฉ์ | ์ง์ฐ ์ฐ์ฐ | ์ฆ์ ์ฐ์ฐ |
๋ฐ์ดํฐ ๋ณ๊ฒฝ ๊ฐ๋ฅ ์ฌ๋ถ | ๋ถ๋ณ์ฑ | ๊ฐ๋ณ์ฑ |
๐ SparkSQL
SparkSQL
์ DataFrame API์ ํตํฉ๋ ํ๋์ ํจํค์ง๋ก, SQL ์ฟผ๋ฆฌ์ ๋ฐ์ดํฐํ๋ ์ ์ฐ์ฐ์ ์์ ๋กญ๊ฒ ์ค๊ฐ๋ฉฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค. SQL ์ฟผ๋ฆฌ๋ฅผ ์ง์ ๋ฐ์ดํฐํ๋ ์์ ์คํํ ์ ์๊ณ , ๊ฒฐ๊ณผ ๋ํ ๋ฐ์ดํฐํ๋ ์์ผ๋ก ๋ฐ์ ์ ์๋ค.
SQL ์ฟผ๋ฆฌ๋ก ๋ฌด์์ ํ ์ง ์ ์ํ๋ฉด catalyst optimizer
๊ฐ ์ฟผ๋ฆฌ๋ฅผ ๋ถ์ํ์ฌ ํจ์จ์ ์ธ plan์ ์๋์ผ๋ก ์๋ฆฝํ๋ค. JSON, Parquet ๋ฑ๊ณผ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ์ ์์ผ๋ฉฐ, BI ๋๊ตฌ์ ๋ฐ์ดํฐ ์์ค๋ก๋ ํ์ฉ๋ ์ ์๋ค. ๋ํ Hive
์์ ์ฌ์ฉํ๋ ์ฟผ๋ฆฌ(HiveQL
)๋ ์ง์ํ๋ค.
BI ๋๊ตฌ๋ ๋ฐ์ดํฐ์์ ์๋ฏธ ์๋ ์ ๋ณด๋ ์ธ์ฌ์ดํธ๋ฅผ ์ฐพ๋๋ก ๋๋ ์ํํธ์จ์ด์ด๋ค. ๋ํ์ ์ผ๋ก Tableau์ Power BI๊ฐ ์๋ค.
๐ ๋ฉ์๋
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark.sql import (
Row,
SparkSession)
from pyspark.sql.functions import col, asc, desc
def parse_line(line: str):
fields = line.split('|') # |
return Row(
name=str(fields[0]),
country=str(fields[1]),
email=str(fields[2]),
compensation=int(fields[3]))
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
lines = spark.sparkContext.textFile("file:///home/jovyan/work/sample/income.txt")
income_data = lines.map(parse_line)
Row
๊ฐ์ฒด๋ SparkSQL์์ ๋ฐ์ดํฐํ๋ ์์ ํ ํ์ ํํํ๋ ๊ฐ์ฒด์ด๋ค. ํ๋ ์ด๋ฆ๊ณผ ์ธ๋ฑ์ค๋ฅผ ํตํด ๋ฐ์ดํฐ์ ์ ๊ทผํ ์ ์๋ค. Row
๊ฐ์ฒด๋ก ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ฉด ํ๋ ์ด๋ฆ๊ณผ ํ์
์ ์๋์ผ๋ก ๋ถ์ํ์ฌ ๋ฐ์ดํฐํ๋ ์์ ์คํค๋ง๋ฅผ ์๋์ผ๋ก ์ถ๋ก ํ๋ค.
1
schema_income = spark.createDataFrame(data=income_data).cache()
createDataFrame
์ ๋ฐ์ดํฐ๋ก๋ถํฐ ์คํํฌ ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ ๋ฉ์๋์ด๋ค. data
ํ๋ผ๋ฏธํฐ๋ก RDD, ๋ฆฌ์คํธ์ ๊ฐ์ ์ปฌ๋ ์
์ ๋ฐ๋๋ค. ์๋์ผ๋ก ๋ฐ์ดํฐ์ ๊ตฌ์กฐ๋ฅผ ๋ถ์ํ์ฌ ์คํค๋ง๋ฅผ ์๋์ผ๋ก ์ถ๋ก ํ๋ค.
cache
๋ ๋ฐ์ดํฐํ๋ ์์ ์ฒ์ ๊ณ์ฐํ ๋ค ๊ฒฐ๊ณผ๋ฅผ ํด๋ฌ์คํฐ์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํ๋ ๋ฉ์๋์ด๋ค. ๋ฐ์ดํฐํ๋ ์์ ์ฐ์ฐ ์๋๋ฅผ ๋์ด๊ธฐ ์ํด ์ฌ์ฉํ๋ค.
1
schema_income.createOrReplaceTempView("income")
createOrReplaceTempView
๋ ๋ฐ์ดํฐํ๋ ์์ SQL๋ก ์ฟผ๋ฆฌํ ์ ์๋ view๋ก ๋ฑ๋กํ๋ ๋ฉ์๋์ด๋ค. ์ฆ, ๋ฐ์ดํฐํ๋ ์ ๊ฐ์ฒด๋ฅผ DB ํ
์ด๋ธ์ฒ๋ผ ์ทจ๊ธํ์ฌ SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ฒ ํ ์ ์๋ค. ๋์ผํ ์ด๋ฆ์ ๋ทฐ๊ฐ ์กด์ฌํ๋ฉด ๊ธฐ์กด ๋ทฐ๋ฅผ ์ญ์ ํ๊ณ ๋ฎ์ด์ด๋ค. ์์ฑ๋ ๋ทฐ๋ ์ธ์
๋ฒ์์์๋ง ์ ํจํ๋ฉฐ ์คํํฌ ์ธ์
์ด ์ข
๋ฃ๋๋ฉด ์๋์ผ๋ก ์ฌ๋ผ์ง๋ค.
1
2
3
medium_income_df = spark.sql(
"SELECT * FROM income WHERE compensation >= 70000 AND compensation <= 100000")
medium_income_df.show()
sql
์ ์คํํฌ ์ธ์
๋ด์์ SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ์๋ก์ด ๋ฐ์ดํฐํ๋ ์์ผ๋ก ๋ณํํ๋ ๋ฉ์๋์ด๋ค. ๋ฌธ์์ด ํํ์ SQL ์ฟผ๋ฆฌ๋ฅผ ์ธ์๋ก ๋ฐ๋๋ค. sql
๋ฉ์๋ ์์ฒด๋ transformation ์ฐ์ฐ์ด๋ผ, ์ค์ action ์ฐ์ฐ์ด ํธ์ถ๋๊ธฐ ์ ๊น์ง ์คํ๋์ง ์๋๋ค.
show
๋ ๋ฐ์ดํฐํ๋ ์์ ๋ด์ฉ์ ์ถ๋ ฅํ๋ action ๋ฉ์๋์ด๋ค.
1
2
for income_data in medium_income_df.collect():
print(income_data)
collect
๋ฉ์๋๋ฅผ ํตํด ๋ฐ์ดํฐํ๋ ์์ Row
๊ฐ์ฒด์ ์ ๊ทผํ ์ ์๋ค.
1
data.printSchema()
printSchema
๋ ๋ฐ์ดํฐํ๋ ์์ ์คํค๋ง๋ฅผ ํธ๋ฆฌ ํํ๋ก ๋ณด์ฌ์ฃผ๋ ๋ฉ์๋์ด๋ค. ์ด์ ์ด๋ฆ๊ณผ ๋ฐ์ดํฐ ํ์
, ๋ ํ์ฉ ์ฌ๋ถ๋ฅผ ํ์ธํ ์ ์๋ค.
1
data.select("name", "age").show()
select
๋ ๋ฐ์ดํฐํ๋ ์์ ํน์ ์ด์ ์ ํํ์ฌ ์๋ก์ด ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ transformation ๋ฉ์๋์ด๋ค.
1
data.filter(data.age > 20).show()
filter
๋ ๋ฐ์ดํฐํ๋ ์์์ ํน์ ์กฐ๊ฑด์ ๋ง์กฑํ๋ Row
๋ง ์ ํํ์ฌ ์๋ก์ด ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ transformation ์ฐ์ฐ์ด๋ค. SQL์ WHERE
์ ๊ณผ ๋์ผํ ์ญํ ์ ์ํํ๋ฉฐ, where
๋ฉ์๋์ filter
๋ฉ์๋๋ ์ด๋ฆ๋ง ๋ค๋ฅผ ๋ฟ ๊ธฐ๋ฅ์ ์ผ๋ก ์ฐจ์ด๊ฐ ์๋ค.
1
2
3
4
5
6
7
df = spark.createDataFrame([
Row(a=1,
intlist=[1,2,3],
mapfield={"a": "b"}
)])
df.select(functions.explode(df.intlist).alias("anInt")).collect()
explode
๋ ๋ฐฐ์ด์ด๋ ๋งต ํํ์ ์ด์ ์ฌ๋ฌ ๊ฐ์ ํ์ผ๋ก ํผ์น๋ ๋ฉ์๋์ด๋ค.
์ ์์ ์์ explode
๋ฉ์๋๋ ๋ฆฌ์คํธ ํํ์ธ intlist
์ ๊ฐ ์์ 1, 2, 3์ ๋ํด ๊ฐ๊ฐ ์๋ก์ด ํ์ ์์ฑํ์ฌ ๋ฐ์ดํฐํ๋ ์์ ๊ตฌ์ฑํ๋ค.
1
2
3
df = spark.createDataFrame([
Row(word="hello world and pyspark")])
df.select(functions.split(df.word, ' ').alias("word")).collect()
split
๋ฉ์๋๋ ๋ฐ์ดํฐํ๋ ์์ ๋ฌธ์์ด ํ์
์ด์ ๊ตฌ๋ถ์๋ก ๋๋์ด ๋ฐฐ์ด ํ์
์ ์๋ก์ด ์ด๋ก ๋ณํํ๋ ๋ฉ์๋์ด๋ค. ํ์ด์ฌ ๋ด์ฅ ํจ์์ธ split
๊ณผ ์ ์ฌํ๋ค.
1
2
3
4
table_schema = t.StructType([
t.StructField("country", t.StringType(), True),
t.StructField("temperature", t.FloatType(), True),
t.StructField("observed_date", t.StringType(), True)])
StructType
์ ๋ฐ์ดํฐํ๋ ์์ ์คํค๋ง๋ฅผ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ์ ์ํ๊ธฐ ์ํ ๊ฐ์ฒด์ด๋ค. StructType
์ ์ฌ๋ฌ ๊ฐ์ StructField
๊ฐ์ฒด๋ก ๊ตฌ์ฑ๋ ๋ฆฌ์คํธ์ด๋ค.
StructField
๋ ๋ฐ์ดํฐํ๋ ์์ ๋จ์ผ ์ด์ ๋ํ ๋ชจ๋ ์ ๋ณด๋ฅผ ๋ด๊ณ ์์ผ๋ฉฐ, name
, dataType
, nullable
ํ๋ผ๋ฏธํฐ๋ฅผ ๊ฐ์ง๊ณ ์๋ค.
1
2
3
4
5
f_temperature = data.withColumn(
"temperature",
(f.col("temperature") * 9 / 5) + 32)\
.select("country", "temperature")
f_temperature.show()
withColumn
์ ๋ฐ์ดํฐํ๋ ์์ ์๋ก์ด ์ด์ ์ถ๊ฐํ๊ฑฐ๋ ์์ ํ ๋ ์ฌ์ฉํ๋ ๋ฉ์๋์ด๋ค. ์ฒซ ๋ฒ์งธ ์ธ์๋ ์๋ก ๋ง๋ค๊ฑฐ๋ ์์ ํ ์ด์ ์ด๋ฆ, ๋ ๋ฒ์งธ ์ธ์๋ ํด๋น ์ด์ ๋ค์ด๊ฐ ๊ฐ์ ์ง์ ํ๋ค.
1
2
3
4
5
6
7
8
meta = {
"1100": "engineer",
"2030": "developer",
"3801": "painter",
"3021": "chemistry teacher",
"9382": "priest"
}
occupation_dict = spark.sparkContext.broadcast(meta)
broadcast
๋ฉ์๋๋ ๋์
๋๋ฆฌ์ ๋ฆฌ์คํธ๊ฐ์ ํ์ด์ฌ ๋ณ์๋ฅผ ๋ธ๋ก๋์บ์คํธ ๋ณ์๋ผ๋ ํํ๋ก ๊ฐ์ผ๋ค.
๋ธ๋ก๋์บ์คํธ ๋ณ์๋ ์ฝ๊ธฐ ์ ์ฉ์ด๋ฉฐ, ๋๋ผ์ด๋ฒ์์ ํด๋ฌ์คํฐ์ ๋ชจ๋ executor ๋ ธ๋๋ก ๋ฐ์ํฐ๋ฅผ ๋จ ํ ๋ฒ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ ์ ์๋ค.
๋ง์ฝ ๋ธ๋ก๋์บ์คํ ์ ์ฌ์ฉํ์ง ์๋๋ค๋ฉด ์ ์กํ ๋ฐ์ดํฐ๋ฅผ ๋คํธ์ํฌ๋ฅผ ํตํด ์ฌ๋ฌ ๋ ธ๋์ ๋ฐ๋ณต์ ์ผ๋ก ์ ์กํด์ผ ํ๋ฉฐ, ์ด๋ ํฐ ์ค๋ฒํค๋์ ์์ธ์ด ๋๋ค.
๊ทธ๋ฌ๋ ๋ธ๋ก๋์บ์คํ ์ ์ฌ์ฉํ๋ฉด ๋ฐ์ดํฐ๋ฅผ ํ ๋ฒ์ ๋ชจ๋ executor์๊ฒ ์ ์กํ ์ ์์ผ๋ฉฐ, ๊ฐ๊ฐ์ executor๋ ๋ฐ์ดํฐ๋ฅผ ์์ ์ ๋ฉ๋ชจ๋ฆฌ์ ์บ์ํ๋ค.
์ฆ, broadcast
๋ฉ์๋๋ฅผ ํตํด ๋น์ผ ์
ํ๋ง ์ฐ์ฐ์ ๋ฐฉ์งํ ์ ์๋ค.
1
occupation_lookup_udf = f.udf(get_occupation_name)
udf
๋ ์ผ๋ฐ ํ์ด์ฌ ํจ์๋ฅผ ์คํํฌ ๋ฐ์ดํฐํ๋ ์์์ ์ฌ์ฉํ ์ ์๋๋ก wrappingํ๋ ๋ฉ์๋์ด๋ค.
๋ค๋ง ์ด๋ ๊ฒ ๋ณํ๋ UDF๋ ์คํํฌ ๋ด์ฅ ํจ์์ ๋นํด ๋๋ฆฌ๊ฒ ๋์ํ ์ ์์ผ๋ฉฐ, ์คํํฌ์ ๋ด์ฅ ํจ์๋ฅผ ์ฐ์ ์ ์ผ๋ก ์ฌ์ฉํ๋ ๊ฒ์ด ์ข๋ค.
์คํํฌ์ JVM ํ๊ฒฝ๊ณผ ํ์ด์ฌ ํ๋ก์ธ์ค ๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ (์ญ)์ง๋ ฌํ ์ค๋ฒํค๋๊ฐ ๊ฐ ํ๋ง๋ค ๋ฐ์ํ๊ธฐ ๋๋ฌธ์ด๋ค.
1
2
3
4
5
data = df.groupBy("hero1")\
.agg(
f.collect_set("hero2").alias("connection"))\
.withColumnRenamed("hero1", "hero")
data.show()
collect_set
์ ๊ทธ๋ฃนํ ํ ๊ฐ ๊ทธ๋ฃน์ ์ํ ํน์ ์ด์ ๊ฐ๋ค์ ๋ชจ์ ์ค๋ณต์ ์ ๊ฑฐํ ํ๋์ ๋ฐฐ์ด๋ก ๋ง๋๋ ๋ฉ์๋์ด๋ค.
collect_list
๋ ์ค๋ณต์ ํ์ฉํ์ฌ ๋ฐฐ์ด์ ๋ง๋ ๋ค.
1
2
data = data.withColumn("connection", f.concat_ws(",", f.col("connection")))
data.show()
concat_ws
๋ ํน์ ๊ตฌ๋ถ์๋ฅผ ์ฌ์ฉํ์ฌ ์ฌ๋ฌ ๋ฌธ์์ด์ด๋ ๋ฐฐ์ด์ ์์๋ค์ ํ๋๋ก ํฉ์ณ ๋จ์ผ ๋ฌธ์์ด๋ก ๋ง๋๋ ํจ์์ด๋ค. ์ฒซ ๋ฒ์งธ ์ธ์๋ ๊ตฌ๋ถ์, ๋ ๋ฒ์งธ ์ธ์๋ ํฉ์น ๋์์ด ๋๋ ์ด์ด๋ค.
1
data.coalesce(1).write.option("header", True).csv("output")
coalesce
๋ ๋ฐ์ดํฐํ๋ ์์ ํํฐ์
์๋ฅผ ์ค์ด๋ ๋ฉ์๋์ด๋ค.
repartition
์ ํํฐ์
์๋ฅผ ๋๋ฆฌ๊ฑฐ๋ ์ค์ผ ๋ ์ฌ์ฉํ๋ ๋ฉ์๋์ด๋ค. coalesce
๋ฉ์๋๋ ์ ์ฒด ์
ํ๋ง์ ์ํํ์ง ์๊ธฐ ๋๋ฌธ์ ๋จ์ํ ํํฐ์
์๋ฅผ ์ค์ด๋ ๊ฒ์ด ๋ชฉ์ ์ด๋ผ๋ฉด coalesce
๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ํจ์จ์ ์ด๋ค.
1
2
3
4
5
df.na.drop(how="any").show()
df.na.drop(thresh=2).show()
df.na.drop(subset=["salary"]).show()
df.printSchema()
df.na
๋ null
์ ์ฒ๋ฆฌํ๊ธฐ ์ํ ๊ธฐ๋ฅ์ ๋ชจ์๋์ ๊ฐ์ฒด์ด๋ค.
na.drop
์ null
์ด ํฌํจ๋ ํ์ ์ ๊ฑฐํ์ฌ ๋ฐ์ดํฐํ๋ ์์ ์ ์ ํ๋ ๋ฉ์๋์ด๋ค. how
ํ๋ผ๋ฏธํฐ๋ ํ์ ์ ๊ฑฐํ ๋ ์ฌ์ฉํ ๊ธฐ์ค์ด๋ค. any
๋ก ์ค์ ๋๋ฉด ํ์ ํ๋๋ผ๋ null
์ด ์กด์ฌํ๋ ๊ฒฝ์ฐ, all
๋ก ์ค์ ๋๋ฉด ํ์ ๋ชจ๋ ์ด์ด null
์ธ ๊ฒฝ์ฐ ํ์ ์ ๊ฑฐํ๋ค.
thresh
ํ๋ผ๋ฏธํฐ์๋ ์ ํจํ ๊ฐ์ ์ต์ ๊ฐ์๋ฅผ ์ง์ ํ๋ค. thresh=2
๋ null
์ด ์๋ ๊ฐ์ด ์ต์ 2๊ฐ ์ด์ ์๋ค๋ฉด ํ์ ์ ์งํ๋ค.
subset
ํ๋ผ๋ฏธํฐ๋ null
์ฌ๋ถ๋ฅผ subset
์ผ๋ก ์ง์ ๋ ์ด๋ง ํ์ธํ๋๋ก ํ๋ค.
1
df.na.fill(0).show()
na.fill
์ ๋ฐ์ดํฐํ๋ ์์ ๊ฒฐ์ธก๊ฐ์ ์ฌ์ฉ์๊ฐ ์ง์ ํ ๊ฐ์ผ๋ก ๋์ฒดํ๋ ๋ฉ์๋์ด๋ค.
1
2
3
df_user.join(df_salary,
df_user.id == df_salary.id,
"fullouter").show()
๋ฐ์ดํฐํ๋ ์ ์ญ์ ์กฐ์ธ ์ฐ์ฐ์ด ์กด์ฌํ๋ค. join
๋ฉ์๋๋ ๋ ๊ฐ์ ๋ฐ์ดํฐํ๋ ์์ ํน์ ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๊ฒฐํฉํ์ฌ ์๋ก์ด ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ค.
how
ํ๋ผ๋ฏธํฐ์ ์กฐ์ธ ์ ํ์ ๋ช
์ํ ์ ์๋ค. ์กฐ์ธ ์ ํ์ ๋ค์๊ณผ ๊ฐ๋ค.
how ํ๋ผ๋ฏธํฐ | ์กฐ์ธ ๊ฒฐ๊ณผ์ ํ |
---|---|
inner (default) | ์์ชฝ ๋ฐ์ดํฐํ๋ ์์ ๋ชจ๋ ์ผ์นํ๋ ํค์ ํ |
leftouter | ์ผ์ชฝ ๋ฐ์ดํฐํ๋ ์์ ๋ชจ๋ ํ |
rightouter | ์ค๋ฅธ์ชฝ ๋ฐ์ดํฐํ๋ ์์ ๋ชจ๋ ํ |
fullouter | ์์ชฝ ๋ฐ์ดํฐํ๋ ์์ ๋ชจ๋ ํ |
leftsemi | ์ค๋ฅธ์ชฝ ๋ฐ์ดํฐํ๋ ์์ ์ผ์นํ๋ ํค๊ฐ ์๋ ์ผ์ชฝ ๋ฐ์ดํฐํ๋ ์์ ํ |
leftanti | ์ค๋ฅธ์ชฝ ๋ฐ์ดํฐํ๋ ์์ ์ผ์นํ๋ ํค๊ฐ ์๋ ์ผ์ชฝ ๋ฐ์ดํฐํ๋ ์์ ํ |
cross | ์์ชฝ ๋ฐ์ดํฐํ๋ ์์ ์นดํ ์์ ๊ณฑ |