pyspark

Definition

pyspark ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•˜์—ฌ ์‹คํ–‰ํ•˜๋Š” ๋…ธ๋“œ์ด๋‹ค. property ํŒจ๋„ "3.source"์— pyspark ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•œ๋‹ค. Source Editor ๋ฒ„ํŠผ("3.source" ์šฐ์ธก ์ฒซ ๋ฒˆ์งธ ๋ฒ„ํŠผ) ํด๋ฆญ ์‹œ Editor๊ฐ€ ํŒ์—…๋˜๋ฉฐ ๋ณด๋‹ค ํฐ ํ™”๋ฉด์—์„œ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ Jupyter ์‚ฌ์ดํŠธ ๋ฐ”๋กœ๊ฐ€๊ธฐ("3.source" ์šฐ์ธก ๋‘ ๋ฒˆ์งธ ๋ฒ„ํŠผ) ์„ ํƒ์‹œ Jupyter Notebook์ด ์—ด๋ฆฌ๋ฉฐ, Notebook์—์„œ interactiveํ•˜๊ฒŒ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ๋‹ค.
[Flow๊ตฌ์„ฑ]๋…ธ๋“œ ์ค‘ [pyspark]๋…ธ๋“œ๋ฅผ drag & drop ํ•œ ํ›„ Property ํ•ญ๋ชฉ์„ ์ž…๋ ฅํ•œ๋‹ค. Property ํŒจ๋„์˜ [๋”๋ณด๊ธฐ+] ๋ฒ„ํŠผ์„ ๋ˆ„๋ฅด๋ฉด ์ž…๋ ฅ๊ฐ€๋Šฅํ•œ ์ „์ฒด Property ํ•ญ๋ชฉ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

Set

[setting], [scheduler], [parameter] ์„ค์ •์€ [์›Œํฌํ”Œ๋กœ์šฐ ์ƒ์„ฑ] > [์„ค์ •]์„ ์ฐธ๊ณ ํ•œ๋‹ค.

property

[Node Description] ์ž‘์„ฑ ์ค‘์ธ ๋…ธ๋“œ๋ช… ์ž…๋ ฅ

flow015

  • Node_description : ์ž‘์„ฑ ์ค‘์ธ ๋…ธ๋“œ๋ช… ์ž…๋ ฅ
  1. version : spark ๋ฒ„์ „ ์ž…๋ ฅ(Spark2๊ฐ€ ๊ณ ์ •๊ฐ’์ž„)
  2. appName : appName ์ž…๋ ฅ
  3. source : ์‹คํ–‰ํ•  source code ์ž…๋ ฅ
  • [Source Editor]๋ฅผ ์„ ํƒํ•ด์„œ ํŒ์—…๋œ editor๋ฅผ ํ†ตํ•ด ์ฝ”๋”ฉ ๊ฐ€๋Šฅ
  • [Jupyter ๋ฐ”๋กœ๊ฐ€๊ธฐ]๋ฅผ ์„ ํƒํ•ด์„œ jupyter notebook ์‚ฌ์šฉ ๊ฐ€๋Šฅ
  1. argument : ์ธ์ˆ˜ ์„ค์ •
  2. sparkOpts : ์‹คํ–‰์— ์‚ฌ์šฉํ•  ์†์„ฑ (key, value) ์ž…๋ ฅ
  3. master : spark master ์ž…๋ ฅ ( ex. yarn )
  4. mode : mode ์ž…๋ ฅ ( ex. client )
  5. forceOK : ์‹คํŒจ ์‹œ ๊ฐ•์ œ OK ์ฒ˜๋ฆฌ ์—ฌ๋ถ€

Example

์ž…๋ ฅ๋ฐ›์€ ์ˆซ์ž์˜ ํ‰๊ท ์„ ๊ตฌํ•ด output.txtํŒŒ์ผ์„ ์ƒ์„ฑํ•˜๋Š” ์˜ˆ์ œ์ด๋‹ค.

  1. property ํŒจ๋„์˜ 3.source ์˜ต์…˜์— ์•„๋ž˜์™€ ๊ฐ™์ด ์ž…๋ ฅํ•œ๋‹ค.
"""
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.parallelize([1, 2, 3, 4])
>>> basicAvg(b)
2.5
"""
import sys
sys.stdout = open('output.txt','w')
from pyspark import SparkContext
def basicAvg(nums):
"""Compute the avg"""
sumCount = nums.map(lambda x: (x, 1)).fold(
(0, 0), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
return sumCount[0] / float(sumCount[1])
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
sc = SparkContext(master, "Sum")
nums = sc.parallelize([1, 2, 3, 4])
avg = basicAvg(nums)
print (avg)

flow071

  1. ์›Œํฌํ”Œ๋กœ์šฐ ์ €์žฅ/์‹คํ–‰์‹œ ์•„๋ž˜์™€ ๊ฐ™์ด output.txtํŒŒ์ผ์ด ์ƒ์„ฑ๋œ๋‹ค.

flow072

Troubleshooting

  1. pyspark node ์‹คํ–‰์‹œ [์‹คํ–‰ํ•  ํด๋Ÿฌ์Šคํ„ฐ์— ์ฃผํ”ผํ„ฐ ๋…ธํŠธ๋ถ์ด ์—†์Šต๋‹ˆ๋‹ค] ๋ฉ”์‹œ์ง€
  • pyspark node๋ฅผ ์‹คํ–‰ํ•˜๋Š” ํด๋Ÿฌ์Šคํ„ฐ์— jupyter-notebook ์ปจํ…Œ์ด๋„ˆ๊ฐ€ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธ
  1. ์›Œํฌํ”Œ๋กœ์šฐ [์ธ์Šคํ„ด์Šค ์ƒ์„ธ] > [ํ๋ฆ„] ํƒญ ํ•˜๋‹จ์˜ STD ๋กœ๊ทธ์— ์•„๋ž˜์™€ ๊ฐ™์ด ์—๋Ÿฌ๋ฉ”์‹œ์ง€ ๊ธฐ๋ก๋จ
  • java.io.IOException: Cannot run program "/usr/local/bin/python3.6": error=2, No such file or directory
  • run_spark2.sh PYSPARK_PYTHON ์‹คํ–‰๊ฒฝ๋กœ ๋ณ€๊ฒฝ
# export PYSPARK_PYTHON=/usr/local/bin/python3.6
export PYSPARK_PYTHON=/opt/conda/bin/python3.6