Post

[Spark] Apache Spark๋ž€?

[Spark] Apache Spark๋ž€?

๐Ÿ“Œ Apache Spark๋ž€?

Apache Spark ๋Š” ๋น…๋ฐ์ดํ„ฐ๋ฅผ ํšจ๊ณผ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๋ถ„์„ ์—”์ง„์ด๋‹ค. Hadoop ์˜ MapReduce ๊ฐ€ ๋””์Šคํฌ ๊ธฐ๋ฐ˜์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ˜๋ฉด, Spark๋Š” ์ธ๋ฉ”๋ชจ๋ฆฌ ๊ธฐ๋ฐ˜ ๋ณ‘๋ ฌ ์—ฐ์‚ฐ์„ ์ง„ํ–‰ํ•œ๋‹ค.

์ผ๋ฐ˜์ ์œผ๋กœ Spark๊ฐ€ Hadoop MapReduce๋ณด๋‹ค 100๋ฐฐ ์ •๋„ ๋น ๋ฅด๋‹ค.

๐Ÿ“Œ ํŠน์ง•

  • ๋ฐฐ์น˜ ํ”„๋กœ์„ธ์‹ฑ์„ ํ†ตํ•ด ์ผ์ • ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์ŠคํŠธ๋ฆฌ๋ฐ ํ”„๋กœ์„ธ์‹ฑ์„ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. Spark์˜ Structured Streaming ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋Š” ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ๊ณ„์†ํ•ด์„œ ํ–‰์ด ์ถ”๊ฐ€๋˜๋Š” ๋ฌดํ•œํ•œ ํ…Œ์ด๋ธ”๋กœ ๊ฐ„์ฃผํ•˜๊ธฐ ๋•Œ๋ฌธ์—, ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ž‘์€ ๋‹จ์œ„๋กœ ๋‚˜๋ˆ„์–ด ์ฆ‰๊ฐ์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ๋ถ„์‚ฐ ํด๋Ÿฌ์Šคํ„ฐ ์ปดํ“จํŒ…์„ ํ†ตํ•ด ์„œ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค. Spark๋Š” ๋†’์ง€ ์•Š์€ ์ŠคํŽ™์˜ ๋…ธ๋“œ๋“ค์„ ๋„คํŠธ์›Œํฌ๋กœ ์—ฐ๊ฒฐํ•˜์—ฌ ํ•˜๋‚˜์˜ ๊ฑฐ๋Œ€ํ•œ ๋…ธ๋“œ์ฒ˜๋Ÿผ ์‚ฌ์šฉํ•œ๋‹ค. ์ด๋ฅผ scale-out ์ด๋ผ๊ณ  ํ•œ๋‹ค.
  • HDFS, S3 ๋“ฑ๊ณผ ๊ฐ™์€ ๋ฐ์ดํ„ฐ ์ €์žฅ์†Œ์™€ ํ†ตํ•ฉ๋  ์ˆ˜ ์žˆ๋‹ค. Spark ์ž์ฒด๋Š” ๊ณ„์‚ฐ๋งŒ ๋‹ด๋‹นํ•˜๋ฉฐ, ํŠน์ • ์ €์žฅ์†Œ์— ์ข…์†๋˜์ง€ ์•Š๋Š”๋‹ค. ์ฆ‰, Spark๋Š” ๊ณ„์‚ฐ ์—”์ง„๊ณผ ์ €์žฅ์†Œ๊ฐ€ ๋ถ„๋ฆฌ๋˜์–ด ์žˆ๋‹ค.
  • ํŒŒ์ด์ฌ, ์Šค์นผ๋ผ, R, SQL ๋“ฑ๊ณผ ๊ฐ™์€ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์–ธ์–ด๋ฅผ ์ง€์›ํ•œ๋‹ค.
  • ๋ฐฐ์น˜ ํ”„๋กœ์„ธ์‹ฑ๊ณผ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”„๋กœ์„ธ์‹ฑ์„ wrapper ๋งŒ ์ˆ˜์ •ํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ ๋กœ์ง๊ณผ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋กœ์ง์ด ๊ฑฐ์˜ ๋™์ผํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

๐Ÿ“Œ ๊ตฌ์กฐ

image.png

ํ•˜๋‚˜์˜ ํด๋Ÿฌ์Šคํ„ฐ๋Š” ํ•˜๋‚˜์˜ master node ์™€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ worker node ๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.

์œ„ ๊ทธ๋ฆผ์—์„œ ์™ผ์ชฝ ๋…ธ๋“œ๊ฐ€ master node์ด๋‹ค.

