'Spark The Definitive Guide' 2장 - 스파크 찍어먹기
2021년 1월 24일#spark
나처럼 시력 검사&타자 연습 하느라 진빼지말고 Chapter2는 그냥 예제 코드를 쓰도록 하자...
_ _ _
CHAPTER 2 스파크 간단히 살펴보기
DataFrame, SQL 을 사용해 클러스터, 스파크 애플리케이션, 구조적 API 를 살펴보고
스파크의 핵심용어와 개념, 사용법을 익힌다.
2.1 스파크의 기본 아키텍처
스파크 애플리케이션을 이해하기 위한 핵심사항
- 스파크는 사용가능한 자원을 파악하기 위해 클러스터 매니저 사용
- 드라이버 프로세스는 주어직 작업을 완료하기위해, 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있음
- 스파크는 클러스터의 데이터 처리 작업을 관리 / 조율
- 컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터 처럼 사용
- 클러스터에서 작업을 조율할 수 있는 프레임워크 => 스파크
- 스파크가 연산에 사용할 클러스터를 관리하는 클러스터 매니저
- 스파크 standalone 클러스터 매니저, 하둡 YARN, Mesos
- 역할
- 사용자 : 스파크 애플리케이션 제출 (submit)
- -> 클러스터 매니저 : 애플리케이션 실행에 필요한 자원 할당
- -> 할당받은 자원으로 작업 처리
- 스파크 애플리케이션 =
driver
프로세스 + 다수의executor
프로세스driver
프로세스- 클러스터 노드 중 하나에서 실행. main() 함수 실행
- 심장과 같은 존재로, 애플리케이션 생명 주기 동안 관련 정보 모두 유지
executor
프로세스- driver 가 할당한 작업 수행 & 진행 상황을 driver에게 보고
- 대부분 스파크 코드를 실행하는 역할로, 스파크 언어 API를 통해 다양한 언어로 실행 가능
2.2 스파크의 다양한 언어 API
- 스파크는 모든 언어에 맞는 몇몇 ‘핵심 개념’ 제공
- 핵심개념 -> (클러스터 머신에서 실행되는) 스파크 코드 로 변환
- 구조적 API만으로 작성된 코드는 언어에 무관하게 유사 성능
- 언어별 요약 정보
- Scala : 스파크가 스칼라 기반. 스파크의 기본 언어
- Java :
자바 지원안해주면 난리칠거니까지원은 함 - Python : 스칼라가 지원하는 거의 모든 구조 지원
- SQL : ANSI SQL:2003 표준 중 일부 지원
- R : 스파크 코어의 sparkR, R 커뮤니티 기반의 sparklyr
- SparkSession 객체
- 사용자가 스파크 코드를 실행하기위해 진입점으로 사용 가능
- Python, R 사용 시에도 사용자 대신 익스큐터의 JVM에서 실행할 수 있는 코드로 변환
2.3 스파크 API
- 다양한 언어로 사용할 수 있는 이유?
- 스파크가 기본적으로 제공하는 2가지 API 때문
- 저수준의 비구조적(unstructured) API
- 고수준의 구조적(structured) API
- 스파크가 기본적으로 제공하는 2가지 API 때문
2.4 스파크 시작하기
- Q. 스파크 애플리케이션을 개발하려면
- A. 사용자 명령과 데이터를 스파크 애플리케이션에 전송하는 방법을 알아야
- SparkSession 생성 실습. 자 드가자~
2.5 SparkSession
- SparkSession : 스파크 애플리케이션을 제어하는 드라이버 프로세스
- 사용자가 정의한 처리명령 -> 클러스터에 실행
- 스파크 애플리케이션에 1:1 대응
1 | scala console |
2.6 DataFrame
- DataFrame : 가장 대표적인 구조적 API
- 테이블 데이터를 row, column 으로 단순하게 표현
- scheme : column 과 column type 을 정의한 목록
- DataFrame 은 수천 대의 컴퓨터에 분산 가능
- vs 스프레드 시트
- 비슷하다고 볼 수 있지만 스프레드 시트는 단일 컴퓨터 저장
- vs Python (Pandas)의 DataFrame, R의 DataFrame
- 마찬가지로 대부분 단일 컴퓨터에 존재
- => 스파크 DataFrame으로 쉽게 변환 가능
- 테이블 데이터를 row, column 으로 단순하게 표현
- 스파크의 핵심 추상화 개념 (분산 데이터 모음)
- Dataset, DataFrame, SQL 테이블, RDD
- DataFrame의 파티션
- 익스큐터가 병렬로 작업을 수행할 수 있도록 데이터를 분할하는 청크 단위
- 실행 중 데이터가 클러스터에서 물리적으로 분산되는 방식을 나타냄
- 파티션 1 익스큐터 1000 => 병렬성 1
- 파티션 1000 익스큐터 1 => 병렬성 1
- 물리적 파티션에 데이터 변환용 함수 지정 시 스파크가 실제 처리 방법 결정 (파티션 수동 처리 필요 X)
2.7 트랜스포메이션
- 스파크의 핵심 데이터 구조 => 불변성 (immutable)
- DataFrame을 변경하려면?
- 원하는 변경 방법을 스파크에게 알려줘야함 => 트랜스포메이션
- 트랜스포메이션 : 스파크에서 비즈니스 로직을 표현하는 핵심 개념
- 유형
- 좁은 의존성 (narrow dependency)
- 입력 파티션 : 출력 파티션 = 1 : 1
- 넓은 의존성 (wide dependency)
- 입력 파티션 : 출력 파티션 = 1 : N
- 좁은 의존성 (narrow dependency)
- 유형
- 지연 연산 (lazy evaluation) : 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
- 스파크는 연산 명령 즉시 데이터를 수정 X. 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성
- 마지막까지 대기하다 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일 => 전체 데이터 흐름 최적화
- ex. DataFrame 의 predicate pushdown
2.8 액션
- 트랜스포메이션은 논리적 실행 계획
- 트랜스포메이션을 선언해도 액션을 호출하지 않으면 수행 X
- 액션 (action) : 실제 연산을 수행
- 유형
- 콘솔에서 데이터를 보는 액션
- 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
- 출력 데이터소스에 저장하는 액션
- 유형
- 액션 지정 시 스파크 잡 시작
- 스파크 잡 (job)
- 필터 (좁은 트랜스포메이션) 수행
- -> 파티션 별로 레코드 수를 카운트 (넓은 트랜스포메이션)
- -> 각 언어에 적합한 네이티브 객체에 결과 모음
- 스파크 UI로 잡 모니터링 가능
- 스파크 잡은 개별 액션에 의해 트리거되는 다수의 트랜스포메이션으로 이루어져 있다
- 스파크 잡 (job)
2.9 스파크 UI
- 드라이버 노드의 4040 포트
- 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보 확인 가능
2.10 종합 예제
- 미국 교통통계국의 항공운항 데이터 중 일부로 실습
- 샘플 데이터 : 반정형(semi-structured), csv 포맷
- (=> 부록 A의 도커 이미지 사용 시 이미 포함)
- 스파크는 다양한 데이터소스 지원
- SparkSession의 DataFrameReader 클래스 사용해서 읽음
- 예제는 스키마 추론 (Schema inference) 기능 추가
- 스파크는 각 컬럼의 데이터 타입 추론을 위해 적은 양의 데이터를 읽음
- DataFrame 은 불특적 다수의 로우와 컬럼
- 지연 연산 형태의 트렌스포메이션이므로 row 수 알 수 X
예제 1
예제 1 펼치기
1 |
|
take(n)
: Actionsort()
: Transformation (넓은)
- DataFrame 을 변경하지 않고 새로운 DataFrame을 생성해 반환
explain()
- DataFrame의 계보(lineage) 나 스파크 쿼리 실행 계획 출력
- 실행 계획? : 디버깅과 스파크의 실행과정을 이해하는데 도움을 주는 도구
- 위에서 아래방향으로 읽는다
- 최종 결과는 가장 위, 데이터소스는 가장 아래
- DataFrame의 계보
- 트랜스포메이션의 논리적 실행 계획 -> DataFrame의 계보 정의
- -> 계보를 통해 스파크가 입력데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
- 함수형 프로그래밍의 핵심 (Pure Function, 같은 입력 -> 같은 출력)
- 사용자는 물리적 데이터를 직접 다루지 않고, 물리적 실행 특성을 제어
- 예시 => 파티션 수 변경
spark.conf.set("spark.sql.shuffle.partitions", "5")
- 스파크 UI (4040 포트) 에서 스파크 잡 물리적, 논리적 실행 특성 확인 가능
- 예시 => 파티션 수 변경
예제 2 (SQL)
예제 2-1 펼치기
1 | # 1) SQL 사용 |
- 스파크는 언어에 무관하게 같은 방식으로 트랜스포메이션 실행
- SQL, DataFrame(R, Python, Scalar, Java) 에서 비즈니스 로직 표현
- 스파크에서 코드 실행 전에 로직을 기본 실행계획(
explain
) 으로 컴파일
- 스파크 SQL 사용시 모든 DataFrame => 테이블, 뷰 (임시 테이블) 로 등록
- 위에서 설명했듯 같은 실행 계획으로 컴파일하므로 성능차이 X
예제 2-2 펼치기
1 | # '최대 비행 횟수' 구하기 |
1 | # '상위 5개의 도착 국가' 구하기 |
- 실행계획은 트랜스포메이션의 지향성 비순환 그래프 (Directed Acyclic Graph, DAG)
- 액션이 호출되면 결과를 만들어낸다
- DAG의 각 단계는 불변성을 가진 신규 DataFrame을 생성
- 예제의 전체 코드 수행 단계 (7단계) 는 p.86 [그림 2-10] 참조
- 실제 실행 계획 (
explain
이 출력하는) 은 물리적인 실행 시점에서 수행하는 최적화로 인해 다를 수 있음 - 직접 explain 해보면 책의 explain 과도 다르게 출력됨
- 실제 실행 계획 (
2.11 정리
- 트랜스포메이션, 액션, DataFrame 실행 계획 최적화 방법
- 트랜스포메이션의 지향성 비순환 그래프(DAG) 를 지연 실행하여 최적화
- 예제를 통한 데이터가 파티션으로 구성되는 방법, 복잡한 트랜스포메이션 작업 실행 단계 확인
📒 단어장
- 셔플 (Shuffle) : 스파크카 클러스터에서 파티션을 교환
- 스파크는 셔플의 결과를 디스크에 저장
- 가환성 (Commutative) : 두 대상의 연산 결과가 순서와 관계없이 동일 (-> 교환 법칙)