What is PySpark?
It is the Python API for Spark. It’s the bridge between Python code and the distributed computing engine of Spark.
This means you can write your data processing tasks in familiar Python syntax, while Spark handles the heavy lifting of parallelizing and executing them across multiple machines.
PySpark can handle large datasets with easiness, thanks to its distributed architecture. PySpark splits the work and gets it done efficiently.
PySpark offers tools like SparkSQL for SQL-like queries and MLlib for machine learning.
PySpark syntax is similar to Pandas. Its APIs are designed to be intuitive and leverage our existing knowledge.
Why Choose PySpark?
Speed: Spark’s in-memory processing and distributed architecture make it significantly faster than traditional data processing frameworks like Hadoop MapReduce, especially for iterative computations.
Ease of Use: PySpark integrates seamlessly with Python, a language known for its simplicity and readability. This makes it easier for data scientists and analysts to learn and use.
Versatility: PySpark supports a wide range of data processing tasks, including batch processing, stream processing, machine learning, and graph processing.
Scalability: Spark can scale from small datasets on a single machine to massive datasets distributed across hundreds or thousands of machines.
Rich Ecosystem: PySpark benefits from the rich ecosystem of Python libraries, including popular tools like Pandas, NumPy, and Scikit-learn, allowing you to combine them with Spark’s distributed computing power.
Key Features
RDDs (Resilient Distributed Datasets): The fundamental data structure in Spark. RDDs are immutable, distributed collections of data that can be processed in parallel.
DataFrames: A higher-level abstraction over RDDs, similar to tables in a relational database. DataFrames provide a more structured way to work with data and offer optimized performance.
Spark SQL: Allows you to use SQL queries to process data in Spark. This makes it easier for those familiar with SQL to work with large datasets.
MLlib (Machine Learning Library): Provides a wide range of machine learning algorithms for tasks like classification, regression, clustering, and collaborative filtering.
Spark Streaming: Enables real-time processing of streaming data. This is useful for applications like fraud detection, log analysis, and social media monitoring.
GraphX: A library for graph processing, allowing you to perform operations on large-scale graphs.
What can you do with PySpark?
- Data Ingestion & Cleaning
Read and process massive datasets from various sources, including text files, databases, and streaming feeds. Clean and prepare your data for analysis with ease.
- Exploratory Data Analysis
Analyze large datasets using familiar Python functions and libraries. Uncover trends, patterns, and relationships hidden within your data.
Build and train machine learning models on vast datasets using PySpark’s MLlib library. Leverage powerful algorithms for classification, regression, clustering, and more.
Gain insights from real-time data streams with PySpark’s Structured Streaming capabilities. React to events as they happen and make data-driven decisions in real-time.
Environment settings
Show code
# Import libraries
import jdk
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
import warnings
warnings.filterwarnings('ignore')
Show code
# Create session
spark = SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 10:59:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Basic Operations
We will show some basic manipulation operations with Pyspark, like reading datasets and doing some operations.
Read datasets
Show code
# Read json
df = spark.read.json('../Datasets/people.json')
Preview datasets
Show code
# Print dataframe
df.show()
+----+-------+
| age| name|
+----+-------+
|NULL|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
View data types
Show code
# Print schema
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
Select columns or fields
Show code
# Show columns
df.columns
Descriptive Statistics of dataset
Show code
# Get descriptive stats
df.describe().show()
+-------+------------------+-------+
|summary| age| name|
+-------+------------------+-------+
| count| 2| 3|
| mean| 24.5| NULL|
| stddev|7.7781745930520225| NULL|
| min| 19| Andy|
| max| 30|Michael|
+-------+------------------+-------+
Changing data types
Show code
# Import data types
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)
Show code
# Create schema with new data types
data_schema = [StructField('age', IntegerType(), True),
StructField('name', StringType(), True)]
Show code
# Save new schema
final_struct = StructType(fields=data_schema)
Show code
# Read json with new schema
df = spark.read.json('../Datasets/people.json', schema=final_struct)
Show code
# Show new schema
df.printSchema()
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
Selecting columns
Show code
# Show first rows
df.head(5)
[Row(age=None, name='Michael'),
Row(age=30, name='Andy'),
Row(age=19, name='Justin')]
Show code
# Select a column
df.select('age').show()
+----+
| age|
+----+
|NULL|
| 30|
| 19|
+----+
Show code
# Select several columns
df.select(['age','name']).show()
+----+-------+
| age| name|
+----+-------+
|NULL|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
Creating new columns
Show code
# Create new column from an existing one
df.withColumn('age2', df['age']+5).show()
+----+-------+----+
| age| name|age2|
+----+-------+----+
|NULL|Michael|NULL|
| 30| Andy| 35|
| 19| Justin| 24|
+----+-------+----+
Rename colums
Show code
# Rename an existing column
df.withColumnRenamed('age','new_age').show()
+-------+-------+
|new_age| name|
+-------+-------+
| NULL|Michael|
| 30| Andy|
| 19| Justin|
+-------+-------+
Using SQL
Show code
# Create SQL view
df.createOrReplaceTempView('people')
Show code
# Execute sql query from above view
spark.sql("SELECT * FROM people WHERE age is not null").show()
+---+------+
|age| name|
+---+------+
| 30| Andy|
| 19|Justin|
+---+------+
Filtering datasets
Show code
app = spark.read.csv('../Datasets/appl_stock.csv', header=True, inferSchema=True)
Show code
[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
Row(Date=datetime.date(2010, 1, 5), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
Row(Date=datetime.date(2010, 1, 6), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004),
Row(Date=datetime.date(2010, 1, 7), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265),
Row(Date=datetime.date(2010, 1, 8), Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034)]
Reading datasets with header and inferred schema
Show code
sales = spark.read.csv('../Datasets/sales_info.csv', header=True, inferSchema=True)
Show code
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
| GOOG| Sam|200.0|
| GOOG|Charlie|120.0|
| GOOG| Frank|340.0|
| MSFT| Tina|600.0|
| MSFT| Amy|124.0|
| MSFT|Vanessa|243.0|
| FB| Carl|870.0|
| FB| Sarah|350.0|
| APPL| John|250.0|
| APPL| Linda|130.0|
| APPL| Mike|750.0|
| APPL| Chris|350.0|
+-------+-------+-----+
Show code
root
|-- Company: string (nullable = true)
|-- Person: string (nullable = true)
|-- Sales: double (nullable = true)
Grouping basics
Show code
sales.groupBy('Company').mean().show()
+-------+-----------------+
|Company| avg(Sales)|
+-------+-----------------+
| APPL| 370.0|
| GOOG| 220.0|
| FB| 610.0|
| MSFT|322.3333333333333|
+-------+-----------------+
Show code
sales.groupBy('Company').agg({'Sales':'max'}).show()
+-------+----------+
|Company|max(Sales)|
+-------+----------+
| APPL| 750.0|
| GOOG| 340.0|
| FB| 870.0|
| MSFT| 600.0|
+-------+----------+
Getting unique values, standard deviation and rounding numbers
Show code
from pyspark.sql.functions import countDistinct, avg, stddev, format_number
Show code
sales.select(countDistinct('Sales')).show()
+---------------------+
|count(DISTINCT Sales)|
+---------------------+
| 11|
+---------------------+
Show code
sales.select(avg('Sales').alias('avg_sales')).show()
+-----------------+
| avg_sales|
+-----------------+
|360.5833333333333|
+-----------------+
Show code
sales_std = sales.select(stddev('Sales').alias('std_dev'))
sales_std.select(format_number('std_dev',2).alias('std')).show()
+------+
| std|
+------+
|250.09|
+------+
Ordering data
Show code
sales.orderBy('Sales').show()
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
| GOOG|Charlie|120.0|
| MSFT| Amy|124.0|
| APPL| Linda|130.0|
| GOOG| Sam|200.0|
| MSFT|Vanessa|243.0|
| APPL| John|250.0|
| GOOG| Frank|340.0|
| FB| Sarah|350.0|
| APPL| Chris|350.0|
| MSFT| Tina|600.0|
| APPL| Mike|750.0|
| FB| Carl|870.0|
+-------+-------+-----+
Show code
# Order descending
sales.orderBy(sales['Sales'].desc()).show()
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
| FB| Carl|870.0|
| APPL| Mike|750.0|
| MSFT| Tina|600.0|
| FB| Sarah|350.0|
| APPL| Chris|350.0|
| GOOG| Frank|340.0|
| APPL| John|250.0|
| MSFT|Vanessa|243.0|
| GOOG| Sam|200.0|
| APPL| Linda|130.0|
| MSFT| Amy|124.0|
| GOOG|Charlie|120.0|
+-------+-------+-----+
Dealing with null values
Show code
df1 = spark.read.csv('../Datasets/ContainsNull.csv', header=True, inferSchema=True)
Show code
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+
Selecting nonnull values
Show code
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+
Show code
df1.na.drop(thresh=2).show()
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+
Show code
df1.na.drop(how='all', subset=['Sales']).show()
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+
Substituting null values
Show code
df1.na.fill('Nan').show()
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| Nan| NULL|
|emp3| Nan|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+
Show code
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp1| John| 0.0|
|emp2| NULL| 0.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+
Show code
from pyspark.sql.functions import mean
Show code
mean_val = df1.select(mean(df1['Sales'])).collect()
Show code
mean_sales = mean_val[0][0]
Show code
df1.na.fill(mean_sales, ['Sales']).show()
+----+-----+-----+
| Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+
Working with datetimes
Show code
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format
Show code
app.select(dayofmonth(app['Date'])).head(5)
[Row(dayofmonth(Date)=4),
Row(dayofmonth(Date)=5),
Row(dayofmonth(Date)=6),
Row(dayofmonth(Date)=7),
Row(dayofmonth(Date)=8)]
Show code
app.withColumn('year', year(app['Date'])).show()
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
| Date| Open| High| Low| Close| Volume| Adj Close|year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04| 213.429998| 214.499996|212.38000099999996| 214.009998|123432400| 27.727039|2010|
|2010-01-05| 214.599998| 215.589994| 213.249994| 214.379993|150476200|27.774976000000002|2010|
|2010-01-06| 214.379993| 215.23| 210.750004| 210.969995|138040000|27.333178000000004|2010|
|2010-01-07| 211.75| 212.000006| 209.050005| 210.58|119282800| 27.28265|2010|
|2010-01-08| 210.299994| 212.000006|209.06000500000002|211.98000499999998|111902700| 27.464034|2010|
|2010-01-11|212.79999700000002| 213.000002| 208.450005|210.11000299999998|115557400| 27.221758|2010|
|2010-01-12|209.18999499999998|209.76999500000002| 206.419998| 207.720001|148614900| 26.91211|2010|
|2010-01-13| 207.870005|210.92999500000002| 204.099998| 210.650002|151473000| 27.29172|2010|
|2010-01-14|210.11000299999998|210.45999700000002| 209.020004| 209.43|108223500| 27.133657|2010|
|2010-01-15|210.92999500000002|211.59999700000003| 205.869999| 205.93|148516900|26.680197999999997|2010|
|2010-01-19| 208.330002|215.18999900000003| 207.240004| 215.039995|182501900|27.860484999999997|2010|
|2010-01-20| 214.910006| 215.549994| 209.500002| 211.73|153038200| 27.431644|2010|
|2010-01-21| 212.079994|213.30999599999998| 207.210003| 208.069996|152038600| 26.957455|2010|
|2010-01-22|206.78000600000001| 207.499996| 197.16| 197.75|220441900| 25.620401|2010|
|2010-01-25|202.51000200000001| 204.699999| 200.190002| 203.070002|266424900|26.309658000000002|2010|
|2010-01-26|205.95000100000001| 213.710005| 202.580004| 205.940001|466777500| 26.681494|2010|
|2010-01-27| 206.849995| 210.58| 199.530001| 207.880005|430642100|26.932840000000002|2010|
|2010-01-28| 204.930004| 205.500004| 198.699995| 199.289995|293375600|25.819922000000002|2010|
|2010-01-29| 201.079996| 202.199995| 190.250002| 192.060003|311488100| 24.883208|2010|
|2010-02-01|192.36999699999998| 196.0|191.29999899999999| 194.729998|187469100| 25.229131|2010|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
only showing top 20 rows
Show code
app_y = app.withColumn('year', year(app['Date']))
Show code
app_y.groupBy('year').mean().show()
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|year| avg(Open)| avg(High)| avg(Low)| avg(Close)| avg(Volume)| avg(Adj Close)|avg(year)|
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|2015|120.17575393253965|121.24452385714291| 118.8630954325397|120.03999980555547| 5.18378869047619E7|115.96740080555561| 2015.0|
|2013| 473.1281355634922| 477.6389272301587|468.24710264682557| 472.6348802857143| 1.016087E8| 62.61798788492063| 2013.0|
|2014| 295.1426195357143|297.56103184523823| 292.9949599801587| 295.4023416507935| 6.315273055555555E7| 87.63583323809523| 2014.0|
|2012| 576.652720788| 581.8254008040001| 569.9211606079999| 576.0497195640002| 1.319642044E8| 74.81383696800002| 2012.0|
|2016|104.50777772619044| 105.4271825436508|103.69027771825397|104.60400786904763| 3.84153623015873E7|103.15032854761901| 2016.0|
|2010| 259.9576190992064|262.36880881349214|256.84761791269847| 259.8424600000002|1.4982631666666666E8|33.665072424603196| 2010.0|
|2011|364.06142773412705| 367.4235704880951|360.29769878174613|364.00432532142867|1.2307474166666667E8| 47.16023692063492| 2011.0|
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
Show code
new = app_y.groupBy('year').mean()
Show code
new.select(['year',format_number('avg(Open)',2).alias('avg_open')]).show()
+----+--------+
|year|avg_open|
+----+--------+
|2015| 120.18|
|2013| 473.13|
|2014| 295.14|
|2012| 576.65|
|2016| 104.51|
|2010| 259.96|
|2011| 364.06|
+----+--------+
Show code
years = new.select(['year',format_number('avg(Open)',2).alias('avg_open')]).orderBy('year')
Converting to pandas dataframe
Show code
# Convert pyspark dataframe to pandas dataframe
averages = years.toPandas()
Show code
# Convert column to numeric
averages['avg_open'] = pd.to_numeric(averages['avg_open'])
Show code
# Show pandas dataframe
averages
| 0 |
2010 |
259.96 |
| 1 |
2011 |
364.06 |
| 2 |
2012 |
576.65 |
| 3 |
2013 |
473.13 |
| 4 |
2014 |
295.14 |
| 5 |
2015 |
120.18 |
| 6 |
2016 |
104.51 |
Visualizing results
Show code
# Create chart
averages.plot.barh(
x='year',
y='avg_open',
legend=None,
color='#800020',
figsize=(14,7),
)
# Add title
plt.title('Average amounts per year', fontsize=20)
# Show chart
plt.show()
Show code
# Stop the SparkSession
spark.stop()
Conclusions
PySpark is a powerful and versatile tool for working with big data. Its combination of Spark’s distributed computing capabilities and Python’s ease of use makes it an excellent choice for data scientists and analysts looking to tackle large-scale data processing tasks.
Whether you’re performing complex data transformations, building machine learning models, or analyzing real-time streams, PySpark can help you unlock the potential of your data. So, dive in, explore its features, and unleash the power of big data!