Redis

REmote DIctionary Service is a key-value database.

Concepts

Redis is a very simple database conceptually. From a programmer perspective, it’s as if you can magically persist simple values, dictionaries, sets, lists, and priority queues, so that they are usable from other programs, possibly residing in other computers. The API is simple to use. And it is an in-memory database, hence extremely fast.

More advanced concepts

  • Pipelines

  • Expiring values

  • Publish-subscribe model

Connect to database

[1]:
import redis

Providing access information

It is common to keep access configuration information to services such as a database or cloud platform in a local file - here we use YAML.

Note: This file MUST be listed in .gitignore - otherwise anyone with access to your repository knows your password!

[2]:
%%file redis_auth_config.yaml
# This would normally live on disk and not be in a notebook!

host: 'localhost'
port: 6379
password:
Overwriting redis_auth_config.yaml
[3]:
import yaml

with open('redis_auth_config.yaml') as f:
    auth = yaml.load(f, Loader=yaml.FullLoader)
auth
[3]:
{'host': 'localhost', 'port': 6379, 'password': None}
[4]:
r = redis.Redis(
    host = auth['host'],
    port = auth['port'],
    password = auth['password']
)
[5]:
r.ping()
[5]:
True

Clear database

[6]:
r.flushdb()
[6]:
True

Simple data types

Set and get a single value

[7]:
r.set('a', 'adenosine')
[7]:
True
[8]:
r.get('a')
[8]:
b'adenosine'

Set and get multiple values

[9]:
r.mset(dict(c='cytosine', t='thymidine', g='guanosine'))
[9]:
True
[10]:
r.mget(list('tcga'))
[10]:
[b'thymidine', b'cytosine', b'guanosine', b'adenosine']

Deletion

[11]:
r.delete('a')
[11]:
1
[12]:
r.keys()
[12]:
[b't', b'c', b'g']
[13]:
r.delete('c', 't', 'g')
[13]:
3
[14]:
r.keys()
[14]:
[]

Transactions

Transactions are achieved by creating and executing pipeline. This is useful not just for atomicity, but also to reduce communication costs.

[15]:
pipe = r.pipeline()
(
pipe.set('a', 0).
    incr('a').
    incr('a').
    incr('a').
    execute()
)
[15]:
[True, 1, 2, 3]
[16]:
r.get('a')
[16]:
b'3'

Expiring values

You can also find the time to expiry with ttl (time-to-live) and convert from volatile to permanent with persist

[17]:
import time
[18]:
r.setex('foo', 3, 'bar')
print('get', r.get('foo'))
time.sleep(1)
print('ttl', r.ttl('foo'))
time.sleep(1)
print('ttl', r.ttl('foo'))
time.sleep(1)
print('ttl', r.ttl('foo'))
time.sleep(1)
print('get', r.get('foo'))
get b'bar'
ttl 2
ttl 1
ttl -2
get None

Alternative

[19]:
r.set('foo', 'bar')
r.expire('foo', 3)
print(r.get('foo'))
time.sleep(3)
print(r.get('foo'))
b'bar'
None

Complex data types

[20]:
import warnings

warnings.simplefilter('ignore', DeprecationWarning)
[21]:
r.hmset('nuc', dict(a='adenosine', c='cytosine', t='thymidine', g='guanosine'))
[21]:
True
[22]:
r.hget('nuc', 'a')
[22]:
b'adenosine'
[23]:
r.hmget('nuc', list('ctg'))
[23]:
[b'cytosine', b'thymidine', b'guanosine']
[24]:
r.hkeys('nuc')
[24]:
[b'a', b'c', b't', b'g']
[25]:
r.hvals('nuc')
[25]:
[b'adenosine', b'cytosine', b'thymidine', b'guanosine']
[26]:
r.rpush('xs', 1, 2, 3)
[26]:
3
[27]:
r.lpush('xs', 4, 5, 6)
[27]:
6
[28]:
r.llen('xs')
[28]:
6
[29]:
r.lrange('xs', 0, r.llen('xs'))
[29]:
[b'6', b'5', b'4', b'1', b'2', b'3']
[30]:
r.lrange('xs', 0, -1)
[30]:
[b'6', b'5', b'4', b'1', b'2', b'3']

Using list as a queue

[31]:
r.lpush('q', 1, 2, 3)
[31]:
3
[32]:
while r.llen('q'):
    print(r.rpop('q'))
b'1'
b'2'
b'3'

Using list as stack

[33]:
r.lpush('q', 1, 2, 3)
[33]:
3
[34]:
while r.llen('q'):
    print(r.lpop('q'))
b'3'
b'2'
b'1'

Transferring values across lists

[35]:
r.lpush('l1', 1,2,3)
[35]:
3
[36]:
while r.llen('l1'):
    r.rpoplpush('l1', 'l2')
