spark

Definition

์ž‘์„ฑํ•œ ์ฝ”๋“œ๋ฅผ Spark์—์„œ ์‹คํ–‰ํ•˜๋Š” ๋…ธ๋“œ์ด๋‹ค. [Flow๊ตฌ์„ฑ]๋…ธ๋“œ ์ค‘ [spark]๋…ธ๋“œ๋ฅผ drag & drop ํ•œ ํ›„ Property ํ•ญ๋ชฉ์„ ์ž…๋ ฅํ•œ๋‹ค. Property ํŒจ๋„์˜ [๋”๋ณด๊ธฐ+] ๋ฒ„ํŠผ์„ ๋ˆ„๋ฅด๋ฉด ์ž…๋ ฅ๊ฐ€๋Šฅํ•œ ์ „์ฒด Property ํ•ญ๋ชฉ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

Set

[setting], [scheduler], [parameter] ์„ค์ •์€ [์›Œํฌํ”Œ๋กœ์šฐ ์ƒ์„ฑ] > [์„ค์ •]์„ ์ฐธ๊ณ ํ•œ๋‹ค.

property

[Node Description] ์ž‘์„ฑ ์ค‘์ธ ๋…ธ๋“œ๋ช… ์ž…๋ ฅ

flow021

  1. prepare : ๋…ธ๋“œ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ HDFS์— ์ €์žฅํ•˜๊ธฐ ์œ„ํ•ด ํŒŒ์ผ์‚ญ์ œ/ํด๋”์ƒ์„ฑ ์ž‘์—… ์„  ์ˆ˜ํ–‰(์›Œํฌํ”Œ๋กœ์šฐ ๋ฐ˜๋ณต์‹คํ–‰ ์‹œ ํ™œ์šฉ๊ฐ€๋Šฅ)

    • delete : ๋…ธ๋“œ ์‹คํ–‰ ์ „ ์‚ญ์ œํ•  ํด๋”/ํŒŒ์ผ ๊ฒฝ๋กœ
    • mkdir : ๋…ธ๋“œ ์‹คํ–‰ ์ „ ์ƒ์„ฑํ•  ํด๋” ๊ฒฝ๋กœ
  2. file : ๋…ธ๋“œ์—์„œ ์‚ฌ์šฉํ•  ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๊ฒฝ๋กœ๋ฅผ ์„ค์ •

  3. archive : archive๊ฒฝ๋กœ๋ฅผ ์„ค์ •

  4. retry

    • max : ์‹คํ–‰ ์‹คํŒจ ์‹œ ์žฌ์‹œ๋„ ํšŸ์ˆ˜
    • period : ์žฌ์‹œ๋„ ์ฃผ๊ธฐ(๋ถ„ ๋‹จ์œ„)
  5. property : ์‹คํ–‰์— ์‚ฌ์šฉํ•  property (key, value) ์ž…๋ ฅ

  6. master : ์‚ฌ์šฉํ•  spark url ์ž…๋ ฅ(DHP์—์„œ ์ƒ์„ฑํ•œ ํด๋Ÿฌ์Šคํ„ฐ ์‚ฌ์šฉ์‹œ yarn ์ž…๋ ฅ)

  7. mode : spark ์‹คํ–‰ ๋ชจ๋“œ(client, cluster) ์ž…๋ ฅ(DHP์—์„œ ์ƒ์„ฑํ•œ ํด๋Ÿฌ์Šคํ„ฐ ์‚ฌ์šฉ์‹œ client ์ž…๋ ฅ)

    deploy-mode์„ค๋ช…
    clientdriver program์„ ๋กœ์ปฌ์—์„œ ์‹คํ–‰
    clusterdriver program์„ ์›Œ์ปค ๋ชจ์‹  ์ค‘ ํ•˜๋‚˜์—์„œ ์‹คํ–‰
  8. name : spark ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๋ช…

  9. jar : ์‹คํ–‰ํ•  python(py), jar ๋“ฑ์˜ ๊ฒฝ๋กœ ์ž…๋ ฅ

  10. sparkOpts : spark ์‹คํ–‰์‹œ ์‚ฌ์šฉํ•  ์˜ต์…˜์ •๋ณด ์ž…๋ ฅ(key, value)

    keyvalues์„ค๋ช…
    --driver-cores1spark driver์—์„œ ์‚ฌ์šฉํ•  core ์ˆ˜ ์ง€์ •
    --driver-memory1024mspark driver์—์„œ ์‚ฌ์šฉํ•  memory ์ง€์ •
    --executor-cores1executor์—์„œ ์‚ฌ์šฉํ•  core ์ˆ˜ ์ง€์ •
    --executor-memory1gexecutor์—์„œ ์‚ฌ์šฉํ•  memory ์ง€์ •
    --num-executors1์‹คํ–‰ํ•  executor ์ˆ˜ ์ง€์ •
    --queuedefaultjob์„ ์ œ์ถœํ•  yarn queue ์ด๋ฆ„ ์ง€์ •
    --confPROP=VALUE์ž„์˜์˜ spark ํ™˜๊ฒฝ์„ค์ • ์†์„ฑ๊ฐ’ ์ง€์ •

