반응형
* 이 포스팅은 'Apache Spark 와 Python으로 빅데이터 다루기' 강의 복습용으로 작성하였습니다.
SparkSQL
- SQL문으로 데이터를 핸들링할 수 있는 툴
- 데이터프레임 API를 포함한다
- spark context 대신 spark session을 생성하여 활용
- spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
- spark sql 인터페이스 노출
- getorcreate : 이전에 생성한 spark session을 종료하기 위함
- 테이블 형태로 노출하기 위해 alias 부여
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people") # view 생성
- spark session 생성 및 alias 부여한 이후에 sql 명령어 사용 가능 : show, select, filter, groupby 등
- spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
- SQL 스타일의 함수 예시 : collect, groupby
- collect : 다량의 행 객체를 반환
- groupby : 테이블 형태의 객체 반환
DataFrame & Datasets
- dataframe 특징
- 행으로 구성
- sql 쿼리 실행 가능
- 스키마O : 구조화된 데이터
- dataframe 장점
- 열에 이름을 붙인 경우, sql 쿼리 실행 및 데이터 분석 작업 시 사용 가능
- 파일 포맷을 읽는데 유용 : json, csv 등
- SQL 데이터베이스와 유사하게 다룰 수 있음
- dataset 특징 : 스칼라 같은 형식
- 비구조화된 데이터까지 포함 가능
- 최근에는 RDD 대신 데이터프레임을 사용하는 추세 : 그러나 작업에 따라서 다르다
Hands-on
- 비구조화된 파일 읽기
# 콤마로 구분된 파일
def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")),
age=int(fields[2]), numFriends=int(fields[3]))
lines = spark.sparkContext.textFile("fakefriends.csv") # RDD 생성
people = lines.map(mapper) # RDD를 구조화된 행으로 변경
# 스페이스 및 문장 부호 기준으로 나누는 경우
inputDF = spark.read.text("file:///sparkcourse/book.txt")
# split : 열 이름을 value로 삼음, 스페이스 및 문장 부호 기준으로 값 나누기
# explode : 각 단어의 새로운 행 생성 - flatmap과 동일한 역할
# alias 별칭 부여
words = inputDF.select(func.explode(func.split(inputDF.value, "\\W+")).
alias("word"))
- 구조화된 파일 읽기
# 첫 행이 header인 경우
people = spark.read.option("header", "true").option("inferSchema", "true")\
.csv("file:///sparkcourse/fakefriends-header.csv")
# 스키마 확인
people.printSchema()
# 첫 행에 header가 없을 경우
## structtype(structfield(열 이름, 타입, nullable))
schema = StructType([ \
StructField("stationID", StringType(), True), \
StructField("date", IntegerType(), True), \
StructField("measure_type", StringType(), True), \
StructField("temperature", FloatType(), True)])
df = spark.read.schema(schema).csv("file:///sparkcourse/1800.csv")
- printschema 결과
- 연산 결과 반올림하기 : friendsByAge.groupBy("age").agg(func.round(func.avg("friends"), 2))
- 계산 결과 새 컬럼으로 추가하기
minTempsByStation.withColumn("temperature", \
func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
.select("stationID", "temperature").sort("temperature")
- 모든 행을 show하기 : sorted_result.show(sorted_result.count())
- 모든 행을 다 출력하기 위해서는 dataframe.count()를 인자로 삽입
'Data > spark' 카테고리의 다른 글
[Udemy] Spark Streaming, 구조적 스트리밍 (1) | 2024.07.05 |
---|---|
[Udemy] Spark ML을 사용한 머신러닝 (0) | 2024.07.02 |
[Udemy] Spark 프로그램의 고급 예제 (1) | 2024.06.15 |
[Udemy] Spark 기본 사항 및 RDD 인터페이스 (0) | 2024.05.21 |
윈도우 pyspark setting 트러블 슈팅 (0) | 2024.05.06 |