PySpark-Note-1
 
Introduction
This note is to introduce some useful functions in PySpark and also do some practice with them to analyze SF crime dataset. Then KMean Clustering model is applied to show how to use ML model in PySpark.
List of Useful Functions
- Create Spark Session - SparkSession 
 Create spark session, the main entry to create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.- 1 
 2
 3
 4
 5
 6- from pyspark.sql import SparkSession 
 spark = SparkSession.builder \
 .master("local") \
 .appName("Word Count") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()- Explanation: - .master(‘local’): sets the Spark master URL to connect to. “local” mean run spark locally. Just like a local server
- .appName(“Word Count”) : application name
- .config(“spark.some.config.option”, “some-value”): configure some key-value pair in application
- .getOrCreate(): Gets an existing SparkSession or, if there is no existing one, creates a new one
 
- SQLContext 
 As of Spark 2.0, this is replaced by SparkSession. However, this class is kept here for backward compatibility.
 A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables,and read parquet files.
 
- Load data 
 After we create a SparkSession, we can use the following code to read data from CSV or JSON file.
 Returned Object: PySpark DataFrame object- csv1 
 2
 3
 4
 5
 6
 7
 8# Set delimiter = ";" between columns 
 # Skip header of file if header=True.
 # Otherwise, load header as data record
 spark.read.options(header= True,delimiter=";") \
 .csv("path-to-dataset/dataset.csv")
 # or
 spark.read.options(header= True,delimiter=";") \
 .format('csv').load("path-to-dataset/dataset.csv")
- json1 
 2
 3spark.read.json("path-to-dataset/dataset.json") 
 #or
 spark.read.format('json').load("path-to-dataset/dataset.json")
 
- csv
- Useful functions of pyspark.sql.DataFrame with SQL 
- Create DataFrame and SQL table - df.createDataFrame(data, schema=None): 
 create PySpark dataframe
 data: a list of tuples,each tuple contains the data of a row
 schema: a list of column names of dataframe- 1 
 2
 3
 4
 5- spark = SparkSession.builder\ 
 .master("local").appName("Word Count")\
 .config("spark.some.config.option", "some-value").getOrCreate()
 l1 = [('Alice', 1)]
 new_row1 = spark.createDataFrame(l1,["name","age"])
- df.createGlobalTempView(view_name) 
 Creates a global temporary view with this DataFrame.
- df.createOrReplaceTempView(view_name): 
 Creates or replaces a local temporary view with this DataFrame.
- df.registerDataFrameAsTable(table_name): 
 Registers the given DataFrame as a temporary table in the catalog.
 
 
- Show Columns, Rows, Statistic description - df.columns: a list of column names 
- df.summary(): Computes specified statistics for numeric and string columns, with count - mean - stddev - min - max. 
- df.describe(): Computes basic statistics for numeric and string columns. Similar to summary() 
- df.printSchema(): print Schema / decriptions of each column, like data type 
 
 
- Query and Selection - df.head(10): return the top 10 rows in Row type, not DataFrame 
- df.tail(10):return the last 10 rows in Row type, not DataFrame 
- df.where(condition) or df.filter(condition) : select the rows which satisfy the conditions. Return a DataFrame - 1 
 2
 3
 4
 5
 6
 7
 8- # Select ages that are < 100 and City name = "SF" 
 # Using SQL expression in conditions
 df.where("age < 100 and city in ('SF') ")
 #Using Spark API
 from pyspark.sql import Row
 from pyspark.sql.functions import col
 df.where(col("age")<100 & col('city').isin(["SF"]))
- df.column.isin([‘a’,’b’]) 
 Check if the value in a column is in the list or not
- df.column.between(lower_bound, upper_bound) 
 check if the value is within a range or not
- df.select([“column-name”]): 
 select the columns based on a list of given column-names
- df.take(N): 
 Return a list the top N rows in Row() format
- df.collect(): 
 convert pyspark dataframe to a list of row in Row() format
 
 
- Handle data - df.withColumn(‘column_name’, col):
 append a column to dataframe with name “column_name”
 - col: a Column expression for the new column. we can use UDF (user defined function) to it as well. - 1 - df = df.withColumn("age", df.age+2) - df.withColumnRenamed(“old name”,”new name”): 
 rename a column in the dataframe
- df.drop([“column-name”]), df.dropna(subset =[“column-name”])): 
 drop columns and drop the rows with NaN in selected columns
- df.dropDuplicates([“column-name-to-remove-duplicated-value”]): 
 drop duplicated rows in selected coumns
- df.fillna(), df.fill_duplicates(): fill na with given values 
- df.spark.sql.Row(age=10, name=’A’): 
 Return a row with elements: age=10, name=’A’
- df.orderBy([“column-name”], ascending=False), df.sortBy(): 
 orderBy is an alias of sortBy, they sort the dataframe along given column names