cluster manager๋Š” ์ „์ฒด ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๊ณ  ๋ฆฌ์†Œ์Šค๋ฅผ ์ ์ ˆํžˆ ๋ถ„๋ฐฐํ•œ๋‹ค.

master node๋Š” driver ํ”„๋กœ์„ธ์Šค๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค. ์ž‘์„ฑํ•œ ์ฝ”๋“œ๋ฅผ ๋ถ„์„ํ•˜์—ฌ plan์„ ์ˆ˜๋ฆฝํ•˜๋ฉฐ, worker node์˜ ์ƒํƒœ๋ฅผ ๋ชจ๋‹ˆํ„ฐ๋งํ•œ๋‹ค.

๋ฐ˜๋ฉด worker node๋Š” executor ํ”„๋กœ์„ธ์Šค๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค. master node๋กœ๋ถ€ํ„ฐ ํ• ๋‹น๋ฐ›์€ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅํ•˜๋ฉฐ, ๋‹ค๋ฅธ worker node์™€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹คํ–‰ ํ”„๋กœ์„ธ์Šค

  1. ์‚ฌ์šฉ์ž๊ฐ€ Spark ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‹คํ–‰ํ•˜๋ฉด ๋จผ์ € driver ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‹œ์ž‘๋œ๋‹ค. ๋“œ๋ผ์ด๋ฒ„ ๋‚ด๋ถ€์— SparkContext ๊ฐ์ฒด๊ฐ€ ์ƒ์„ฑ๋œ๋‹ค. SparkContext ๊ฐ์ฒด๋Š” Spark ๋ชจ๋“  ๊ธฐ๋Šฅ์— ๋Œ€ํ•œ entrypoint์ด๋‹ค.
  2. SparkContext ๊ฐ€ YARN , k8s ์™€ ๊ฐ™์€ ํด๋Ÿฌ์Šคํ„ฐ ๋งค๋‹ˆ์ €์™€ ํ†ต์‹ ํ•˜์—ฌ ํ•„์š”ํ•œ ์ž์›์„ ์š”์ฒญํ•œ๋‹ค. ํด๋Ÿฌ์Šคํ„ฐ ๋งค๋‹ˆ์ €๋Š” ์š”์ฒญ์„ ๋ฐ›๊ณ  ์—ฌ๋Ÿฌ worker node์— ์ž์›์„ ํ• ๋‹นํ•˜๊ณ  worker node ์œ„์—์„œ executor ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰์‹œํ‚จ๋‹ค.
  3. executor๊ฐ€ ์‹คํ–‰๋˜๋ฉด driver์—๊ฒŒ ์ž์‹ ์˜ ์œ„์น˜๋ฅผ ๋“ฑ๋กํ•œ๋‹ค. ์ฆ‰, ๋“œ๋ผ์ด๋ฒ„๋Š” ์ด ์‹œ์ ์— ์–ด๋–ค executor๊ฐ€ ์–ด๋””์„œ ๋™์ž‘ํ•˜๋Š”์ง€ ์•Œ๊ฒŒ ๋œ๋‹ค. ์‚ฌ์šฉ์ž์˜ action ์ด ํ˜ธ์ถœ๋˜๋ฉด ๋“œ๋ผ์ด๋ฒ„๋Š” ์ด๋ฅผ job ์œผ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  ์—ฌ๋Ÿฌ ๊ฐœ์˜ stage ๋กœ ๋‚˜๋ˆˆ๋‹ค. ๊ฐ stage ๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ task ๋กœ ์ชผ๊ฐœ์ง„๋‹ค. ๋“œ๋ผ์ด๋ฒ„๋Š” ์ด task ๋ฅผ executor์— ์ง์ ‘ ์ „๋‹ฌํ•˜์—ฌ ์‹คํ–‰์„ ์ง€์‹œํ•œ๋‹ค.
  4. ๊ฐ๊ฐ์˜ executor๋Š” ํ• ๋‹น๋ฐ›์€ ํƒœ์Šคํฌ๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๊ณ , ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋“œ๋ผ์ด๋ฒ„์— ๋ณด๊ณ ํ•œ๋‹ค.

๊ฐ๊ฐ์˜ executor๋Š” ์ž์‹ ์ด ๊ณ„์‚ฐํ•œ ํŒŒํ‹ฐ์…˜์„ ์ž์‹ ์˜ ๋ฉ”๋ชจ๋ฆฌ๋‚˜ ๋””์Šคํฌ์— ์บ์‹ฑํ•˜๋Š”๋ฐ, ์ด๋ฅผ ํ†ตํ•ด ๋™์ผํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ˜๋ณต์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ์„ฑ๋Šฅ์ด ํ–ฅ์ƒ๋œ๋‹ค.

