pyspark

Definition

pyspark ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•˜์—ฌ ์‹คํ–‰ํ•˜๋Š” ๋…ธ๋“œ์ž…๋‹ˆ๋‹ค. property ํŒจ๋„ "3.source"์— pyspark ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. Source Editor ๋ฒ„ํŠผ("3.source" ์šฐ์ธก ์ฒซ ๋ฒˆ์งธ ๋ฒ„ํŠผ) ํด๋ฆญ ์‹œ Editor๊ฐ€ ํŒ์—…๋˜๋ฉฐ ๋ณด๋‹ค ํฐ ํ™”๋ฉด์—์„œ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ Jupyter ์‚ฌ์ดํŠธ ๋ฐ”๋กœ๊ฐ€๊ธฐ("3.source" ์šฐ์ธก ๋‘ ๋ฒˆ์งธ ๋ฒ„ํŠผ) ์„ ํƒ์‹œ Jupyter Notebook์ด ์—ด๋ฆฌ๋ฉฐ, Notebook์—์„œ interactiveํ•˜๊ฒŒ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
์ขŒ์ธก [Flow๊ตฌ์„ฑ]๋…ธ๋“œ ์ค‘ [pyspark]๋…ธ๋“œ๋ฅผ drag & drop ํ•œ ํ›„ Property ํ•ญ๋ชฉ์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. Property ํŒจ๋„์˜ [๋”๋ณด๊ธฐ+] ๋ฒ„ํŠผ์„ ๋ˆ„๋ฅด๋ฉด ์ž…๋ ฅ๊ฐ€๋Šฅํ•œ ์ „์ฒด Property ํ•ญ๋ชฉ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Set

[setting], [scheduler], [parameter] ์„ค์ •์€ [์›Œํฌํ”Œ๋กœ์šฐ] > [์ƒ์„ฑ] > [๊ธฐ๋ณธ๊ตฌ์„ฑ]์„ ์ฐธ๊ณ ํ•ฉ๋‹ˆ๋‹ค.

property

[Node Description] ์ž‘์„ฑ ์ค‘์ธ ๋…ธ๋“œ๋ช… ์ž…๋ ฅ

pyspark01

  1. version : spark ๋ฒ„์ „ ์ž…๋ ฅ(Spark2๊ฐ€ ๊ณ ์ •๊ฐ’์ž„)
  2. appName : appName ์ž…๋ ฅ
  3. source : ์‹คํ–‰ํ•  source code ์ž…๋ ฅ
    • [Source Editor]๋ฅผ ์„ ํƒํ•ด์„œ ํŒ์—…๋œ editor๋ฅผ ํ†ตํ•ด ์ฝ”๋”ฉ ๊ฐ€๋Šฅ
  4. argument : ์ธ์ˆ˜ ์„ค์ •
  5. sparkOpts : ์‹คํ–‰์— ์‚ฌ์šฉํ•  ์†์„ฑ (key, value) ์ž…๋ ฅ
  6. forceOK : ์‹คํŒจ ์‹œ ๊ฐ•์ œ OK ์ฒ˜๋ฆฌ ์—ฌ๋ถ€

Example

์ž…๋ ฅ๋ฐ›์€ ์ˆซ์ž์˜ ํ‰๊ท ์„ ๊ตฌํ•ด output.txtํŒŒ์ผ์„ ์ƒ์„ฑํ•˜๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค.

  1. property ํŒจ๋„์˜ [3.source] ์˜ต์…˜์— ์•„๋ž˜์™€ ๊ฐ™์ด ์ž…๋ ฅ

    ```
    """
    >>> from pyspark.context import SparkContext
    >>> sc = SparkContext('local', 'test')
    >>> b = sc.parallelize([1, 2, 3, 4])
    >>> basicAvg(b)
    2.5
    """
    import sys
    sys.stdout = open('output.txt','w')
    from pyspark import SparkContext
    def basicAvg(nums):
    """Compute the avg"""
    sumCount = nums.map(lambda x: (x, 1)).fold(
    (0, 0), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
    return sumCount[0] / float(sumCount[1])
    if __name__ == "__main__":
    master = "local"
    if len(sys.argv) == 2:
    master = sys.argv[1]
    sc = SparkContext(master, "Sum")
    nums = sc.parallelize([1, 2, 3, 4])
    avg = basicAvg(nums)
    print (avg)
    ```

    pyspark02

  2. ์›Œํฌํ”Œ๋กœ์šฐ ์ €์žฅ/์‹คํ–‰ ์‹œ ์•„๋ž˜์™€ ๊ฐ™์ด output.txtํŒŒ์ผ ์ƒ์„ฑ ํ™•์ธ
    pyspark03

Troubleshooting

  1. pyspark node ์‹คํ–‰์‹œ [์‹คํ–‰ํ•  ํด๋Ÿฌ์Šคํ„ฐ์— ์ฃผํ”ผํ„ฐ ๋…ธํŠธ๋ถ์ด ์—†์Šต๋‹ˆ๋‹ค] ๋ฉ”์‹œ์ง€

    • pyspark node๋ฅผ ์‹คํ–‰ํ•˜๋Š” ํด๋Ÿฌ์Šคํ„ฐ์— jupyter-notebook ์ปจํ…Œ์ด๋„ˆ๊ฐ€ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธ
  2. ์›Œํฌํ”Œ๋กœ์šฐ [์ธ์Šคํ„ด์Šค ์ƒ์„ธ] > [ํ๋ฆ„] ํƒญ ํ•˜๋‹จ์˜ STD ๋กœ๊ทธ์— ์•„๋ž˜์™€ ๊ฐ™์ด ์—๋Ÿฌ๋ฉ”์‹œ์ง€ ๊ธฐ๋ก๋จ

    • java.io.IOException: Cannot run program "/usr/local/bin/python3.6": error=2, No such file or directory
    • run_spark2.sh PYSPARK_PYTHON ์‹คํ–‰๊ฒฝ๋กœ ๋ณ€๊ฒฝ
      # export PYSPARK_PYTHON=/usr/local/bin/python3.6
      export PYSPARK_PYTHON=/opt/conda/bin/python3.6