'Spark The Definitive Guide' 7장 - 집계해라 애송이
벌써부터 슬슬 포스트 포멧 헷갈리기 시작하죠? 망했죠?
나중에 한번에 맞춰야지 생각해놓고 절대 수정안하죠? ㅎ..
_ _ _
CHAPTER 7 집계 연산
집계(aggregation)은 무언갈 함께 모으는 행위이며 빅데이터 분석의 초석이다.
스파크는 모든 데이터 타입 다루는 것 + 다음과 같은 그룹화 데이터 타입 생성 가능하고,
지정된 집계 함수에 따라 그룹화된 결과는 RelationalGroupedDataset 을 반환.
- select 구문에서 집계 수행, DataFrame 전체 데이터 요약 (가장 간단한 그룹화)
- ‘group by’ : 하나 이상의 키 지정. 다른 집계 함수 사용해서 값을 가진 컬럼 변환 가능
- ‘윈도우(window)’ : 하나 이상의 키 지정. 다른 집계 함수로 컬럼 변환 가능. + 단, 함수 입력으로 사용할 로우는 현재 로우와 연관성 있어야 함
- ‘그룹화 셋(grouping set)’ : 서로 다른 레벨 값 집계 (SQL, DataFrame의 롤업, 큐브)
- ‘롤업(rollup)’ : 하나 이상의 키 지정. 다른 집계 함수로 컬럼 변환 가능. + 계층적으로 요약된 값 추출
- ‘큐브(cube)’ : 하나 이상의 키 지정. 다른 집계 함수로 컬럼 변환 가능. + 모든 컬럼 조합에 대한 요약 값 계산
(=> 사실상 7장 요약)
📌 중요한 건, 어떤 결과를 만들지 정확히 파악해야 한다는 것.
(정확한 답 계산 = 높은 비용 요구 → 빅데이터의 경우 근사치가 효율적일 수 있음)
7.1 집계 함수
- 모든 집계는 특별한 경우를 제외하고는 함수 사용
- => 집계 함수 (org.apache.spark.sql.functions 패키지)
- 예외) DataFrame의 .stat 속성 이용 (6장 참고)
- 스칼라, 파이썬에서 임포트 할 수 있는 함수와 SQL에서 사용가능한 함수는 약간 다름 (매 릴리즈마다 조금씩 변함)
집계 함수
집계 함수 : 키나 그룹을 지정하고 + 하나 이상의 컬럼을 변환하는 방법을 지정 (여러 입력값이 주어지면 그룹 별로 결과 생성)
- 수치형 데이터 요약 (ex. 그룹의 평균값 구하기)
- 합산, 곱셈, 카운팅 등의 작업
- 복합 데이터 타입(배열, 리스트, 맵)을 사용한 집계 수행 가능
count(컬럼명)
: 전체 로우 수 카운트액션이 아닌 트랜스포메이션으로 동작
두가지 방식으로 사용 가능
count(특정 컬럼)
: null 값 포함 Xcount(*)
orcount(1)
: null 값 가진 로우 포함해서 카운트
[7.1.1] 예제 펼치기
1
2import org.apache.spark.sql.functions.count
df.select(count("StockCode")).show() // 5419091
SELECT COUNT(*) FROM dfTable
countDistinct(컬럼명)
: 고유 (distinct) 레코드 수 카운트[7.1.2] 예제 펼치기
1
2import org.apache.spark.sql.functions.countDistinct
df.select(countDistinct("StockCode")).show() // 40701
SELECT COUNT(DISTINCT *) FROM DFTABLE
approx_count_distinct(컬럼명, 최대추정오류율)
: 근사치 고유 레코드 수 카운트대규모 데이터셋 다룰 시 정확한 개수 무의미함 => 근사치로 효율
최대 추정 오류율 (maximum estimation error)
파라미터 설정[7.1.3] 예제 펼치기
1
2import org.apache.spark.sql.functions.approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() // 33641
SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE
first(컬럼명)
,last(컬럼명)
: 첫 번째 값, 마지막 값 추출DataFrame 값이 아닌 로우 기반 동작
[7.1.4] 예제 펼치기
1
2
3
4
5
6df.select(first("StockCode"), last("StockCode")).show()
// +-----------------------+----------------------+
// |first(StockCode, false)|last(StockCode, false)|
// +-----------------------+----------------------+
// | 85123A| 22138|
// +-----------------------+----------------------+1
SELECT first(StockCode), last(StockCode) FROM dfTable
min(컬럼명)
,max(컬럼명)
: 최솟값, 최댓값 추출[7.1.5] 예제 펼치기
1
2
3
4
5
6
7import org.apache.spark.sql.functions.{min, max}
df.select(min("Quantity"), max("Quantity")).show()
// +-------------+-------------+
// |min(Quantity)|max(Quantity)|
// +-------------+-------------+
// | -80995| 80995|
// +-------------+-------------+1
SELECT min(Quantity), max(Quantity) FROM dfTable
sum(컬럼명)
: 특정 컬럼의 모든 값 합산[7.1.6] 예제 펼치기
1
2
3
4
5
6
7import org.apache.spark.sql.functions.sum
df.select(sum("Quantity")).show() // 5176450
// +-------------+
// |sum(Quantity)|
// +-------------+
// | 5176450|
// +-------------+1
SELECT sum(Quantity) FROM dfTable
sumDistinct(컬럼명)
: 특정 컬럼의 고유 (distinct) 값 합산[7.1.7] 예제 펼치기
1
2
3
4
5
6
7import org.apache.spark.sql.functions.sumDistinct
df.select(sumDistinct("Quantity")).show() // 29310
// +----------------------+
// |sum(DISTINCT Quantity)|
// +----------------------+
// | 29310|
// +----------------------+1
SELECT SUM(Quantity) FROM dfTable -- 29310
avg(컬럼명)
: 평균 값==
sum()/count()
==expr("mean(컬럼명)")
+
distinct()
=> 고윳값 평균 구하기도 가능[7.1.8] 예제 펼치기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import org.apache.spark.sql.functions.{sum, count, avg, expr}
(df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show())
// +--------------------------------------+----------------+----------------+
// |(total_purchases / total_transactions)| avg_purchases| mean_purchases|
// +--------------------------------------+----------------+----------------+
// | 9.55224954743324|9.55224954743324|9.55224954743324|
// +--------------------------------------+----------------+----------------+
분산과 표준편차
평균(
m
) 주변에 데이터가 분포된 정도를 측정- 분산 : 평균과의 차이를 제곱한 결과의 평균 (
v = avg((x-m)^2)
) - 표준편차 : 분산의 제곱근 (
σ = v^(1/2)
)
- 분산 : 평균과의 차이를 제곱한 결과의 평균 (
스파크는 표본표준편차(sample standard deviation), 모표준편차(population standard deviation) 방식 지원
- => 아예 다르므로 잘 구분해서 사용해야함
표본표준분산, 표본표준편차 방식 사용 시 =>
variance()
,stddev()
모표준분산, 모표준편차 방식 사용 시 =>
var_pop()
,stddev_pop()
[7.1.9] 예제 펼치기
1
2
3
4
5
6
7df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()
// +-----------------+------------------+--------------------+---------------------+
// |var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
// +-----------------+------------------+--------------------+---------------------+
// |47559.30364660923| 47559.39140929892| 218.08095663447835| 218.08115785023455|
// +-----------------+------------------+--------------------+---------------------+1
2
3SELECT var_pop(Quantity), var_samp(Quantity),
stddev_pop(Quantity), stddev_samp(Quantity)
FROM dfTable
비대칭도와 첨도
데이터의 변곡점(extreme point) 를 측정하는 방법
skewness(컬럼명)
: 비대칭도 (데이터 평균의 비대칭 정도) 측정kurtosis(컬럼명)
: 첨도 (데이터 끝 부분의 뾰족한 정도) 측정
확률변수(random variable)의 확률분포(probability distribution)로 데이터 모델링 시에 중요
수학적인 내용은 따로 알아서… 흠흠..
[7.1.10] 예제 펼치기
1
2
3
4
5
6
7import org.apache.spark.sql.functions.{skewness, kurtosis}
df.select(skewness("Quantity"), kurtosis("Quantity")).show()
// +--------------------+------------------+
// | skewness(Quantity)|kurtosis(Quantity)|
// +--------------------+------------------+
// |-0.26407557610528376|119768.05495530753|
// +--------------------+------------------+1
SELECT skewness(Quantity), kurtosis(Quantity) FROM dfTable
공분산과 상관관계
두 컬럼값 사이의 영향도 비교
cov(컬럼1, 컬럼2)
: 공분산(covariance) 계산- 데이터 입력값에 따라 다른 범위를 가짐
- var 함수처럼 표본공분산(sample covariance)이나 모공분산(population covariance) 방식으로도 계산 가능 =>
covar_samp()
,covar_pop()
corr(컬럼1, 컬럼2)
: 상관관계(correlation) 계산- 피어슨 상관계수 (Pearson correlation coefficient) 측정 (-1 <=
r
<= 1) - 모집단이나 표본에 대한 계산 개념 X
- 피어슨 상관계수 (Pearson correlation coefficient) 측정 (-1 <=
[7.1.11] 예제 펼치기
1
2
3
4
5
6
7
8import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")).show()
// +-------------------------+-------------------------------+------------------------------+
// |corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
// +-------------------------+-------------------------------+------------------------------+
// | 4.912186085640497E-4| 1052.7280543915997| 1052.7260778754955|
// +-------------------------+-------------------------------+------------------------------+1
2
3SELECT corr(InvoiceNo, Quantity), covar_samp(InvoiceNo, Quantity),
covar_pop(InvoiceNo, Quantity)
FROM dfTable
복합 데이터 타입의 집계
스파크는 수식을 통한 집계 외에도 복합 데이터 타입을 사용한 집계 가능 (ex. 특정 컬럼 값 => List, Set .. 등으로 수집)
수집된 데이터는 다양한 프로그래밍 방식으로 다루거나 활용 가능
[7.1.12] 예제 펼치기
1
2
3
4
5
6
7import org.apache.spark.sql.functions.{collect_set, collect_list}
df.agg(collect_set("Country"), collect_list("Country")).show()
// +--------------------+---------------------+
// |collect_set(Country)|collect_list(Country)|
// +--------------------+---------------------+
// |[Portugal, Italy,...| [United Kingdom, ...|
// +--------------------+---------------------+1
SELECT collect_set(Country), collect_set(Country) FROM dfTable
7.2 그룹화
- 데이터 그룹 기반의 집계 에 대한 내용
- ([7.1] 은 DataFrame 수준의 집계 내용)
- 카테고리형 데이터(categorical data) 사용
- => 단일 컬럼의 데이터를 그룹화, 해당 그룹의 다른 여러 컬럼을 사용해서 계산
- 그룹화 작업의 2 단계
- 1) 하나 이상의 컬럼 그룹화 (여러개 지정도 가능)
- 2) 집계 연산 수행
- 표현식을 이용한 그룹화
- 카운팅은 메서드, 함수 둘 다 사용 가능 🤔
- 메서드 보다
count()
함수 사용 추천 - select 구문의 표현식 지정보다
agg()
메서드 사용 추천
- 메서드 보다
agg()
: 여러 집계 처리 한번에 지정 & 집계에 표현식 사용 가능- 트랜스포메이션 완료 컬럼에
alias
사용 가능
- 트랜스포메이션 완료 컬럼에
- 카운팅은 메서드, 함수 둘 다 사용 가능 🤔
- 맵을 이용한 그룹화
- 맵(map) 타입 사용 : Key = 컬럼 / Value = 수행할 집계 함수의 문자열
- 수행할 집계함수를 한 줄로 작성 시 => 여러 컬럼명 재사용 가능
agg(Key -> Value, Key -> Value, ...)
7.3 윈도우 함수
- 윈도우 함수 도 집계에 사용 가능
- 윈도우 함수
- 데이터의 특정 ‘윈도우(window)’ 대상으로 고유의 집계 연산 수행
- 데이터의 ‘윈도우’ => 현재 데이터에 대한 참조(reference)를 사용해 정의
- 윈도우 명세(window specification) => 함수에 전달될 로우 결정
- 스파크가 지원하는 윈도우 함수
- 랭크 함수 (ranking function)
- 분석 함수 (analytic function)
- 집계 함수 (aggragate function)
- 윈도우 함수 vs group-by 함수
- 윈도우 함수 : 프레임에 입력되는 모든 로우에 대해 결과값 계산
- group-by 함수 : 모든 로우 레코드가 단일 그룹으로만 이동
- 프레임(frame) : 로우 그룹 기반의 테이블
- 각 로우는 하나 이상의 프레임에 할당 가능
- 프레임 정의 방법은 예제 참고
- ex. 하루를 나타내는 값의 롤링 평균(rolling average) 구하기
- 개별 로우가 7개의 다른 프레임으로 구성되어야 함
[7.3] 예제 펼치기
1 | // 1) 주문 일자(InvoiceDate) => 'date' 컬럼으로 변환 (날짜 정보만 포함) |
1 | -- SQL |
Window 메서드
partitionBy()
: 그룹을 어떻게 나눌지 결정 (지금까지 파티셔닝 스키마 개념이랑 관련 X)orderBy()
: 파티션 정렬 방식 정의rowsBetween(from, to)
: 입력된 로우의 참조 기반으로 프레임에 로우가 포함될 수 있는지 결정row_number vs rank vs dense_rank
row_number()
: 순서대로 넘버링 (1,2,3,4 …)rank()
: 순서대로 넘버링 + 같은 값일 경우 같은 숫자 (1,1,3,4 …)dense_rank()
: rank 와 동일하되, 빈값 없이 증가하게끔 넘버링 (1,1,2,3, …)
7.4 그룹화 셋
- 컬럼의 값을 이용해 여러 컬럽 집계 =>
group-by
표현식- 그러면 여러 그룹에 걸쳐 집계는? => 그룹화셋 사용
- 그룹화 셋 : 여러 집계를 결합하는 저수준 기능
GROUPING SETS
구문은 SQL에서만 사용 가능- DataFrame에서 동일 연산하려면? => 롤업, 큐브 메서드 사용
- 주의 사항
- 그룹화 셋, 롤업, 큐브 사용 시 null 제거 필수
- null에 따라 집계 수준이 달라짐 (=> null 미제거시 부정확한 결과)
[7.4.0] '그룹화 셋' 예제 펼치기
1 | // 그룹화 셋 사용 시 null 제거 필수 |
1 | -- SQL 예제 1) 재고 코드(StockCode)와 고객(CustomerId) 별 총 수량 구하기 |
1 | -- SQL 예제 2) 예제 1 + 재고 코드나 고객 상관없이 총 수량 합산 결과 추가 => group-by로 처리 불가 |
1 | # output |
- 롤업(rollup)
group-by
스타일의 다양한 연산을 수행할 수 있는 다차원 집계 기능rollup(그룹화 키)
=> 다양한 컬럼을 그룹화 키로 설정 가능- 롤업된 컬럼값이 모두 null 인 로우 = 해당 컬럼에 속한 레코드의 전체 합계
[7.4.1] '롤업' 예제 펼치기
1 | // 시간(신규 Date 컬럼), 공간(Country) 을 축으로 하는 롤업 |
- 큐브(cube)
- 롤업의 고차원적 사용 (호출 방식도 유사)
- 요소들을 계층적으로 다루는 대신 모든 차원에 대해 동일한 작업 수행
- ex. 전체 기간에 대한 날짜와 국가별 결과 구하기
cube(그룹화 키)
=> 요약 정보 테이블 만들기 가능
[7.4.2] '큐브' 예제 펼치기
1 | // 시간(신규 Date 컬럼), 공간(Country) 을 축으로 하는 큐브 |
- 그룹화 메타데이터
- 큐브, 롤업 사용 시 집계 수준에 따라 쉽게 필터링하고자 하면 => 집계 수준 조회 필요
grouping_id()
: 결과 데이터셋의 집계 수준을 명시하는 컬럼 제공
[7.4.3] grouping_id() 예제 펼치기
1 | import org.apache.spark.sql.functions.{grouping_id, sum, expr} |
- 피벗(pivot)
pivot()
: 로우 → 컬럼으로 변환 가능- => 컬럼의 모든 값을 단일 그룹화하여 계산 가능
- 그러나 데이터 탐색방식에 따라 피벗 수행 결과값이 감소할 수 있음
- 특정 컬럼 cardinality가 낮으면 피벗으로 다수 컬럼으로 변환 추천 => 스키마, 쿼리 대상 확인 가능
[7.4.4] '피벗' 예제 펼치기
1 | val pivoted = dfWithDate.groupBy("date").pivot("Country").sum() // 집계 수행 => 수치형 컬럼으로 나옴 |
7.5 사용자 정의 집계 함수
- 사용자 정의 집계 함수 (UDAF, user-defined aggregation function)
- 직접 제작한 함수나 비즈니스 규칙에 기반한 자체 집계 함수 정의 방법
- UDAF 사용 => 입력 데이터 그룹에 직접 개발한 연산 수행 가능
- 스파크는 입력 데이터의 모든 그룹 중간 결과를 단일 AggregationBuffer에 저장/관리
- UDAF는 현재 스칼라, 자바로만 사용 가능
- Spark 2.3 에서는 UDF/UDAF => 함수 등록 가능 ([6.12] 참고)
- 생성 방법
- 기본 클래스 UserDefinedAggregateFunction 상속 + 메서드 정의
UDAF 생성 시 정의해야할 메서드
- inputScheme :
UDAF 입력 파라미터의 스키마
를 StructType 로 정의- bufferSchema :
UDAF 중간 결과의 스키마
를 StructType 로 정의- dataType :
반환될 값의 DataType
정의- deterministic :
UDAF가 동일한 입력값에 대해 항상 동일한 결과를 반환하는지
Boolean 값으로 정의- initialize :
집계용 버퍼 값 초기화 로직
정의- update : 입력받은 로우 기반으로
내부 버퍼 업데이트 로직
정의- merge : 두 개의
집계용 버퍼 병합 로직
정의- evaluate :
집계 최종 결과 생성 로직
정의
[7.5] 예제 펼치기
1 | // UDAF 예제 - 'BoolAnd' Class : 입력된 모든 로우의 컬럼이 true인지 판단 |
7.6 정리
- 스파크에서 사용 가능한 여러 유형의 집계 연산
- 그룹화, 윈도우 함수, 롤업, 큐브
📒 단어장
- 비대칭도(skewness) : 실숫값 확률변수의 확률분포 비대칭성을 나타내는 지표 (=왜도)
- 첨도(kurtosis) : 확률분포의 뾰족한 정도를 나타내는 척도