'Spark The Definitive Guide' 14장 - 분산형 공유 변수 (PART 3 끝)
2021년 3월 20일#spark
[Part 3] END
마침내 🎈약속의 파트 쓰리! 🎈 🙌🏻
빠르지는 않았지만 꾸준함에 의미를 두고 싶다.
스파크 너란 녀석.. 이젠 뭔지 조금은 알지도…?
( 『5252.. 이제부턴 ‘실전’이다ㅡ 』 )
_ _ _
CHAPTER 14 분산형 공유 변수
스파크의 저수준 API의 두 번째 유형, ‘분산형 공유 변수’
분산형 공유 변수 타입이 만들어지게 된 계기, 사용 방법 소개
- 클러스터 실행 시 특별한 속성을 가진 사용자 정의함수 (ex. RDD, DataFrame을 다루는 map 함수)에서 사용 가능
- 분산형 공유 변수의 2가지 타입 : 브로드캐스트 변수, 어큐뮬레이터
14.1 브로드캐스트 변수
브로드캐스트 변수는 모든 워커노드에 큰 값 저장 => 재 전송 없이 많은 스파크 액션에서 재사용 가능
브로드캐스트 변수
- 변하지 않는 값(불변성 값)을 클로저(closure) 함수의 변수로 캡슐화하지 않고, 클러스터에서 효율적으로 공유하는 방법 제공
(1) 태스크에서 드라이버 노드의 변수 사용 시 클로저 함수 내부에서 단순 참조
- => 비효율적. 특히 큰 변수 사용시 (ex. 룩업 데테이블, 머신러닝 모델)
- Why?
- 클로저 함수에서 변수 사용 시 => 워커 노드에서 여러 번 (태스크당 한번) 역직렬화 발생
- 여러 스파크 액션과 잡에서 동일 변수 사용시 => 잡 실행 때마다 워커로 큰 변수 재전송
- 그러면 어떻게 하나?
- => 브로드캐스트 변수 사용해라!
- => 비효율적. 특히 큰 변수 사용시 (ex. 룩업 데테이블, 머신러닝 모델)
(2) 브로드 캐스트 변수 사용 시
- 모든 태스크마다 직렬화 X => 클러스터의 모든 머신에 캐시하는 불변성 공유 변수
- 익스큐터 메모리에 맞는 조회용 테이블을 전달하고 함수에서 사용
How to use ?
spark.sparkContext.broadcast()
로 참조 (= 불변성 값)- 액션을 실행할때 클러스터 모든 노드에 지연 처리 방식으로 복제됨
value
메서드로 브로드캐스트된 값 참조 가능- 직렬화된 함수에서 브로드캐스트된 데이터를 직렬화 하지않아도 접근 가능
- 데이터를 보다 효율적으로 전송. 직렬화/역직렬화 부하 ↓
클로저에 담아 전달 vs 브로드캐스트 변수 사용
- 말해뭐해. 브로드캐스트 변수가 훨씬 더 효율적
- 데이터 총량, 익스큐터에 따라 다를 수는 있지만.. (작은 데이터를 작은 클러스터에서 돌릴 땐 별 차이 X)
- 브로드캐스트 변수에 작은 크기의 딕셔너리(dictionary) 타입 사용 시 부하 크게 발생 X (?)
- 훨씬 큰 크기 데이터 사용 시 전체 데이터 직렬화 시 발생 부하 커질 수도
- 말해뭐해. 브로드캐스트 변수가 훨씬 더 효율적
RDD 영역에서 브로드캐스트 변수 사용 (UDF, Dataset도 사용 가능. 동일 효과)
[14.1] '브로드캐스트 변수' 예제 펼치기
1 | // '단어의 값과 목록 + 수 KB~GB 크기를 가지는 다른 정보' 브로드캐스트 후 RDD로 변환하는 예제 |
14.2 어큐뮬레이터
어큐뮬레이터는 모든 태스크 데이터를 공유 결과에 추가 가능 (ex. 잡의 입력 레코드 파싱하면서 오류 발생 확인 카운터 구현 가능)
어큐뮬레이터
- 트랜스포메이션 내부의 다양한 값 갱신하는데 사용
- 내고장성 보장 + 효율적인 방식으로 드라이버에 값 전달 가능
어큐뮬레이터는 스파크 클러스터에서 로우 단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수 제공
- 디버깅용, 저수준 집계 생성용으로 사용 가능 (ex. 파티션별로 특정 변수 값 추적 용도)
- 결합성, 가환성을 가진 연산을 통해서만 더할 수 있는 변수 => 병렬 처리 과정에서 효율적 사용 가능
- 카운터(==맵리듀스의 카운터)나 합계 구하는 용도로 사용 가능
스파크는 기본적으로 수치형 어큐뮬레이터 지원. 사용자 정의 어큐뮬레이터 만들어서 사용도 OK
어큐뮬레이터의 값은 액션 처리 과정에서만 갱신됨
- 스파크는 각 태스크에서 어큐뮬레이터를 한 번만 갱신하도록 제어 (재시작한 태스크는 갱신X)
- 트랜스포메이션에서 태스크 or 잡 스테이지 재처리 시? => 각 태스크 갱신 작업이 두 번 이상 적용될 수도
어큐뮬레이터는 스파크의 지연 연산 모델에 영향 X
- 어큐뮬레이터가 RDD 처리 중 갱신되면? => RDD 연산이 실제로 수행된 지점에 딱 한번만 값 갱신 (해당 or 부모 RDD에 액션을 실행하는 시점)
- 지연 처리 형태의 트랜스포메이션 (ex.
map()
) 에서 어큐뮬레이터 갱신 작업 수행시? => 실제 실행 전까지는 갱신 X
어큐뮬레이터 이름은 선택적 지정 가능
- 생성 시 파리미터로 이름을 붙이거나
spark.sparkContext.register(어큐뮬레이터, 이름)
사용 - 이름이 지정된 (named) 어큐뮬레이터의 실행결과는 스파크 UI에 표시
- 이름이 지정되지 않은 어큐뮬레이터는 표시 X
- 생성 시 파리미터로 이름을 붙이거나
사용자 정의 어큐뮬레이터
- 어큐뮬레이터 직접 정의 시 =>
AccumulatorV2
클래스 상속하여 구현 - 파이썬 사용 시 AccumulatorParam 상속
- 어큐뮬레이터 직접 정의 시 =>
[14.2.1] '어큐뮬레이터' 예제 펼치기
1 | // Dataset API 사용 (RDD API X) |
Spark UI (이름이 지정된 어큐뮬레이터)
[14.2.2] 사용자 정의 어큐뮬레이터 구현 예제 펼치기
1 | // 사용자 정의 어큐뮬레이터 (AccumulatorV2 상속 클래스 구현) |
14.3 정리
- 분산형 공유 변수 (브로드캐스트 변수, 어큐뮬레이터)
- 분산형 공유 변수는 디버깅이나 최적화 작업에 유용한 도구
📒 단어장
- 내고장성 (Fault tolerance) : 시스템 일부에 문제가 발생하여도 정상 동작이 가능한
- 결합성 (Associative) : 둘 이상의 이항연산 중첩시, 연산 결과가 순서에 관계없이 동일
(a+b)+c = a+(b+c) = a+b+c
- 가환성 (Commutative) : 연산 결과가 순서에 관계없이 동일 (=> 2장 단어장)
a+b = b+a