{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark DataFrames\n", "\n", "Use Spakr DataFrames rather than RDDs whenever possible. In general, Spark DataFrames are more performant, and the performance is consistent across differnet languagge APIs. Unlike RDDs which are executed on the fly, Spakr DataFrames are compiled using the Catalyst optimiser and an optimal execution path executed by the engine. Since all langugaes compile to the same execution code, there is no difference across languages (unless you use user-defined funcitons UDF)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start spark" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
150application_1522938745830_0107pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "%%spark" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "u'2.2.0.2.6.3.0-235'" ] } ], "source": [ "spark.version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import native spark functions" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.functions as F" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import variables to specify schema" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import StructType, StructField, StringType, IntegerType" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDDs and DataFrames" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "data = [('ann', 'spring', 'math', 98),\n", " ('ann', 'fall', 'bio', 50),\n", " ('bob', 'spring', 'stats', 100),\n", " ('bob', 'fall', 'stats', 92),\n", " ('bob', 'summer', 'stats', 100),\n", " ('charles', 'spring', 'stats', 88),\n", " ('charles', 'fall', 'bio', 100) \n", " ]" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "rdd = sc.parallelize(data)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('ann', 'spring', 'math', 98), ('ann', 'fall', 'bio', 50), ('bob', 'spring', 'stats', 100)]" ] } ], "source": [ "rdd.take(3)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "df = spark.createDataFrame(rdd, ['name', 'semester', 'subject', 'score'])" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+\n", "| name|semester|subject|score|\n", "+-------+--------+-------+-----+\n", "| ann| spring| math| 98|\n", "| ann| fall| bio| 50|\n", "| bob| spring| stats| 100|\n", "| bob| fall| stats| 92|\n", "| bob| summer| stats| 100|\n", "|charles| spring| stats| 88|\n", "|charles| fall| bio| 100|\n", "+-------+--------+-------+-----+" ] } ], "source": [ "df.show()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+--------+-------+-----+\n", "|name|semester|subject|score|\n", "+----+--------+-------+-----+\n", "| ann| spring| math| 98|\n", "| ann| fall| bio| 50|\n", "| bob| spring| stats| 100|\n", "+----+--------+-------+-----+\n", "only showing top 3 rows" ] } ], "source": [ "df.show(3)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'ann', semester=u'spring', subject=u'math', score=98), Row(name=u'ann', semester=u'fall', subject=u'bio', score=50), Row(name=u'bob', semester=u'spring', subject=u'stats', score=100)]" ] } ], "source": [ "df.rdd.take(3)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[summary: string, name: string, semester: string, subject: string, score: string]" ] } ], "source": [ "df.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Converstion to and from pandas\n", "\n", "Make sure your data set can fit into memory before converting to `pandas`." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " name semester subject score\n", "0 ann spring math 98\n", "1 ann fall bio 50\n", "2 bob spring stats 100\n", "3 bob fall stats 92\n", "4 bob summer stats 100\n", "5 charles spring stats 88\n", "6 charles fall bio 100" ] } ], "source": [ "pdf = df.toPandas()\n", "pdf" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+\n", "| name|semester|subject|score|\n", "+-------+--------+-------+-----+\n", "| ann| spring| math| 98|\n", "| ann| fall| bio| 50|\n", "| bob| spring| stats| 100|\n", "| bob| fall| stats| 92|\n", "| bob| summer| stats| 100|\n", "|charles| spring| stats| 88|\n", "|charles| fall| bio| 100|\n", "+-------+--------+-------+-----+" ] } ], "source": [ "spark.createDataFrame(pdf).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Schemas" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- semester: string (nullable = true)\n", " |-- subject: string (nullable = true)\n", " |-- score: long (nullable = true)" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data manipulation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Selecting columns" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-------+-----+\n", "| name|subject|score|\n", "+-------+-------+-----+\n", "| ann| math| 98|\n", "| ann| bio| 50|\n", "| bob| stats| 100|\n", "| bob| stats| 92|\n", "| bob| stats| 100|\n", "|charles| stats| 88|\n", "|charles| bio| 100|\n", "+-------+-------+-----+" ] } ], "source": [ "df.select(['name', 'subject', 'score']).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Filtering rows" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+\n", "| name|semester|subject|score|\n", "+-------+--------+-------+-----+\n", "| ann| spring| math| 98|\n", "| bob| spring| stats| 100|\n", "| bob| fall| stats| 92|\n", "| bob| summer| stats| 100|\n", "|charles| fall| bio| 100|\n", "+-------+--------+-------+-----+" ] } ], "source": [ "df.filter(df['score'] > 90).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Mutating values" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using select" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+---------+\n", "| name|semester|subject|score|adj_score|\n", "+-------+--------+-------+-----+---------+\n", "| ann| spring| math| 98| 88|\n", "| ann| fall| bio| 50| 40|\n", "| bob| spring| stats| 100| 90|\n", "| bob| fall| stats| 92| 82|\n", "| bob| summer| stats| 100| 90|\n", "|charles| spring| stats| 88| 78|\n", "|charles| fall| bio| 100| 90|\n", "+-------+--------+-------+-----+---------+" ] } ], "source": [ "df.select(df['name'], df['semester'], df['subject'], df['score'], \n", " (df['score'] - 10).alias('adj_score')).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using `withColumn`" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+----------+\n", "| name|semester|subject|score|sqrt_socre|\n", "+-------+--------+-------+-----+----------+\n", "| ann| spring| math| 98| 49.0|\n", "| ann| fall| bio| 50| 25.0|\n", "| bob| spring| stats| 100| 50.0|\n", "| bob| fall| stats| 92| 46.0|\n", "| bob| summer| stats| 100| 50.0|\n", "|charles| spring| stats| 88| 44.0|\n", "|charles| fall| bio| 100| 50.0|\n", "+-------+--------+-------+-----+----------+" ] } ], "source": [ "df.withColumn('sqrt_socre', df['score']/2).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sorting" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+\n", "| name|semester|subject|score|\n", "+-------+--------+-------+-----+\n", "| ann| fall| bio| 50|\n", "|charles| spring| stats| 88|\n", "| bob| fall| stats| 92|\n", "| ann| spring| math| 98|\n", "| bob| summer| stats| 100|\n", "|charles| fall| bio| 100|\n", "| bob| spring| stats| 100|\n", "+-------+--------+-------+-----+" ] } ], "source": [ "df.sort(df['score']).show()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+\n", "| name|semester|subject|score|\n", "+-------+--------+-------+-----+\n", "| bob| summer| stats| 100|\n", "| bob| spring| stats| 100|\n", "|charles| fall| bio| 100|\n", "| ann| spring| math| 98|\n", "| bob| fall| stats| 92|\n", "|charles| spring| stats| 88|\n", "| ann| fall| bio| 50|\n", "+-------+--------+-------+-----+" ] } ], "source": [ "df.sort(df['score'].desc()).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alternative syntax" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+\n", "| name|semester|subject|score|\n", "+-------+--------+-------+-----+\n", "| bob| spring| stats| 100|\n", "|charles| fall| bio| 100|\n", "| bob| summer| stats| 100|\n", "| ann| spring| math| 98|\n", "| bob| fall| stats| 92|\n", "|charles| spring| stats| 88|\n", "| ann| fall| bio| 50|\n", "+-------+--------+-------+-----+" ] } ], "source": [ "df.sort(df.score.desc()).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Summarizing" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------+\n", "| avg(score)|\n", "+-----------------+\n", "|89.71428571428571|\n", "+-----------------+" ] } ], "source": [ "df.agg(\n", " {'score': 'mean'}\n", ").show()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------+---+---+\n", "| avg|min|max|\n", "+-----------------+---+---+\n", "|89.71428571428571| 50|100|\n", "+-----------------+---+---+" ] } ], "source": [ "df.agg(\n", " F.mean(df.score).alias('avg'),\n", " F.min(df.score).alias('min'),\n", " F.max(df.score).alias('max')\n", ").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Split-Apply-Combine" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-----------------+--------------+\n", "| name| avg(score)|count(subject)|\n", "+-------+-----------------+--------------+\n", "|charles| 94.0| 2|\n", "| ann| 74.0| 2|\n", "| bob|97.33333333333333| 3|\n", "+-------+-----------------+--------------+" ] } ], "source": [ "df.groupby('name').agg({'score': 'mean', 'subject': 'count'}).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Join" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "meta = [('ann', 'female', 23),\n", " ('bob', 'male', 19),\n", " ('charles', 'male', 22),\n", " ('daivd', 'male', 23)\n", " ]" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "schema = StructType([\n", " StructField('name', StringType(), True),\n", " StructField('sex', StringType(), True),\n", " StructField('age', IntegerType(), True)\n", "])" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "df_meta = spark.createDataFrame(meta, schema)" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- sex: string (nullable = true)\n", " |-- age: integer (nullable = true)" ] } ], "source": [ "df_meta.printSchema()" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------+---+\n", "| name| sex|age|\n", "+-------+------+---+\n", "| ann|female| 23|\n", "| bob| male| 19|\n", "|charles| male| 22|\n", "| daivd| male| 23|\n", "+-------+------+---+" ] } ], "source": [ "df_meta.show()" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+------+---+\n", "| name|semester|subject|score| sex|age|\n", "+-------+--------+-------+-----+------+---+\n", "|charles| spring| stats| 88| male| 22|\n", "|charles| fall| bio| 100| male| 22|\n", "| ann| spring| math| 98|female| 23|\n", "| ann| fall| bio| 50|female| 23|\n", "| bob| spring| stats| 100| male| 19|\n", "| bob| fall| stats| 92| male| 19|\n", "| bob| summer| stats| 100| male| 19|\n", "+-------+--------+-------+-----+------+---+" ] } ], "source": [ "df.join(df_meta, on='name', how='inner').show()" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[name: string, semester: string, subject: string, score: bigint, sex: string, age: int]" ] } ], "source": [ "df_full = df.join(df_meta, on='name', how='rightouter')\n", "df_full.drop()" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----------+------------------+\n", "| sex|avg(score)| avg(age)|\n", "+------+----------+------------------+\n", "|female| 74.0| 23.0|\n", "| male| 96.0|20.666666666666668|\n", "+------+----------+------------------+" ] } ], "source": [ "df_full.groupby('sex').mean().show()" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----+----+----+-----+\n", "| sex|null| bio|math|stats|\n", "+------+----+----+----+-----+\n", "|female|null|23.0|23.0| null|\n", "| male|23.0|22.0|null|19.75|\n", "+------+----+----+----+-----+" ] } ], "source": [ "df_full.groupby('sex').pivot('subject').agg(F.mean('age')).show()" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----+----+-----+\n", "| sex| bio|math|stats|\n", "+------+----+----+-----+\n", "|female|23.0|23.0| null|\n", "| male|22.0|null|19.75|\n", "+------+----+----+-----+" ] } ], "source": [ "(\n", " df_full.\n", " dropna().\n", " groupby('sex').\n", " pivot('subject').\n", " agg(F.mean('age')).\n", " show()\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using SQL" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "df_full.createOrReplaceTempView('table')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Select columns" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "|charles| 22|\n", "|charles| 22|\n", "| ann| 23|\n", "| ann| 23|\n", "| daivd| 23|\n", "| bob| 19|\n", "| bob| 19|\n", "| bob| 19|\n", "+-------+---+" ] } ], "source": [ "spark.sql('select name, age from table').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Filter rows" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "|charles| 22|\n", "|charles| 22|\n", "| ann| 23|\n", "| ann| 23|\n", "| daivd| 23|\n", "+-------+---+" ] } ], "source": [ "spark.sql('select name, age from table where age > 20').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Mutate" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-------+\n", "| name|adj_age|\n", "+-------+-------+\n", "|charles| 24|\n", "|charles| 24|\n", "| ann| 25|\n", "| ann| 25|\n", "| daivd| 25|\n", "| bob| 21|\n", "| bob| 21|\n", "| bob| 21|\n", "+-------+-------+" ] } ], "source": [ "spark.sql('select name, age + 2 as adj_age from table').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sort" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "| daivd| 23|\n", "| ann| 23|\n", "| ann| 23|\n", "|charles| 22|\n", "|charles| 22|\n", "| bob| 19|\n", "| bob| 19|\n", "| bob| 19|\n", "+-------+---+" ] } ], "source": [ "spark.sql('select name, age from table order by age desc').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Summary" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|avg(age)|\n", "+--------+\n", "| 21.25|\n", "+--------+" ] } ], "source": [ "spark.sql('select mean(age) from table').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Split-apply-combine\n" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [], "source": [ "q = '''\n", "select sex, mean(score), min(age)\n", "from table\n", "group by sex\n", "'''" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----------+--------+\n", "| sex|avg(score)|min(age)|\n", "+------+----------+--------+\n", "|female| 74.0| 23|\n", "| male| 96.0| 19|\n", "+------+----------+--------+" ] } ], "source": [ "spark.sql(q).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using SQL magic" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "b8c294159b3a47d1b9ff59718c005628", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "a9687be1069f4684b557bc66ddf47cf6", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Output()" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%sql\n", "\n", "select sex, mean(score), min(age)\n", "from table\n", "group by sex" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Capture output locally (i.e. not sent to livy and cluster)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [], "source": [ "%%sql -q -o df1\n", "\n", "select sex, score, age from table" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "data": { "image/png": "iVBORw0KGgoAAAANSUhEUgAAAXoAAAD8CAYAAAB5Pm/hAAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAALEgAACxIB0t1+/AAAADl0RVh0U29mdHdhcmUAbWF0cGxvdGxpYiB2ZXJzaW9uIDIuMi4yLCBodHRwOi8vbWF0cGxvdGxpYi5vcmcvhp/UCwAAEPRJREFUeJzt3W2MXFd9x/HvH9spmxRkO15HsWPXobJcKFFwWIUU2vQhBScRIgttECmoWxLVbygPrWTVEai8gIoEVypIlUAuDzUC3KTBcYLaYiwD5UWbiHUcsnkyDiE4Xrv2QmIemlVx3H9fzDWslxnv7tyZndmT70ca3Ttn79n7z8nd347PvXdvZCaSpHK9qNcFSJK6y6CXpMIZ9JJUOINekgpn0EtS4Qx6SSqcQS9JhTPoJalwBr0kFW5xrwsAWLFiRa5bt67XZUjSgrJ///4fZObgTNv1RdCvW7eO0dHRXpchSQtKRHx/Nts5dSNJhTPoJalwBr0kFc6gl6TCGfSSVLgZr7qJiM8AbwROZOYrq7blwB3AOuAp4K2Z+WxEBPBx4HrgOeDPMvOB7pQOH9g9xs77n+Z0JosiuOk1a/jw8GXd2p2kFnYfGGfbnoMcPTnJqqUDbNm0geGNq3tdliqz+UT/T8C109q2Avsycz2wr3oPcB2wvnptBj7RmTJ/2Qd2j/H5+w5zunpC1ulMPn/fYT6we6xbu5TUxO4D49y6a4zxk5MkMH5yklt3jbH7wHivS1NlxqDPzG8Cz0xrvgHYUa3vAIantH8uG+4DlkbExZ0qdqqd9z89p3ZJ3bFtz0EmT50+q23y1Gm27TnYo4o0Xbtz9Bdl5jGAarmyal8NTE3aI1XbL4mIzRExGhGjExMTcy7gdItn3bZql9QdR09Ozqld86/TJ2OjSVvT5M3M7Zk5lJlDg4Mz3sH7SxZFs121bpfUHauWDsypXfOv3aA/fmZKplqeqNqPAGumbHcJcLT98lq76TVr5tQuqTu2bNrAwJJFZ7UNLFnElk0belSRpms36O8FRqr1EeCeKe1/Gg1XAT86M8XTaR8evox3XLX255/gF0XwjqvWetWNNM+GN67mI2+5jNVLBwhg9dIBPvKWy7zqpo9EzjCnHRE7gd8DVgDHgQ8Cu4E7gbXAYeDGzHymurzyH2hcpfMc8M7MnPGvlQ0NDaV/1EyS5iYi9mfm0EzbzXgdfWbe1OJL1zTZNoF3zVyeJL1wzfd9B33xZ4ol6YXizH0HZy5JPXPfAdC1sPdPIEjSPOrFfQcGvSTNo17cd2DQS9I86sV9Bwa9JM2jXtx34MlYSZpHZ064etWNJBVseOPqeb2hzKkbSSqcQS9JhTPoJalwBr0kFc6gl6TCGfSSVDiDXpIKZ9BLUuEMekkqnEEvSYUz6CWpcAa9JBXOoJekwhn0klQ4g16SCmfQS1LhFvSDR3YfGJ/Xp7RI0kK0YIN+94Fxbt01xuSp0wCMn5zk1l1jAIa9JE2xYKdutu05+POQP2Py1Gm27TnYo4okqT8t2KA/enJyTu2S9EK1YIN+1dKBObVL0gvVgg36LZs2MLBk0VltA0sWsWXThh5VJEn9acGejD1zwtWrbiTp3GoFfUS8F/hzIIB/zMyPRcRy4A5gHfAU8NbMfLZmnU0Nb1xtsEvSDNqeuomIV9II+SuBy4E3RsR6YCuwLzPXA/uq95KkHqkzR/9y4L7MfC4znwf+A3gzcAOwo9pmBzBcr0RJUh11gv5h4OqIuDAizgeuB9YAF2XmMYBqubJ+mZKkdrU9R5+Zj0XE7cBe4KfAt4HnZ9s/IjYDmwHWrl3bbhmSpBnUurwyMz+dmVdk5tXAM8Ah4HhEXAxQLU+06Ls9M4cyc2hwcLBOGZKkc6gV9BGxslquBd4C7ATuBUaqTUaAe+rsQ5JUT93r6L8UERcCp4B3ZeazEXEbcGdE3AIcBm6sW6QkqX21gj4zf6dJ2w+Ba+p8X0lS5yzYO2Ml9Q+fDdHfDHpJtfhsiP63YP+omaT+4LMh+p9BL6kWnw3R/wx6SbX4bIj+Z9BLqsVnQ/Q/T8ZKqsVnQ/Q/g15SbT4bor85dSNJhTPoJalwBr0kFc6gl6TCGfSSVDiDXpIKZ9BLUuEMekkqnEEvSYUz6CWpcAa9JBXOoJekwhn0klQ4g16SCmfQS1LhDHpJKpxBL0mFM+glqXAGvSQVzqCXpMIZ9JJUOINekgpn0EtS4WoFfUT8ZUQ8EhEPR8TOiHhxRFwaEfdHxKGIuCMizutUsZKkuWs76CNiNfAeYCgzXwksAt4G3A78fWauB54FbulEoZKk9tSdulkMDETEYuB84BjwB8Bd1dd3AMM19yFJqqHtoM/MceDvgMM0Av5HwH7gZGY+X212BFjdrH9EbI6I0YgYnZiYaLcMSdIM6kzdLANuAC4FVgEXANc12TSb9c/M7Zk5lJlDg4OD7ZYhSZpBnambPwS+l5kTmXkK2AW8FlhaTeUAXAIcrVmjJKmGOkF/GLgqIs6PiACuAR4Fvg78cbXNCHBPvRIlSXXUmaO/n8ZJ1weAsep7bQf+GviriHgCuBD4dAfqlCS1afHMm7SWmR8EPjit+UngyjrfV5LUOd4ZK0mFM+glqXAGvSQVzqCXpMIZ9JJUOINekgpn0EtS4Qx6SSqcQS9JhTPoJalwBr0kFc6gl6TCGfSSVDiDXpIKZ9BLUuEMekkqnEEvSYUz6CWpcAa9JBXOoJekwhn0klQ4g16SCmfQS1LhDHpJKpxBL0mFM+glqXAGvSQVzqCXpMIZ9JJUOINekgrXdtBHxIaIeHDK68cR8b6IWB4ReyPiULVc1smCJUlz03bQZ+bBzHxVZr4KeDXwHHA3sBXYl5nrgX3Ve0lSj3Rq6uYa4LuZ+X3gBmBH1b4DGO7QPiRJbehU0L8N2FmtX5SZxwCq5coO7UOS1IbaQR8R5wFvAv5ljv02R8RoRIxOTEzULUOS1EInPtFfBzyQmcer98cj4mKAanmiWafM3J6ZQ5k5NDg42IEyJEnNdCLob+IX0zYA9wIj1foIcE8H9iFJalOtoI+I84HXA7umNN8GvD4iDlVfu63OPiRJ9Syu0zkznwMunNb2QxpX4UiS+oB3xkpS4Qx6SSqcQS9JhTPoJalwBr0kFc6gl6TCGfSSVDiDXpIKZ9BLUuEMekkqnEEvSYUz6CWpcAa9JBXOoJekwhn0klQ4g16SCmfQS1LhDHpJKpxBL0mFM+glqXAGvSQVzqCXpMIZ9JJUOINekgpn0EtS4Qx6SSqcQS9JhTPoJalwBr0kFc6gl6TCGfSSVLhaQR8RSyPiroh4PCIei4jfiojlEbE3Ig5Vy2WdKlaSNHd1P9F/HPhKZv4GcDnwGLAV2JeZ64F91XtJUo+0HfQR8VLgauDTAJn5s8w8CdwA7Kg22wEM1y1SktS+Op/oXwZMAJ+NiAMR8amIuAC4KDOPAVTLlc06R8TmiBiNiNGJiYkaZUiSzqVO0C8GrgA+kZkbgf9hDtM0mbk9M4cyc2hwcLBGGZKkc6kT9EeAI5l5f/X+LhrBfzwiLgaolifqlShJqqPtoM/M/waejogNVdM1wKPAvcBI1TYC3FOrQklSLYtr9n838IWIOA94EngnjV8ed0bELcBh4Maa+5Ak1VAr6DPzQWCoyZeuqfN9JUmd452xklQ4g16SCmfQS1LhDHpJKpxBL0mFM+glqXAGvSQVzqCXpMIZ9JJUOINekgpn0EtS4Qx6SSqcQS9JhTPoJalwBr0kFc6gl6TCGfSSVDiDXpIKZ9BLUuEMekkqnEEvSYUz6CWpcAa9JBXOoJekwhn0klQ4g16SCmfQS1LhDHpJKpxBL0mFM+glqXCL63SOiKeAnwCngeczcygilgN3AOuAp4C3Zuaz9cqUJLWrE5/ofz8zX5WZQ9X7rcC+zFwP7KveS5Iquw+M87rbvsalW/+V1932NXYfGO/q/roxdXMDsKNa3wEMd2EfkrQg7T4wzq27xhg/OUkC4ycnuXXXWFfDvm7QJ/DViNgfEZurtosy8xhAtVxZcx+SVIxtew4yeer0WW2Tp06zbc/Bru2z1hw98LrMPBoRK4G9EfH4bDtWvxg2A6xdu7ZmGZK0MBw9OTmn9k6o9Yk+M49WyxPA3cCVwPGIuBigWp5o0Xd7Zg5l5tDg4GCdMiRpwVi1dGBO7Z3QdtBHxAUR8ZIz68AbgIeBe4GRarMR4J66RUpSKbZs2sDAkkVntQ0sWcSWTRu6ts86UzcXAXdHxJnv88XM/EpEfAu4MyJuAQ4DN9YvU5LKMLxxNdCYqz96cpJVSwfYsmnDz9u7ITKza998toaGhnJ0dLTXZUjSghIR+6dc2t6Sd8ZKUuEMekkqnEEvSYUz6CWpcAa9JBWuL666iYgJ4Ps1vsUK4AcdKqeTrGtu+rGufqwJrGuuSq3r1zJzxjtO+yLo64qI0dlcYjTfrGtu+rGufqwJrGuuXuh1OXUjSYUz6CWpcKUE/fZeF9CCdc1NP9bVjzWBdc3VC7quIuboJUmtlfKJXpLUQl8HfUR8JiJORMTDU9ouj4j/ioixiPhyRLy0Rd9rI+JgRDwRER19bm3Nup6qtnkwIjr2l9wiYk1EfD0iHouIRyLivVX78ojYGxGHquWyFv1Hqm0ORcRIs216VNfpaqwejIh756GuG6v3/xcRLa+G6Nbx1YG65vv42hYRj0fEQxFxd0QsbdF/vsdrtnXN93h9qKrpwYj4akSsatG/sz+Pmdm3L+Bq4Arg4Slt3wJ+t1q/GfhQk36LgO8CLwPOA74NvKLXdVVfewpY0YWxuhi4olp/CfAd4BXAR4GtVftW4PYmfZcDT1bLZdX6sl7XVX3tp106tlrV9XJgA/ANYKhF364dX3Xq6tHx9QZgcdV+e4vjqxfjNWNdPRqvl07Z5j3AJ5v07fjPY19/os/MbwLPTGveAHyzWt8L/FGTrlcCT2Tmk5n5M+CfaTy0vNd1dU1mHsvMB6r1nwCPAauZ3cPaNwF7M/OZzHyWRv3X9kFdXdOqrsx8LDNnenhn146vmnV1zTnq+mpmPl9tdh9wSZPuvRiv2dTVNeeo68dTNruAxnO3p+v4z2NfB30LDwNvqtZvBNY02WY18PSU90eqtl7XBc0fqN5REbEO2Ajcz+we1j4v49VGXQAvjojRiLgvIrryy2BaXbPRi/Garfk+vqa6Gfj3Jl16PV6t6oIejFdE/G1EPA28HfibJl06Pl4LMehvBt4VEftp/JPoZ022iSZt3b68aDZ1QeOB6lcA11XbX93JIiLiV4EvAe+b9unhnN2atHV0vNqsC2BtNu4c/BPgYxHx631QVz+PV0+Or4h4P/A88IVm3Zq0zct4zVAX9GC8MvP9mbmmqukvmnVr0lZrvBZc0Gfm45n5hsx8NbCTxtzfdEc4+xP1JcDRPqiLbP5A9Y6IiCU0DqovZOauqnk2D2vv6njVqGvqeD1JY356Y5frmo1ejNes9OD4ojpZ+Ebg7VlNMk/Tk/GaRV09Ga8pvkjzKd7Oj1enT0J0+gWs4+yTniur5YuAzwE3N+mzmMYJjEv5xcmf3+yDui4AXjJl/T+BaztUT1T7/di09m2cfdLzo036Lge+R+PEz7JqfXkf1LUM+JVqfQVwiM6dxGta15Svf4PWJ2O7dnzVrKsXx9e1wKPA4Dn6zvt4zbKuXozX+inr7wbuatK34z+Ptf+Duvmi8cn4GHCKxm+5W4D30jiD/R3gNn5x09cq4N+m9L2+2ua7wPv7oS4aVx18u3o90sm6gN+m8c+7h4AHq9f1wIXAviok9505YIAh4FNT+t8MPFG93tkPdQGvBcaq8RoDbpmHut5c/T/9X+A4sGc+j686dfXo+HqCxnzymbZP9sl4zVhXj8brSzTO5z0EfJnGCdqu/zx6Z6wkFW7BzdFLkubGoJekwhn0klQ4g16SCmfQS1LhDHpJKpxBL0mFM+glqXD/D+3XFd09qBZqAAAAAElFTkSuQmCC\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%local\n", "\n", "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "\n", "plt.scatter(x='age', y='score', data=df1)\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## User Definted Functions (UDF) verus `pyspark.sql.functions`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Version 1**: Using a User Defined Funciton (UDF)\n", "\n", "Note: Using a Python UDF is not efficient." ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [], "source": [ "@F.udf\n", "def score_to_grade(g):\n", " if g > 90:\n", " return 'A'\n", " elif g > 80:\n", " return 'B'\n", " else:\n", " return 'C'" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+-----+\n", "| name|semester|subject|score|grade|\n", "+-------+--------+-------+-----+-----+\n", "| ann| spring| math| 98| A|\n", "| ann| fall| bio| 50| C|\n", "| bob| spring| stats| 100| A|\n", "| bob| fall| stats| 92| A|\n", "| bob| summer| stats| 100| A|\n", "|charles| spring| stats| 88| B|\n", "|charles| fall| bio| 100| A|\n", "+-------+--------+-------+-----+-----+" ] } ], "source": [ "df.withColumn('grade', score_to_grade(df['score'])).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Version 2**: Using built-in fucntions.\n", "\n", "See [list of functions](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions) available.\n", "\n", "More performant version" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [], "source": [ "score_to_grade_fast = (\n", " F.\n", " when(F.col('score') > 90, 'A').\n", " when(F.col('score') > 80, 'B').\n", " otherwise('C')\n", ")" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+----------+\n", "| name|semester|subject|score|grade_fast|\n", "+-------+--------+-------+-----+----------+\n", "| ann| spring| math| 98| A|\n", "| ann| fall| bio| 50| C|\n", "| bob| spring| stats| 100| A|\n", "| bob| fall| stats| 92| A|\n", "| bob| summer| stats| 100| A|\n", "|charles| spring| stats| 88| B|\n", "|charles| fall| bio| 100| A|\n", "+-------+--------+-------+-----+----------+" ] } ], "source": [ "df.withColumn('grade_fast', score_to_grade_fast).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Vectorized UDFs\n", "\n", "The current version of `pyspark 2.3` has support for vectorized UDFs, which can make Python functions using `numpy` or `pandas` functionality much faster. Unfortunately, the Docker version of `pyspark 2.2` does not support vectorized UDFs.\n", "\n", "If you have access to `pysark 2.3` see [Introducing Pandas UDF for PySpark: How to run your native Python code with PySpark, fast](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html) and the linked [benchmarking notebook](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1281142885375883/2174302049319883/7729323681064935/latest.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## I/O options" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### CSV" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [], "source": [ "df_full.write.mode('overwrite').option('header', 'true').csv('foo.csv')" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [], "source": [ "foo = spark.read.option('header', 'true').csv('foo.csv')" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+------+---+\n", "| name|semester|subject|score| sex|age|\n", "+-------+--------+-------+-----+------+---+\n", "| bob| spring| stats| 100| male| 19|\n", "| bob| fall| stats| 92| male| 19|\n", "| bob| summer| stats| 100| male| 19|\n", "|charles| spring| stats| 88| male| 22|\n", "|charles| fall| bio| 100| male| 22|\n", "| ann| spring| math| 98|female| 23|\n", "| ann| fall| bio| 50|female| 23|\n", "| daivd| null| null| null| male| 23|\n", "+-------+--------+-------+-----+------+---+" ] } ], "source": [ "foo.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### JSON" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [], "source": [ "df_full.write.mode('overwrite').json('foo.json')" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [], "source": [ "foo = spark.read.json('foo.json')" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+-----+--------+------+-------+\n", "|age| name|score|semester| sex|subject|\n", "+---+-------+-----+--------+------+-------+\n", "| 19| bob| 100| spring| male| stats|\n", "| 19| bob| 92| fall| male| stats|\n", "| 19| bob| 100| summer| male| stats|\n", "| 22|charles| 88| spring| male| stats|\n", "| 22|charles| 100| fall| male| bio|\n", "| 23| ann| 98| spring|female| math|\n", "| 23| ann| 50| fall|female| bio|\n", "| 23| daivd| null| null| male| null|\n", "+---+-------+-----+--------+------+-------+" ] } ], "source": [ "foo.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parquet\n", "\n", "This is an efficient columnar store." ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [], "source": [ "df_full.write.mode('overwrite').parquet('foo.parquet')" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [], "source": [ "foo = spark.read.parquet('foo.parquet')" ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+------+---+\n", "| name|semester|subject|score| sex|age|\n", "+-------+--------+-------+-----+------+---+\n", "| bob| spring| stats| 100| male| 19|\n", "| bob| fall| stats| 92| male| 19|\n", "| bob| summer| stats| 100| male| 19|\n", "|charles| spring| stats| 88| male| 22|\n", "|charles| fall| bio| 100| male| 22|\n", "| ann| spring| math| 98|female| 23|\n", "| ann| fall| bio| 50|female| 23|\n", "| daivd| null| null| null| male| 23|\n", "+-------+--------+-------+-----+------+---+" ] } ], "source": [ "foo.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Random numbers" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+------+---+-------------------+\n", "| name|semester|subject|score| sex|age| uniform|\n", "+-------+--------+-------+-----+------+---+-------------------+\n", "| bob| spring| stats| 100| male| 19| 0.5029534413816527|\n", "| bob| fall| stats| 92| male| 19| 0.9867496419260051|\n", "| bob| summer| stats| 100| male| 19| 0.8209632961670508|\n", "|charles| spring| stats| 88| male| 22| 0.839067819363327|\n", "|charles| fall| bio| 100| male| 22| 0.3737326850860542|\n", "| ann| spring| math| 98|female| 23|0.45650488329285255|\n", "| ann| fall| bio| 50|female| 23| 0.1729393778824495|\n", "| daivd| null| null| null| male| 23| 0.5760115975887162|\n", "+-------+--------+-------+-----+------+---+-------------------+" ] } ], "source": [ "foo.withColumn('uniform', F.rand(seed=123)).show()" ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+------+---+--------------------+\n", "| name|semester|subject|score| sex|age| normal|\n", "+-------+--------+-------+-----+------+---+--------------------+\n", "| bob| spring| stats| 100| male| 19|0.001988081602007817|\n", "| bob| fall| stats| 92| male| 19| 0.32765099517752727|\n", "| bob| summer| stats| 100| male| 19| 0.35989602440312274|\n", "|charles| spring| stats| 88| male| 22| 0.3801966195174709|\n", "|charles| fall| bio| 100| male| 22| -2.1726586720908876|\n", "| ann| spring| math| 98|female| 23| -0.7484125450184252|\n", "| ann| fall| bio| 50|female| 23| -1.229237021920563|\n", "| daivd| null| null| null| male| 23| 0.2856848655347919|\n", "+-------+--------+-------+-----+------+---+--------------------+" ] } ], "source": [ "foo.withColumn('normal', F.randn(seed=123)).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Indexing with row numbers\n", "\n", "Note that `monotonically_increasing_id` works over partitions, so while numbers are guaranteed to be unique and increasing, they may not be consecutive." ] }, { "cell_type": "code", "execution_count": 62, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+-------+-----+------+---+-----+\n", "| name|semester|subject|score| sex|age|index|\n", "+-------+--------+-------+-----+------+---+-----+\n", "| bob| spring| stats| 100| male| 19| 0|\n", "| bob| fall| stats| 92| male| 19| 1|\n", "| bob| summer| stats| 100| male| 19| 2|\n", "|charles| spring| stats| 88| male| 22| 3|\n", "|charles| fall| bio| 100| male| 22| 4|\n", "| ann| spring| math| 98|female| 23| 5|\n", "| ann| fall| bio| 50|female| 23| 6|\n", "| daivd| null| null| null| male| 23| 7|\n", "+-------+--------+-------+-----+------+---+-----+" ] } ], "source": [ "foo.withColumn('index', F.monotonically_increasing_id()).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example: Word counting in a DataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are 2 text files in the `/data/texts` directory. We will read both in at once." ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "hdfs://vcm-2167.oit.duke.edu:8020/data/texts/Portrait.txt\n", "hdfs://vcm-2167.oit.duke.edu:8020/data/texts/Ulysses.txt" ] } ], "source": [ "hadoop = sc._jvm.org.apache.hadoop\n", "\n", "fs = hadoop.fs.FileSystem\n", "conf = hadoop.conf.Configuration() \n", "path = hadoop.fs.Path('/data/texts')\n", "\n", "for f in fs.get(conf).listStatus(path):\n", " print f.getPath()" ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [], "source": [ "text = spark.read.text('/data/texts')" ] }, { "cell_type": "code", "execution_count": 65, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+\n", "| value|\n", "+--------------------+\n", "| |\n", "|The Project Guten...|\n", "| |\n", "|This eBook is for...|\n", "|no restrictions w...|\n", "|it under the term...|\n", "|eBook or online a...|\n", "| |\n", "| |\n", "| Title: Ulysses|\n", "+--------------------+\n", "only showing top 10 rows" ] } ], "source": [ "text.show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Remove blank lines" ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+\n", "| value|\n", "+--------------------+\n", "|The Project Guten...|\n", "|This eBook is for...|\n", "|no restrictions w...|\n", "|it under the term...|\n", "|eBook or online a...|\n", "| Title: Ulysses|\n", "| Author: James Joyce|\n", "|Release Date: Aug...|\n", "|Last Updated: Aug...|\n", "| Language: English|\n", "+--------------------+\n", "only showing top 10 rows" ] } ], "source": [ "text = text.filter(text['value'] != '')\n", "text.show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using built-in functions to process a column of strings\n", "\n", "Note: This is more efficient than using a Python UDF." ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [], "source": [ "from string import punctuation\n", "\n", "def process(col):\n", " col = F.lower(col) # convert to lowercase\n", " col = F.translate(col, punctuation, '') # remove punctuation\n", " col = F.trim(col) # remove leading and traling blank space\n", " col = F.split(col, '\\s') # split on blank space\n", " col = F.explode(col) # give each iterable in row its owwn row\n", " return col" ] }, { "cell_type": "code", "execution_count": 68, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+\n", "| value|\n", "+---------+\n", "| the|\n", "| project|\n", "|gutenberg|\n", "| ebook|\n", "| of|\n", "| ulysses|\n", "| by|\n", "| james|\n", "| joyce|\n", "| this|\n", "| ebook|\n", "| is|\n", "| for|\n", "| the|\n", "| use|\n", "| of|\n", "| anyone|\n", "| anywhere|\n", "| at|\n", "| no|\n", "+---------+\n", "only showing top 20 rows" ] } ], "source": [ "words = text.withColumn('value', process(text.value))\n", "words.show(20)" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [], "source": [ "counts = words.groupby('value').count()" ] }, { "cell_type": "code", "execution_count": 70, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[value: string, count: bigint]" ] } ], "source": [ "counts.cache()" ] }, { "cell_type": "code", "execution_count": 71, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+-----+\n", "| value|count|\n", "+-----------+-----+\n", "| online| 8|\n", "| those| 403|\n", "| still| 272|\n", "| tripping| 5|\n", "| art| 66|\n", "| few| 110|\n", "| some| 416|\n", "| waters| 47|\n", "| tortured| 5|\n", "| slaver| 1|\n", "| inner| 21|\n", "| guts| 14|\n", "| hope| 79|\n", "| —billy| 1|\n", "| squealing| 3|\n", "| deftly| 10|\n", "| ceylon| 4|\n", "|ineluctably| 4|\n", "| filing| 3|\n", "| foxy| 5|\n", "+-----------+-----+\n", "only showing top 20 rows" ] } ], "source": [ "counts.show(20)" ] }, { "cell_type": "code", "execution_count": 72, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-----+\n", "|value|count|\n", "+-----+-----+\n", "| the|21063|\n", "| of|11510|\n", "| and|10611|\n", "| a| 8500|\n", "| to| 7033|\n", "| in| 6555|\n", "| he| 5835|\n", "| his| 5069|\n", "| that| 3537|\n", "| it| 3204|\n", "| was| 3192|\n", "| with| 3170|\n", "| i| 2982|\n", "| on| 2662|\n", "| you| 2650|\n", "| for| 2500|\n", "| him| 2225|\n", "| her| 2098|\n", "| is| 1892|\n", "| said| 1821|\n", "+-----+-----+\n", "only showing top 20 rows" ] } ], "source": [ "counts.sort(counts['count'].desc()).show(20)" ] }, { "cell_type": "code", "execution_count": 73, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------+-----+\n", "| value|count|\n", "+-----------------+-----+\n", "| incited| 1|\n", "| emancipation| 1|\n", "| overcloud| 1|\n", "| differed| 1|\n", "| fruitlessly| 1|\n", "| outboro| 1|\n", "| —duck| 1|\n", "| stonecutting| 1|\n", "| lectureroom| 1|\n", "| rainfragrant| 1|\n", "| lapide| 1|\n", "| end—what| 1|\n", "| shielding| 1|\n", "| breezy| 1|\n", "| chemistry| 1|\n", "| —tarentum| 1|\n", "| hockey| 1|\n", "| cable| 1|\n", "| plashing| 1|\n", "|peacocktwittering| 1|\n", "+-----------------+-----+\n", "only showing top 20 rows" ] } ], "source": [ "counts.sort(counts['count']).show(20)" ] }, { "cell_type": "code", "execution_count": 74, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python2" } }, "nbformat": 4, "nbformat_minor": 2 }