[pyspark] 기본 문법
SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Test').getOrCreate()
spark.conf.sest('spark.sql.execution.arrow.pyspark.enabled', 'true') # 메모리 관련
read csv
sparkDF = spark.read.csv('hdfs://localhost:9000/housing.csv', encoding='cp949', header=True, inferSchema=True)
sparkDF = spark.read.option('header', 'true').option('encoding', 'cp949').csv('hdfs://localhost:9000/housing.csv', inferSchema=True)
sparkDF.show()
emp = spark.read.option('encoding','utf-8').option('header','true').csv("../data/Employee.csv",inferSchema=True)
inferSchema=True
- 변수들의 datatype까지 가져옴
True
로 설정하지 않으면 모두 String타입
emp.printSchema()
Hadoop -> Pandas
house = sparkDF.toPandas()
DataFrame 출력
상위 20개 데이터 출력
emp.show()
- df 형태는 모두
.show()
를 붙여야 df 형태로 출력됨!
특정 컬럼만 출력
emp.select('id').show()
emp.select(['id', 'gender']).show()
기타
emp.head()
emp['gender']
# .show() 사용 불가
Column<’gender’>
print(type(emp.select('id')),type(emp['id']))
<class ‘pyspark.sql.dataframe.DataFrame’> <class ‘pyspark.sql.column.Column’>
describe()
emp.describe().show()
컬럼
컬럼 추가
emp = emp.withColumn('jobtime2', emp['jobtime']*2)
- jobtime 컬럼의 값을 2배한 값을 갖는 jobtime2 컬럼 생성
컬럼명 변경
emp = emp.withColumnRenamed('jobtime2', 'jobtime3')
- jobtime2 컬럼명 -> jobtime3로 변경
컬럼 삭제
emp = emp.drop('jobtime3')
- jobtime3 컬럼 삭제
Filter Operations
&
, |
, ~
emp.filter('salary <= 30000').show()
emp.where('salary <= 30000').show()
emp.filter('salary <= 50000').select(['gender', 'jobcat']).show()
emp.filter(emp['salary'] <= 30000).show()
emp.filter((emp['salary'] <= 30000) & (emp['salary'] >= 25000)).show()
emp.filter(~(emp['salary'] <= 30000)).show()
groupBy, .agg()
emp.groupBy('gender').count().show() # 그룹별 카운트
emp.groupBy('gender').mean().show() # 그룹별 평균
emp.groupBy('gender').mean('salary').show()
emp.groupBy(['gender', 'jobcat']).max().show() # 그룹별 최댓값
emp.agg({'salary' : 'mean', 'salbegin' : 'min'}).show() # 컬럼 별 수치값
정렬
emp.orderBy('salary', ascending=False).show()
emp.orderBy('educ', 'salary', ascending=[False, True]).show()
데이터프레임 복사
emp_copy = emp.select('*')
pyspark built-in func.
# 표준화
from pyspark.sql.functions import arg, col, stddev
sal_mean = emp_copy.select(avg(col('salary'))).first()[0]
sal_std = emp_copy.select(std(col('salary'))).first()[0]
emp_copy.withColumn('salary_std', (col('salary') - sal_mean) / sal_std).show()
.first()[0]
: 값만 가져옴emp_copy.select(avg(col('salary')))
는 df 형태
데이터 저장 및 불러오기
csv
spark = SparkSession.builder.appName('Test').getOrCreate()
# read
sparkDF = spark.read.csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
sparkDF = 스파크.read.option('encoding','cp949').option('header','true').csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
# write
sparkDF.write.csv("hdfs://localhost:9000/Spark/hadoop_Employee.csv")
# 폴더 생성하며 지정한 형태로 저장
data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://localhost:9000/Text/csv")
pandas <-> spark
# spark -> pandas
pandasDF_spark = sparkDF.toPandas()
# pandas -> spark
sparkDF_pd = spark.createDataFrame(pandasDF)
parquet
spark = SparkSession.builder.appName('Test').getOrCreate()
# read
sparkPQ = spark.read.parquet("hdfs://localhost:9000/Spark/spark_Employee.parquet")
# write
sparkDF.write.parquet("hdfs://localhost:9000/Spark/spark_Employee.parquet")
cf) pandas
# read csv
pandasDF = pd.read_csv('Employee.csv')
# write csv
pandasDF.to_csv('pandas_Employee.csv')
# read parquet
pandasPQ = pd.read_parquet('pandas_Employee.parquet')
# write parquet
pandasDF.to_parquet('pandas_Employee.parquet')