Data/spark

[Udemy] SparkSQL, DataFrames 및 Datasets

sennysideup 2024. 6. 11. 19:56
반응형

* 이 포스팅은 'Apache Spark 와 Python으로 빅데이터 다루기' 강의 복습용으로 작성하였습니다.

 

SparkSQL

  • SQL문으로 데이터를 핸들링할 수 있는 툴
  • 데이터프레임 API를 포함한다
  • spark context 대신 spark session을 생성하여 활용
    • spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
    • spark sql 인터페이스 노출
    • getorcreate : 이전에 생성한 spark session을 종료하기 위함
  • 테이블 형태로 노출하기 위해 alias 부여
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people") # view 생성
  • spark session 생성 및 alias 부여한 이후에 sql 명령어 사용 가능 : show, select, filter, groupby 등
    • spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
  • SQL 스타일의 함수 예시 : collect, groupby
    • collect : 다량의 행 객체를 반환
    • groupby : 테이블 형태의 객체 반환

좌) collect 실행 결과, 우) groupby 실행 결과

 

DataFrame & Datasets

  • dataframe 특징
    • 행으로 구성
    • sql 쿼리 실행 가능
    • 스키마O : 구조화된 데이터
  • dataframe 장점
    • 열에 이름을 붙인 경우, sql 쿼리 실행 및 데이터 분석 작업 시 사용 가능
    • 파일 포맷을 읽는데 유용 : json, csv 등
    • SQL 데이터베이스와 유사하게 다룰 수 있음
  • dataset 특징 : 스칼라 같은 형식
    • 비구조화된 데이터까지 포함 가능
  • 최근에는 RDD 대신 데이터프레임을 사용하는 추세 : 그러나 작업에 따라서 다르다

 

Hands-on

  • 비구조화된 파일 읽기
# 콤마로 구분된 파일
def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), 
               age=int(fields[2]), numFriends=int(fields[3]))
               
lines = spark.sparkContext.textFile("fakefriends.csv") # RDD 생성
people = lines.map(mapper) # RDD를 구조화된 행으로 변경

# 스페이스 및 문장 부호 기준으로 나누는 경우
inputDF = spark.read.text("file:///sparkcourse/book.txt")

# split : 열 이름을 value로 삼음, 스페이스 및 문장 부호 기준으로 값 나누기
# explode : 각 단어의 새로운 행 생성 - flatmap과 동일한 역할
# alias 별칭 부여
words = inputDF.select(func.explode(func.split(inputDF.value, "\\W+")).
        alias("word"))
  • 구조화된 파일 읽기
# 첫 행이 header인 경우
people = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("file:///sparkcourse/fakefriends-header.csv")   
# 스키마 확인
people.printSchema()

# 첫 행에 header가 없을 경우
## structtype(structfield(열 이름, 타입, nullable))
schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])
df = spark.read.schema(schema).csv("file:///sparkcourse/1800.csv")
  • printschema 결과

  • 연산 결과 반올림하기 : friendsByAge.groupBy("age").agg(func.round(func.avg("friends"), 2))
  • 계산 결과 새 컬럼으로 추가하기
minTempsByStation.withColumn("temperature", \
func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
.select("stationID", "temperature").sort("temperature")
  • 모든 행을 show하기 : sorted_result.show(sorted_result.count())
    • 모든 행을 다 출력하기 위해서는 dataframe.count()를 인자로 삽입