[Spark] MLlib
๐ MLlib์ด๋?
Spark MLlib
์ ๋น
๋ฐ์ดํฐ๋ฅผ ์ํ ๋ถ์ฐ ๋จธ์ ๋ฌ๋ ํ๋ ์์ํฌ์ด๋ค. scikit-learn
๊ณผ ๊ฐ์ด ๋จ์ผ ๋จธ์ ์์ ๋์ํ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์ฒ๋ฆฌํ ์ ์๋ ๋์ฉ๋์ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๊ธฐ ์ํด ๋ฑ์ฅํ์๋ค.
ํน์ง
- MLlib์ ๊ตฌํ๋ ์๊ณ ๋ฆฌ์ฆ๋ค์ ๋๊ท๋ชจ ๋ณ๋ด ์ฒ๋ฆฌ์ ๋ง๊ฒ ์ค๊ณ๋์๋ค. ๋ฐ๋ผ์ ๋ณด๋ค ๋์ ์ฑ๋ฅ์ ๋ณด์ฌ์ค๋ค.
- ๋ฐ์ดํฐ๊ฐ ์ฆ๊ฐํ๋ ๊ฒฝ์ฐ ํด๋ฌ์คํฐ์ ๋ ๋ง์ ๋ ธ๋๋ฅผ ์ถ๊ฐํ๋ ์ํ์ ํ์ฅ ๋ฐฉ๋ฒ์ ์ ํํ๋ค. ๋ฐ๋ผ์ ๋ฐ์ดํฐ์ ์๊ฐ ์ฆ๊ฐํ์ฌ๋ ๊ฑฐ์ ์ ํ์ ์ธ ์ฑ๋ฅ์ ์ ์งํ ์ ์๋ค.
- ๊ฑฐ๋ํ ๋ฐ์ดํฐ์ ์ ์ฌ๋ฌ ํํฐ์ ์ผ๋ก ๋๋์ด ๊ฐ ๋ ธ๋์ ๋ถ์ฐ์ํจ๋ค. ๊ฐ ๋ ธ๋๋ ์์ ์ด ๋งก์ ํํฐ์ ์ ๋ํด์๋ง ์ฐ์ฐ์ ์งํํ๊ณ , ํ์ ์ ๋ค๋ฅธ ๋ ธ๋์ ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ ๊ตํํ๋ค.
- ๋จธ์ ๋ฌ๋์ ์ ์ฒด ํ์ดํ์ฌ์ดํด์ ์คํํฌ ํ๋ ์์ํฌ ์์์ ํด๊ฒฐํ ์ ์๋ค.
- ๋จธ์ ๋ฌ๋ ๋ก์ง ์์ฒด์ ์ง์คํ ์ ์๋๋ก ํ๋ ๊ณ ์์ค API๋ฅผ ์ ๊ณตํ๋ค.
๊ตฌ์ฑ ์์
- Logistic Regression, Principal Component Analysis ๋ฑ๊ณผ ๊ฐ์ด ๋ค์ํ ๋จธ์ ๋ฌ๋ ์๊ณ ๋ฆฌ์ฆ์ด ์กด์ฌํ๋ค.
- OneHotEndocer, StandardScaler ๋ฑ๊ณผ ๊ฐ์ด ๋ค์ํ ํผ์ฒ ์์ง๋์ด๋ง ๋๊ตฌ๋ค์ด ์กด์ฌํ๋ค.
- ์ฌ๋ฌ ๋จ๊ณ๋ฅผ ํ๋์ ์ํฌํ๋ก์ฐ๋ก ๋ฌถ๋ ํ์ดํ๋ผ์ธ์ด ์๋ค.
- ์ ์ฒด ํ์ดํ๋ผ์ธ์ ๋์คํฌ์ ์ ์ฅํ๊ณ ๋ค์ ๋ถ๋ฌ์ค๋ ๊ธฐ๋ฅ์ด ์๋ค.
- RMSE, R-squared์ ๊ฐ์ ๋ค์ํ ํ๊ฐ ์งํ๊ฐ ์๋ค.
์ํคํ ์ฒ
๋ ๊ฐ์ง API๊ฐ ์กด์ฌํ๋๋ฐ, RDD ๊ธฐ๋ฐ๊ณผ DataFrame ๊ธฐ๋ฐ์ด๋ค.
RDD ๊ธฐ๋ฐ API(spark.mllib
)์ ํ์ฌ drprecated์ด๋ค. RDD๋ ๋ฐ์ดํฐ์ ์ปฌ๋ผ ์ด๋ฆ์ด ์์ผ๋ฏ๋ก ๊ฐ๋ฐ์๊ฐ ๋ฐ์ดํฐ์ ์์๋ ๋ด์ฉ์ ์ง์ ๊ด๋ฆฌํด์ผ ํ๊ณ , ๋ง์ ๋ถํธํจ์ด ์๊ฒผ๋ค. ๋ํ ํ์ดํ๋ผ์ธ API๋ฅผ ์ง์ํ์ง ์์ ํผ์ฒ ์์ง๋์ด๋ง, ๋ชจ๋ธ ํ๋ จ, ํ๊ฐ ๋ฑ์ ๊ณผ์ ์ ๊ฐ๋ฐ์๊ฐ ์๋์ผ๋ก ์ฐ๊ฒฐํด์ผ ํ๋ค.
์คํํฌ๊ฐ ๋ฐ์ ํ๋ฉด์ DataFrame์ด ๋์
๋์๊ณ , ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก DataFrame ๊ธฐ๋ฐ API(spark.ml
)๊ฐ ๋ฑ์ฅํ์๋ค. ๊ฐ์ฅ ๋๋ ทํ ์ฐจ์ด์ ์ ํ์ดํ๋ผ์ธ API๋ฅผ ์ฌ์ฉํ ์ ์๋ค๋ ์ ์ด๋ค. ๋ชจ๋ ์ผ๋ จ์ ๋จธ์ ๋ฌ๋ ๋จ๊ณ๋ฅผ ํ๋์ ํ์ดํ๋ผ์ธ ๊ฐ์ฒด๋ก ๋ฌถ์ด ๊ด๋ฆฌํ ์ ์๋ค. ๋ํ Spark SQL, Structured Streaming ๋ฑ ๋ค๋ฅธ ์คํํฌ ์ปดํฌ๋ํธ์ ์ฐ๋์ด ๊ฐ๋ฅํ๋ค.
์ฅ์ ๊ณผ ํ๊ณ
๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๋ ธ๋์ ๋ถ์ฐ์์ผ ๋ณ๋ ฌ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ ํ์ฅ์ฑ์ด ๋งค์ฐ ์ข๋ค. ๋ํ ๋ฐ์ดํฐ ์์ง๊ณผ ETL, ํ๋ จ, ์์ธก๊น์ง์ ์ ์ฒด ํ์ดํ๋ผ์ธ์ ์คํํฌ๋ผ๋ ํ๋์ ํ๋ ์์ํฌ ์์์ ๋๋ผ ์ ์๋ค. ๊ทธ๋ฆฌ๊ณ ๋ฐ์ดํฐ ๋ถ์์ ์ฌ์ฉ๋๋ ๋ค์ํ ์ธ์ด๋ฅผ ์ง์ํ์ฌ ์ ์ฐ์ฑ์ด ์ข๋ค.
๊ทธ๋ฌ๋ scikit-learn
์ ๋นํด ์๋์ ์ผ๋ก ์๊ณ ๋ฆฌ์ฆ์ ์๊ฐ ์ ์ผ๋ฉฐ, ๋ฅ๋ฌ๋ ๋ชจ๋ธ์ ์ง์ ์ค๊ณํ๊ณ ํ๋ จํ๊ธฐ ์ํ ํฌ๊ด์ ์ธ ๋๊ตฌ๋ฅผ ์ ๊ณตํ์ง ์๋๋ค. ๋ชจ๋ ๋ถ์ฐ ์์คํ
์ด ๊ทธ๋ ๋ฏ์ด, ๋๋ฒ๊น
์ด ์ด๋ ต๋ค.
Linear Regression
์ ํ ํ๊ท๋ฅผ ํตํด MLlib์ ์ฌ์ฉํ ์ ์ฒด์ ์ธ ๋จธ์ ๋ฌ๋ ์ํฌํ๋ก์ฐ๋ฅผ ์ดํด๋ณด์.
1
2
df = spark.read.format("libsvm")\
.load("file:///home/jovyan/work/sample/sample_linear_regression_data.txt")
libsvm
์ ํผ์ฒ๊ฐ ๋งค์ฐ ๋ง์ง๋ง ๋๋ถ๋ถ์ ๊ฐ์ด 0์ธ ํฌ์ ๋ฐ์ดํฐ๋ฅผ ํจ์จ์ ์ผ๋ก ํํํ๋ ๋ฐ ์ ํฉํ๋ค. ์๋์ผ๋ก Double
ํ์
์ label
๊ณผ Vector
ํ์
์ features
์ปฌ๋ผ์ ๊ฐ์ง ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ค.
1
2
3
4
5
6
7
8
lr = LinearRegression(
featuresCol='features',
labelCol='label',
predictionCol='prediction',
maxIter=10,
regParam=0.3,
elasticNetParam=0.8
)
์ ์ฝ๋์ ๊ฐ์ด Estimator๋ฅผ ์ ์ํ๋ค.
maxIter
๋ ์ต์ ์ ๊ฐ์ค์น๋ฅผ ์ฐพ๊ธฐ ์ํ ์ต์ ํ ์๊ณ ๋ฆฌ์ฆ์ ์ต๋ ๋ฐ๋ณต ํ์์ด๋ค.
elasticNetParam
์ Elastic Net ์ ๊ทํ์ ํผํฉ ๋น์จ์ ๊ฒฐ์ ํ๋ค. ์ค์ ๋๋ ๊ฐ์ L1 ์ ๊ทํ์ ๋น์จ์ด๋ค. ์ฆ, ์ฌ๊ธฐ์๋ L1์ ๋น์ค์ 80%, L2์ ๋น์ค์ 20%๋ก ๋๋ค.
1
lr_model = lr.fit(training)
fit
๋ฉ์๋๋ฅผ ํตํด ์ค์ ๋ก ๋ชจ๋ธ์ ํ๋ จํ๋ค.
1
2
3
test_result = lr_model.evaluate(test)
test_result.residuals.show()
evaluate
๋ฉ์๋๋ฅผ ํตํด ๋ค์ํ ์ฑ๋ฅ ์งํ๋ฅผ ๊ณ์ฐํ๋ค.
evaluate
๋ฉ์๋์ ๋ฆฌํด ๊ฐ์ Summary
๊ฐ์ฒด์ด๊ณ , ์ฌ๋ฌ๊ฐ์ง ์ฑ๋ฅ ์งํ๋ฅผ ๊ฐ์ง๊ณ ์๋ค.
1
2
3
print(f"coefficients: {lr_model.coefficients}\n")
print(f"intercept: {lr_model.intercept}\n")
๋ชจ๋ธ ๊ฐ์ฒด๋ ํ๋ผ๋ฏธํฐ๋ฅผ ์ง์ ์กฐํํ ์ ์๋ coefficients
์ intercept
์์ฑ์ ๊ฐ์ง๊ณ ์๋ค.
1
2
predictions = lr_model.transform(test)
predictions.show()
๋ชจ๋ธ ๊ฐ์ฒด๋ Transformer
๋ก, transform
๋ฉ์๋๋ฅผ ๊ฐ์ง๊ณ ์๋ค. ์ด ๋ฉ์๋๋ ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํด ์์ธก์ ์ํํ๊ณ prediction
์ปฌ๋ผ์ด ์ถ๊ฐ๋ ๋ฐ์ดํฐํ๋ ์์ ์์ฑํ๋ค.
1
2
3
4
5
6
7
8
9
asbl = VectorAssembler(
inputCols=[
'avg_session',
'time_on_app',
'time_on_website',
'membership_period'],
outputCol='feature_vectors')
tr_data = asbl.transform(df)
VectorAssembler
๋ ์ฌ๋ฌ ๊ฐ์ ์ซ์ ์ปฌ๋ผ๋ค์ ํ๋์ ํผ์ฒ ๋ฒกํฐ ์ปฌ๋ผ์ผ๋ก ํฉ์น๋ transformer์ด๋ค. MLlib์ ๋ชจ๋ ์๊ณ ๋ฆฌ์ฆ์ ๋ ๊ฐ์ ์ปฌ๋ผ์ ๊ธฐ๋ํ๋๋ฐ, label ์ปฌ๋ผ๊ณผ features ์ปฌ๋ผ์ด๋ค. features ์ปฌ๋ผ์๋ ์์ธก์ ์ฌ์ฉ๋ ๋ชจ๋ ๋
๋ฆฝ ๋ณ์๋ค์ ๋ด๊ณ ์๋ ํ๋์ Vector
ํ์
์ปฌ๋ผ์ด์ด์ผ ํ๋ค.
inputCols
์ ๋ฒกํฐ๋ก ํฉ์น๊ณ ์ถ์ ์
๋ ฅ ์ปฌ๋ผ๋ค์ ๋ชฉ๋ก์ ์ง์ ํ๋ค. outputCol
์ ์๋ก ์์ฑ๋ ๋ฒกํฐ ์ปฌ๋ผ์ ์ด๋ฆ์ ์ง์ ํ๋ค.
Alternating Least Square (ALS)
Alternating Least Square
์ ์ถ์ฒ ์์คํ
์ ์์ฃผ ์ฌ์ฉ๋๋ ์๊ณ ๋ฆฌ์ฆ์ด๋ค. ALS๋ Collaborative FIltering
์ด๋ผ๋ ์ถ์ฒ ๊ธฐ๋ฒ์ ๊ตฌํํ ์๊ณ ๋ฆฌ์ฆ์ธ๋ฐ, ๋ค๋ฅธ ์ฌ๋๋ค์ ์ ํธ ์์ดํ
์ ํตํด ๋์ ์ถ์ฒ ์์ดํ
์ ์์ธกํ๋ ๊ฒ์ด๋ค.
1
2
3
4
5
6
7
8
9
als = ALS(
maxIter=5,
regParam=0.01,
userCol="user_id",
itemCol="movie_id",
ratingCol="rating",
coldStartStrategy="drop",
nonnegative=True
)
coldStartStrategy
์๋ ์ฝ๋ ์คํํธ ๋ฌธ์ ์ ๋ํ ์ฒ๋ฆฌ ์ ๋ต์ ์ ์ํ๋ค. ์ฝ๋ ์คํํธ๋ ๋ชจ๋ธ์ ํ๊ฐํ๊ฑฐ๋ ์์ธกํ ๋ ํ๋ จ ๋ฐ์ดํฐ์๋ ์์๋ ์๋ก์ด ์์ดํ
์ด ๋ฑ์ฅํ๋ ์ํฉ์ ๋งํ๋ค. drop
์ผ๋ก ์ค์ ํ๋ฉด ํด๋น ํ์ ๊ฒฐ๊ณผ์์ ์ ๊ฑฐํ๋ค. nan
์ผ๋ก ์ค์ ํ๋ฉด ์์ธก ๊ฐ์ NaN
๋ก ๋ฆฌํดํ๋ค.
nonnegative
๋ ๋ฒกํฐ์ ๋ชจ๋ ๊ฐ์ด ์์๊ฐ ์๋๋๋ก ํ๋ ์ต์
์ด๋ค.
1
2
userRecs = model.recommendForAllUsers(10)
userRecs.show()
recommendForAllUsers
์ ๋ชจ๋ ์ฌ์ฉ์์๊ฒ ๊ฐ์ฅ ๋์ ์ ์๋ฅผ ๊ฐ์ง N๊ฐ์ ์์ดํ
์ ์ถ์ฒํ๋ ๋ฉ์๋์ด๋ค.
1
2
3
4
5
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
์ด์ ์๋ ๋จ์ํ evaluate
๋ฉ์๋๋ง ํธ์ถํ์์ง๋ง, ์ด ๋ํ ๋ด๋ถ์ ์ผ๋ก RegressionEvaluator
๊ฐ์ฒด๋ฅผ ์์ฑํ๊ณ ์คํํ๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
param_grid = ParamGridBuilder() \
.addGrid(als.rank, [10, 100]) \
.addGrid(als.regParam, [.1]) \
.addGrid(als.maxIter, [10]) \
.build()
cv = CrossValidator(
estimator=als,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
parallelism=6)
์ด์ ์๋ ํ์ดํผํ๋ผ๋ฏธํฐ๋ฅผ ์์๋ก ๊ฒฐ์ ํด์ ๋ฃ์๋ค๋ฉด, ParamGridBuilder
์ CrossValidator
๋ฅผ ํตํด ์ต์ ์ ํ์ดํผํ๋ผ๋ฏธํฐ ์กฐํฉ์ ์ฐพ์ ์ ์๋ค.
ParamGridBuilder
๋ ํ
์คํธํ ํ๋ผ๋ฏธํฐ์ ๋ฒ์๋ฅผ ์ง์ ํ๋ ๋น๋ ๋๊ตฌ์ด๋ค. addGrid
๋ฉ์๋๋ฅผ ํตํด ํ
์คํธํ ํ๋ผ๋ฏธํฐ๋ฅผ ๋ฑ๋กํ ์ ์๋ค. build
๋ฉ์๋๋ฅผ ํธ์ถํ๊ฒ ๋๋ฉด ์ค์ ํ ๋ชจ๋ ๊ทธ๋ฆฌ๋๋ฅผ ์กฐํฉํ์ฌ ํ
์คํธํ ๋ชจ๋ ํ๋ผ๋ฏธํฐ ์กฐํฉ ๋ฆฌ์คํธ๋ฅผ ์์ฑํ๋ค.
CrossValidator
๋ ParamGridBuilder
๋ฅผ ํตํด ์์ฑ๋ ํ๋ผ๋ฏธํฐ ๋ฆฌ์คํธ๋ฅผ ํตํด ์ต์ ์ ํ์ดํผํ๋ผ๋ฏธํฐ๋ฅผ ์ฐพ๋๋ค. estimator
์ต์
์ ํตํด ํ๋ํ ๋์์ ์ง์ ํ๋ค. estimatorParamMaps
์ ํ
์คํธํ ํ๋ผ๋ฏธํฐ ๋ฆฌ์คํธ๋ฅผ ์ ๋ฌํ๊ณ , evaluator
์ ๊ธฐ์ค์ ์ค์ ํ๋ค. numFolds
๋ฅผ ์์ฑํ๋ฉด K-Fold Cross Validation
์ ์ํํ๊ฒ ๋๋ฉฐ, ์ค์ ๋ ๊ฐ์ด k
์ด๋ค. parallelism
์ ๋ช ๊ฐ์ ์์
์ ๋ณ๋ ฌ๋ก ์ํํ ์ง ์ค์ ํ๋ ์ต์
์ด๋ค. ํ์ฌ ํ๋ผ๋ฏธํฐ์ ์กฐํฉ์ 2๊ฐ์ด๊ณ , k๋ 3์ด๋ฏ๋ก ์ด 6๊ฐ์ ๋ชจ๋ธ์ ํ๋ จํด์ผ ํ๋ค. parallelism
์ 6์ผ๋ก ์ค์ ํ๊ฒ ๋๋ฉด ์ผ๋ฐ์ ์ผ๋ก 1๊ฐ์ ๋ชจ๋ธ์ ํ๋ จํ๋ ์๊ฐ๊ณผ ๋น์ทํ๊ฒ ๊ฑธ๋ฆฌ๊ฒ ๋๋ค.
1
2
3
best_model = model.bestModel
predictions = best_model.transform(test)
bestModel
์์ฑ์ ํตํด ๊ฐ์ฅ ์ข์ ์ฑ๋ฅ์ ๋ณด์ธ ๋ชจ๋ธ์ ์ ํํ ์ ์๋ค.
1
2
3
4
5
print(f"Rank = {best_model._java_obj.parent().getRank()}")
print(f"MaxIter = {best_model._java_obj.parent().getMaxIter()}")
print(f"RegParam = {best_model._java_obj.parent().getRegParam()}")
rank
๋ ์ต์ ์ ๋ชจ๋ธ์ ๋ง๋๋ ๋ฐ ์ฌ์ฉ๋ ์ ์ฌ ์์ธ์ ์์ด๋ค.
์ฌ๊ธฐ์ best_model
์ Transformer์ธ๋ฐ, rank
, maxIter
์ ๊ฐ์ ์์ฑ์ ๋ชจ๋ธ์ ์์ฑํ Estimator์ ์์ฑ์ด๋ค. Transformer์ parent
์์ฑ์ Estimator
๊ฐ์ฒด๋ฅผ ๊ฐ๋ฆฌํค๋ฏ๋ก, ์์ ๊ฐ์ด ์์ฑ๋ ๊ฒ์ด๋ค.