Source code for nti.transactions.queue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Support for transactionally working with queues.
"""
from __future__ import print_function, absolute_import, division
__docformat__ = "restructuredtext en"
logger = __import__('logging').getLogger(__name__)
import transaction
try:
from queue import Full as QFull
except ImportError: # pragma: no cover
# Py2
# The gevent.queue.Full class is just an alias
# for the stdlib class, on both Py2 and Py3
from Queue import Full as QFull
from nti.transactions.manager import ObjectDataManager
__all__ = [
'put_nowait',
]
class _QueuePutDataManager(ObjectDataManager):
"""
A data manager that checks if the queue is full before putting.
Overrides :meth:`tpc_vote` for efficiency.
"""
def __init__(self, queue, method, args=()):
super(_QueuePutDataManager, self).__init__(target=queue, call=method, args=args)
# NOTE: See the `sortKey` method. The use of the queue as the target
# is critical to ensure that the FIFO property holds when multiple objects
# are added to a queue during a transaction
def tpc_vote(self, tx):
if self.target.full():
# TODO: Should this be a transient exception?
# So retry logic kicks in?
raise QFull()
[docs]def put_nowait(queue, obj):
"""
Transactionally puts `obj` in `queue`. The `obj` will only be visible
in the queue after the current transaction successfully commits.
If the queue cannot accept the object because it is full, the transaction
will be aborted.
See :class:`gevent.queue.Queue` and :class:`Queue.Full` and :mod:`gevent.queue`.
"""
transaction.get().join(
_QueuePutDataManager(queue,
queue.put_nowait,
args=(obj,)))