- df.groupby().agg() / df.groupby().count() 
 Apply aggregation function, such as count() to a group- 1 
 2
 3- # First select group based on column A, B. Then count the amount of group "A" 
 from pyspark.sql import functions as F
 df.groupby(["A"]).agg(F.count(df.A))
 
- df.withColumn(‘column_name’, col):
- Append New Rows - df.union(): 
 Return a new DataFrame containing union of rows in this and another DataFrame, based on position.
- df.unionByName([“colunm-name”]) 
 The difference between this function and union() is that this function resolves columns by name (not by position)- 1 
 2- new_row = spark.createDataFrame([("A",1)],["name","count"]) 
 df = df.union()
 
- SQL - df = SparkSession.sql(“select * from table”)
 
- Display Data - df.show(n)
 
- Data Types Convertion with Pandas - df.toPandas() 
 convert dataframe to pandas dataframe
- df.toDF() 
 convert a list of Rows to PySpark dataframe
- Convert Pandas DataFrame to PySpark DataFrame using Schema - 1 
 2
 3
 4
 5- from pyspark.sql.types import * 
 mySchema = StructType([ StructField("col name1", IntegerType(), True)\
 ,StructField("col name2", StringType(), True)\
 ,StructField("col name3", IntegerType(), True)])
 spark_df = spark.createDataFrame(df, mySchema)
 
- Using Resilient distributed Dataset (RDD)
 RDD represents an immutable, partitioned collection of elements that can be operated on in parallel.
 Please refer to the official website about RDD
 
Practice with Example: SF Crime data
- Requirements: - Find a platform for distributed computing, like databricks
- Install PySpark
 
- Download SF Crime Data for Demo - 1 
 2
 3
 4
 5- import requests 
 r = requests.get("https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD")
 with open("sf_crime.csv","w") as f
 f.write(r.content)
 f.close()
- Load Data with PySpark 
| 1 | from pyspark.sql import SparkSession | 
- Visualize Data - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11- # Create a view called "sf_crime" to the dataframe, so that we can use SQL 
 # to query data later
 df_opt1.createOrReplaceTempView('sf_crime')
 # if using databricks, we can use display function to see the data
 display(df_opt1 )
 # or
 df_opt1.show()
 # or
 # show the schema of dataframe
 df_opt1.printSchema()
- Clean Data 
 Change the string data type to date type, integer type and other suitable data type
| 1 | # convert data type and replace data | 
- Query and Select data I’m interested in
 I want to analyze the count of crime of each category here, so there are two ways to do this.
| 1 | # Display Count of cime using SQL | 
Result
 
Advance Topic: Machine Learning Model
Using KMean Clustering to find the 5 centers in which crimes occur frequently
- Select Position data X,Y and then Use Interquantile Range method to find outliers of position
 Since KMean Clustering is sensitive to the outlier as it uses mean method to find the center of clusters, we need to remove outliers first.
- Quantile Based method to remove outlier:
The outlier is defined as the data point that drop outside the range [Q1-1.5IQR ,  Q3+1.5IQR],
where Q1 and Q3 are  the first and third quantile of dataset and IQR = Q3-Q1 is the interquantile range.
- API in PySpark to find quantile:
df.approxQuantile(col, probabilities, relativeError):
col: column to find quantile
probabilities: a list of quantile probabilities we want to find. Here I want to find Q1 =0.25 and  Q3=0.75
return: the a list values that correspond to the quantile in probabilities list.
| 1 | # select the positions where crimes occur | 
- Remove Outliers based on upper bound and lower bound of quantile - 1 
 2
 3
 4- from pyspark.sql.functions import * 
 crime_cluster_df = crime_cluster_df.select(["X","Y"])\
 .where(col("X").between(bounds['X']['lower'], bounds['X']['upper'])) \
 .where(col("Y").between(bounds['Y']['lower'], bounds['Y']['upper']))
 
- Assemble columns into one feature column before training 
 In PySpark, we need to put all feature columns into one single feature column before we train the model.
VectorAssembler provides a way to assemble those features into one column.
| 1 | # ensemble multiple columns into one single feature column for training KMean Clustering | 
- KMean Clustering to learn data
 In Machine Learning of PySpark, we need to set the name of feature column to “features”, otherwise, setfeaturesCol="column-name"to select which feature column to learn in dataframe
| 1 | # Training KMean clustering | 
Result
 
 
Summary
This tutorial introduce:
- List of useful PySpark functions and their basic usage
- Use SF Crime dataset as a demo to see how to use PySpark to manipulate data and visualize them
- How to use machine learning model: KMean Clustering in PySpark to learn data. Note: the APIs of ML in PySpark are  different from sklearn. Need to Pay attention to the difference.
 
Reference
[1] SparkReader,Writer
[2] PySpark,SQL_module
[3] PySpark,ML_module
[4] PySpark,outlier_detection
[5] logo