SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Test').getOrCreate()
spark.conf.sest('spark.sql.execution.arrow.pyspark.enabled', 'true')    # 메모리 관련


read csv

sparkDF = spark.read.csv('hdfs://localhost:9000/housing.csv', encoding='cp949', header=True, inferSchema=True)
sparkDF = spark.read.option('header', 'true').option('encoding', 'cp949').csv('hdfs://localhost:9000/housing.csv', inferSchema=True)

sparkDF.show()


image


emp = spark.read.option('encoding','utf-8').option('header','true').csv("../data/Employee.csv",inferSchema=True)

inferSchema=True

  • 변수들의 datatype까지 가져옴
  • True로 설정하지 않으면 모두 String타입
emp.printSchema()

image


Hadoop -> Pandas

house = sparkDF.toPandas()


DataFrame 출력

상위 20개 데이터 출력

emp.show()
  • df 형태는 모두 .show()를 붙여야 df 형태로 출력됨!


특정 컬럼만 출력

emp.select('id').show()
emp.select(['id', 'gender']).show()

image


image


기타

emp.head()

image


emp['gender']
# .show() 사용 불가

Column<’gender’>


print(type(emp.select('id')),type(emp['id']))

<class ‘pyspark.sql.dataframe.DataFrame’> <class ‘pyspark.sql.column.Column’>


describe()

emp.describe().show()


컬럼

컬럼 추가

emp = emp.withColumn('jobtime2', emp['jobtime']*2)
  • jobtime 컬럼의 값을 2배한 값을 갖는 jobtime2 컬럼 생성


컬럼명 변경

emp = emp.withColumnRenamed('jobtime2', 'jobtime3')
  • jobtime2 컬럼명 -> jobtime3로 변경


컬럼 삭제

emp = emp.drop('jobtime3')
  • jobtime3 컬럼 삭제


Filter Operations

&, |, ~

emp.filter('salary <= 30000').show()
emp.where('salary <= 30000').show()

emp.filter('salary <= 50000').select(['gender', 'jobcat']).show()

emp.filter(emp['salary'] <= 30000).show()

emp.filter((emp['salary'] <= 30000) & (emp['salary'] >= 25000)).show()

emp.filter(~(emp['salary'] <= 30000)).show()


groupBy, .agg()

emp.groupBy('gender').count().show()                       # 그룹별 카운트
 
emp.groupBy('gender').mean().show()                        # 그룹별 평균

emp.groupBy('gender').mean('salary').show()

emp.groupBy(['gender', 'jobcat']).max().show()             # 그룹별 최댓값

emp.agg({'salary' : 'mean', 'salbegin' : 'min'}).show()    # 컬럼 별 수치값

image


image


image


image


image


정렬

emp.orderBy('salary', ascending=False).show()

emp.orderBy('educ', 'salary', ascending=[False, True]).show()


데이터프레임 복사

emp_copy = emp.select('*')


pyspark built-in func.

# 표준화
from pyspark.sql.functions import arg, col, stddev
sal_mean = emp_copy.select(avg(col('salary'))).first()[0]    
sal_std = emp_copy.select(std(col('salary'))).first()[0]
emp_copy.withColumn('salary_std', (col('salary') - sal_mean) / sal_std).show()    
  • .first()[0] : 값만 가져옴
    • emp_copy.select(avg(col('salary'))) 는 df 형태


데이터 저장 및 불러오기

csv

image


spark = SparkSession.builder.appName('Test').getOrCreate()

# read
sparkDF = spark.read.csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
sparkDF = 스파크.read.option('encoding','cp949').option('header','true').csv("hdfs://localhost:9000/Spark/spark_Employee.csv")

# write
sparkDF.write.csv("hdfs://localhost:9000/Spark/hadoop_Employee.csv")

# 폴더 생성하며 지정한 형태로 저장
data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://localhost:9000/Text/csv")

pandas <-> spark

# spark -> pandas
pandasDF_spark = sparkDF.toPandas()

# pandas -> spark
sparkDF_pd  = spark.createDataFrame(pandasDF)

parquet

spark = SparkSession.builder.appName('Test').getOrCreate()

# read
sparkPQ = spark.read.parquet("hdfs://localhost:9000/Spark/spark_Employee.parquet")

# write
sparkDF.write.parquet("hdfs://localhost:9000/Spark/spark_Employee.parquet")

cf) pandas

# read csv
pandasDF = pd.read_csv('Employee.csv')
# write csv
pandasDF.to_csv('pandas_Employee.csv')
# read parquet
pandasPQ  = pd.read_parquet('pandas_Employee.parquet')
# write parquet
pandasDF.to_parquet('pandas_Employee.parquet')