Token Bucket (rate limiting) - celery.worker.buckets

class celery.worker.buckets.FastQueue(maxsize=0)

Queue.Queue supporting the interface of TokenBucketQueue.

can_consume(tokens=1)
clear()
expected_time(tokens=1)
items
wait(block=True)
exception celery.worker.buckets.RateLimitExceeded

The token buckets rate limit has been exceeded.

class celery.worker.buckets.TaskBucket(task_registry)

This is a collection of token buckets, each task type having its own token bucket. If the task type doesn’t have a rate limit, it will have a plain Queue object instead of a TokenBucketQueue.

The put() operation forwards the task to its appropriate bucket, while the get() operation iterates over the buckets and retrieves the first available item.

Say we have three types of tasks in the registry: celery.ping, feed.refresh and video.compress, the TaskBucket will consist of the following items:

{"celery.ping": TokenBucketQueue(fill_rate=300),
 "feed.refresh": Queue(),
 "video.compress": TokenBucketQueue(fill_rate=2)}

The get operation will iterate over these until one of the buckets is able to return an item. The underlying datastructure is a dict, so the order is ignored here.

Parameters:
  • task_registry – The task registry used to get the task type class for a given task name.
add_bucket_for_type(task_name)

Add a bucket for a task type.

Will read the tasks rate limit and create a TokenBucketQueue if it has one. If the task doesn’t have a rate limit a regular Queue will be used.

clear()
empty()
get(block=True, timeout=None)

Retrive the task from the first available bucket.

Available as in, there is an item in the queue and you can consume tokens from it.

get_bucket_for_type(task_name)

Get the bucket for a particular task type.

get_nowait()
init_with_registry()

Initialize with buckets for all the task types in the registry.

items
put(request)

Put a TaskRequest into the appropiate bucket.

put_nowait(request)

Put a TaskRequest into the appropiate bucket.

qsize()

Get the total size of all the queues.

refresh()

Refresh rate limits for all task types in the registry.

update_bucket_for_type(task_name)
class celery.worker.buckets.TokenBucketQueue(fill_rate, queue=None, capacity=1)

Queue with rate limited get operations.

This uses the token bucket algorithm to rate limit the queue on get operations. See http://en.wikipedia.org/wiki/Token_Bucket Most of this code was stolen from an entry in the ASPN Python Cookbook: http://code.activestate.com/recipes/511490/

Parameters:
fill_rate

The rate in tokens/second that the bucket will be refilled.

capacity

Maximum number of tokens in the bucket. Default is 1.

timestamp

Timestamp of the last time a token was taken out of the bucket.

exception RateLimitExceeded

The token buckets rate limit has been exceeded.

TokenBucketQueue.can_consume(tokens=1)

Consume tokens from the bucket. Returns True if there were sufficient tokens otherwise False.

TokenBucketQueue.clear()
TokenBucketQueue.empty()
TokenBucketQueue.expected_time(tokens=1)

Returns the expected time in seconds when a new token should be available.

TokenBucketQueue.get(block=True)

Remove and return an item from the queue.

Raises:
  • RateLimitExceeded – If a token could not be consumed from the token bucket (consuming from the queue too fast).
  • Queue.Empty – If an item is not immediately available.

Also see Queue.Queue.get().

TokenBucketQueue.get_nowait()

Remove and return an item from the queue without blocking.

Raises:
  • RateLimitExceeded – If a token could not be consumed from the token bucket (consuming from the queue too fast).
  • Queue.Empty – If an item is not immediately available.

Also see Queue.Queue.get_nowait().

TokenBucketQueue.items
TokenBucketQueue.put(item, block=True)

Put an item into the queue.

Also see Queue.Queue.put().

TokenBucketQueue.put_nowait(item)

Put an item into the queue without blocking.

Raises Queue.Full:
 If a free slot is not immediately available.

Also see Queue.Queue.put_nowait()

TokenBucketQueue.qsize()

Returns the size of the queue.

See Queue.Queue.qsize().

TokenBucketQueue.wait(block=False)

Wait until a token can be retrieved from the bucket and return the next item.

celery.worker.buckets.chain_from_iterable()

chain.from_iterable(iterable) –> chain object

Alternate chain() contructor taking a single iterable argument that evaluates lazily.

Previous topic

Worker Controller Threads - celery.worker.controllers

Next topic

Worker Scheduler - celery.worker.scheduler

This Page