groupby, join ๊ณผ ๊ฐ™์ด ๋ฐ์ดํ„ฐ์˜ ์žฌ๋ถ„๋ฐฐ๊ฐ€ ํ•„์š”ํ•œ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•  ๋•Œ ๊ฐ executor๋Š” ๋‹ค๋ฅธ executor์™€ ๋„คํŠธ์›Œํฌ๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š”๋ฐ, ์ด ๊ณผ์ •์„ shuffling ์ด๋ผ๊ณ  ํ•œ๋‹ค. ๋‹ค๋งŒ shuffling ์€ ๋””์Šคํฌ I/O์™€ ๋„คํŠธ์›Œํฌ ํ†ต์‹  ๋ชจ๋‘ ์‚ฌ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋น„์šฉ์ด ํฐ ์—ฐ์‚ฐ์ด๋ฉฐ, ์ด๋ฅผ ์ตœ์†Œํ™”ํ•˜๋Š” ๊ฒƒ์ด Spark ์„ฑ๋Šฅ ํŠœ๋‹์˜ ํ•ต์‹ฌ์ด๋‹ค.

groupby ๋‚˜ join ์€ ๊ด€๋ จ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ผ๋ฆฌ ํ•œ ๊ณณ์— ๋ชจ์•„์•ผ ํ•˜๋Š” ์—ฐ์‚ฐ์ด๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ์žฌ๋ถ„๋ฐฐ๊ฐ€ ํ•„์š”ํ•œ ๊ฒƒ์ด๋‹ค.

ํด๋Ÿฌ์Šคํ„ฐ์˜ ์ข…๋ฅ˜

  • ์˜จํ”„๋ ˆ๋ฏธ์Šค ํด๋Ÿฌ์Šคํ„ฐ๋Š” ์ž์ฒด์ ์ธ ๋ฐ์ดํ„ฐ ์„ผํ„ฐ์˜ ๋ฌผ๋ฆฌ์  ์„œ๋ฒ„๋ฅผ ํ†ตํ•ด ์ง์ ‘ ๊ตฌ์ถ•ํ•˜๋Š” ํด๋Ÿฌ์Šคํ„ฐ์ด๋‹ค. ์ดˆ๊ธฐ ๋น„์šฉ์€ ๋†’์œผ๋‚˜ ์ดํ›„ ์ถ”๊ฐ€์ ์ธ ์ธํ”„๋ผ ๋น„์šฉ์€ ๊ฑฐ์˜ ์—†์œผ๋‚˜, ์ง์ ‘ ์œ ์ง€๋ณด์ˆ˜ํ•ด์•ผ ํ•œ๋‹ค๋Š” ๋‹จ์ ์ด ์žˆ๋‹ค.
  • ํด๋ผ์šฐ๋“œ ๊ธฐ๋ฐ˜ ํด๋Ÿฌ์Šคํ„ฐ๋Š” AWS EMR, Google Dataproc, Azure HDInsight์™€ ๊ฐ™์ด ํด๋ผ์šฐ๋“œ ๋ฒค๋”์—์„œ ์ œ๊ณตํ•˜๋Š” ์„œ๋น„์Šค๋ฅผ ํ†ตํ•ด ์‚ฌ์šฉํ•˜๋Š” ํด๋Ÿฌ์Šคํ„ฐ์ด๋‹ค. ํ™•์žฅ์„ฑ์ด ์ข‹๊ณ  ๊ด€๋ฆฌ ์ธก๋ฉด์—์„œ ํŽธ๋ฆฌํ•˜๋‚˜, ์ง€์†์ ์ธ ์šด์˜ ๋น„์šฉ์ด ๋ฐœ์ƒํ•œ๋‹ค.
  • ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ํด๋Ÿฌ์Šคํ„ฐ๋Š” ์˜จํ”„๋ ˆ๋ฏธ์Šค์™€ ํด๋ผ์šฐ๋“œ ๊ธฐ๋ฐ˜ ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ํ˜ผ์šฉํ•˜์—ฌ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ํ‰์†Œ์—๋Š” ์˜จํ”„๋ ˆ๋ฏธ์Šค ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๋‹ค, ํŠธ๋ž˜ํ”ฝ ํ”ผํฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ํด๋ผ์šฐ๋“œ๋กœ๋ถ€ํ„ฐ ์ถ”๊ฐ€ ๋…ธ๋“œ๋ฅผ ์ž„์‹œ๋กœ ๋Œ€์—ฌํ•˜์—ฌ ํ™•์žฅ์„ฑ์„ ์ฑ™๊ธด๋‹ค.