r.llen('l1'), r.llen('l2')
[36]:
(0, 3)
[37]:
for key in r.scan_iter('l2'):
    print(key)
b'l2'
[38]:
r.lpush('l1', 1,2,3)
[38]:
3

Sets

[39]:
r.sadd('s1', 1,2,3)
[39]:
3
[40]:
r.sadd('s1', 2,3,4)
[40]:
1
[41]:
r.smembers('s1')
[41]:
{b'1', b'2', b'3', b'4'}
[42]:
r.sadd('s2', 4,5,6)
[42]:
3
[43]:
r.sdiff(['s1', 's2'])
[43]:
{b'1', b'2', b'3'}
[44]:
r.sinter(['s1', 's2'])
[44]:
{b'4'}
[45]:
r.sunion(['s1', 's2'])
[45]:
{b'1', b'2', b'3', b'4', b'5', b'6'}

Sorted sets

This is equivalent to a priority queue.

[46]:
r.zadd('jobs',
       dict(job1=3,
            job2=7,
            job3=1,
            job4=2,
            job5=6)
      )
[46]:
5
[47]:
r.zincrby('jobs', 2, 'job5')
[47]:
8.0
[48]:
r.zrange('jobs', 0, -1, withscores=True)
[48]:
[(b'job3', 1.0),
 (b'job4', 2.0),
 (b'job1', 3.0),
 (b'job2', 7.0),
 (b'job5', 8.0)]
[49]:
r.zrevrange('jobs', 0, -1, withscores=True)
[49]:
[(b'job5', 8.0),
 (b'job2', 7.0),
 (b'job1', 3.0),
 (b'job4', 2.0),
 (b'job3', 1.0)]

Union and intersection store

This just creates new sets from the union and intersection respectively.

[50]:
s1 = 'time flies like an arrow'
s2 = 'fruit flies like a banana'
[51]:
from collections import Counter
[52]:
c1 = Counter(s1.split())
[53]:
c2 = Counter(s2.split())
[54]:
r.zadd('c1', c1)
[54]:
5
[55]:
r.zadd('c2', c2)
[55]:
5
[56]:
r.zrange('c1', 0, -1, withscores=True)
[56]:
[(b'an', 1.0),
 (b'arrow', 1.0),
 (b'flies', 1.0),
 (b'like', 1.0),
 (b'time', 1.0)]
[57]:
r.zrange('c2', 0, -1, withscores=True)
[57]:
[(b'a', 1.0),
 (b'banana', 1.0),
 (b'flies', 1.0),
 (b'fruit', 1.0),
 (b'like', 1.0)]
[58]:
r.zunionstore('c3', ['c1', 'c2'])
[58]:
8
[59]:
r.zrange('c3', 0, -1, withscores=True)
[59]:
[(b'a', 1.0),
 (b'an', 1.0),
 (b'arrow', 1.0),
 (b'banana', 1.0),
 (b'fruit', 1.0),
 (b'time', 1.0),
 (b'flies', 2.0),
 (b'like', 2.0)]
[60]:
r.zinterstore('c4', ['c1', 'c2'])
[60]:
2
[61]:
r.zrange('c4', 0, -1, withscores=True)
[61]:
[(b'flies', 2.0), (b'like', 2.0)]

Publisher/Subscriber

image0

Source: https://making.pusher.com/redis-pubsub-under-the-hood/

[62]:
help(r.pubsub)
Help on method pubsub in module redis.client:

pubsub(**kwargs) method of redis.client.Redis instance
    Return a Publish/Subscribe object. With this object, you can
    subscribe to channels and listen for messages that get published to
    them.

[63]:
p = r.pubsub()

Channels

[64]:
p.subscribe('python', 'perl', 'sql')
[65]:
m = p.get_message()
while m:
    print(m)
    m = p.get_message()
{'type': 'subscribe', 'pattern': None, 'channel': b'python', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'perl', 'data': 2}
{'type': 'subscribe', 'pattern': None, 'channel': b'sql', 'data': 3}
[66]:
p.channels
[66]:
{b'python': None, b'perl': None, b'sql': None}
[67]:
p2 = r.pubsub()
[68]:
p2.psubscribe('p*')
[69]:
p2.patterns
[69]:
{b'p*': None}

Messages

From redis-puy

Every message read from a PubSub instance will be a dictionary with the following keys.

  • type: One of the following: ‘subscribe’, ‘unsubscribe’, ‘psubscribe’, ‘punsubscribe’, ‘message’, ‘pmessage’

  • channel: The channel [un]subscribed to or the channel a message was published to

  • pattern: The pattern that matched a published message’s channel. Will be None in all cases except for ‘pmessage’ types.

  • data: The message data. With [un]subscribe messages, this value will be the number of channels and patterns the connection is currently subscribed to. With [p]message messages, this value will be the actual published message.

