{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 01 스파크 소개\n", "\n", "## 1.1 스파크\n", "\n", "### 1.1.1 빅데이터 등장" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.2 빅데이터 정의" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.3 빅데이터 솔루션" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.4 스파크" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.5 RDD(Resilient Distributed DataSet) 소개와 연산\n", "\n", "#### 스파크 RDD\n", "\n", "스파크가 사용하는 핵심 데이터 모델로 다수에 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합\n", "\n", "병렬처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성(tolerance)을 가지고 있다.\n", "\n", "Spark revolves around the concept of a resilient distributed dataset(RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.\n", "\n", "* 파티션 단위로 나눠 병렬로 처리를 수행. 파티션 : 병렬처리 프로세스 수.\n", "\n", "
\n", "=> 셔플링: 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는 과정.\n", "\n", "\n", "* 손상된 RDD를 원래 상태로 복원하기 위해 생성과정을 기억했다가 다시 복구해 주는 기능이 있음.\n", "\n", "
\n", "=> resilient : 회복력 있는\n", "\n", "\n", "* RDD는 읽기 전용 모델로 생성됨. 장애 발생시 이전 RDD를 만들 때 수행한 작업을 똑같이 실행해 복구.\n", "* 리니지 : RDD 생성 작업을 기록해 두는 것" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### RDD 생성방법\n", "\n", "* 기존 프로그램 메모리에 생성된 데이터를 이용. 즉시 테스트 가능\n", "* 로컬 혹은 하둡의 HDFS(Hadoop Distributed File System) 같은 외부 저장소에 저장된 데이터를 읽어 사용.\n", "\n", "
\n", "로컬 파일시스템 file:///~\n", "하둡 hdfs://~\n", "\n", "\n", "* 기존에 생성돼 있는 RDD로 또 다른 RDD를 생성하는 방법" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import findspark\n", "findspark.init()\n", "import pyspark\n", "sc = pyspark.SparkContext()\n", "\n", "# 1. Collection 이용\n", "rdd = sc.parallelize([\"a\", \"b\", \"c\", \"d\", \"e\"])\n", "\n", "# 2. 파일로부터 생성\n", "# rdd = sc.textFile(\"
\n", "$ cat ${SPARK_HOME}/bin/run-example\n", "... 중략 ...\n", "exec \"${SPARK_HOME}\"/bin/spark-submit run-example \"$@\"\n", "\n", "'spark-submit'이라는 셸을 호출하고 있음.\n", "\n", "spark-submit은 필요한 환경변수를 정의하고 다시 spark-class 셸을 실행.\n", "
\n", "$ cat ${SPARK_HOME}/bin/spark-submit\n", "... 중략 ...\n", "exec \"$SPARK_HOME\"/bin/spark-class org.apache.spark.deploy.SparkSubmit \"$@\"\n", "\n", "\n", "spark-class 셸은 우리가 전달한 실행 매개변수를 이용해 org.apache.spark.launcher.Main 클래스를 실행\n", "\n", "명령문 확인을 위해 다음처럼 실습\n", "\n", "
\n", "$ vi ~/.bash_profile\n", "$ export SPARK_PRINT_LAUNCH_COMMAND=1\n", "$ source ~/.bash_profile\n", "$ ./bin/run-example JavaWordCount README.md\n", "\n", "\n", "결과\n", "
\n", "Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /home/sdrlurker/apps/spark/conf/:/home/sdrlurker/apps/spark/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --jars\n", " /home/sdrlurker/apps/spark/examples/jars/spark-examples_2.11-2.1.0.jar,/home/sdrlurker/apps/spark/examples/jars/scopt_2.11-3.3.0.jar --class org.apache.spark.examples.JavaWordCount \n", "spark-internal README.md\n", "... 후략 ...\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "python용 스파크 셸 예시" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os\n", "SPARK_HOME = os.getenv(\"SPARK_HOME\")\n", "\n", "file = sc.textFile(\"file://%s/README.md\" % SPARK_HOME)\n", "words = file.flatMap(lambda line : line.split(\" \"))\n", "result = words.countByValue()\n", "result[\"For\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "스파크 셸은 개발 단계 및 작업 내용에 따라 빠른 테스트나 프로토타이핑 또는 일회성 데이터를 처리하는 등에 유용하게 활용 가능." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2.6 실행 옵션\n", "\n", "--master 옵션 : 스파크가 사용할 클러스터 마스터 정보\n", "\n", "단일 서버에서 동작시킬 경우 \"local\"\n", "\n", "여러 개의 스레드를 사용하려면 \"local[스레드수]\", local[*] 는 모든 스레드를 사용.\n", "\n", "스파크 애플리케이션 설정정보 확인 방법\n", "\n", "1. --verbose 옵션 활용\n", "\n", "
\n", "$ ./spark-shell --master=local --verbose\n", "\n", "\n", "python 스파크 셸에서는 다음 명령어로 확인 가능." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['spark.app.id=local-1503751287905',\n", " 'spark.app.name=pyspark-shell',\n", " 'spark.driver.host=192.168.1.2',\n", " 'spark.driver.port=65230',\n", " 'spark.executor.id=driver',\n", " 'spark.master=local[*]',\n", " 'spark.rdd.compress=True',\n", " 'spark.serializer.objectStreamReset=100',\n", " 'spark.submit.deployMode=client']" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sc.getConf().toDebugString().split()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. 스파크 셸을 실행하고 다음 주소로 접속함.\n", "\n", "http://localhost:4040/environment/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2.7 더 살펴보기\n", "\n", "클러스터 모드에서는 여러 대의 컴퓨터에 분산되어 데이터가 저장됨.\n", "\n", "어떤 기준으로 데이터를 분류해서 각 서버에 분배할 지 결정해야 함.\n", "\n", "일부 서버에 장애가 발생하거나 네트워크에 문제가 발생하는 경우도 생각해야 함." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.4 예제 프로젝트 설정\n", "\n", "### 1.4.1 WordCount 예제 실행\n", "\n", "코드 작성 -> 단위 테스트 -> 빌드 및 배포\n", "\n", "1. Spark Context 생성\n", "\n", "SparkContext는 애플리케이션과 스파크 클러스터와의 연결을 담당하는 객체. \n", "\n", "이를 통해 RDD나 accumulator 또는 broadcast 등 변수를 다룸.\n", "\n", "2. RDD 생성\n", "\n", "스파크에서 사용하는 기본 분산 데이터 모델. \"the basic abstraction in Spark\"\n", "\n", "RDD를 생성하는 방법: 외부 데이터 소스로부터 생성. 기존 RDD에서 또 다른 RDD를 생성.\n", "\n", "3. RDD 처리\n", "\n", "예제에서는 process() 부분. 다양한 데이터 처리 함수로 프로그래머가 원하는 처리 수행 가능.\n", "\n", "Java 8에서는 람다 문법 사용 가능.\n", "\n", "4. 처리 결과 저장\n", "\n", "테스트일 때는 \"단순히 화면에 출력\" 가능.\n", "\n", "실제 서비스 시에는 \"하둡 파일 시스템에 저장\" 가능.\n", "\n", "#### 테스트 코드 수행\n", "\n", "src/test/java 폴더 아래에 com.wikibooks.spark.ch1.WordCountTest 파일을 열고\n", "\n", "Run -> Run As -> Junit Test를 선택.\n", "\n", "setup() 메소드에서 SparkContext를 생성.\n", "\n", "testProcess() 메소드에서 RDD를 만들고 필요한 처리를 수행.\n", "\n", "* input을 List로 받아 inputRDD로 만듬.\n", "\n", "* Map 형태로 collectAsMap 메소드로 resultMap을 생성.\n", "\n", "* 검증 작업 수행 (assertThat)\n", "\n", "cleanup() 메소드에서 SparkContext를 종료.\n", "\n", "http://www.nextree.co.kr/p11104/\n", "\n", "@BeforeClass, @AfterClass annotation은 참고." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "maven으로 빌드한 뒤 실행방법\n", "\n", "
\n", "$ $SPARK_HOME/bin/spark-submit \\\n", "--class com.wikibooks.spark.ch1.WordCount \\\n", "beginning-spark-examples.jar \\\n", "local[*] \\\n", "file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/README.md \\ file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/testresult\n", "\n", "\n", "--class : 메인함수를 가진 클래스를 지정하는 변수\n", "\n", "Jar 파일 경로 : 이 클래스가 포함된 jar 파일 경로\n", "\n", "클러스터 정보 : 프로그램 인자 1번째로 전달\n", "\n", "입력 경로 : 프로그램 인자 2번째로 전달\n", "\n", "출력 경로 : 프로그램 인자 3번째로 전달\n", "\n", "test_result 경로에 _SUCCESS 파일과 part-로 시작하는 파일이 있으면 성공." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "스칼라에는 JUnit뿐만 아니라 FlatSpec을 테스트 코드하는데 사용 가능.\n", "\n", "FlatSpec은 BDD를 위한 것." ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python [Root]", "language": "python", "name": "Python [Root]" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.2" } }, "nbformat": 4, "nbformat_minor": 0 }