ํด๋Ÿฌ์Šคํ„ฐ์˜ ์žฅ์ 

  • ๋ณ‘๋Ÿด ์ฒ˜๋ฆฌ ๋ฐฉ์‹์„ ํ†ตํ•ด ์ž‘์—… ์ˆ˜ํ–‰ ์‹œ๊ฐ„์„ ๋‹จ์ถ•์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค.
  • scale-out ๋ชจ๋ธ์„ ์ฑ„ํƒํ•˜๋ฏ€๋กœ ์ƒ๋Œ€์ ์œผ๋กœ ๋น„์šฉ์ด ์ €๋ ดํ•˜๋ฉฐ, ์ˆ˜ํ‰์ ์œผ๋กœ ๋…ธ๋“œ๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ์–ด ์œ ์—ฐํ•˜๊ฒŒ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ํŠน์ • ๋…ธ๋“œ์— ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ ์ž๋™์œผ๋กœ ๋‹ค๋ฅธ ๋…ธ๋“œ๊ฐ€ ๊ทธ ์ž‘์—…์„ ๋Œ€์‹  ์ˆ˜ํ–‰ํ•˜์—ฌ ์•ˆ์ •์„ฑ์„ ์œ ์ง€ํ•œ๋‹ค. ์ด๋ฅผ Fault Tolerance ๋ผ๊ณ  ํ•œ๋‹ค.

YARN ์•„ํ‚คํ…์ฒ˜

image.png

  • ResourceManager ๋Š” master node์—์„œ ์‹คํ–‰๋œ๋‹ค. ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋ชจ๋“  ์ž์›์„ ๊ด€๋ฆฌํ•˜๋ฉฐ ์–ด๋–ค ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ์ž์›์„ ํ• ๋‹นํ• ์ง€ ๊ฒฐ์ •ํ•œ๋‹ค.
  • NodeManager ๋Š” slave node์—์„œ ์‹คํ–‰๋˜๋ฉฐ, ์ž์‹ ์ด ์†ํ•œ ๋…ธ๋“œ์˜ ์ž์› ์ƒํƒœ๋ฅผ ์ง€์†์ ์œผ๋กœ ResourceManager์—๊ฒŒ ๋ณด๊ณ ํ•˜๊ณ , ResourceManager์˜ ๋ช…๋ น์„ ๋ฐ›์•„ Container ๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ด€๋ฆฌํ•œ๋‹ค.
  • ApplicationMaster ๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋‹น ํ•˜๋‚˜ ์กด์žฌํ•˜๋Š” ์ปจํ…Œ์ด๋„ˆ๋กœ, ์ƒ๋ช…์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค. ResourceManager๋ฅผ ํ†ตํ•ด ํ•„์š”ํ•œ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ํ• ๋‹น๋ฐ›๊ณ , ํ•ด๋‹น ์ปจํ…Œ์ด๋„ˆ์— task๊ฐ€ ์‹คํ–‰๋˜๋„๋ก NodeManager์— ์š”์ฒญํ•œ๋‹ค.
  • Container ๋Š” NodeManager๊ฐ€ ๊ด€๋ฆฌํ•˜๋Š” ์ž์› ํ• ๋‹น ๋‹จ์œ„๋กœ, ์‹ค์ œ task๊ฐ€ ์ปจํ…Œ์ด๋„ˆ ์•ˆ์—์„œ ์‹คํ–‰๋œ๋‹ค.

๊ตฌ์ฒด์ ์ธ ์‹คํ–‰ ํ”„๋กœ์„ธ์Šค๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  1. Spark ๋˜๋Š” MapReduce job์„ ํด๋Ÿฌ์Šคํ„ฐ์— ์ œ์ถœํ•œ๋‹ค. ํด๋ผ์ด์–ธํŠธ๋Š” ResourceManager์—๊ฒŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹คํ–‰์„ ์š”์ฒญํ•œ๋‹ค.
  2. ์š”์ฒญ์„ ๋ฐ›์€ ResourceManager๋Š” ApplicationMaster ํ”„๋กœ์„ธ์Šค๋ฅผ ์ปจํ…Œ์ด๋„ˆ ์•ˆ์—์„œ ์‹œ์ž‘์‹œํ‚จ๋‹ค.
  3. ApplicationMaster๋Š” ๊ด€๋ฆฌํ•  ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ์š”๊ตฌ์‚ฌํ•ญ์„ ํŒŒ์•…ํ•œ ํ›„ ResourceManager์—๊ฒŒ ํ•„์š”ํ•œ ์ž์›์„ ์ง์ ‘ ์š”์ฒญํ•œ๋‹ค.
  4. ResourceManager๋Š” ์š”์ฒญ๋ฐ›์€ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์—ฌ๋Ÿฌ NodeManger์—๊ฒŒ ํ• ๋‹นํ•œ๋‹ค. ApplicationMaster์—๊ฒŒ ์š”์ฒญํ•œ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์–ด๋А ๋…ธ๋“œ์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š”์ง€์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์ „๋‹ฌํ•œ๋‹ค.
  5. ์ปจํ…Œ์ด๋„ˆ ์œ„์น˜ ์ •๋ณด๋ฅผ ๋ฐ›์€ ApplicationMaster๋Š” NodeManager์™€ ์ง์ ‘ ํ†ต์‹ ํ•˜์—ฌ ์ปจํ…Œ์ด๋„ˆ์— ์‹ค์ œ task๋ฅผ ์‹คํ–‰ํ•˜๋„๋ก ์ง€์‹œํ•œ๋‹ค.
  6. ๋ชจ๋“  task๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ResourceManager์—๊ฒŒ ๋“ฑ๋ก์„ ํ•ด์ œํ•˜๊ณ , ์ž์›์„ ํด๋Ÿฌ์Šคํ„ฐ์— ๋ฐ˜๋‚ฉํ•œ๋‹ค.

