Spark SQL¶
A tour of the Spark SQL library, the spark-csv
package and Spark
DataFrames.
Resources¶
- Spark tutorials: A growing bunch of accessible tutorials on Spark, mostly in Scala but a few in Python.
from pyspark import SparkContext, SparkConf
conf = (SparkConf()
.setAppName('SparkSQL')
.setMaster('local[*]'))
sc = SparkContext(conf=conf)
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
DataFrame from pandas
¶
pandas_df = sns.load_dataset('iris')
spark_df = sqlc.createDataFrame(pandas_df)
spark_df.show(n=3)
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
| 5.1| 3.5| 1.4| 0.2| setosa|
| 4.9| 3.0| 1.4| 0.2| setosa|
| 4.7| 3.2| 1.3| 0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 3 rows
DataFrame from CSV files¶
Using manual parsing and a schema¶
%%bash
cat data/cars.csv
year,make,model,comment,blank
"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt
from pyspark.sql.types import *
def pad(alist):
tmp = alist[:]
n = 5 - len(alist)
for i in range(n):
tmp.append('')
return tmp
# Load a text file and convert each line to a tuple.
lines = sc.textFile('data/cars.csv')
header = lines.first() #extract header
lines = lines.filter(lambda line: line != header)
lines = lines.filter(lambda line: line)
parts = lines.map(lambda l: l.split(','))
parts = parts.map(lambda part: pad(part))
fields = [
StructField('year', IntegerType(), True),
StructField('make', StringType(), True),
StructField('model', StringType(), True),
StructField('comment', StringType(), True),
StructField('blank', StringType(), True),
]
schema = StructType(fields)
# Apply the schema to the RDD.
df0 = sqlc.createDataFrame(parts, schema)
df0.show(n=3)
+----+-------+-----+--------------------+-----+
|year| make|model| comment|blank|
+----+-------+-----+--------------------+-----+
|null|"Tesla"| "S"| "No comment"| |
|null| Ford| E350|"Go get one now t...| |
|null| Chevy| Volt| | |
+----+-------+-----+--------------------+-----+
Using the spark-csv
package¶
df = (sqlc.read.format('com.databricks.spark.csv')
.options(header='true', inferschema='true')
.load('data/cars.csv'))
Using the dataframe¶
df.printSchema()
root
|-- year: integer (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
+----+-----+-----+--------------------+-----+
|year| make|model| comment|blank|
+----+-----+-----+--------------------+-----+
|2012|Tesla| S| No comment| |
|1997| Ford| E350|Go get one now th...| |
|2015|Chevy| Volt| null| null|
+----+-----+-----+--------------------+-----+
df.select(['year', 'make']).show()
+----+-----+
|year| make|
+----+-----+
|2012|Tesla|
|1997| Ford|
|2015|Chevy|
+----+-----+
To run SQL queries, we need to register the dataframe as a table¶
df.registerTempTable('cars')
q = sqlc.sql('select year, make from cars where year > 2000')
q.show()
+----+-----+
|year| make|
+----+-----+
|2012|Tesla|
|2015|Chevy|
+----+-----+
Spark dataframes can be converted to Pandas ones¶
Typically, we would only convert small dataframes such as the results of
SQL queries. If we could load the original dataset in memory as a
pandaa
dataframe, why would we be using Spark?
q_df = q.toPandas()
q_df
year | make | |
---|---|---|
0 | 2012 | Tesla |
1 | 2015 | Chevy |
DataFrame from JSON files¶
It is easier to read in JSON than CSV files because JSON is self-describing, allowing Spark SQL to infer the appropriate schema without additional hints.
As an example, we will look at Durham police crime reports from the Durham Open Data website.
df = sqlc.read.json('data/durham-police-crime-reports.json')
How many records are there?
df.count()
101375
Since this is JSON, it is possible to have a nested schema.
df.printSchema()
root
|-- datasetid: string (nullable = true)
|-- fields: struct (nullable = true)
| |-- addtime: string (nullable = true)
| |-- big_zone: string (nullable = true)
| |-- chrgdesc: string (nullable = true)
| |-- csstatus: string (nullable = true)
| |-- csstatusdt: string (nullable = true)
| |-- date_fnd: string (nullable = true)
| |-- date_occu: string (nullable = true)
| |-- date_rept: string (nullable = true)
| |-- dist: string (nullable = true)
| |-- dow1: string (nullable = true)
| |-- dow2: string (nullable = true)
| |-- geo_point_2d: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- geo_shape: struct (nullable = true)
| | |-- coordinates: array (nullable = true)
| | | |-- element: double (containsNull = true)
| | |-- type: string (nullable = true)
| |-- hour_fnd: string (nullable = true)
| |-- hour_occu: string (nullable = true)
| |-- hour_rept: string (nullable = true)
| |-- inci_id: string (nullable = true)
| |-- monthstamp: string (nullable = true)
| |-- reportedas: string (nullable = true)
| |-- reviewdate: string (nullable = true)
| |-- strdate: string (nullable = true)
| |-- ucr_code: string (nullable = true)
| |-- ucr_type_o: string (nullable = true)
| |-- yearstamp: string (nullable = true)
|-- geometry: struct (nullable = true)
| |-- coordinates: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- type: string (nullable = true)
|-- record_timestamp: string (nullable = true)
|-- recordid: string (nullable = true)
Show the top few rows.
df.show(n=5)
+--------------------+--------------------+--------------------+--------------------+--------------------+
| datasetid| fields| geometry| record_timestamp| recordid|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|2c0251654c4b7a006...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|e5fe0e483fdb17fb7...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|d16c330ea4b3e2a90...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|1128e12a912b16cfe...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|ac79bc9c709d5dfa4...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows
Make a dataframe only containing date and charges.
df.select(['fields.strdate', 'fields.chrgdesc']).show(n=5)
+-----------+--------------------+
| strdate| chrgdesc|
+-----------+--------------------+
|Dec 2 2013|CALLS FOR SERVICE...|
|Dec 2 2013|VANDALISM TO PROP...|
|Dec 2 2013|BURGLARY - FORCIB...|
|Dec 2 2013|LARCENY - SHOPLIF...|
|Dec 2 2013|BURGLARY - FORCIB...|
+-----------+--------------------+
only showing top 5 rows
Show distinct charges - note that for an actual analysis, you would probably want to consolidate these into a smaller number of groups to account for typos, etc.
df.select('fields.chrgdesc').distinct().show()
+--------------------+
| chrgdesc|
+--------------------+
|ALL OTHER OFFENSE...|
|DRUG EQUIPMENT/PA...|
| ASSIST OTHER AGENCY|
|TOWED/ABANDONED V...|
|DRUG EQUIPMENT/PA...|
|BURGLARY - FORCIB...|
|SEX OFFENSE - STA...|
|ROBBERY - INDIVIDUAL|
|WEAPON VIOLATIONS...|
|ALL OTHER OFFENSE...|
|DRUG/NARCOTIC VIO...|
|SEX OFFENSE - PEE...|
|DRUG/NARCOTIC VIO...|
|DRUG/NARCOTIC VIO...|
|AGGRAVATED ASSAUL...|
|ALL OTHER OFFENSE...|
|LIQUOR LAW - POSS...|
|EMBEZZLEMENT - WI...|
|WEAPON VIOLATIONS...|
| RUNAWAY|
+--------------------+
only showing top 20 rows
What charges are the most common?
df.groupby('fields.chrgdesc').count().sort('count', ascending=False).show()
+--------------------+-----+
| chrgdesc|count|
+--------------------+-----+
|BURGLARY - FORCIB...|11630|
|LARCENY - SHOPLIF...| 7633|
|LARCENY - FROM MO...| 7405|
|SIMPLE ASSAULT (P...| 5085|
| LARCENY - ALL OTHER| 4666|
|LARCENY - FROM BU...| 4514|
|VANDALISM TO AUTO...| 4112|
|DRUG/NARCOTIC VIO...| 3790|
|LARCENY - AUTOMOB...| 3441|
|VANDALISM TO PROP...| 3422|
|CALLS FOR SERVICE...| 3207|
| AGGRAVATED ASSAULT| 3183|
|BURGLARY - NON-FO...| 2339|
|ROBBERY - INDIVIDUAL| 2330|
|TOWED/ABANDONED V...| 2244|
|MOTOR VEHICLE THE...| 1970|
|DRIVING WHILE IMP...| 1912|
|FRAUD - FALSE PRE...| 1660|
| FOUND PROPERTY| 1643|
|ALL TRAFFIC (EXCE...| 1436|
+--------------------+-----+
only showing top 20 rows
Register as table to run full SQL queries¶
df.registerTempTable('crimes')
q = sqlc.sql('''
select fields.chrgdesc, count(fields.chrgdesc) as count
from crimes
where fields.monthstamp=3
group by fields.chrgdesc
''')
q.show()
+--------------------+-----+
| chrgdesc|count|
+--------------------+-----+
|ALL OTHER OFFENSE...| 1|
|TOWED/ABANDONED V...| 258|
| ASSIST OTHER AGENCY| 19|
|BURGLARY - FORCIB...| 929|
|SEX OFFENSE - STA...| 3|
|ROBBERY - INDIVIDUAL| 157|
|WEAPON VIOLATIONS...| 6|
|SEX OFFENSE - PEE...| 5|
|ALL OTHER OFFENSE...| 8|
|DRUG/NARCOTIC VIO...| 14|
|DRUG/NARCOTIC VIO...| 28|
|AGGRAVATED ASSAUL...| 1|
|LIQUOR LAW - POSS...| 2|
|ALL OTHER OFFENSE...| 3|
|EMBEZZLEMENT - WI...| 7|
|WEAPON VIOLATIONS...| 1|
| RUNAWAY| 87|
| MISSING PERSON| 16|
|SIMPLE ASSAULT-PH...| 3|
|ALL OTHER OFFENSE...| 22|
+--------------------+-----+
only showing top 20 rows
Convert to pandas
¶
crimes_df = q.toPandas()
crimes_df.head()
chrgdesc | count | |
---|---|---|
0 | ALL OTHER OFFENSES-BIGAMY/MARRIAGE LAWS | 1 |
1 | TOWED/ABANDONED VEHICLE | 258 |
2 | ASSIST OTHER AGENCY | 19 |
3 | BURGLARY - FORCIBLE ENTRY | 929 |
4 | SEX OFFENSE - STATUTORY RAPE | 3 |
DataFrame from SQLite3¶
The official docs suggest that this can be done directly via JDBC but I cannot get it to work. As a workaround, you can convert to JSON before importing as a dataframe. If anyone finds out how to load an SQLite3 database table directly into a Spark dataframe, please let me know.
from odo import odo
odo('sqlite:///../data/Chinook_Sqlite.sqlite::Album', 'Album.json')
df = sqlc.read.json('Album.json')
df.show(n=3)
+-------+--------+--------------------+
|AlbumId|ArtistId| Title|
+-------+--------+--------------------+
| 1| 1|For Those About T...|
| 2| 2| Balls to the Wall|
| 3| 2| Restless and Wild|
+-------+--------+--------------------+
only showing top 3 rows
DataSets¶
In Scala and Java, Spark 1.6 introduced a new type called DataSet
that combines the relational properties of a DataFrame
with the
functional methods of an RDD
. This will be available in Python in a
later version. However, because of the dynamic nature of Python, you can
already call functional methods on a Spark Dataframe
, giving most of
the ease of use of the DataSet
type.
ds = sqlc.read.text('../data/Ulysses.txt')
ds
DataFrame[value: string]
ds.show(n=3)
+--------------------+
| value|
+--------------------+
|The Project Guten...|
| |
|This eBook is for...|
+--------------------+
only showing top 3 rows
def remove_punctuation(s):
import string
return s.translate(dict.fromkeys(ord(c) for c in string.punctuation))
counts = (ds.map(lambda x: remove_punctuation(x[0]))
.flatMap(lambda x: x.lower().strip().split())
.filter(lambda x: x!= '')
.map(lambda x: (x, 1))
.countByKey())
sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]
[('the', 15107),
('of', 8257),
('and', 7282),
('a', 6553),
('to', 5042),
('in', 4981),
('he', 4033),
('his', 3333),
('i', 2698),
('that', 2621)]
Optional Exercise
The crime data set includes both date and geospatial information.
Consider creating an interactive map visualization of crimes in Durham
by date using the bokeh
package. See this
example
to get started. GeoJSON version of the Durham Police Crime Reports can
be
downloaded.
Version information¶
%load_ext version_information
%version_information pyspark
Software | Version |
---|---|
Python | 3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)] |
IPython | 4.0.3 |
OS | Darwin 15.4.0 x86_64 i386 64bit |
pyspark | The 'pyspark' distribution was not found and is required by the application |
Wed Apr 20 11:54:43 2016 EDT |