// A. 원시 데이터 소스 -> DataFrame val df = (spark.read.format("json").load("/data/flight-data/json/2015-summary.json")) // (임시 뷰 등록) df.createOrReplaceTempView("dfTable") ``
// B. (Row 객체를 가지는) Seq 타입 -> DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = newStructType(Array( newStructField("some", StringType, true), newStructField("col", StringType, true), newStructField("names", LongType, false))) val myRows = Seq(Row("Hello", null, 1L)) // sc 객체의 parallelize() 로 RDD 생성 val myRDD = spark.sparkContext.parallelize(myRows) // createDataFrame()로 DataFrame 생성 val myDf = spark.createDataFrame(myRDD, myManualSchema) myDf.show()
# select() == SELECT query scala> df.select("DEST_COUNTRY_NAME").show(2) +-----------------+ |DEST_COUNTRY_NAME| +-----------------+ | United States| | United States| +-----------------+
scala> df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2) +-----------------+-------------------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| +-----------------+-------------------+ | United States| Romania| | United States| Croatia| +-----------------+-------------------+
--- SQL SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2 SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2
# 다양한 컬럼 참조 방법 # df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME") => 에러 scala> import org.apache.spark.sql.functions.{expr, col, column} scala> (df.select( | df.col("DEST_COUNTRY_NAME"), | col("DEST_COUNTRY_NAME"), | column("DEST_COUNTRY_NAME"), | 'DEST_COUNTRY_NAME, | $"DEST_COUNTRY_NAME", | expr("DEST_COUNTRY_NAME")) | .show(2)) +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+ |DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME| +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+ | United States| United States| United States| United States| United States| United States| | United States| United States| United States| United States| United States| United States| +-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
// DEST_COUNTRY_NAME -> dest 로 변경 df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns // res21: Array[String] = Array(dest, ORIGIN_COUNTRY_NAME, count)
// 이스케이핑 필요 없는 예시 - 새로운 컬럼명을 나타내는 문자열 import org.apache.spark.sql.functions.expr val dfWithLongColName = df.withColumn( "This Long Column-Name", expr("ORIGIN_COUNTRY_NAME")) // dfWithLongColName: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]
// 이스케이핑 필요한 예시 - 표현식으로 해당 컬럼을 참조 (dfWithLongColName.selectExpr( "`This Long Column-Name`", "`This Long Column-Name` as `new col`") .show(2)) // +---------------------+-------+ // |This Long Column-Name|new col| // +---------------------+-------+ // | Romania|Romania| // | Croatia|Croatia| // +---------------------+-------+
import org.apache.spark.sql.Row val schema = df.schema val newRows = Seq( Row("New Country", "Other Country", 5L), Row("New Country 2", "Other Country 3", 1L) ) val parallelizedRows = spark.sparkContext.parallelize(newRows) val newDF = spark.createDataFrame(parallelizedRows, schema)
// df + newDF => 로우가 추가된 새로운 객체 (df.union(newDF) .where("count = 1") .where($"ORIGIN_COUNTRY_NAME" =!= "United States") .show()) // get all of them and we'll see our new rows at the end
// schema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true)) // newRows: Seq[org.apache.spark.sql.Row] = List([New Country,Other Country,5], [New Country 2,Other Country 3,1]) // parallelizedRows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[74] at parallelize at <console>:29 // newDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] // +-----------------+-------------------+-----+ // |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| // +-----------------+-------------------+-----+ // | United States| Croatia| 1| // | United States| Singapore| 1| // | United States| Gibraltar| 1| // | United States| Cyprus| 1| // | United States| Estonia| 1| // | United States| Lithuania| 1| // | United States| Bulgaria| 1| // | United States| Georgia| 1| // | United States| Bahrain| 1| // | United States| Papua New Guinea| 1| // | United States| Montenegro| 1| // | United States| Namibia| 1| // | New Country 2| Other Country 3| 1| // +-----------------+-------------------+-----+
DataFrame은 불변성 (immutability)
DataFrame을 변경하는 레코드 추가는 불가능
=> 원본 DataFrame을 새로운 DataFrame과 통합(union) (결합)
단, 통합하려는 두 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야 함
union()
현재 스키마가 아닌 컬럼 위치 기반으로 동작 (자동 정렬 X)
로우가 추가된 DataFrame 을 참조하려면 새롭게 만들어진 DataFrame 사용해야하지만, 뷰나 테이블로 등록 시에는 동적으로 참조 가능
val collectDF = df.limit(10) collectDF.take(5) // take() 는 정수형 값을 인수로 사용 collectDF.show() // show() => 결과를 정돈된 형태로 출력 // +-----------------+-------------------+-----+ // |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| // +-----------------+-------------------+-----+ // | United States| Romania| 15| // | United States| Croatia| 1| // | United States| Ireland| 344| // | Egypt| United States| 15| // | United States| India| 62| // | United States| Singapore| 1| // | United States| Grenada| 62| // | Costa Rica| United States| 588| // | Senegal| United States| 40| // | Moldova| United States| 1| // +-----------------+-------------------+-----+
collectDF.show(5, false) // +-----------------+-------------------+-----+ // |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| // +-----------------+-------------------+-----+ // |United States |Romania |15 | // |United States |Croatia |1 | // |United States |Ireland |344 | // |Egypt |United States |15 | // |United States |India |62 | // +-----------------+-------------------+-----+
collectDF.collect()
스파크는 ‘드라이버’에서 클러스터 상태 정보 유지
로컬 환경에서 데이터 다룰 때는 ‘드라이버’로 데이터 수집
사용해본 데이터 수집 메서드 일부
collect() : 전체 DataFrame의 모든 데이터 수집
take() : 상위 N개 로우 반환
show() : 여러 로우를 보기 좋게 출력
toLocalIterator() : 전체 데이터셋에 대한 반복(iterate) 처리를 위해 ‘드라이버’로 로우를 모으는 방법
iterator(반복자) 로 모든 파티션의 데이터를 ‘드라이버’에 전달
데이터셋의 파티션을 차례대로 반복 처리 가능
드라이버로 모든 데이터 컬렉션을 수집하는 건
=> 매우 큰 비용 (CPU, 메모리, 네트워크)
차례대로 처리하므로 처리 비용 엄청남 (병렬 연산 X)
따라서 대규모 데이터셋에 collect() 나 매우 큰 파티션에 대해 toLocalIterator() 사용 시 => 드라이버 비정상적 종료