[Spark] MLlib
📌 MLlib이란?
Spark MLlib 은 빅데이터를 위한 분산 머신러닝 프레임워크이다. scikit-learn 과 같이 단일 머신에서 동작하는 라이브러리가 처리할 수 없는 대용량의 데이터를 다루기 위해 등장하였다.
특징
- MLlib에 구현된 알고리즘들은 대규모 병럴 처리에 맞게 설계되었다. 따라서 보다 높은 성능을 보여준다.
- 데이터가 증가하는 경우 클러스터에 더 많은 노드를 추가하는 수평적 확장 방법을 선택한다. 따라서 데이터의 수가 증가하여도 거의 선형적인 성능을 유지할 수 있다.
- 거대한 데이터셋을 여러 파티션으로 나누어 각 노드에 분산시킨다. 각 노드는 자신이 맡은 파티션에 대해서만 연산을 진행하고, 필요 시 다른 노드와 중간 결과를 교환한다.
- 머신러닝의 전체 파이프사이클을 스파크 프레임워크 안에서 해결할 수 있다.
- 머신러닝 로직 자체에 집중할 수 있도록 하는 고수준 API를 제공한다.
구성 요소
- Logistic Regression, Principal Component Analysis 등과 같이 다양한 머신러닝 알고리즘이 존재한다.
- OneHotEndocer, StandardScaler 등과 같이 다양한 피처 엔지니어링 도구들이 존재한다.
- 여러 단계를 하나의 워크플로우로 묶는 파이프라인이 있다.
- 전체 파이프라인을 디스크에 저장하고 다시 불러오는 기능이 있다.
- RMSE, R-squared와 같은 다양한 평가 지표가 있다.
아키텍처
두 가지 API가 존재하는데, RDD 기반과 DataFrame 기반이다.
RDD 기반 API(spark.mllib)은 현재 drprecated이다. RDD는 데이터에 컬럼 이름이 없으므로 개발자가 데이터의 순서나 내용을 직접 관리해야 했고, 많은 불편함이 생겼다. 또한 파이프라인 API를 지원하지 않아 피처 엔지니어링, 모델 훈련, 평가 등의 과정을 개발자가 수동으로 연결해야 했다.
스파크가 발전하면서 DataFrame이 도입되었고, 이를 기반으로 DataFrame 기반 API(spark.ml)가 등장하였다. 가장 뚜렷한 차이점은 파이프라인 API를 사용할 수 있다는 점이다. 모든 일련의 머신러닝 단계를 하나의 파이프라인 객체로 묶어 관리할 수 있다. 또한 Spark SQL, Structured Streaming 등 다른 스파크 컴포넌트와 연동이 가능하다.
장점과 한계
데이터를 여러 노드에 분산시켜 병렬적으로 처리하기 때문에 확장성이 매우 좋다. 또한 데이터 수집과 ETL, 훈련, 예측까지의 전체 파이프라인을 스파크라는 하나의 프레임워크 안에서 끝낼 수 있다. 그리고 데이터 분석에 사용되는 다양한 언어를 지원하여 유연성이 좋다.
그러나 scikit-learn 에 비해 상대적으로 알고리즘의 수가 적으며, 딥러닝 모델을 직접 설계하고 훈련하기 위한 포괄적인 도구를 제공하지 않는다. 모든 분산 시스템이 그렇듯이, 디버깅이 어렵다.
Linear Regression
선형 회귀를 통해 MLlib을 사용한 전체적인 머신러닝 워크플로우를 살펴보자.
1
2
df = spark.read.format("libsvm")\
.load("file:///home/jovyan/work/sample/sample_linear_regression_data.txt")
libsvm 은 피처가 매우 많지만 대부분의 값이 0인 희소 데이터를 효율적으로 표현하는 데 적합하다. 자동으로 Double 타입의 label 과 Vector 타입의 features 컬럼을 가진 데이터프레임을 생성한다.
1
2
3
4
5
6
7
8
lr = LinearRegression(
featuresCol='features',
labelCol='label',
predictionCol='prediction',
maxIter=10,
regParam=0.3,
elasticNetParam=0.8
)
위 코드와 같이 Estimator를 정의한다.
maxIter 는 최적의 가중치를 찾기 위한 최적화 알고리즘의 최대 반복 횟수이다.
elasticNetParam 은 Elastic Net 정규화의 혼합 비율을 결정한다. 설정되는 값은 L1 정규화의 비율이다. 즉, 여기서는 L1의 비중을 80%, L2의 비중을 20%로 둔다.
1
lr_model = lr.fit(training)
fit 메서드를 통해 실제로 모델을 훈련한다.
1
2
3
test_result = lr_model.evaluate(test)
test_result.residuals.show()
evaluate 메서드를 통해 다양한 성능 지표를 계산한다.
evaluate 메서드의 리턴 값은 Summary 객체이고, 여러가지 성능 지표를 가지고 있다.
1
2
3
print(f"coefficients: {lr_model.coefficients}\n")
print(f"intercept: {lr_model.intercept}\n")
모델 객체는 파라미터를 직접 조회할 수 있는 coefficients 와 intercept 속성을 가지고 있다.
1
2
predictions = lr_model.transform(test)
predictions.show()
모델 객체는 Transformer 로, transform 메서드를 가지고 있다. 이 메서드는 테스트 데이터에 대해 예측을 수행하고 prediction 컬럼이 추가된 데이터프레임을 생성한다.
1
2
3
4
5
6
7
8
9
asbl = VectorAssembler(
inputCols=[
'avg_session',
'time_on_app',
'time_on_website',
'membership_period'],
outputCol='feature_vectors')
tr_data = asbl.transform(df)
VectorAssembler 는 여러 개의 숫자 컬럼들을 하나의 피처 벡터 컬럼으로 합치는 transformer이다. MLlib의 모든 알고리즘은 두 개의 컬럼을 기대하는데, label 컬럼과 features 컬럼이다. features 컬럼에는 예측에 사용될 모든 독립 변수들을 담고 있는 하나의 Vector 타입 컬럼이어야 한다.
inputCols 에 벡터로 합치고 싶은 입력 컬럼들의 목록을 지정한다. outputCol 은 새로 생성될 벡터 컬럼의 이름을 지정한다.
Alternating Least Square (ALS)
Alternating Least Square 은 추천 시스템에 자주 사용되는 알고리즘이다. ALS는 Collaborative FIltering 이라는 추천 기법을 구현한 알고리즘인데, 다른 사람들의 선호 아이템을 통해 나의 추천 아이템을 예측하는 것이다.
1
2
3
4
5
6
7
8
9
als = ALS(
maxIter=5,
regParam=0.01,
userCol="user_id",
itemCol="movie_id",
ratingCol="rating",
coldStartStrategy="drop",
nonnegative=True
)
coldStartStrategy 에는 콜드 스타트 문제에 대한 처리 전략을 정의한다. 콜드 스타트는 모델을 평가하거나 예측할 때 훈련 데이터에는 없었던 새로운 아이템이 등장하는 상황을 말한다. drop 으로 설정하면 해당 행을 결과에서 제거한다. nan 으로 설정하면 예측 값을 NaN 로 리턴한다.
nonnegative 는 벡터의 모든 값이 음수가 아니도록 하는 옵션이다.
1
2
userRecs = model.recommendForAllUsers(10)
userRecs.show()
recommendForAllUsers 은 모든 사용자에게 가장 높은 점수를 가진 N개의 아이템을 추천하는 메서드이다.
1
2
3
4
5
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
이전에는 단순히 evaluate 메서드만 호출하였지만, 이 또한 내부적으로 RegressionEvaluator 객체를 생성하고 실행한다.
1
2
3
4
5
6
7
8
9
10
11
12
param_grid = ParamGridBuilder() \
.addGrid(als.rank, [10, 100]) \
.addGrid(als.regParam, [.1]) \
.addGrid(als.maxIter, [10]) \
.build()
cv = CrossValidator(
estimator=als,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
parallelism=6)
이전에는 하이퍼파라미터를 임의로 결정해서 넣었다면, ParamGridBuilder 와 CrossValidator 를 통해 최적의 하이퍼파라미터 조합을 찾을 수 있다.
ParamGridBuilder 는 테스트할 파라미터의 범위를 지정하는 빌더 도구이다. addGrid 메서드를 통해 테스트할 파라미터를 등록할 수 있다. build 메서드를 호출하게 되면 설정한 모든 그리드를 조합하여 테스트할 모든 파라미터 조합 리스트를 생성한다.
CrossValidator 는 ParamGridBuilder 를 통해 생성된 파라미터 리스트를 통해 최적의 하이퍼파라미터를 찾는다. estimator 옵션을 통해 튜닝할 대상을 지정한다. estimatorParamMaps 에 테스트할 파라미터 리스트를 전달하고, evaluator 에 기준을 설정한다. numFolds 를 작성하면 K-Fold Cross Validation 을 수행하게 되며, 설정된 값이 k이다. parallelism 은 몇 개의 작업을 병렬로 수행할지 설정하는 옵션이다. 현재 파라미터의 조합은 2개이고, k는 3이므로 총 6개의 모델을 훈련해야 한다. parallelism 을 6으로 설정하게 되면 일반적으로 1개의 모델을 훈련하는 시간과 비슷하게 걸리게 된다.
1
2
3
best_model = model.bestModel
predictions = best_model.transform(test)
bestModel 속성을 통해 가장 좋은 성능을 보인 모델을 선택할 수 있다.
1
2
3
4
5
print(f"Rank = {best_model._java_obj.parent().getRank()}")
print(f"MaxIter = {best_model._java_obj.parent().getMaxIter()}")
print(f"RegParam = {best_model._java_obj.parent().getRegParam()}")
rank는 최적의 모델을 만드는 데 사용된 잠재 요인의 수이다.
여기서 best_model 은 Transformer인데, rank, maxIter 와 같은 속성은 모델을 생성한 Estimator의 속성이다. Transformer의 parent 속성은 Estimator 객체를 가리키므로, 위와 같이 작성된 것이다.