[Note] ์•„๋ž˜ ๋‚ด์šฉ์ฐธ๊ณ 
https://spark.apache.org/docs/latest/running-on-yarn.html

  1. argument : python code ์‹คํ–‰์‹œ ์ „๋‹ฌํ•  argument
  2. jobXml : jobXml ๊ฒฝ๋กœ ์ž…๋ ฅ. ์žก์— ์ „๋‹ฌํ•  ํ”„๋กœํผํ‹ฐ๋ฅผ ๋ณ„๋„ xml ๋กœ ์ž‘์„ฑํ•˜์—ฌ ์ „๋‹ฌ๊ฐ€๋Šฅ
  3. forceOK : ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๊ฐ€ ์‹คํŒจํ•ด๋„ ์ •์ƒ์œผ๋กœ ํ‘œ์‹œํ•˜๊ณ  ์ข…๋ฃŒ

Example

์ž…๋ ฅ๋ฐ›์€ ์ˆซ์ž์˜ ํ‰๊ท ์„ ๊ตฌํ•˜๋Š” basicavg.py ํŒŒ์ผ์„ spark์—์„œ ์‹คํ–‰ํ•˜๋Š” ์˜ˆ์ œ์ด๋‹ค.

"""
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.parallelize([1, 2, 3, 4])
>>> basicAvg(b)
2.5
"""
import sys
from pyspark import SparkContext
def basicAvg(nums):
"""Compute the avg"""
sumCount = nums.map(lambda x: (x, 1)).fold(
(0, 0), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
return sumCount[0] / float(sumCount[1])
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
sc = SparkContext(master, "Sum")
nums = sc.parallelize([1, 2, 3, 4])
avg = basicAvg(nums)
print avg
  1. ์›Œํฌํ”Œ๋กœ์šฐ ์‹คํ–‰ ํด๋Ÿฌ์Šคํ„ฐ์— basicavg.py ํŒŒ์ผ์„ ์ ์žฌํ•œ๋‹ค.

  2. [Flow๊ตฌ์„ฑ] > [spark] ๋…ธ๋“œ๋ฅผ drag & drop ํ•œ ํ›„ setting ํŒจ๋„์—์„œ ์‹คํ–‰ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์„ ํƒํ•œ๋‹ค.

  3. property ํŒจ๋„์—์„œ ์•„๋ž˜์™€ ๊ฐ™์ด ์ž…๋ ฅํ•œ๋‹ค.

    • 2.file ํ•ญ๋ชฉ์— hdfs๋ธŒ๋ผ์šฐ์ €์—์„œ basicavg.py์ด ์ €์žฅ๋œ ๊ฒฝ๋กœ๋ฅผ ์„ ํƒํ•œ๋‹ค.
      flow075

    • ๊ทธ ์™ธ ํ•ญ๋ชฉ์„ ์•„๋ž˜์™€ ๊ฐ™์ด ์ž…๋ ฅํ•œ๋‹ค.

      propertyvalue
      6.masteryarn
      7.modeclient
      9.jarbasicavg.py
    • 10.sparkOpts : ์œ„ ๋งค๋‰ด์–ผ ์ค‘ property ํ•ญ๋ชฉ์„ ์ฐธ๊ณ ํ•˜์—ฌ ์ž…๋ ฅํ•œ๋‹ค. ๋ณธ ์˜ˆ์ œ์—์„œ๋Š” driver-memory, conf ์˜ต์…˜์„ ์‚ฌ์šฉํ–ˆ๋‹ค.

      keyvalue
      --driver-memory2G
      --confspark.executor.extraJavaOptions=-Xms512m -Xmx1024m

      flow076