PySpark ์•„ํ‚คํ…์ฒ˜

image.png

PySpark ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ํŒŒ์ด์ฌ๊ณผ ์ž๋ฐ”๊ฐ€ ์œ ๊ธฐ์ ์œผ๋กœ ๊ฒฐํ•ฉํ•˜์—ฌ ๋™์ž‘ํ•˜๋Š” ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ๊ตฌ์กฐ์ด๋‹ค. ์ž‘์—… ์ง€์‹œ๋Š” ํŒŒ์ด์ฌ, ์‹คํ–‰์€ JVM์—์„œ ์ด๋ฃจ์–ด์ง„๋‹ค. ์ด๋“ค์„ ์ง์ ‘ ์†Œํ†ตํ•  ์ˆ˜ ์—†๋Š”๋ฐ, Py4J ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ๋‘˜ ์‚ฌ์ด ๋‹ค๋ฆฌ ์—ญํ• ์„ ํ•œ๋‹ค.

๊ตฌ์ฒด์ ์ธ ์‹คํ–‰ ํ”„๋กœ์„ธ์Šค๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  1. PySpark ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‹คํ–‰ํ•˜๋ฉด ํŒŒ์ด์ฌ ์ธํ„ฐํ”„๋ฆฌํ„ฐ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‹œ์ž‘๋˜๊ณ , SparkSession ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
  2. ์ดํ›„ ํŒŒ์ด์ฌ ๋“œ๋ผ์ด๋ฒ„๋Š” ๋‚ด๋ถ€์ ์œผ๋กœ JVM์„ ์‹คํ–‰ํ•˜๊ณ  Py4J ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ํ†ตํ•ด JVM๊ณผ ์†Œ์ผ“ ํ†ต์‹ ์œผ๋กœ ์—ฐ๊ฒฐ๋œ๋‹ค.
  3. ์ดํ›„ ์ž‘์—…์€ ์ด์ „๊ณผ ๊ฐ™๋‹ค.

์ƒ์„ฑ๋œ ํƒœ์Šคํฌ๋ฅผ executor์— ์ „๋‹ฌํ•˜์—ฌ ์‹คํ–‰์„ ์ง€์‹œํ•  ๋•Œ, ์—ฐ์‚ฐ์˜ ์ข…๋ฅ˜์— ๋”ฐ๋ผ ์‹คํ–‰ ๋ฐฉ์‹์ด ๋‹ฌ๋ผ์ง„๋‹ค.

select, filter, groupBy ์™€ ๊ฐ™์ด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์ตœ์ ํ™”๋œ ๋‚ด์žฅ ํ•จ์ˆ˜๋“ค์€ ์ „๋ถ€ executor JVM์—์„œ ์Šค์นผ๋ผ ์ฝ”๋“œ๋กœ ์‹คํ–‰๋œ๋‹ค. ๋”ฐ๋ผ์„œ ํŒŒ์ด์ฌ๊ณผ JVM ๊ฐ„ ๋ฐ์ดํ„ฐ ์ด๋™์ด ์—†์–ด ๋น ๋ฅด๊ณ  ํšจ์œจ์ ์ด๋‹ค.

