Flask Snippets

Job queue implemented using SQLite

By Thiago Arruda filed in Utilities

Very simple job queue implemented using SQLite.

This can be used in the same way as in the snippet at http://flask.pocoo.org/snippets/73/.

import os, sqlite3
from cPickle import loads, dumps
from time import sleep
try:
    from thread import get_ident
except ImportError:
    from dummy_thread import get_ident


class SqliteQueue(object):

    _create = (
            'CREATE TABLE IF NOT EXISTS queue ' 
            '('
            '  id INTEGER PRIMARY KEY AUTOINCREMENT,'
            '  item BLOB'
            ')'
            )
    _count = 'SELECT COUNT(*) FROM queue'
    _iterate = 'SELECT id, item FROM queue'
    _append = 'INSERT INTO queue (item) VALUES (?)'
    _write_lock = 'BEGIN IMMEDIATE'
    _popleft_get = (
            'SELECT id, item FROM queue '
            'ORDER BY id LIMIT 1'
            )
    _popleft_del = 'DELETE FROM queue WHERE id = ?'
    _peek = (
            'SELECT item FROM queue '
            'ORDER BY id LIMIT 1'
            )

    def __init__(self, path):
        self.path = os.path.abspath(path)
        self._connection_cache = {}
        with self._get_conn() as conn:
            conn.execute(self._create)

    def __len__(self):
        with self._get_conn() as conn:
            l = conn.execute(self._count).next()[0]
        return l

    def __iter__(self):
        with self._get_conn() as conn:
            for id, obj_buffer in conn.execute(self._iterate):
                yield loads(str(obj_buffer))

    def _get_conn(self):
        id = get_ident()
        if id not in self._connection_cache:
            self._connection_cache[id] = sqlite3.Connection(self.path, 
                    timeout=60)
        return self._connection_cache[id]

    def append(self, obj):
        obj_buffer = buffer(dumps(obj, 2))
        with self._get_conn() as conn:
            conn.execute(self._append, (obj_buffer,)) 

    def popleft(self, sleep_wait=True):
        keep_pooling = True
        wait = 0.1
        max_wait = 2
        tries = 0
        with self._get_conn() as conn:
            id = None
            while keep_pooling:
                conn.execute(self._write_lock)
                cursor = conn.execute(self._popleft_get)
                try:
                    id, obj_buffer = cursor.next()
                    keep_pooling = False
                except StopIteration:
                    conn.commit() # unlock the database
                    if not sleep_wait:
                        keep_pooling = False
                        continue
                    tries += 1
                    sleep(wait)
                    wait = min(max_wait, tries/10 + wait)
            if id:
                conn.execute(self._popleft_del, (id,))
                return loads(str(obj_buffer))
        return None

    def peek(self):
        with self._get_conn() as conn:
            cursor = conn.execute(self._peek)
            try:
                return loads(str(cursor.next()[0]))
            except StopIteration:
                return None

Here's a comparison with a redis-based queue:

# put on queue
$ python -mtimeit -s'from sqlite_queue import SqliteQueue;from random import random;q = SqliteQueue("/run/shm/queue")' 'q.append(random())'
1000 loops, best of 3: 280 usec per loop
$ python -mtimeit -s'from redis import Redis;from random import random; q = Redis()' 'q.rpush("q", random())'
1000 loops, best of 3: 201 usec per loop
# remove from queue
$ python -mtimeit -s'from sqlite_queue import SqliteQueue;from random import random;q = SqliteQueue("/run/shm/queue")' 'q.popleft()'       
1000 loops, best of 3: 325 usec per loop
$ python -mtimeit -s'from redis import Redis;from random import random; q = Redis()' 'q.lpop("q")'                                  
1000 loops, best of 3: 249 usec per loop

It is fast, simple and requires no external process managing queue access.

This snippet by Thiago Arruda can be used freely for anything you like. Consider it public domain.