# 01 스파크 소개

## 1.1 스파크

### 1.1.1 빅데이터 등장

### 1.1.2 빅데이터 정의

### 1.1.3 빅데이터 솔루션

### 1.1.4 스파크

### 1.1.5 RDD(Resilient Distributed DataSet) 소개와 연산

#### 스파크 RDD

스파크가 사용하는 핵심 데이터 모델로 다수에 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합

병렬처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성(tolerance)을 가지고 있다.

Spark revolves around the concept of a resilient distributed dataset(RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.

* 파티션 단위로 나눠 병렬로 처리를 수행. 파티션 : 병렬처리 프로세스 수.

<pre>
=> 셔플링: 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는 과정.
</pre>

* 손상된 RDD를 원래 상태로 복원하기 위해 생성과정을 기억했다가 다시 복구해 주는 기능이 있음.

<pre>
=> resilient : 회복력 있는
</pre>

* RDD는 읽기 전용 모델로 생성됨. 장애 발생시 이전 RDD를 만들 때 수행한 작업을 똑같이 실행해 복구.
* 리니지 : RDD 생성 작업을 기록해 두는 것

#### RDD 생성방법

* 기존 프로그램 메모리에 생성된 데이터를 이용. 즉시 테스트 가능
* 로컬 혹은 하둡의 HDFS(Hadoop Distributed File System) 같은 외부 저장소에 저장된 데이터를 읽어 사용.

<pre>
로컬 파일시스템 file:///~
하둡 hdfs://~
</pre>

* 기존에 생성돼 있는 RDD로 또 다른 RDD를 생성하는 방법

In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

# 1. Collection 이용
rdd = sc.parallelize(["a", "b", "c", "d", "e"])

# 2. 파일로부터 생성
# rdd = sc.textFile("<path_to_file>")

# 3. 기존 RDD로 새로운 RDD 생성
rdd1 = rdd.map(lambda s: s.upper())

#### transformation 연산

어떤 RDD에 변형을 가해 새로운 RDD를 생성하는 연산.

기존 RDD는 바뀌지 않은 채 변형된 값을 가진 새로운 RDD가 생성.

연산의 수행 결과가 RDD이면 변환 메소드.

* sum, stddev : RDD를 구성하는 요소들이 모두 숫자 타입일 경우만 사용 가능.
* groupByKey : 키와 값 쌍으로 구성된 RDD에만 사용 가능.

#### action 연산

RDD가 아닌 타입의 값을 반환하면 액션에 해당하는 연산.

### 1.1.6 DAG

우지(oozie) : http://oozie.apache.org
        
여러 개의 하둡 맵리듀스 잡과 피그, 하이브 잡을 스케쥴링하고 연동할 수 있게 지원하는 도구

일련의 작업 흐름을 XML을 사용해 명시적으로 선언해 사용 가능.

DAG(directed acyclic graph)를 구성하며, 이를 이용해 일련의 작성을 수행하면서 데이터 처리를 수행.

스파크

드라이버 프로그램 : Job을 구동하는 프로그램.

SparkContext 객체를 만들고 Job을 실행하고 종료하는 역할을 수행.

드라이버는 RDD의 연산 정보를 DAG 스케쥴러에게 전달. 

스케쥴러는 실행 계획을 수립한 후 클러스터 매니저에게 전달.

넓은 의존성(Wide Dependencies) : 부모RDD를 구성하는 파티션이 여러개의 자식 RDD 파티션과 관련있는 경우. 셔플이 많이 발생.

좁은 의존성(Narrow Dependencies) : 부모RDD와 자식RDD가 1:1 관계

### 1.1.7 람다 아키텍쳐

http://goo.gl/Tkq6RJ

1. 새로운 데이터는 일괄 처리 계층과 속도 계층 모두에 전달

2. 일괄 처리 계층 : 일정 주기마다 일괄적으로 가공해서 배치 뷰를 생성. 뷰 : 결과 데이터

3. 속도 계층 : 데이터를 즉시 처리해 실시간 뷰를 생성

4. 서빙 계층 : 실시간 뷰와 배치 뷰의 결과를 적절히 조합해 사용자에게 데이터를 전달.

## 1.2 스파크 설치

### 1.2.1 스파크 실행 모드의 이해

클러스터 : 여러 대의 컴퓨터가 하나의 그룹을 형성해서 마치 하나의 컴퓨터인 것처럼 동작하는 것

실행모드 : 클러스터 환경에서 (1대의 단독서버로) 개발했던 동일한 애플리케이션을 실행할 수 있음.

### 1.2.2 사전 준비

### 1.2.3 스파크 설치

Choose a Spark release 항목에서 2.1.0 선택.

Pre-build for Hadoop 2.7 버전을 선택.

### 1.2.4 예제 실행

스파크 배포판 글자세기 예제

In [2]:
%%sh
cd ${SPARK_HOME}
./bin/run-example JavaWordCount README.md

package: 1
For: 3
Programs: 1
processing.: 1
Because: 1
The: 1
page](http://spark.apache.org/documentation.html).: 1
cluster.: 1
its: 1
[run: 1
than: 1
APIs: 1
have: 1
Try: 1
computation: 1
through: 1
several: 1
This: 2
graph: 1
Hive: 2
storage: 1
["Specifying: 1
To: 2
"yarn": 1
Once: 1
["Useful: 1
prefer: 1
SparkPi: 2
engine: 1
version: 1
file: 1
documentation,: 1
processing,: 1
the: 24
are: 1
systems.: 1
params: 1
not: 1
different: 1
refer: 2
Interactive: 2
R,: 1
given.: 1
if: 4
build: 4
when: 1
be: 2
Tests: 1
Apache: 1
thread: 1
programs,: 1
including: 4
./bin/run-example: 2
Spark.: 1
package.: 1
1000).count(): 1
Versions: 1
HDFS: 1
Data.: 1
>>>: 1
Maven: 1
programming: 1
Testing: 1
module,: 1
Streaming: 1
environment: 1
run:: 1
Developer: 1
clean: 1
1000:: 2
rich: 1
GraphX: 1
Please: 4
is: 6
guide](http://spark.apache.org/contributing.html): 1
run: 7
URL,: 1
threads.: 1
same: 1
MASTER=spark://host:7077: 1
on: 7
built: 1
against: 1
[Apache: 1
tests: 2
examples: 2
at: 2
optimized: 1


17/08/26 21:41:30 INFO SparkContext: Running Spark version 2.2.0
17/08/26 21:41:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/26 21:41:31 INFO SparkContext: Submitted application: JavaWordCount
17/08/26 21:41:31 INFO SecurityManager: Changing view acls to: donglyeolsin
17/08/26 21:41:31 INFO SecurityManager: Changing modify acls to: donglyeolsin
17/08/26 21:41:31 INFO SecurityManager: Changing view acls groups to: 
17/08/26 21:41:31 INFO SecurityManager: Changing modify acls groups to: 
17/08/26 21:41:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(donglyeolsin); groups with view permissions: Set(); users  with modify permissions: Set(donglyeolsin); groups with modify permissions: Set()
17/08/26 21:41:31 INFO Utils: Successfully started service 'sparkDriver' on port 65233.
17/08/26 21:41:31 INFO SparkEnv: Registering MapOutputTra

for가 3번 나오는지 grep으로 확인

In [3]:
%%sh
cd ${SPARK_HOME}
grep --color=always "For" README.md

[01;31m[KFor[m[K general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).
To run one of them, use `./bin/run-example <class> [params]`. [01;31m[KFor[m[K example:
package. [01;31m[KFor[m[K instance:


### 1.2.5 스파크 셸

run-example의 역할
<pre>
$ cat ${SPARK_HOME}/bin/run-example
... 중략 ...
exec "${SPARK_HOME}"/bin/spark-submit run-example "$@"
</pre>
'spark-submit'이라는 셸을 호출하고 있음.

spark-submit은 필요한 환경변수를 정의하고 다시 spark-class 셸을 실행.
<pre>
$ cat ${SPARK_HOME}/bin/spark-submit
... 중략 ...
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
</pre>

spark-class 셸은 우리가 전달한 실행 매개변수를 이용해 org.apache.spark.launcher.Main 클래스를 실행

명령문 확인을 위해 다음처럼 실습

<pre>
$ vi ~/.bash_profile
$ export SPARK_PRINT_LAUNCH_COMMAND=1
$ source ~/.bash_profile
$ ./bin/run-example JavaWordCount README.md
</pre>

결과
<pre>
Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /home/sdrlurker/apps/spark/conf/:/home/sdrlurker/apps/spark/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --jars
 /home/sdrlurker/apps/spark/examples/jars/spark-examples_2.11-2.1.0.jar,/home/sdrlurker/apps/spark/examples/jars/scopt_2.11-3.3.0.jar --class org.apache.spark.examples.JavaWordCount 
spark-internal README.md
... 후략 ...
</pre>

python용 스파크 셸 예시

In [4]:
import os
SPARK_HOME = os.getenv("SPARK_HOME")

file = sc.textFile("file://%s/README.md" % SPARK_HOME)
words = file.flatMap(lambda line : line.split(" "))
result = words.countByValue()
result["For"]

3

스파크 셸은 개발 단계 및 작업 내용에 따라 빠른 테스트나 프로토타이핑 또는 일회성 데이터를 처리하는 등에 유용하게 활용 가능.

### 1.2.6 실행 옵션

--master 옵션 : 스파크가 사용할 클러스터 마스터 정보

단일 서버에서 동작시킬 경우 "local"

여러 개의 스레드를 사용하려면 "local[스레드수]", local[*] 는 모든 스레드를 사용.

스파크 애플리케이션 설정정보 확인 방법

1. --verbose 옵션 활용

<pre>
$ ./spark-shell --master=local --verbose
</pre>

python 스파크 셸에서는 다음 명령어로 확인 가능.

In [8]:
sc.getConf().toDebugString().split()

['spark.app.id=local-1503751287905',
 'spark.app.name=pyspark-shell',
 'spark.driver.host=192.168.1.2',
 'spark.driver.port=65230',
 'spark.executor.id=driver',
 'spark.master=local[*]',
 'spark.rdd.compress=True',
 'spark.serializer.objectStreamReset=100',
 'spark.submit.deployMode=client']

2. 스파크 셸을 실행하고 다음 주소로 접속함.

http://localhost:4040/environment/

### 1.2.7 더 살펴보기

클러스터 모드에서는 여러 대의 컴퓨터에 분산되어 데이터가 저장됨.

어떤 기준으로 데이터를 분류해서 각 서버에 분배할 지 결정해야 함.

일부 서버에 장애가 발생하거나 네트워크에 문제가 발생하는 경우도 생각해야 함.

## 1.4 예제 프로젝트 설정

### 1.4.1 WordCount 예제 실행

코드 작성 -> 단위 테스트 -> 빌드 및 배포

1. Spark Context 생성

SparkContext는 애플리케이션과 스파크 클러스터와의 연결을 담당하는 객체. 

이를 통해 RDD나 accumulator 또는 broadcast 등 변수를 다룸.

2. RDD 생성

스파크에서 사용하는 기본 분산 데이터 모델. "the basic abstraction in Spark"

RDD를 생성하는 방법: 외부 데이터 소스로부터 생성. 기존 RDD에서 또 다른 RDD를 생성.

3. RDD 처리

예제에서는 process() 부분. 다양한 데이터 처리 함수로 프로그래머가 원하는 처리 수행 가능.

Java 8에서는 람다 문법 사용 가능.

4. 처리 결과 저장

테스트일 때는 "단순히 화면에 출력" 가능.

실제 서비스 시에는 "하둡 파일 시스템에 저장" 가능.

#### 테스트 코드 수행

src/test/java 폴더 아래에 com.wikibooks.spark.ch1.WordCountTest 파일을 열고

Run -> Run As -> Junit Test를 선택.

setup() 메소드에서 SparkContext를 생성.

testProcess() 메소드에서 RDD를 만들고 필요한 처리를 수행.

* input을 List로 받아 inputRDD로 만듬.

* Map 형태로 collectAsMap 메소드로 resultMap을 생성.

* 검증 작업 수행 (assertThat)

cleanup() 메소드에서 SparkContext를 종료.

http://www.nextree.co.kr/p11104/

@BeforeClass, @AfterClass annotation은 참고.

maven으로 빌드한 뒤 실행방법

<pre>
$ $SPARK_HOME/bin/spark-submit \
--class com.wikibooks.spark.ch1.WordCount \
beginning-spark-examples.jar \
local[*] \
file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/README.md \ file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/testresult
</pre>

--class : 메인함수를 가진 클래스를 지정하는 변수

Jar 파일 경로 : 이 클래스가 포함된 jar 파일 경로

클러스터 정보 : 프로그램 인자 1번째로 전달

입력 경로 : 프로그램 인자 2번째로 전달

출력 경로 : 프로그램 인자 3번째로 전달

test_result 경로에 _SUCCESS 파일과 part-로 시작하는 파일이 있으면 성공.

스칼라에는 JUnit뿐만 아니라 FlatSpec을 테스트 코드하는데 사용 가능.

FlatSpec은 BDD를 위한 것.