์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜(UDF)๋‚˜ RDD์˜ map ์—ฐ์‚ฐ์ฒ˜๋Ÿผ ํŒŒ์ด์ฌ ์›Œ์ปค๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ JVM์€ ๋ณ„๋„์˜ ํŒŒ์ด์ฌ ์›Œ์ปค ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•œ๋‹ค. executor JVM์€ ์ฒ˜๋ฆฌํ•  ๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…˜์„ ์ง๋ ฌํ™”ํ•œ ํ›„ ์†Œ์ผ“์„ ํ†ตํ•ด ํŒŒ์ด์ฌ ์›Œ์ปค๋กœ ์ „์†กํ•˜์—ฌ ์—ฐ์‚ฐ์„ ์ฒ˜๋ฆฌํ•œ ํ›„, ๋‹ค์‹œ ์ง๋ ฌํ™”ํ•˜์—ฌ JVM์œผ๋กœ ๋Œ๋ ค๋ณด๋‚ธ๋‹ค.

๐Ÿ“Œ ๊ตฌ์„ฑ ์š”์†Œ

image.png

Spark๋Š” ์—ฌ๋Ÿฌ ์ปดํฌ๋„ŒํŠธ๋“ค์ด ์ธต์„ ์ด๋ฃจ๊ณ  ์žˆ๋Š” ํ†ตํ•ฉ ๋ถ„์„ ์—”์ง„์ด๋‹ค. ๋ชจ๋“  ์ปดํฌ๋„ŒํŠธ๋“ค์€ Spark Core ๋ผ๋Š” ํ•ต์‹ฌ ์—”์ง„ ์œ„์—์„œ ๋™์ž‘ํ•œ๋‹ค. Spark Core ๋Š” ๊ธฐ๋ณธ์ ์ธ ์ž…์ถœ๋ ฅ, ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ ์žฅ์•  ๋ณต๊ตฌ์™€ ๊ฐ™์ด ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์— ํ•„์š”ํ•œ ์ €์ˆ˜์ค€ ๊ธฐ๋Šฅ์„ ๋‹ด๋‹นํ•˜๋ฉฐ, Spark์˜ ๊ธฐ๋ณธ ๋ฐ์ดํ„ฐ ๋ชจ๋ธ์ธ RDD(Resilient Distributed Dataset) API๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

Spark Core ์œ„์—์„œ ๋™์ž‘ํ•˜๋Š” ๋„ค ๊ฐ€์ง€ ๊ณ ์ˆ˜์ค€ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์•Œ์•„๋ณด์ž.

  1. Spark SQL ์€ (๋ฐ˜)์ •ํ˜• ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๊ธฐ ์œ„ํ•œ ์ปดํฌ๋„ŒํŠธ์ด๋ฉฐ, SQL ์ฟผ๋ฆฌ ๋˜๋Š” DataFrame ์„ ํ†ตํ•ด ์‰ฝ๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค. ์ž‘์„ฑ๋œ SQL ์ฟผ๋ฆฌ ๋˜๋Š” DataFrame์„ Catalyst Optimizer ๋ผ๋Š” ์ฟผ๋ฆฌ ์ตœ์ ํ™” ์—”์ง„์„ ํ†ตํ•ด ํšจ์œจ์ ์ธ RDD ์—ฐ์‚ฐ plan์œผ๋กœ ๋ณ€ํ™˜ํ•œ ํ›„ ์ฒ˜๋ฆฌํ•œ๋‹ค.

Spark์˜ DataFrame์€ RDD์— ์Šคํ‚ค๋งˆ๋ฅผ ๋ถ€์—ฌํ•œ ๊ฒƒ์ด๋‹ค.

  1. Spark Streaming ์€ Kafka ๋“ฑ์œผ๋กœ๋ถ€ํ„ฐ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ์ปดํฌ๋„ŒํŠธ์ด๋‹ค. ๋‚ด๋ถ€์ ์œผ๋กœ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์งง์€ ์‹œ๊ฐ„ ๋‹จ์œ„์˜ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋กœ ๋‚˜๋ˆˆ ํ›„ ์ฒ˜๋ฆฌํ•œ๋‹ค.
  2. MLlib ์€ ๋น…๋ฐ์ดํ„ฐ๋ฅผ ์œ„ํ•œ ๋จธ์‹ ๋Ÿฌ๋‹ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋กœ, ๋ชจ๋“  ์•Œ๊ณ ๋ฆฌ์ฆ˜์— ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋˜๋„๋ก ์„ค๊ณ„๋˜์—ˆ๋‹ค.
  3. GraphX ๋Š” ๊ทธ๋ž˜ํ”„ ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„์„ํ•˜๊ธฐ ์œ„ํ•œ ์ปดํฌ๋„ŒํŠธ๋กœ PageRank, Connected Components ์™€ ๊ฐ™์€ ๊ทธ๋ž˜ํ”„ ์•Œ๊ณ ๋ฆฌ์ฆ˜์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” API๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

๐Ÿ“Œ RDD์™€ ํŠน์ง•

