{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark SQL\n", "\n", "Using Spark SQL and DataFrames results in performance that is comparable to native compiled Scala code. This is not true in general if you use the low-level RDD API.\n", "\n", "![img](https://miro.medium.com/max/1524/1*Kolwjg356xdqPv-8JmQGRQ.png)\n", "\n", "Source: https://miro.medium.com/max/1524/1*Kolwjg356xdqPv-8JmQGRQ.png\n", "\n", "The improvement in efficiency is largely due to the Catalyst optimizer, which applies rule-based and cost-based optimization. Optimization occurs in 4 phases:\n", "\n", "1. Analysis\n", " - Generate an abstract syntax tree\n", " - Resolve names using internal catalog\n", "2. Logical optimization\n", " - Construct plans using rule-based optimization\n", " - Assign costs to each plan using cost-based optimization\n", "3. Physical planning\n", " - Generate a physical plan using operations available in Spark execution engine\n", "4. Code generation\n", " - Generate Java bytecode to run on each machine\n", " - Generates compact RDD code for final execution\n", "\n", "![imng](https://miro.medium.com/max/1400/1*_DdwvGk23tB3p1aLQB3Q4A.png)\n", "\n", "Source: https://miro.medium.com/max/1400/1*_DdwvGk23tB3p1aLQB3Q4A.pngjpg" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "spark = (\n", " SparkSession.builder \n", " .master(\"local\") \n", " .appName(\"BIOS-823\") \n", " .config(\"spark.executor.cores\", 4) \n", " .getOrCreate() \n", ")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import pandas as pd" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "df = spark.createDataFrame(pd.DataFrame(dict(a=range(6), b=list('aabbcc'))))" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| a| b|\n", "+---+---+\n", "| 0| a|\n", "| 1| a|\n", "| 2| b|\n", "| 3| b|\n", "| 4| c|\n", "| 5| c|\n", "+---+---+\n", "\n" ] } ], "source": [ "df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the DataFrame into a temporary view for SQL queries." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView('df_table')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you want to create a permanent table, use this" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "df.write.saveAsTable('df_table')\n", "```" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Database(name='default', description='Default Hive database', locationUri='file:/Users/cliburnchan/_teach/bios-823-2020/notebooks/spark-warehouse')]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.catalog.listDatabases()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Table(name='df_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.catalog.listTables()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To convert a table back inot a DataFrame" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| a| b|\n", "+---+---+\n", "| 0| a|\n", "| 1| a|\n", "| 2| b|\n", "| 3| b|\n", "| 4| c|\n", "| 5| c|\n", "+---+---+\n", "\n" ] } ], "source": [ "spark.sql('SELECT * FROM df_table').show()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| a| b|\n", "+---+---+\n", "| 0| a|\n", "| 1| a|\n", "| 2| b|\n", "| 3| b|\n", "| 4| c|\n", "| 5| c|\n", "+---+---+\n", "\n" ] } ], "source": [ "spark.table('df_table').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Standard SQL Queries\n", "\n", "All Spark SQL queries return a DataFrame." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| a| b|\n", "+---+---+\n", "| 0| a|\n", "| 1| a|\n", "| 2| b|\n", "| 3| b|\n", "| 4| c|\n", "| 5| c|\n", "+---+---+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SELECT a, b FROM df_table\n", "''').show()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| a| b|\n", "+---+---+\n", "| 2| b|\n", "| 3| b|\n", "+---+---+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SELECT * \n", "FROM df_table\n", "WHERE b='b'\n", "''').show()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+------------------+\n", "| b|avg| std|\n", "+---+---+------------------+\n", "| a|0.5|0.7071067811865476|\n", "| b|2.5|0.7071067811865476|\n", "| c|4.5|0.7071067811865476|\n", "+---+---+------------------+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SELECT b, mean(a) AS avg, std(a) AS std\n", "FROM df_table\n", "GROUP BY b\n", "ORDER BY b\n", "''').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Functions" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------+\n", "| function|\n", "+-----------------+\n", "| cardinality|\n", "| case|\n", "| cast|\n", "| cbrt|\n", "| ceil|\n", "| ceiling|\n", "| char|\n", "| char_length|\n", "| character_length|\n", "| chr|\n", "| coalesce|\n", "| collect_list|\n", "| collect_set|\n", "| concat|\n", "| concat_ws|\n", "| conv|\n", "| corr|\n", "| cos|\n", "| cosh|\n", "| cot|\n", "| count|\n", "| count_if|\n", "| count_min_sketch|\n", "| covar_pop|\n", "| covar_samp|\n", "| crc32|\n", "| cube|\n", "| cume_dist|\n", "| current_database|\n", "| current_date|\n", "|current_timestamp|\n", "+-----------------+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SHOW FUNCTIONS \"c*\"\n", "''').show(50)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import functions" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['cbrt',\n", " 'ceil',\n", " 'coalesce',\n", " 'col',\n", " 'collect_list',\n", " 'collect_set',\n", " 'column',\n", " 'concat',\n", " 'concat_ws',\n", " 'conv',\n", " 'corr',\n", " 'cos',\n", " 'cosh',\n", " 'count',\n", " 'countDistinct',\n", " 'covar_pop',\n", " 'covar_samp',\n", " 'crc32',\n", " 'create_map',\n", " 'cume_dist',\n", " 'current_date',\n", " 'current_timestamp']" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sorted([f for f in dir(functions) if f.startswith('c')])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Complex Types" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Structs" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "df1 = spark.sql('''\n", "SELECT a, b, (a, b) AS struct FROM df_table\n", "''')" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+------+\n", "| a| b|struct|\n", "+---+---+------+\n", "| 0| a|[0, a]|\n", "| 1| a|[1, a]|\n", "| 2| b|[2, b]|\n", "| 3| b|[3, b]|\n", "| 4| c|[4, c]|\n", "| 5| c|[5, c]|\n", "+---+---+------+\n", "\n" ] } ], "source": [ "df1.show()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "df1.createOrReplaceTempView('df1_table')" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| a| b|\n", "+---+---+\n", "| 0| a|\n", "| 1| a|\n", "| 2| b|\n", "| 3| b|\n", "| 4| c|\n", "| 5| c|\n", "+---+---+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SELECT struct.a, struct.b FROM df1_table\n", "''').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Lists" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "df2 = spark.sql('''\n", "SELECT b, collect_list(a) as list\n", "FROM df_table\n", "GROUP BY b\n", "ORDER BY b\n", "''')" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "df2.createOrReplaceTempView('df2_table')" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+\n", "| b| list|\n", "+---+------+\n", "| a|[0, 1]|\n", "| b|[2, 3]|\n", "| c|[4, 5]|\n", "+---+------+\n", "\n" ] } ], "source": [ "df2.show()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| b| a|\n", "+---+---+\n", "| a| 0|\n", "| a| 1|\n", "| b| 2|\n", "| b| 3|\n", "| c| 4|\n", "| c| 5|\n", "+---+---+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SELECT b, explode(list) as a\n", "FROM df2_table\n", "''').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Maps" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import create_map" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "from itertools import chain" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "df3 = df.select('a', 'b', create_map('b', 'a').alias('map'))" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "df3.createOrReplaceTempView('df3_table')" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+--------+\n", "| a| b| map|\n", "+---+---+--------+\n", "| 0| a|[a -> 0]|\n", "| 1| a|[a -> 1]|\n", "| 2| b|[b -> 2]|\n", "| 3| b|[b -> 3]|\n", "| 4| c|[c -> 4]|\n", "| 5| c|[c -> 5]|\n", "+---+---+--------+\n", "\n" ] } ], "source": [ "df3.show()" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+------+\n", "| a| b|map[c]|\n", "+---+---+------+\n", "| 4| c| 4|\n", "| 5| c| 5|\n", "+---+---+------+\n", "\n" ] } ], "source": [ "spark.sql('''\n", "SELECT a, b, map['c'] FROM df3_table\n", "''').na.drop().show()" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+---+\n", "| a| b| c|\n", "+---+---+---+\n", "| 4| c| 4|\n", "| 5| c| 5|\n", "+---+---+---+\n", "\n" ] } ], "source": [ "df3.select('a', 'b', 'map.c').filter('c is not null').show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.5 64-bit", "language": "python", "name": "python38564bit02a66c47ce504b05b2ef5646cfed96c2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 2 }