Queue.Queue supporting the interface of TokenBucketQueue.
The token buckets rate limit has been exceeded.
This is a collection of token buckets, each task type having its own token bucket. If the task type doesn’t have a rate limit, it will have a plain Queue object instead of a TokenBucketQueue.
The put() operation forwards the task to its appropriate bucket, while the get() operation iterates over the buckets and retrieves the first available item.
Say we have three types of tasks in the registry: celery.ping, feed.refresh and video.compress, the TaskBucket will consist of the following items:
{"celery.ping": TokenBucketQueue(fill_rate=300),
"feed.refresh": Queue(),
"video.compress": TokenBucketQueue(fill_rate=2)}
The get operation will iterate over these until one of the buckets is able to return an item. The underlying datastructure is a dict, so the order is ignored here.
| Parameters: |
|
|---|
Add a bucket for a task type.
Will read the tasks rate limit and create a TokenBucketQueue if it has one. If the task doesn’t have a rate limit a regular Queue will be used.
Retrive the task from the first available bucket.
Available as in, there is an item in the queue and you can consume tokens from it.
Get the bucket for a particular task type.
Initialize with buckets for all the task types in the registry.
Put a TaskRequest into the appropiate bucket.
Put a TaskRequest into the appropiate bucket.
Get the total size of all the queues.
Refresh rate limits for all task types in the registry.
Queue with rate limited get operations.
This uses the token bucket algorithm to rate limit the queue on get operations. See http://en.wikipedia.org/wiki/Token_Bucket Most of this code was stolen from an entry in the ASPN Python Cookbook: http://code.activestate.com/recipes/511490/
| Parameters: |
|---|
The rate in tokens/second that the bucket will be refilled.
Maximum number of tokens in the bucket. Default is 1.
Timestamp of the last time a token was taken out of the bucket.
The token buckets rate limit has been exceeded.
Consume tokens from the bucket. Returns True if there were sufficient tokens otherwise False.
Returns the expected time in seconds when a new token should be available.
Remove and return an item from the queue.
| Raises: |
|
|---|
Also see Queue.Queue.get().
Remove and return an item from the queue without blocking.
| Raises: |
|
|---|
Also see Queue.Queue.get_nowait().
Put an item into the queue.
Also see Queue.Queue.put().
Put an item into the queue without blocking.
| Raises Queue.Full: | |
|---|---|
| If a free slot is not immediately available. | |
Also see Queue.Queue.put_nowait()
Returns the size of the queue.
See Queue.Queue.qsize().
Wait until a token can be retrieved from the bucket and return the next item.
chain.from_iterable(iterable) –> chain object
Alternate chain() contructor taking a single iterable argument that evaluates lazily.