RDD(Resilient Distributed Dataset) ์€ ์—ฌ๋Ÿฌ ์„œ๋ฒ„์— ๋ถ„์‚ฐ๋˜์–ด ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌ๋  ์ˆ˜ ์žˆ๋Š” ๋ถˆ๋ณ€ ๋ฐ์ดํ„ฐ ์š”์†Œ์˜ ์ปฌ๋ ‰์…˜์ด๋‹ค. ๋ถˆ๋ณ€์„ฑ์ด RDD์˜ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ํŠน์ง• ์ค‘ ํ•˜๋‚˜์ธ๋ฐ, ๊ธฐ์กด RDD๋ฅผ ํ†ตํ•ด ์ƒˆ๋กœ์šด RDD๋ฅผ ์ƒ์„ฑํ•˜๊ธฐ ์œ„ํ•ด map ์ด๋‚˜ filter ์™€ ๊ฐ™์€ ์—ฐ์‚ฐ์„ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

RDD๋Š” ํ•˜๋‚˜์˜ ๊ฑฐ๋Œ€ ๋ฐ์ดํ„ฐ ๋ฉ์–ด๋ฆฌ๊ฐ€ ์•„๋‹ˆ๋ผ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ž‘์€ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋‰˜์–ด ํด๋Ÿฌ์Šคํ„ฐ์˜ ์—ฌ๋Ÿฌ ์›Œ์ปค ๋…ธ๋“œ์— ๋ถ„์‚ฐ ์ €์žฅ๋œ๋‹ค. Spark๋Š” ๊ฐ๊ฐ์˜ ํŒŒํ‹ฐ์…˜์„ ํ•˜๋‚˜์˜ ๋…ผ๋ฆฌ์ ์ธ ๋ฐ์ดํ„ฐ์…‹์œผ๋กœ ๋‹ค๋ฃฌ๋‹ค.

Spark๋Š” RDD๊ฐ€ ์–ด๋–ค ๊ณผ์ •์„ ํ†ตํ•ด ์ƒ์„ฑ๋˜์—ˆ๋Š”์ง€์— ๋Œ€ํ•œ ์ •๋ณด(lineage)๋ฅผ ๊ธฐ์–ตํ•œ๋‹ค. ํŠน์ • ์›Œ์ปค ๋…ธ๋“œ๊ฐ€ ๋‹ค์šด๋˜์–ด ์ผ๋ถ€ ํŒŒํ‹ฐ์…˜์ด ์œ ์‹ค๋œ๋‹ค๋ฉด, ๊ธฐ์–ตํ•œ ์ •๋ณด๋ฅผ ํ†ตํ•ด ์‚ฌ๋ผ์ง„ ํŒŒํ‹ฐ์…˜์„ ๊ณ„์‚ฐํ•˜์—ฌ ๋ณต๊ตฌํ•œ๋‹ค.

Spark๋Š” transformation, action ๋‘ ๊ฐ€์ง€ ์—ฐ์‚ฐ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋Š”๋ฐ, ๋ณ€ํ™˜ ์—ฐ์‚ฐ์€ ํ˜ธ์ถœ๋˜๋Š” ์ฆ‰์‹œ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ, ์•ก์…˜ ์—ฐ์‚ฐ์ด ํ˜ธ์ถœ๋˜๋ฉด ์ •์˜๋œ ๋ชจ๋“  ๋ณ€ํ™˜ ์—ฐ์‚ฐ์ด ์‹ค์ œ๋กœ ์ˆ˜ํ–‰๋œ๋‹ค. ์ด๋ฅผ lazy evaluation ์ด๋ผ๊ณ  ํ•œ๋‹ค.

๐Ÿ“Œ pyspark์˜ ๊ธฐ๋ณธ์ ์ธ ๋ฉ”์„œ๋“œ

1
sc = pyspark.SparkContext('local[*]')

SparkContext ๋ฉ”์„œ๋“œ์˜ master ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” Spark ์ž‘์—…์„ ์–ด๋””์„œ ์‹คํ–‰ํ• ์ง€ ์ง€์ •ํ•˜๋Š” ํด๋Ÿฌ์Šคํ„ฐ URL์ด๋‹ค. local[*] ๋กœ ์„ค์ •ํ•˜๋ฉด, ํด๋Ÿฌ์Šคํ„ฐ๊ฐ€ ์•„๋‹Œ ๋กœ์ปฌ ๋จธ์‹ ์—์„œ Spark๋ฅผ ์‹คํ–‰ํ•˜๊ฒ ๋‹ค๋Š” ์˜๋ฏธ์ด๋‹ค. ๋Œ€๊ด„ํ˜ธ ์•ˆ์—๋Š” ์‚ฌ์šฉํ•  ์ฝ”์–ด์˜ ์ˆ˜๋ฅผ ์ž…๋ ฅํ•˜๋ฉฐ, asterisk๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  CPU ์ฝ”์–ด๋ฅผ ์ตœ๋Œ€ํ•œ ํ™œ์šฉํ•œ๋‹ค.

