Snippets are unofficial and unmaintained.
This is an archived view of user-submitted snippets. Despite being hosted on the Flask site, they are not official. No Flask maintainer has curated or checked the snippets for security, correctness, or design.
For all your queuing needs there is Flask-Celery but if you just want a very basic queue functionality to get started you can build yourself something on top of redis very easily.
from redis import Redis
redis = Redis()
For all this to work you need to define the redis key that should be used for queuing in your app config:
app.config['REDIS_QUEUE_KEY'] = 'my_queue'
from flask import current_app
from pickle import loads, dumps
class DelayedResult(object):
def __init__(self, key):
self.key = key
self._rv = None
@property
def return_value(self):
if self._rv is None:
rv = redis.get(self.key)
if rv is not None:
self._rv = loads(rv)
return self._rv
def queuefunc(f):
def delay(*args, **kwargs):
qkey = current_app.config['REDIS_QUEUE_KEY']
key = '%s:result:%s' % (qkey, str(uuid4()))
s = dumps((f, key, args, kwargs))
redis.rpush(current_app.config['REDIS_QUEUE_KEY'], s)
return DelayedResult(key)
f.delay = delay
return f
The queue runner is a simple function that runs in a while loop and processes entries from a list key in redis. Whenever something is added on there it will pop one item off the list, deserialize it, run the function and put the result into redis for a few seconds (by default 500). If the return value is None
we don't store anything because in that case the assumption is that the return value is not interesting.
def queue_daemon(app, rv_ttl=500):
while 1:
msg = redis.blpop(app.config['REDIS_QUEUE_KEY'])
func, key, args, kwargs = loads(msg[1])
try:
rv = func(*args, **kwargs)
except Exception, e:
rv = e
if rv is not None:
redis.set(key, dumps(rv))
redis.expire(key, rv_ttl)
To run the daemon you can write a simple script like this:
#!/usr/bin/env python
from yourapp import app
from that_queue_module import queue_daemon
queue_daemon(app)
To define a function to be run through the queue you need to use the @queuefunc
decorator:
@queuefunc
def add(a, b):
return a + b
When you call it normally it will be executed synchronously and in the same process. If you however call add.delay(a, b)
it will send off the request to call this function to the queue and return you a QueueResult
object. This also will need an active request context. Here an example from a python shell:
>>> from yourapp import app, add
>>>
>>> ctx = app.test_request_context()
>>> ctx.push()
>>> rv = add.delay(1, 2)
>>> rv.return_value
3
rv.return_value
will be None
until the key is available. Usually you will want to poll for a result over HTTP or not at all, so the return_value
attribute is only really useful for testing.
If you want to poll this result the rv
object has a key
attribute which is the redis key corresponding to the result:
>>> rv.key
'my_queue:result:7d43370c-0f98-4e98-9d4b-1cdaf7362eb5'
Here is how you could poll for this via HTTP:
from flask import session, abort, jsonify
@app.route('/add')
def add_numbers():
a = request.args.get('a', type=int)
b = request.args.get('b', type=int)
if a is None or b is None:
abort(400)
rv = add.delay(a, b)
session['add_result_key'] = rv.key
return 'Waiting for result...'
@app.route('/add-result')
def add_numbers_result():
key = session.get('add_result_key')
if key is None:
return jsonify(ready=False)
rv = DelayedResult(key)
if rv.return_value is None:
return jsonify(ready=False)
redis.delete(key)
del session['add_result_key']
return jsonify(ready=True, result=rv.return_value)
First you let the user access /add
to do something, then you can use JavaScript to poll /add-result
. Note that once successfully polled the result is deleted from the redis server.
This snippet by Armin Ronacher can be used freely for anything you like. Consider it public domain.