{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 01 스파크 소개\n", "\n", "## 1.1 스파크\n", "\n", "### 1.1.1 빅데이터 등장" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.2 빅데이터 정의" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.3 빅데이터 솔루션" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.4 스파크" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.5 RDD(Resilient Distributed DataSet) 소개와 연산\n", "\n", "#### 스파크 RDD\n", "\n", "스파크가 사용하는 핵심 데이터 모델로 다수에 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합\n", "\n", "병렬처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성(tolerance)을 가지고 있다.\n", "\n", "Spark revolves around the concept of a resilient distributed dataset(RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.\n", "\n", "* 파티션 단위로 나눠 병렬로 처리를 수행. 파티션 : 병렬처리 프로세스 수.\n", "\n", "
\n",
    "=> 셔플링: 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는 과정.\n",
    "
\n", "\n", "* 손상된 RDD를 원래 상태로 복원하기 위해 생성과정을 기억했다가 다시 복구해 주는 기능이 있음.\n", "\n", "
\n",
    "=> resilient : 회복력 있는\n",
    "
\n", "\n", "* RDD는 읽기 전용 모델로 생성됨. 장애 발생시 이전 RDD를 만들 때 수행한 작업을 똑같이 실행해 복구.\n", "* 리니지 : RDD 생성 작업을 기록해 두는 것" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### RDD 생성방법\n", "\n", "* 기존 프로그램 메모리에 생성된 데이터를 이용. 즉시 테스트 가능\n", "* 로컬 혹은 하둡의 HDFS(Hadoop Distributed File System) 같은 외부 저장소에 저장된 데이터를 읽어 사용.\n", "\n", "
\n",
    "로컬 파일시스템 file:///~\n",
    "하둡 hdfs://~\n",
    "
\n", "\n", "* 기존에 생성돼 있는 RDD로 또 다른 RDD를 생성하는 방법" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import findspark\n", "findspark.init()\n", "import pyspark\n", "sc = pyspark.SparkContext()\n", "\n", "# 1. Collection 이용\n", "rdd = sc.parallelize([\"a\", \"b\", \"c\", \"d\", \"e\"])\n", "\n", "# 2. 파일로부터 생성\n", "# rdd = sc.textFile(\"\")\n", "\n", "# 3. 기존 RDD로 새로운 RDD 생성\n", "rdd1 = rdd.map(lambda s: s.upper())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### transformation 연산\n", "\n", "어떤 RDD에 변형을 가해 새로운 RDD를 생성하는 연산.\n", "\n", "기존 RDD는 바뀌지 않은 채 변형된 값을 가진 새로운 RDD가 생성.\n", "\n", "연산의 수행 결과가 RDD이면 변환 메소드.\n", "\n", "* sum, stddev : RDD를 구성하는 요소들이 모두 숫자 타입일 경우만 사용 가능.\n", "* groupByKey : 키와 값 쌍으로 구성된 RDD에만 사용 가능.\n", "\n", "#### action 연산\n", "\n", "RDD가 아닌 타입의 값을 반환하면 액션에 해당하는 연산." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.6 DAG\n", "\n", "우지(oozie) : http://oozie.apache.org\n", " \n", "여러 개의 하둡 맵리듀스 잡과 피그, 하이브 잡을 스케쥴링하고 연동할 수 있게 지원하는 도구\n", "\n", "일련의 작업 흐름을 XML을 사용해 명시적으로 선언해 사용 가능.\n", "\n", "DAG(directed acyclic graph)를 구성하며, 이를 이용해 일련의 작성을 수행하면서 데이터 처리를 수행.\n", "\n", "스파크\n", "\n", "드라이버 프로그램 : Job을 구동하는 프로그램.\n", "\n", "SparkContext 객체를 만들고 Job을 실행하고 종료하는 역할을 수행.\n", "\n", "드라이버는 RDD의 연산 정보를 DAG 스케쥴러에게 전달. \n", "\n", "스케쥴러는 실행 계획을 수립한 후 클러스터 매니저에게 전달.\n", "\n", "넓은 의존성(Wide Dependencies) : 부모RDD를 구성하는 파티션이 여러개의 자식 RDD 파티션과 관련있는 경우. 셔플이 많이 발생.\n", "\n", "좁은 의존성(Narrow Dependencies) : 부모RDD와 자식RDD가 1:1 관계" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.7 람다 아키텍쳐\n", "\n", "http://goo.gl/Tkq6RJ\n", "\n", "1. 새로운 데이터는 일괄 처리 계층과 속도 계층 모두에 전달\n", "\n", "2. 일괄 처리 계층 : 일정 주기마다 일괄적으로 가공해서 배치 뷰를 생성. 뷰 : 결과 데이터\n", "\n", "3. 속도 계층 : 데이터를 즉시 처리해 실시간 뷰를 생성\n", "\n", "4. 서빙 계층 : 실시간 뷰와 배치 뷰의 결과를 적절히 조합해 사용자에게 데이터를 전달." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.2 스파크 설치\n", "\n", "### 1.2.1 스파크 실행 모드의 이해\n", "\n", "클러스터 : 여러 대의 컴퓨터가 하나의 그룹을 형성해서 마치 하나의 컴퓨터인 것처럼 동작하는 것\n", "\n", "실행모드 : 클러스터 환경에서 (1대의 단독서버로) 개발했던 동일한 애플리케이션을 실행할 수 있음.\n", "\n", "### 1.2.2 사전 준비\n", "\n", "### 1.2.3 스파크 설치\n", "\n", "Choose a Spark release 항목에서 2.1.0 선택.\n", "\n", "Pre-build for Hadoop 2.7 버전을 선택." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2.4 예제 실행\n", "\n", "스파크 배포판 글자세기 예제" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "package: 1\n", "For: 3\n", "Programs: 1\n", "processing.: 1\n", "Because: 1\n", "The: 1\n", "page](http://spark.apache.org/documentation.html).: 1\n", "cluster.: 1\n", "its: 1\n", "[run: 1\n", "than: 1\n", "APIs: 1\n", "have: 1\n", "Try: 1\n", "computation: 1\n", "through: 1\n", "several: 1\n", "This: 2\n", "graph: 1\n", "Hive: 2\n", "storage: 1\n", "[\"Specifying: 1\n", "To: 2\n", "\"yarn\": 1\n", "Once: 1\n", "[\"Useful: 1\n", "prefer: 1\n", "SparkPi: 2\n", "engine: 1\n", "version: 1\n", "file: 1\n", "documentation,: 1\n", "processing,: 1\n", "the: 24\n", "are: 1\n", "systems.: 1\n", "params: 1\n", "not: 1\n", "different: 1\n", "refer: 2\n", "Interactive: 2\n", "R,: 1\n", "given.: 1\n", "if: 4\n", "build: 4\n", "when: 1\n", "be: 2\n", "Tests: 1\n", "Apache: 1\n", "thread: 1\n", "programs,: 1\n", "including: 4\n", "./bin/run-example: 2\n", "Spark.: 1\n", "package.: 1\n", "1000).count(): 1\n", "Versions: 1\n", "HDFS: 1\n", "Data.: 1\n", ">>>: 1\n", "Maven: 1\n", "programming: 1\n", "Testing: 1\n", "module,: 1\n", "Streaming: 1\n", "environment: 1\n", "run:: 1\n", "Developer: 1\n", "clean: 1\n", "1000:: 2\n", "rich: 1\n", "GraphX: 1\n", "Please: 4\n", "is: 6\n", "guide](http://spark.apache.org/contributing.html): 1\n", "run: 7\n", "URL,: 1\n", "threads.: 1\n", "same: 1\n", "MASTER=spark://host:7077: 1\n", "on: 7\n", "built: 1\n", "against: 1\n", "[Apache: 1\n", "tests: 2\n", "examples: 2\n", "at: 2\n", "optimized: 1\n", "3\"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).: 1\n", "usage: 1\n", "development: 1\n", "Maven,: 1\n", "graphs: 1\n", "talk: 1\n", "Shell: 2\n", "class: 2\n", "abbreviated: 1\n", "using: 5\n", "directory.: 1\n", "README: 1\n", "computing: 1\n", "overview: 1\n", "`examples`: 2\n", "example:: 1\n", "##: 9\n", "N: 1\n", "set: 2\n", "use: 3\n", "Hadoop-supported: 1\n", "running: 1\n", "find: 1\n", "contains: 1\n", "project: 1\n", "Pi: 1\n", "need: 1\n", "or: 3\n", "Big: 1\n", "high-level: 1\n", "Java,: 1\n", "uses: 1\n", ": 1\n", "Hadoop,: 2\n", "available: 1\n", "requires: 1\n", "(You: 1\n", "more: 1\n", "see: 3\n", "Documentation: 1\n", "of: 5\n", "tools: 1\n", "using:: 1\n", "cluster: 2\n", "must: 1\n", "supports: 2\n", "built,: 1\n", "tests](http://spark.apache.org/developer-tools.html#individual-tests).: 1\n", "system: 1\n", "build/mvn: 1\n", "Hadoop: 3\n", "this: 1\n", "Version\"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version): 1\n", "particular: 2\n", "Python: 2\n", "Spark: 16\n", "general: 3\n", "YARN,: 1\n", "pre-built: 1\n", "[Configuration: 1\n", "locally: 2\n", "library: 1\n", "A: 1\n", "locally.: 1\n", "sc.parallelize(1: 1\n", "only: 1\n", "Configuration: 1\n", "following: 2\n", "basic: 1\n", "#: 1\n", "changed: 1\n", "More: 1\n", "which: 2\n", "learning,: 1\n", "first: 1\n", "./bin/pyspark: 1\n", "also: 4\n", "info: 1\n", "should: 2\n", "for: 12\n", "[params]`.: 1\n", "documentation: 3\n", "[project: 1\n", "mesos://: 1\n", "Maven](http://maven.apache.org/).: 1\n", "setup: 1\n", ": 1\n", "latest: 1\n", "your: 1\n", "MASTER: 1\n", "example: 3\n", "[\"Parallel: 1\n", "scala>: 1\n", "DataFrames,: 1\n", "provides: 1\n", "configure: 1\n", "distributions.: 1\n", "can: 7\n", "About: 1\n", "instructions.: 1\n", "do: 2\n", "easiest: 1\n", "no: 1\n", "project.: 1\n", "how: 3\n", "`./bin/run-example: 1\n", "started: 1\n", "Note: 1\n", "by: 1\n", "individual: 1\n", "spark://: 1\n", "It: 2\n", "tips,: 1\n", "Scala: 2\n", "Alternatively,: 1\n", "an: 4\n", "variable: 1\n", "submit: 1\n", "-T: 1\n", "machine: 1\n", "thread,: 1\n", "them,: 1\n", "detailed: 2\n", "stream: 1\n", "And: 1\n", "distribution: 1\n", "review: 1\n", "return: 2\n", "Thriftserver: 1\n", "developing: 1\n", "./bin/spark-shell: 1\n", "\"local\": 1\n", "start: 1\n", "You: 4\n", "Spark](#building-spark).: 1\n", "one: 3\n", "help: 1\n", "with: 4\n", "print: 1\n", "Spark\"](http://spark.apache.org/docs/latest/building-spark.html).: 1\n", "data: 1\n", "Contributing: 1\n", "in: 6\n", "-DskipTests: 1\n", "downloaded: 1\n", "versions: 1\n", "online: 1\n", "Guide](http://spark.apache.org/docs/latest/configuration.html): 1\n", "builds: 1\n", "comes: 1\n", "Tools\"](http://spark.apache.org/developer-tools.html).: 1\n", "[building: 1\n", "Python,: 2\n", "Many: 1\n", "building: 2\n", "Running: 1\n", "from: 1\n", "way: 1\n", "Online: 1\n", "site,: 1\n", "other: 1\n", "Example: 1\n", "[Contribution: 1\n", "analysis.: 1\n", "sc.parallelize(range(1000)).count(): 1\n", "you: 4\n", "runs.: 1\n", "Building: 1\n", "higher-level: 1\n", "protocols: 1\n", "guidance: 2\n", "a: 8\n", "guide,: 1\n", "name: 1\n", "fast: 1\n", "SQL: 2\n", "that: 2\n", "will: 1\n", "IDE,: 1\n", "to: 17\n", "get: 1\n", ": 71\n", "information: 1\n", "core: 1\n", "web: 1\n", "\"local[N]\": 1\n", "programs: 2\n", "option: 1\n", "MLlib: 1\n", "[\"Building: 1\n", "contributing: 1\n", "shell:: 2\n", "instance:: 1\n", "Scala,: 1\n", "and: 9\n", "command,: 2\n", "package.): 1\n", "./dev/run-tests: 1\n", "sample: 1\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "17/08/26 21:41:30 INFO SparkContext: Running Spark version 2.2.0\n", "17/08/26 21:41:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "17/08/26 21:41:31 INFO SparkContext: Submitted application: JavaWordCount\n", "17/08/26 21:41:31 INFO SecurityManager: Changing view acls to: donglyeolsin\n", "17/08/26 21:41:31 INFO SecurityManager: Changing modify acls to: donglyeolsin\n", "17/08/26 21:41:31 INFO SecurityManager: Changing view acls groups to: \n", "17/08/26 21:41:31 INFO SecurityManager: Changing modify acls groups to: \n", "17/08/26 21:41:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(donglyeolsin); groups with view permissions: Set(); users with modify permissions: Set(donglyeolsin); groups with modify permissions: Set()\n", "17/08/26 21:41:31 INFO Utils: Successfully started service 'sparkDriver' on port 65233.\n", "17/08/26 21:41:31 INFO SparkEnv: Registering MapOutputTracker\n", "17/08/26 21:41:31 INFO SparkEnv: Registering BlockManagerMaster\n", "17/08/26 21:41:31 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information\n", "17/08/26 21:41:31 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up\n", "17/08/26 21:41:31 INFO DiskBlockManager: Created local directory at /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/blockmgr-ea10087e-bf83-4c7a-81fd-415b57b656b7\n", "17/08/26 21:41:32 INFO MemoryStore: MemoryStore started with capacity 366.3 MB\n", "17/08/26 21:41:32 INFO SparkEnv: Registering OutputCommitCoordinator\n", "17/08/26 21:41:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n", "17/08/26 21:41:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n", "17/08/26 21:41:32 INFO Utils: Successfully started service 'SparkUI' on port 4042.\n", "17/08/26 21:41:32 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.2:4042\n", "17/08/26 21:41:32 INFO SparkContext: Added JAR file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/examples/jars/scopt_2.11-3.3.0.jar at spark://192.168.1.2:65233/jars/scopt_2.11-3.3.0.jar with timestamp 1503751292886\n", "17/08/26 21:41:32 INFO SparkContext: Added JAR file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar at spark://192.168.1.2:65233/jars/spark-examples_2.11-2.2.0.jar with timestamp 1503751292887\n", "17/08/26 21:41:33 INFO Executor: Starting executor ID driver on host localhost\n", "17/08/26 21:41:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 65250.\n", "17/08/26 21:41:33 INFO NettyBlockTransferService: Server created on 192.168.1.2:65250\n", "17/08/26 21:41:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy\n", "17/08/26 21:41:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.2, 65250, None)\n", "17/08/26 21:41:33 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.2:65250 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.2, 65250, None)\n", "17/08/26 21:41:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.2, 65250, None)\n", "17/08/26 21:41:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.2, 65250, None)\n", "17/08/26 21:41:33 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/spark-warehouse/').\n", "17/08/26 21:41:33 INFO SharedState: Warehouse path is 'file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/spark-warehouse/'.\n", "17/08/26 21:41:35 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint\n", "17/08/26 21:41:38 INFO FileSourceStrategy: Pruning directories with: \n", "17/08/26 21:41:38 INFO FileSourceStrategy: Post-Scan Filters: \n", "17/08/26 21:41:38 INFO FileSourceStrategy: Output Data Schema: struct\n", "17/08/26 21:41:38 INFO FileSourceScanExec: Pushed Filters: \n", "17/08/26 21:41:38 INFO CodeGenerator: Code generated in 422.378204 ms\n", "17/08/26 21:41:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 277.3 KB, free 366.0 MB)\n", "17/08/26 21:41:39 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.4 KB, free 366.0 MB)\n", "17/08/26 21:41:39 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.2:65250 (size: 23.4 KB, free: 366.3 MB)\n", "17/08/26 21:41:39 INFO SparkContext: Created broadcast 0 from javaRDD at JavaWordCount.java:45\n", "17/08/26 21:41:39 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.\n", "17/08/26 21:41:39 INFO SparkContext: Starting job: collect at JavaWordCount.java:53\n", "17/08/26 21:41:39 INFO DAGScheduler: Registering RDD 5 (mapToPair at JavaWordCount.java:49)\n", "17/08/26 21:41:39 INFO DAGScheduler: Got job 0 (collect at JavaWordCount.java:53) with 1 output partitions\n", "17/08/26 21:41:39 INFO DAGScheduler: Final stage: ResultStage 1 (collect at JavaWordCount.java:53)\n", "17/08/26 21:41:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)\n", "17/08/26 21:41:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)\n", "17/08/26 21:41:39 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at mapToPair at JavaWordCount.java:49), which has no missing parents\n", "17/08/26 21:41:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 12.5 KB, free 366.0 MB)\n", "17/08/26 21:41:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.5 KB, free 366.0 MB)\n", "17/08/26 21:41:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.2:65250 (size: 6.5 KB, free: 366.3 MB)\n", "17/08/26 21:41:39 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006\n", "17/08/26 21:41:39 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at mapToPair at JavaWordCount.java:49) (first 15 tasks are for partitions Vector(0))\n", "17/08/26 21:41:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks\n", "17/08/26 21:41:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5288 bytes)\n", "17/08/26 21:41:39 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)\n", "17/08/26 21:41:39 INFO Executor: Fetching spark://192.168.1.2:65233/jars/scopt_2.11-3.3.0.jar with timestamp 1503751292886\n", "17/08/26 21:41:39 INFO TransportClientFactory: Successfully created connection to /192.168.1.2:65233 after 48 ms (0 ms spent in bootstraps)\n", "17/08/26 21:41:39 INFO Utils: Fetching spark://192.168.1.2:65233/jars/scopt_2.11-3.3.0.jar to /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/fetchFileTemp2991879884693235060.tmp\n", "17/08/26 21:41:39 INFO Executor: Adding file:/private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/scopt_2.11-3.3.0.jar to class loader\n", "17/08/26 21:41:39 INFO Executor: Fetching spark://192.168.1.2:65233/jars/spark-examples_2.11-2.2.0.jar with timestamp 1503751292887\n", "17/08/26 21:41:39 INFO Utils: Fetching spark://192.168.1.2:65233/jars/spark-examples_2.11-2.2.0.jar to /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/fetchFileTemp8238211247047707553.tmp\n", "17/08/26 21:41:39 INFO Executor: Adding file:/private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/spark-examples_2.11-2.2.0.jar to class loader\n", "17/08/26 21:41:40 INFO CodeGenerator: Code generated in 20.518002 ms\n", "17/08/26 21:41:40 INFO FileScanRDD: Reading File path: file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/README.md, range: 0-3809, partition values: [empty row]\n", "17/08/26 21:41:40 INFO CodeGenerator: Code generated in 16.808836 ms\n", "17/08/26 21:41:40 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1795 bytes result sent to driver\n", "17/08/26 21:41:40 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 995 ms on localhost (executor driver) (1/1)\n", "17/08/26 21:41:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool \n", "17/08/26 21:41:40 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at JavaWordCount.java:49) finished in 1.024 s\n", "17/08/26 21:41:40 INFO DAGScheduler: looking for newly runnable stages\n", "17/08/26 21:41:40 INFO DAGScheduler: running: Set()\n", "17/08/26 21:41:40 INFO DAGScheduler: waiting: Set(ResultStage 1)\n", "17/08/26 21:41:40 INFO DAGScheduler: failed: Set()\n", "17/08/26 21:41:40 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[6] at reduceByKey at JavaWordCount.java:51), which has no missing parents\n", "17/08/26 21:41:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.7 KB, free 366.0 MB)\n", "17/08/26 21:41:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.1 KB, free 366.0 MB)\n", "17/08/26 21:41:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.2:65250 (size: 2.1 KB, free: 366.3 MB)\n", "17/08/26 21:41:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006\n", "17/08/26 21:41:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[6] at reduceByKey at JavaWordCount.java:51) (first 15 tasks are for partitions Vector(0))\n", "17/08/26 21:41:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks\n", "17/08/26 21:41:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 4621 bytes)\n", "17/08/26 21:41:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)\n", "17/08/26 21:41:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks\n", "17/08/26 21:41:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms\n", "17/08/26 21:41:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 7903 bytes result sent to driver\n", "17/08/26 21:41:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 112 ms on localhost (executor driver) (1/1)\n", "17/08/26 21:41:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool \n", "17/08/26 21:41:40 INFO DAGScheduler: ResultStage 1 (collect at JavaWordCount.java:53) finished in 0.114 s\n", "17/08/26 21:41:40 INFO DAGScheduler: Job 0 finished: collect at JavaWordCount.java:53, took 1.373619 s\n", "17/08/26 21:41:40 INFO SparkUI: Stopped Spark web UI at http://192.168.1.2:4042\n", "17/08/26 21:41:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!\n", "17/08/26 21:41:40 INFO MemoryStore: MemoryStore cleared\n", "17/08/26 21:41:40 INFO BlockManager: BlockManager stopped\n", "17/08/26 21:41:40 INFO BlockManagerMaster: BlockManagerMaster stopped\n", "17/08/26 21:41:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!\n", "17/08/26 21:41:40 INFO SparkContext: Successfully stopped SparkContext\n", "17/08/26 21:41:40 INFO ShutdownHookManager: Shutdown hook called\n", "17/08/26 21:41:40 INFO ShutdownHookManager: Deleting directory /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901\n" ] } ], "source": [ "%%sh\n", "cd ${SPARK_HOME}\n", "./bin/run-example JavaWordCount README.md" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "for가 3번 나오는지 grep으로 확인" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[01;31m\u001b[KFor\u001b[m\u001b[K general development tips, including info on developing Spark using an IDE, see [\"Useful Developer Tools\"](http://spark.apache.org/developer-tools.html).\n", "To run one of them, use `./bin/run-example [params]`. \u001b[01;31m\u001b[KFor\u001b[m\u001b[K example:\n", "package. \u001b[01;31m\u001b[KFor\u001b[m\u001b[K instance:\n" ] } ], "source": [ "%%sh\n", "cd ${SPARK_HOME}\n", "grep --color=always \"For\" README.md" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2.5 스파크 셸\n", "\n", "run-example의 역할\n", "
\n",
    "$ cat ${SPARK_HOME}/bin/run-example\n",
    "... 중략 ...\n",
    "exec \"${SPARK_HOME}\"/bin/spark-submit run-example \"$@\"\n",
    "
\n", "'spark-submit'이라는 셸을 호출하고 있음.\n", "\n", "spark-submit은 필요한 환경변수를 정의하고 다시 spark-class 셸을 실행.\n", "
\n",
    "$ cat ${SPARK_HOME}/bin/spark-submit\n",
    "... 중략 ...\n",
    "exec \"$SPARK_HOME\"/bin/spark-class org.apache.spark.deploy.SparkSubmit \"$@\"\n",
    "
\n", "\n", "spark-class 셸은 우리가 전달한 실행 매개변수를 이용해 org.apache.spark.launcher.Main 클래스를 실행\n", "\n", "명령문 확인을 위해 다음처럼 실습\n", "\n", "
\n",
    "$ vi ~/.bash_profile\n",
    "$ export SPARK_PRINT_LAUNCH_COMMAND=1\n",
    "$ source ~/.bash_profile\n",
    "$ ./bin/run-example JavaWordCount README.md\n",
    "
\n", "\n", "결과\n", "
\n",
    "Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /home/sdrlurker/apps/spark/conf/:/home/sdrlurker/apps/spark/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --jars\n",
    " /home/sdrlurker/apps/spark/examples/jars/spark-examples_2.11-2.1.0.jar,/home/sdrlurker/apps/spark/examples/jars/scopt_2.11-3.3.0.jar --class org.apache.spark.examples.JavaWordCount \n",
    "spark-internal README.md\n",
    "... 후략 ...\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "python용 스파크 셸 예시" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os\n", "SPARK_HOME = os.getenv(\"SPARK_HOME\")\n", "\n", "file = sc.textFile(\"file://%s/README.md\" % SPARK_HOME)\n", "words = file.flatMap(lambda line : line.split(\" \"))\n", "result = words.countByValue()\n", "result[\"For\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "스파크 셸은 개발 단계 및 작업 내용에 따라 빠른 테스트나 프로토타이핑 또는 일회성 데이터를 처리하는 등에 유용하게 활용 가능." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2.6 실행 옵션\n", "\n", "--master 옵션 : 스파크가 사용할 클러스터 마스터 정보\n", "\n", "단일 서버에서 동작시킬 경우 \"local\"\n", "\n", "여러 개의 스레드를 사용하려면 \"local[스레드수]\", local[*] 는 모든 스레드를 사용.\n", "\n", "스파크 애플리케이션 설정정보 확인 방법\n", "\n", "1. --verbose 옵션 활용\n", "\n", "
\n",
    "$ ./spark-shell --master=local --verbose\n",
    "
\n", "\n", "python 스파크 셸에서는 다음 명령어로 확인 가능." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['spark.app.id=local-1503751287905',\n", " 'spark.app.name=pyspark-shell',\n", " 'spark.driver.host=192.168.1.2',\n", " 'spark.driver.port=65230',\n", " 'spark.executor.id=driver',\n", " 'spark.master=local[*]',\n", " 'spark.rdd.compress=True',\n", " 'spark.serializer.objectStreamReset=100',\n", " 'spark.submit.deployMode=client']" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sc.getConf().toDebugString().split()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. 스파크 셸을 실행하고 다음 주소로 접속함.\n", "\n", "http://localhost:4040/environment/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2.7 더 살펴보기\n", "\n", "클러스터 모드에서는 여러 대의 컴퓨터에 분산되어 데이터가 저장됨.\n", "\n", "어떤 기준으로 데이터를 분류해서 각 서버에 분배할 지 결정해야 함.\n", "\n", "일부 서버에 장애가 발생하거나 네트워크에 문제가 발생하는 경우도 생각해야 함." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.4 예제 프로젝트 설정\n", "\n", "### 1.4.1 WordCount 예제 실행\n", "\n", "코드 작성 -> 단위 테스트 -> 빌드 및 배포\n", "\n", "1. Spark Context 생성\n", "\n", "SparkContext는 애플리케이션과 스파크 클러스터와의 연결을 담당하는 객체. \n", "\n", "이를 통해 RDD나 accumulator 또는 broadcast 등 변수를 다룸.\n", "\n", "2. RDD 생성\n", "\n", "스파크에서 사용하는 기본 분산 데이터 모델. \"the basic abstraction in Spark\"\n", "\n", "RDD를 생성하는 방법: 외부 데이터 소스로부터 생성. 기존 RDD에서 또 다른 RDD를 생성.\n", "\n", "3. RDD 처리\n", "\n", "예제에서는 process() 부분. 다양한 데이터 처리 함수로 프로그래머가 원하는 처리 수행 가능.\n", "\n", "Java 8에서는 람다 문법 사용 가능.\n", "\n", "4. 처리 결과 저장\n", "\n", "테스트일 때는 \"단순히 화면에 출력\" 가능.\n", "\n", "실제 서비스 시에는 \"하둡 파일 시스템에 저장\" 가능.\n", "\n", "#### 테스트 코드 수행\n", "\n", "src/test/java 폴더 아래에 com.wikibooks.spark.ch1.WordCountTest 파일을 열고\n", "\n", "Run -> Run As -> Junit Test를 선택.\n", "\n", "setup() 메소드에서 SparkContext를 생성.\n", "\n", "testProcess() 메소드에서 RDD를 만들고 필요한 처리를 수행.\n", "\n", "* input을 List로 받아 inputRDD로 만듬.\n", "\n", "* Map 형태로 collectAsMap 메소드로 resultMap을 생성.\n", "\n", "* 검증 작업 수행 (assertThat)\n", "\n", "cleanup() 메소드에서 SparkContext를 종료.\n", "\n", "http://www.nextree.co.kr/p11104/\n", "\n", "@BeforeClass, @AfterClass annotation은 참고." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "maven으로 빌드한 뒤 실행방법\n", "\n", "
\n",
    "$ $SPARK_HOME/bin/spark-submit \\\n",
    "--class com.wikibooks.spark.ch1.WordCount \\\n",
    "beginning-spark-examples.jar \\\n",
    "local[*] \\\n",
    "file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/README.md \\ file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/testresult\n",
    "
\n", "\n", "--class : 메인함수를 가진 클래스를 지정하는 변수\n", "\n", "Jar 파일 경로 : 이 클래스가 포함된 jar 파일 경로\n", "\n", "클러스터 정보 : 프로그램 인자 1번째로 전달\n", "\n", "입력 경로 : 프로그램 인자 2번째로 전달\n", "\n", "출력 경로 : 프로그램 인자 3번째로 전달\n", "\n", "test_result 경로에 _SUCCESS 파일과 part-로 시작하는 파일이 있으면 성공." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "스칼라에는 JUnit뿐만 아니라 FlatSpec을 테스트 코드하는데 사용 가능.\n", "\n", "FlatSpec은 BDD를 위한 것." ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python [Root]", "language": "python", "name": "Python [Root]" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.2" } }, "nbformat": 4, "nbformat_minor": 0 }