Spark Streaming

The Spark Streaming library takes a stream of data and breaks it up into micro-batches that are then processed, giving the illusion of a continually updated stream of results.

Streaming using sockets

We will first illustrate the idea of streaming data over TCP/IP with the Python standard library socket module. The consumer and producer should be run in separate terminals

Terminal 1

python consumer.py localhost 10000

Terminal 2

python producer.py localhost 10000

Consumer keeps a running word count

%%file consumer.py
import sys
import socket
from collections import Counter

HOST = sys.argv[1]
PORT = int(sys.argv[2])

s = socket.socket()
s.bind((HOST, PORT))
s.listen(4)
connection, address = s.accept()

c = Counter()

while True:
    line = connection.recv(64)
    words = line.split()
    if words:
        c.update(words)
        print(c.most_common(5))
Overwriting consumer.py

Producer sends data to server for processing

%%file client.py
import socket
import time
import sys

HOST = sys.argv[1]
PORT = int(sys.argv[2])
s = socket.socket()
s.connect((HOST, PORT))
while True:
    for line in open('data/Ulysses.txt'):
        s.sendall(str.encode(line))
        time.sleep(1)
Overwriting client.py

Using Spark Streaming

Now we’ll replace the consumer with a Spark application. This will work with micro-batches lasting 2 seconds.

from pyspark import SparkContext

sc = SparkContext('local[*]')
lines = sc.textFile('data/Ulysses.txt')

counts = (lines.flatMap(lambda line: line.split())
          .map(lambda word: (word, 1))
          .reduceByKey(lambda x,y: x+ y))

counts.takeOrdered(5, key=lambda x: -x[1])
[('the', 13600), ('of', 8127), ('and', 6542), ('a', 5842), ('to', 4787)]

Monitor a directory for new or renamed files

%%file file_consumer.py

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext('local[*]')
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 2)
lines = ssc.textFileStream(sys.argv[1])

counts = (lines.flatMap(lambda line: line.split())
          .map(lambda word: (word, 1))
          .reduceByKey(lambda x,y: x+ y))

counts.pprint()

ssc.start()
ssc.awaitTermination()
Writing file_consumer.py

Usage

Run in terminal

~/anaconda3/share/spark-1.6.0/bin/spark-submit file_consumer.py <folder>

When you copy, move or save a file to <folder>, the word counts for that file will be updated ..

Monitor a TCP/IP socket

%%file socket_consumer.py

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext('local[*]')
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 2)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

counts = (lines.flatMap(lambda line: line.split())
          .map(lambda word: (word, 1))
          .reduceByKey(lambda x,y: x+ y))

counts.pprint()

ssc.start()
ssc.awaitTermination()
Overwriting socket_consumer.py

Usage

Run in terminal

~/anaconda3/share/spark-1.6.0/bin/spark-submit socket_consumer.py localhost 10000

In a different terminal

nc -lk 10000

Any text pasted in the nc terminal will have its words counted.

Keeping state

%%file stateful_socket_consumer.py

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def updateFunc(new, last):
    if last is None:
        last = 0
    return sum(new) + last

sc = SparkContext('local[*]')
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint")

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

counts = (lines.flatMap(lambda line: line.split())
          .map(lambda word: (word, 1))
          .updateStateByKey(updateFunc)
          .transform(lambda x: x.sortByKey()))

counts.pprint()

ssc.start()
ssc.awaitTermination()
Overwriting stateful_socket_consumer.py

Usage

Same as above, but the Spark program will now maintain an updated running count.