SparkContext ๋Š” ์‹ฑ๊ธ€ํ„ด ํŒจํ„ด์œผ๋กœ ์„ค๊ณ„๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ํ•˜๋‚˜์˜ Spark ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋‚ด์—๋Š” ์˜ค์ง ํ•˜๋‚˜์˜ SparkContext ๋งŒ ์กด์žฌํ•  ์ˆ˜ ์žˆ๋‹ค. ๋”ฐ๋ผ์„œ ์ƒ์„ฑ๋ค sc ๋ฅผ ๊ณ„์†ํ•ด์„œ ์žฌํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฏธ SparkContext ๊ฐ€ ์‹คํ–‰ ์ค‘์ธ ์ƒํƒœ์—์„œ SparkContext ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๋Š”๋ฐ, ์ด๋ฏธ ์ƒ์„ฑ๋œ context๋ฅผ ์•ˆ์ „ํ•˜๊ฒŒ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•ด getOrCreate ๋ฉ”์„œ๋“œ๋ฅผ ์ž์ฃผ ์‚ฌ์šฉํ•œ๋‹ค.

1
2
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

parallelize ๋ฉ”์„œ๋“œ๋Š” ๋ฆฌ์ŠคํŠธ๋‚˜ ํŠœํ”Œ๊ฐ™์€ ํŒŒ์ด์ฌ ์ปฌ๋ ‰์…˜์„ RDD๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค. parallelize(range(1000)) ๋Š” 0๋ถ€ํ„ฐ 999๊นŒ์ง€์˜ ์ˆซ์ž๊ฐ€ ์žˆ๋Š” RDD๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์ด๋“ค์„ ์—ฌ๋Ÿฌ ํŒŒํ‹ฐ์…˜์— ๋‚˜๋ˆˆ๋‹ค. ๋‘ ๋ฒˆ์งธ ์ธ์ž๋กœ ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜์ธ numSlices ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

takeSample ๋ฉ”์„œ๋“œ๋Š” RDD์—์„œ ๋ฌด์ž‘์œ„๋กœ ์ผ๋ถ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜์—ฌ ๊ฐ€์ ธ์˜ค๋Š” action ์—ฐ์‚ฐ์ด๋‹ค. ์ฒซ ๋ฒˆ์งธ ์ธ์ž์ธ withReplacement ๋Š” ๋ณต์› ์ถ”์ถœ ์—ฌ๋ถ€์ด๋ฉฐ, ๋‘ ๋ฒˆ์งธ ์ธ์ง€์•ˆ num ์€ ์ถ”์ถœํ•  ๋ฐ์ดํ„ฐ์˜ ๊ฐœ์ˆ˜๋ฅผ ์ง€์ •ํ•œ๋‹ค.

1
2
3
4
from operator import add

rdd = sc.parallelize([("a", 2), ("b", 3), ("a", 3)])
sorted(rdd.reduceByKey(add).collect())

reduceByKey(add) ๋Š” RDD์˜ ๊ฐ ํ‚ค์— ๋Œ€ํ•ด ๊ฐ’์„ ๋”ํ•˜๋Š” ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ์…”ํ”Œ๋ง ์ „ ๊ฐ ์›Œ์ปค ๋…ธ๋“œ๋Š” ์ž์‹ ์˜ ํŒŒํ‹ฐ์…˜์—์„œ ๋จผ์ € ์ง‘๊ณ„ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ์ด๋ ‡๊ฒŒ ๋ถ€๋ถ„์ ์œผ๋กœ ์ง‘๊ณ„๋œ ๊ฒฐ๊ณผ๋“ค์„ ๋™์ผํ•œ ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์…”ํ”Œ๋งํ•œ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋„คํŠธ์›Œํฌ ๋ถ€ํ•˜๋ฅผ ์ตœ์†Œํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค.

collect ๋Š” ๋ถ„์‚ฐ๋œ RDD์˜ ๋ชจ๋“  ์š”์†Œ๋ฅผ ์ˆ˜์ง‘ํ•˜์—ฌ ๋ฉ”๋ชจ๋ฆฌ๋กœ ๊ฐ€์ ธ์˜ค๋Š” action ์—ฐ์‚ฐ์ด๋‹ค.

๐Ÿ“Œ ์ฐธ๊ณ 

https://wjrmffldrhrl.github.io/spark-1/

This post is licensed under CC BY 4.0 by the author.