Spark Streaming¶
The Spark Streaming library takes a stream of data and breaks it up into micro-batches that are then processed, giving the illusion of a continually updated stream of results.
Resources¶
Streaming using sockets¶
We will first illustrate the idea of streaming data over TCP/IP with the
Python standard library socket
module. The consumer and producer
should be run in separate terminals
Terminal 1
python consumer.py localhost 10000
Terminal 2
python producer.py localhost 10000
Consumer keeps a running word count¶
%%file consumer.py
import sys
import socket
from collections import Counter
HOST = sys.argv[1]
PORT = int(sys.argv[2])
s = socket.socket()
s.bind((HOST, PORT))
s.listen(4)
connection, address = s.accept()
c = Counter()
while True:
line = connection.recv(64)
words = line.split()
if words:
c.update(words)
print(c.most_common(5))
Overwriting consumer.py
Producer sends data to server for processing¶
%%file client.py
import socket
import time
import sys
HOST = sys.argv[1]
PORT = int(sys.argv[2])
s = socket.socket()
s.connect((HOST, PORT))
while True:
for line in open('data/Ulysses.txt'):
s.sendall(str.encode(line))
time.sleep(1)
Overwriting client.py
Using Spark Streaming¶
Now we’ll replace the consumer with a Spark application. This will work with micro-batches lasting 2 seconds.
from pyspark import SparkContext
sc = SparkContext('local[*]')
lines = sc.textFile('data/Ulysses.txt')
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda x,y: x+ y))
counts.takeOrdered(5, key=lambda x: -x[1])
[('the', 13600), ('of', 8127), ('and', 6542), ('a', 5842), ('to', 4787)]
Monitor a directory for new or renamed files¶
%%file file_consumer.py
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext('local[*]')
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
lines = ssc.textFileStream(sys.argv[1])
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda x,y: x+ y))
counts.pprint()
ssc.start()
ssc.awaitTermination()
Writing file_consumer.py
Usage¶
Run in terminal
~/anaconda3/share/spark-1.6.0/bin/spark-submit file_consumer.py <folder>
When you copy, move or save a file to <folder>
, the word counts for
that file will be updated ..
Monitor a TCP/IP socket¶
%%file socket_consumer.py
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext('local[*]')
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda x,y: x+ y))
counts.pprint()
ssc.start()
ssc.awaitTermination()
Overwriting socket_consumer.py
Usage¶
Run in terminal
~/anaconda3/share/spark-1.6.0/bin/spark-submit socket_consumer.py localhost 10000
In a different terminal
nc -lk 10000
Any text pasted in the nc
terminal will have its words counted.
Keeping state¶
%%file stateful_socket_consumer.py
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def updateFunc(new, last):
if last is None:
last = 0
return sum(new) + last
sc = SparkContext('local[*]')
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint")
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.updateStateByKey(updateFunc)
.transform(lambda x: x.sortByKey()))
counts.pprint()
ssc.start()
ssc.awaitTermination()
Overwriting stateful_socket_consumer.py
Usage¶
Same as above, but the Spark program will now maintain an updated running count.