Quick View
Spark의 기본에 대해서 정리해 보자.
RDD
- RDD는 분산 자료구조이다 ≈ 일단 DB 테이블로 생각하자
- Mutable 특성 ≈ 테이블인데 append만 되는 read-only table이라고 생각하자
- Lazy execution 특성
- RDD "Actions"은 operator이며 결과로 새로운 RDD들을 만들어 낸다.
- RDD actions 체인을 거쳐서 최종 RDD를 설계한 다음,
- "Transformations"이 실제로 RDD의 actions들을 실행시킨다.
- DAG (Directed Acyclic Graph)
- RDD의 actions / transformation은 결국 DAG를 설계하는 것이다.
- 작업 단위
- 크기 순: Job > Stage > Task
- Stage: shuffle이 발생하기 전까지 하나의 executor에 의해서 실행되는 구간
- Task: map(), filter()와 같은 단위 연산
SparkSQL
- RDD에 DB table처럼 schema를 부여하여 SQL 질의를 가능하게 할 수 있다. 이러한 RDD를 "DataFrames"라고 부른다.
Spark Streaming
- Data Source (Kafka/S3/Kinesis/…) → Spark Streaming → Data Destination (Cassandra/HDFS/…)
- DStream (Discretized Stream)
- Stream block을 5초 동안 모아서 하나의 RDD로 만들어 낸 것이다. 결국 Spark Stream도 RDD 단위로 처리하는 셈
Spark Installation
Installing PySpark on Windows
설치 가이드를 따라 진행하다가 막히는 부분들을 여기저기 찾으며 정리한 것이다. Anaconda는 미리 설치되어 있다고 가정한다. Spark는 HDFS 파일시스템 위에서 동작하기 때문에 windows Hadoop을 설치해야 한다. 아래 미리 Windows용으로 빌드된 Hadoop을 설치한다.
예를 들어, 압축을 풀고 "C:\app\hadoop"으로 이동시킨 후 환경변수를 설정한다. 현재 자바 최신 버전인 JDK 11은 아직 지원되지 않고 문제가 발생하므로 JDK 8 최신 버전을 설정한다.
JAVA_HOME=C:\app\Java\8\jdk
HADOOP_HOME=C:\app\hadoop
ANACONDA_HOME=C:\app\anaconda
PYTHONPATH=%ANACONDA_HOME%
PYSPARK_PYTHON=%PYTHONPATH%\python.exe
PYTHONIOENCODING=UTF-8
PATH=%PATH%;%ANACONDA_HOME%;%ANACONDA_HOME%\Library\bin;
%HADOOP_HOME%\bin;
Anaconda prompt를 열고 환경변수가 잘 설정되었는지 확인하고 pyspark를 설치한다. 현재 최신 PySpark 버전이 2.4.0이지만 dataframe 생성시 exception이 발생하여 2.3.2로 설치한다.
echo %JAVA_HOME%
echo %HADOOP_HOME%
...
conda install pyspark=2.3.2
IDE
Jupyter Notebook
Anaconda prompt에서 아래 명령어를 실행시키면 데몬이 시작되면서 기본 browser가 자동으로 열린다.
(base) C:\Users\brad>jupyter notebook [I 22:47:18.529 NotebookApp] JupyterLab extension loaded from C:\app\anaconda\lib\site-packages\jupyterlab [I 22:47:18.530 NotebookApp] JupyterLab application directory is C:\app\anaconda\share\jupyter\lab [I 22:47:18.532 NotebookApp] Serving notebooks from local directory: C:\Users\brad [I 22:47:18.532 NotebookApp] The Jupyter Notebook is running at: [I 22:47:18.532 NotebookApp] http://localhost:8888/?token=6d3d60c01b9715eb8d3a94fd862f22eefab07192fe7aa1a0
New > Python 3로 새로운 edit 탭을 시작한다.
Syntax highlight와 괄호 닫기 자동 입력을 지원해준다. Shift+Enter로 작성한 상자 안에 코드를 수행해볼 수 있다. 작성 중간 중간에 File > Save를 해준다. 현재 directory 안에 .ipynb 화일로 저장된다.
PyCharm
편리한 편집 기능을 제공하는 PyCharm이다. 단축키와 스타일이 IntelliJ와 유사하다.
Troubleshooting
Numpy update 후에 PyCharm에서 다음과 같은 오류가 발생한다
"DLL Load Failed: The specified procedure could not be found."
Windows 환경 변수 PATH에 다음을 추가하고 PyCharm을 restart해보자.
%ANACONDA_HOME%\Library\bin
Anaconda Installation
Windows 10에 Anaconda를 설치하는 방법을 정리한다. 개인적으로 경로 상에 공백 문자가 없는 것을 선호하기 때문에 C:\app\anaconda 디렉토리 아래에 설치해 보도록 하겠다.
Downloading Anacond
아래 URL로 가서 최신 Anaconda 설치화일을 다운로드한다.
https://www.anaconda.com/download/
현재 최신 버전은, https://repo.continuum.io/archive/Anaconda3-2018.12-Windows-x86_64.exe 이다.
설치 창에서 경로를 "C:\app\anaconda"로 수정하고 설치를 완료한다.
Setting Environment Variables
Windows 10의 시스템 환경변수에 다음과 같이 추가한다.
ANACONDA_HOME=C:\app\anaconda
PYTHONPATH=%ANACONDA_HOME%
PYTHONIOENCODING=UTF-8
PATH=%PATH%;%ANACONDA_HOME%;%ANACONDA_HOME%\Library\bin;
Adding Mirror Site
Anaconda 설치 후 새로 생긴 Anaconda Prompt를 실행한다.
기본적으로 Anaconda는 아래 채널을 갖고 있는데 속도가 느려서 중국의 미러 채널을 추가해 보도록하겠다.
default_channels:
https://repo.anaconda.com/pkgs/main
https://repo.anaconda.com/pkgs/free
https://repo.anaconda.com/pkgs/r
https://repo.anaconda.com/pkgs/pro
https://repo.anaconda.com/pkgs/msys2
다음 URL을 참조하여 채널을 추가한다.
https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --set show_channel_urls yes
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/
테스트 삼아 numpy를 설치해본다.
conda install numpy
Updating Conda Package
설치되어 있는 Anaconda package들을 살펴보자.
conda list
Package들을 update해본다.
conda update --all
캐쉬된 잔여물들을 삭제한다.
conda clean --all
Troubleshooting
Unknown Encoding
만일 conda update 중 아래와 같은 메시지를 만나게 된다면 Windows 10에서 기본 문자셋을 UTF-8로 설정했기 때문일 것이다. 환경변수 "PYTHONIOENCODING=UTF8"으로 설정했음에도 불구하고 6500이라는 숫자로 변경되었다.
Preparing transaction: done
Verifying transaction: done
Executing transaction: done
Fatal Python error: init_sys_streams: can't initialize sys standard streams
LookupError: unknown encoding: 6500
Anaconda Prompt를 닫고 "C:\app\anaconda\Scripts\activate.bat" 끝에 아래 라인을 강제로 추가한다.
set PYTHONIOENCODING=UTF-8
다시 Anaconda Prompt를 열고 encoding을 확인한다.
(base) C:\Users\brad>echo %PYTHONIOENCODING%
UTF-8
(base) C:\Users\brad>conda update conda
Solving environment: /
Spark SQL
긴 설명은 생략하고 직관적으로 이해할 수 있도록 Python 샘플 코드를 통해서 레퍼런스처럼 참조할 수 있도록 정리한다. Ctrl+F로 검색하기 쉽도록 외래어는 가능하면 알파벳으로 표시할 것이다.
Spark SQL Session
Spark에서 DataFrame을 생성하고 SQL을 수행하기 위해서 pyspark.sql.SparkSession
인스턴스를 만든다.
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("MyApp").getOrCreate()
File I/O
대부분의 경우 대량의 입력 데이터를 수기로 입력할 수는 없고 다른 data source로부터 받아야 한다. 가장 기본이 되는 방식으로 하나의 File을 읽어서 DataFrame으로 변환해 보자.
테스트를 위하여 다음 링크에서 붓꽃 데이터를 다운로해서 저장한다.
https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
아래 예제는 Pyspark v2.4.0 기준이고, 이하 모든 예제는 사용 버전에 따라 코드가 달라질 수 있다.
# spark = SparkSession.builder.getOrCreate() irisDf = spark.read.csv('data/iris.data', sep=",", header="false", inferSchema="true")
- sep: 구분자. 기본은 콤마(,)
- header: csv에 헤더가 있는지 여부
- inferSchema: 컬럼 내용을 보고 data type을 유추해서 결정해 준다. Default인 "false"로 하면 string type으로 인식한다.
DataFrames
방금 CSV file을 읽어서 dataframe 한 개로 전환하였다. Dataframe의 정확한 정의는 따로 있겠지만 이해하기 쉽도록 schema를 가진 RDD라고 생각하자. DB table처럼 schema가 있기 때문에 SQL로 조작이 가능해진다. 이제 dataframe에 대한 기초 조작법을 정리해 보자.
Schema
>>> df.columns ['_c0', '_c1', '_c2', '_c3', '_c4'] >>> df.dtypes [('_c0', 'double'), ('_c1', 'double'), ('_c2', 'double'), ('_c3', 'double'), ('_c4', 'string')] >>> df.printSchema() root |-- _c0: double (nullable = true) |-- _c1: double (nullable = true) |-- _c2: double (nullable = true) |-- _c3: double (nullable = true) |-- _c4: string (nullable = true)
DataFrame을 화면에 출력하는 기능이다. 앞에 chain이 얼마나 길든 상관없이 최종 결과 DataFrame을 화면에 출력한다.
# Print a DataFrame in table format. All the belows print the same table. irisDf.show(3) irisDf.select('*').show(3) irisDf.limit(3).show(3) # Display +---+---+---+---+-----------+ |_c0|_c1|_c2|_c3| _c4| +---+---+---+---+-----------+ |5.1|3.5|1.4|0.2|Iris-setosa| |4.9|3.0|1.4|0.2|Iris-setosa| |4.7|3.2|1.3|0.2|Iris-setosa| +---+---+---+---+-----------+
Query
First N
전체 row 또는 앞 n 개 row를 질의할 수 있는 몇가지 방법들을 제공한다. 테스트 결과 아래 구문들은 모두 동일한 Row 객체 리스트를 반환한다.
# Whatever methods below return the same list of Row object. irisDf.head(3) irisDf.take(3) irisDf.collect()[:3] irisDf.limit(3).collect() irisDf.select('*').limit(3).collect() # Result [Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa'), Row(_c0=4.9, _c1=3.0, _c2=1.4, _c3=0.2, _c4='Iris-setosa'), Row(_c0=4.7, _c1=3.2, _c2=1.3, _c3=0.2, _c4='Iris-setosa')]
- take(n): 앞의 n개 row를 fetch
- show(n): 앞의 n개 row를 print
API Queries
SQL과 유사한 API로 질의한다.
>>> df.select('_c4').filter('_c0 > 5.0 and _c2 < 1.5').show(3) +-----------+ | _c4| +-----------+ |Iris-setosa| |Iris-setosa| |Iris-setosa| +-----------+ >>> df.groupby('_c4').agg({'_c1':'max', '_c2':'count'}).show() +---------------+--------+----------+ | _c4|max(_c1)|count(_c2)| +---------------+--------+----------+ | Iris-virginica| 3.8| 50| | Iris-setosa| 4.4| 50| |Iris-versicolor| 3.4| 50| +---------------+--------+----------+
- filter(): SQL의 WHERE에 해당한다.
- groupby(): SQL과 달리 SELECT가 없고 group by column을 명시하지 않는다.
- agg(): 'column명':'항수명' 형태로 aggregation columns를 표현한다.
- show(): 앞의 fiter, groupby, agg 등은 transformations여서 실제 수행은 하지 않고 설계만 하는 것이고, action인 show()를 불러야 실제로 조회가 시작된다.
Columns
지금까지는 windows command 창에서 수행하였는데 코딩이 수월한 PyCharm으로 옮기겠다.
Column Functions
Column를 전문적으로 다루는 module이 따로 있다. Alias, aggregation 등 편리한 기능들이 있다.
from pyspark.sql import functions as F df.groupby('_c4').agg(F.max('_c1').alias('_c1_max'), F.count('_c2').alias('_c2_count')).show() +---------------+-------+---------+ | _c4|_c1_max|_c2_count| +---------------+-------+---------+ | Iris-virginica| 3.8| 50| | Iris-setosa| 4.4| 50| |Iris-versicolor| 3.4| 50| +---------------+-------+---------+
Add Column
df.withColumn('_c5', F.col('_c1')*2).show(3) +---+---+---+---+-----------+---+ |_c0|_c1|_c2|_c3| _c4|_c5| +---+---+---+---+-----------+---+ |5.1|3.5|1.4|0.2|Iris-setosa|7.0| |4.9|3.0|1.4|0.2|Iris-setosa|6.0| |4.7|3.2|1.3|0.2|Iris-setosa|6.4| +---+---+---+---+-----------+---+ df.show(3) +---+---+---+---+-----------+ |_c0|_c1|_c2|_c3| _c4| +---+---+---+---+-----------+ |5.1|3.5|1.4|0.2|Iris-setosa| |4.9|3.0|1.4|0.2|Iris-setosa| |4.7|3.2|1.3|0.2|Iris-setosa| +---+---+---+---+-----------+ df2 = df.withColumn('_c5', F.col('_c1')*2) df2.show(3) +---+---+---+---+-----------+---+ |_c0|_c1|_c2|_c3| _c4|_c5| +---+---+---+---+-----------+---+ |5.1|3.5|1.4|0.2|Iris-setosa|7.0| |4.9|3.0|1.4|0.2|Iris-setosa|6.0| |4.7|3.2|1.3|0.2|Iris-setosa|6.4| +---+---+---+---+-----------+---+
- withColumn(): 새로운 column을 마지막에 추가
- df.show(3): Spark RDD나 DataFrame은 mutable 자료구조라 in-place update가 안 된다. 다른 변수에 assign해서 변경을 유지해 나간다.
Update Column
df.withColumn('_c4', df['_c4'].substr(1,4)).show(3) +---+---+---+---+----+ |_c0|_c1|_c2|_c3| _c4| +---+---+---+---+----+ |5.1|3.5|1.4|0.2|Iris| |4.9|3.0|1.4|0.2|Iris| |4.7|3.2|1.3|0.2|Iris| +---+---+---+---+----+
Type Casting
df.select(F.col('_c0').cast('integer')).show(3) +---+ |_c0| +---+ | 5| | 4| | 4| +---+
Join
DataFrame class의 member method인 join()은 INNER, LEFT OUTER, RIGHT OUTER, FULL OUTER, CROSS 등의 join 기능을 제공한다.
# Create python lists. dept = [(1,'hr'), (2,'dev'), (3,'special_force')] emp = [(11, 'john', 1), (12, 'james', 2), (13, 'jennis', 1), (14, 'brad', 2), (15, 'doctor-x', 4)] # Create RDDs. deptRdd = sc.parallelize(dept) empRdd = sc.parallelize(emp) # Create DataFrames. deptDf = spark.createDataFrame(dept, ['dept_id', 'dept_name']) empDf = spark.createDataFrame(emp, ['emp_id', 'emp_name', 'dept_id']) # Join two DataFrames. joinDf = deptDf.join(empDf, on='dept_id') joinDf.show() +-------+---------+------+--------+ |dept_id|dept_name|emp_id|emp_name| +-------+---------+------+--------+ | 1| hr| 11| john| | 1| hr| 13| jennis| | 2| dev| 12| james| | 2| dev| 14| brad| +-------+---------+------+--------+ deptDf.join(empDf, deptDf.dept_id == empDf.dept_id, 'left_outer').show() +-------+-------------+------+--------+ |dept_id| dept_name|emp_id|emp_name| +-------+-------------+------+--------+ | 1| hr| 11| john| | 1| hr| 13| jennis| | 3|special_force| null| null| | 2| dev| 12| james| | 2| dev| 14| brad| +-------+-------------+------+--------+ deptDf.join(empDf, deptDf['dept_id'] == empDf['dept_id'], 'full_outer').show() +-------+-------------+------+--------+ |dept_id| dept_name|emp_id|emp_name| +-------+-------------+------+--------+ | 1| hr| 11| john| | 1| hr| 13| jennis| | 3|special_force| null| null| | 2| dev| 12| james| | 2| dev| 14| brad| | 4| null| 15|doctor-x| +-------+-------------+------+--------+
Spark RDD
Python sample code를 예로 해서 RDD를 어떻게 다루는지 기본 조작 방법들을 정리해 보자.
RDD
RDD는 Resilient Distributed Dataset의 약자인데 이름에 유추할 수 있듯이 fault-tolerant하고 분산되어 병렬 처리되는 자료구조이다. 우리는 보통 S3, HDFS, DB에 있는 source data를 읽어서 RDD 형태로 로드하게 된다.
Spark SQL이 테이블 형태의 스키마를 가지고 SQL로 조작하기 위한 자료구조라면, RDD는 map-reduce로 조작하기 위한 자료구조이다. 양방향으로 치환이 가능하다. 따라서 주로 python tuple 형태 (k, v)로 data를 만들어 놓고, map/reduce 함수로 조작하게 된다.
하나의 RDD는 논리적인 하부구조인 하나 이상의 partition으로 이루어진다.
RDD에 대한 기본 조작을 python 예제로 살펴보자. 관련 method가 많으므로 자주 사용되는 몇가지만 살펴본다.
Cluster Structure
Driver program은 우리들이 작성한 Spark code이고 PC client에서 실행할 수도 있고 Spark cluster node 안에서 실행시킬 수도 있다.
지금부터의 클러스터 내용은 정확하지 않을 수 있는데 나중에 수정하도록 하겠다. Driver가 data를 load해서 cluster의 각 node에 분산시켰다고 하자. Driver는 cluster node 중에 지정된 app master에게 RDD 조작 명령을 내린다. App master는 RDD를 보관하고 있는 각 executor node에게 다시 명령을 전달한다.
Spark Context
Spark cluster에 접속하기 위해서 SparkContext를 생성한다.
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('MyApp').setMaster('local[4]') sc = SparkContext(conf=conf)
Creating RDD
아래에서 생성한 RDD는 이후 example에서도 계속 사용된다.
From Driver Program
# Small printer import pprint def pp(o): pprint.pprint(o) pprint.pprint(type(o)) # parallelize: Get a Python data and distribute on Spark cluster nodes. personList = [('David',101),('Bob',102),('Charles',103),('Brad',104)] personRdd = sc.parallelize(personList) pp(personRdd) ParallelCollectionRDD[13] at parallelize at PythonRDD.scala:194 <class 'pyspark.rdd.RDD'>
From External Data Source
filePath = 'data/hollins.dat.gz' # textFile: Open a file and return a RDD. fileRdd = sc.textFile(filePath) pp(fileRdd.count()) 29888 <class 'int'>
Listing Elements
All
# collect: Return a python list from a RDD. pp(personRdd.collect()) [('David', 101), ('Bob', 102), ('Charles', 103), ('Brad', 104)] <class 'list'>
First N
# first: Return the 1st element of list. pp(personRdd.first()) ('David', 101) <class 'tuple'> # take: Return the first n elements of list. pp(personRdd.take(2)) [('David', 101), ('Bob', 102)] <class 'list'> # top: Return the largest n elements of list. pp(personRdd.top(2)) [('David', 101), ('Charles', 103)] <class 'list'>
Key-Value
# lookup: like get() in most cache systems pp(personRdd.lookup('Charles')) [103] <class 'list'>
Transformation Operations
map/groupby/join 등과 같이 RDD를 변형시키는 연산들이며 연산 결과가 다시 RDD이므로 연산들을 chainning할 수 있다. Lazy approach라서 transformation은 여러 번을 적용하더라도 실제로 연산이 수행되지는 않는다. Action 연산을 적용할 때 한꺼번에 수행된다. Transformation/Action 연산 리스트는 아래 spark guide page에 나열되어 있다.
https://spark.apache.org/docs/latest/rdd-programming-guide.html
Action Operations
reduce/count/collect와 같이 RDD를 python scalar나 list 등으로 최종 결과를 얻어낼 때 사용한다. 그 앞에 적용한 transformation 연산들을 있다면 실제로 순서대로 수행하고 마지막에 action을 수행한다.
Top comments (0)