[70]:
r.publish('python', 'use blank spaces')
r.publish('python', 'no semi-colons')
r.publish('perl', 'use spaceship operator')
r.publish('sql', 'select this')
r.publish('haskell', 'functional is cool')
[70]:
0
[71]:
m = p.get_message()
while m:
    print(m)
    m = p.get_message()
{'type': 'message', 'pattern': None, 'channel': b'python', 'data': b'use blank spaces'}
{'type': 'message', 'pattern': None, 'channel': b'python', 'data': b'no semi-colons'}
{'type': 'message', 'pattern': None, 'channel': b'perl', 'data': b'use spaceship operator'}
{'type': 'message', 'pattern': None, 'channel': b'sql', 'data': b'select this'}
[72]:
p.unsubscribe('python')
[73]:
p.channels
[73]:
{b'python': None, b'perl': None, b'sql': None}
[74]:
r.publish('python', 'use blank spaces 2')
r.publish('python', 'no semi-colons 2')
r.publish('perl', 'use spaceship operator 2')
r.publish('sql', 'select this 2')
r.publish('haskell', 'functional is cool 2')
[74]:
0
[75]:
m = p.get_message()
while m:
    print(m)
    m = p.get_message()
{'type': 'unsubscribe', 'pattern': None, 'channel': b'python', 'data': 2}
{'type': 'message', 'pattern': None, 'channel': b'perl', 'data': b'use spaceship operator 2'}
{'type': 'message', 'pattern': None, 'channel': b'sql', 'data': b'select this 2'}
[76]:
m = p2.get_message()
while m:
    print(m)
    m = p2.get_message()
{'type': 'psubscribe', 'pattern': None, 'channel': b'p*', 'data': 1}
{'type': 'pmessage', 'pattern': b'p*', 'channel': b'python', 'data': b'use blank spaces'}
{'type': 'pmessage', 'pattern': b'p*', 'channel': b'python', 'data': b'no semi-colons'}
{'type': 'pmessage', 'pattern': b'p*', 'channel': b'perl', 'data': b'use spaceship operator'}
{'type': 'pmessage', 'pattern': b'p*', 'channel': b'python', 'data': b'use blank spaces 2'}
{'type': 'pmessage', 'pattern': b'p*', 'channel': b'python', 'data': b'no semi-colons 2'}
{'type': 'pmessage', 'pattern': b'p*', 'channel': b'perl', 'data': b'use spaceship operator 2'}

Multiple databases

[77]:
r2 = redis.Redis(db=1)
r2.flushdb()
[77]:
True
[78]:
for c in ['c1', 'c2', 'c3', 'c4']:
    r.move(c, 1)
[79]:
for key in r2.scan_iter('c?'):
    print(r2.zrange(key, 0, -1, withscores=True))
[(b'flies', 2.0), (b'like', 2.0)]
[(b'a', 1.0), (b'an', 1.0), (b'arrow', 1.0), (b'banana', 1.0), (b'fruit', 1.0), (b'time', 1.0), (b'flies', 2.0), (b'like', 2.0)]
[(b'a', 1.0), (b'banana', 1.0), (b'flies', 1.0), (b'fruit', 1.0), (b'like', 1.0)]
[(b'an', 1.0), (b'arrow', 1.0), (b'flies', 1.0), (b'like', 1.0), (b'time', 1.0)]

Clean up

There is no need to close the connections when we use the Redis() object. This is taken care of automatically

def execute_command(self, *args, **options):
    "Execute a command and return a parsed response"
    pool = self.connection_pool
    command_name = args[0]
    connection = pool.get_connection(command_name, **options)
    try:
        connection.send_command(*args)
        return self.parse_response(connection, command_name, **options)
    except (ConnectionError, TimeoutError) as e:
        connection.disconnect()
        if not connection.retry_on_timeout and isinstance(e, TimeoutError):
            raise
        connection.send_command(*args)
        return self.parse_response(connection, command_name, **options)
    finally:
        pool.release(connection)

Benchmark redis

[80]:
%%bash

redis-benchmark -q -n 10000 -c 50
PING_INLINE: 106382.98 requests per second
PING_BULK: 136986.30 requests per second
SET: 144927.55 requests per second
GET: 123456.79 requests per second
INCR: 101010.10 requests per second
LPUSH: 138888.89 requests per second
RPUSH: 147058.81 requests per second
LPOP: 142857.14 requests per second
RPOP: 135135.14 requests per second
SADD: 109890.11 requests per second
HSET: 108695.65 requests per second
SPOP: 142857.14 requests per second
LPUSH (needed to benchmark LRANGE): 144927.55 requests per second
LRANGE_100 (first 100 elements): 27397.26 requests per second
LRANGE_300 (first 300 elements): 11641.44 requests per second
LRANGE_500 (first 450 elements): 8635.58 requests per second
LRANGE_600 (first 600 elements): 6250.00 requests per second
MSET (10 keys): 101010.10 requests per second