mirror of
https://github.com/python/cpython.git
synced 2024-11-25 02:44:06 +08:00
logging: added QueueListener and documentation.
This commit is contained in:
parent
d12a420acc
commit
0637d493e9
@ -2651,12 +2651,18 @@ The :class:`QueueHandler` class, located in the :mod:`logging.handlers` module,
|
||||
supports sending logging messages to a queue, such as those implemented in the
|
||||
:mod:`queue` or :mod:`multiprocessing` modules.
|
||||
|
||||
Along with the :class:`QueueListener` class, :class:`QueueHandler` can be used
|
||||
to let handlers do their work on a separate thread from the one which does the
|
||||
logging. This is important in Web applications and also other service
|
||||
applications where threads servicing clients need to respond as quickly as
|
||||
possible, while any potentially slow operations (such as sending an email via
|
||||
:class:`SMTPHandler`) are done on a separate thread.
|
||||
|
||||
.. class:: QueueHandler(queue)
|
||||
|
||||
Returns a new instance of the :class:`QueueHandler` class. The instance is
|
||||
initialized with the queue to send messages to. The queue can be any queue-
|
||||
like object; it's passed as-is to the :meth:`enqueue` method, which needs
|
||||
like object; it's used as-is by the :meth:`enqueue` method, which needs
|
||||
to know how to send messages to it.
|
||||
|
||||
|
||||
@ -2688,8 +2694,80 @@ supports sending logging messages to a queue, such as those implemented in the
|
||||
|
||||
The :class:`QueueHandler` class was not present in previous versions.
|
||||
|
||||
.. queue-listener:
|
||||
|
||||
QueueListener
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
The :class:`QueueListener` class, located in the :mod:`logging.handlers`
|
||||
module, supports receiving logging messages from a queue, such as those
|
||||
implemented in the :mod:`queue` or :mod:`multiprocessing` modules. The
|
||||
messages are received from a queue in an internal thread and passed, on
|
||||
the same thread, to one or more handlers for processing.
|
||||
|
||||
Along with the :class:`QueueHandler` class, :class:`QueueListener` can be used
|
||||
to let handlers do their work on a separate thread from the one which does the
|
||||
logging. This is important in Web applications and also other service
|
||||
applications where threads servicing clients need to respond as quickly as
|
||||
possible, while any potentially slow operations (such as sending an email via
|
||||
:class:`SMTPHandler`) are done on a separate thread.
|
||||
|
||||
.. class:: QueueListener(queue, *handlers)
|
||||
|
||||
Returns a new instance of the :class:`QueueListener` class. The instance is
|
||||
initialized with the queue to send messages to and a list of handlers which
|
||||
will handle entries placed on the queue. The queue can be any queue-
|
||||
like object; it's passed as-is to the :meth:`dequeue` method, which needs
|
||||
to know how to get messages from it.
|
||||
|
||||
.. method:: dequeue(block)
|
||||
|
||||
Dequeues a record and return it, optionally blocking.
|
||||
|
||||
The base implementation uses ``get()``. You may want to override this
|
||||
method if you want to use timeouts or work with custom queue
|
||||
implementations.
|
||||
|
||||
.. method:: prepare(record)
|
||||
|
||||
Prepare a record for handling.
|
||||
|
||||
This implementation just returns the passed-in record. You may want to
|
||||
override this method if you need to do any custom marshalling or
|
||||
manipulation of the record before passing it to the handlers.
|
||||
|
||||
.. method:: handle(record)
|
||||
|
||||
Handle a record.
|
||||
|
||||
This just loops through the handlers offering them the record
|
||||
to handle. The actual object passed to the handlers is that which
|
||||
is returned from :meth:`prepare`.
|
||||
|
||||
.. method:: start()
|
||||
|
||||
Starts the listener.
|
||||
|
||||
This starts up a background thread to monitor the queue for
|
||||
LogRecords to process.
|
||||
|
||||
.. method:: stop()
|
||||
|
||||
Stops the listener.
|
||||
|
||||
This asks the thread to terminate, and then waits for it to do so.
|
||||
Note that if you don't call this before your application exits, there
|
||||
may be some records still left on the queue, which won't be processed.
|
||||
|
||||
.. versionadded:: 3.2
|
||||
|
||||
The :class:`QueueListener` class was not present in previous versions.
|
||||
|
||||
.. _zeromq-handlers:
|
||||
|
||||
Subclassing QueueHandler
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
You can use a :class:`QueueHandler` subclass to send messages to other kinds
|
||||
of queues, for example a ZeroMQ "publish" socket. In the example below,the
|
||||
socket is created separately and passed to the handler (as its 'queue')::
|
||||
@ -2716,6 +2794,7 @@ data needed by the handler to create the socket::
|
||||
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
|
||||
self.ctx = ctx or zmq.Context()
|
||||
socket = zmq.Socket(self.ctx, socktype)
|
||||
socket.bind(uri)
|
||||
QueueHandler.__init__(self, socket)
|
||||
|
||||
def enqueue(self, record):
|
||||
@ -2725,6 +2804,23 @@ data needed by the handler to create the socket::
|
||||
def close(self):
|
||||
self.queue.close()
|
||||
|
||||
Subclassing QueueListener
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
You can also subclass :class:`QueueListener` to get messages from other kinds
|
||||
of queues, for example a ZeroMQ "subscribe" socket. Here's an example::
|
||||
|
||||
class ZeroMQSocketListener(QueueListener):
|
||||
def __init__(self, uri, *handlers, **kwargs):
|
||||
self.ctx = kwargs.get('ctx') or zmq.Context()
|
||||
socket = zmq.Socket(self.ctx, zmq.SUB)
|
||||
socket.setsockopt(zmq.SUBSCRIBE, '') # subscribe to everything
|
||||
socket.connect(uri)
|
||||
|
||||
def dequeue(self):
|
||||
msg = self.queue.recv()
|
||||
return logging.makeLogRecord(json.loads(msg))
|
||||
|
||||
.. _formatter-objects:
|
||||
|
||||
Formatter Objects
|
||||
|
@ -1178,8 +1178,8 @@ class QueueHandler(logging.Handler):
|
||||
|
||||
def prepare(self, record):
|
||||
"""
|
||||
Prepares a record for queuing. The object returned by this
|
||||
method is enqueued.
|
||||
Prepares a record for queuing. The object returned by this method is
|
||||
enqueued.
|
||||
|
||||
The base implementation formats the record to merge the message
|
||||
and arguments, and removes unpickleable items from the record
|
||||
@ -1205,7 +1205,7 @@ class QueueHandler(logging.Handler):
|
||||
"""
|
||||
Emit a record.
|
||||
|
||||
Writes the LogRecord to the queue, preparing it first.
|
||||
Writes the LogRecord to the queue, preparing it for pickling first.
|
||||
"""
|
||||
try:
|
||||
self.enqueue(self.prepare(record))
|
||||
@ -1213,3 +1213,107 @@ class QueueHandler(logging.Handler):
|
||||
raise
|
||||
except:
|
||||
self.handleError(record)
|
||||
|
||||
class QueueListener(object):
|
||||
"""
|
||||
This class implements an internal threaded listener which watches for
|
||||
LogRecords being added to a queue, removes them and passes them to a
|
||||
list of handlers for processing.
|
||||
"""
|
||||
_sentinel = None
|
||||
|
||||
def __init__(self, queue, *handlers):
|
||||
"""
|
||||
Initialise an instance with the specified queue and
|
||||
handlers.
|
||||
"""
|
||||
self.queue = queue
|
||||
self.handlers = handlers
|
||||
self._stop = threading.Event()
|
||||
self._thread = None
|
||||
|
||||
def dequeue(self, block):
|
||||
"""
|
||||
Dequeue a record and return it, optionally blocking.
|
||||
|
||||
The base implementation uses get. You may want to override this method
|
||||
if you want to use timeouts or work with custom queue implementations.
|
||||
"""
|
||||
return self.queue.get(block)
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the listener.
|
||||
|
||||
This starts up a background thread to monitor the queue for
|
||||
LogRecords to process.
|
||||
"""
|
||||
self._thread = t = threading.Thread(target=self._monitor)
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
|
||||
def prepare(self , record):
|
||||
"""
|
||||
Prepare a record for handling.
|
||||
|
||||
This method just returns the passed-in record. You may want to
|
||||
override this method if you need to do any custom marshalling or
|
||||
manipulation of the record before passing it to the handlers.
|
||||
"""
|
||||
return record
|
||||
|
||||
def handle(self, record):
|
||||
"""
|
||||
Handle a record.
|
||||
|
||||
This just loops through the handlers offering them the record
|
||||
to handle.
|
||||
"""
|
||||
record = self.prepare(record)
|
||||
for handler in self.handlers:
|
||||
handler.handle(record)
|
||||
|
||||
def _monitor(self):
|
||||
"""
|
||||
Monitor the queue for records, and ask the handler
|
||||
to deal with them.
|
||||
|
||||
This method runs on a separate, internal thread.
|
||||
The thread will terminate if it sees a sentinel object in the queue.
|
||||
"""
|
||||
q = self.queue
|
||||
has_task_done = hasattr(q, 'task_done')
|
||||
while not self._stop.isSet():
|
||||
try:
|
||||
record = self.dequeue(True)
|
||||
if record is self._sentinel:
|
||||
break
|
||||
self.handle(record)
|
||||
if has_task_done:
|
||||
q.task_done()
|
||||
except queue.Empty:
|
||||
pass
|
||||
# There might still be records in the queue.
|
||||
while True:
|
||||
try:
|
||||
record = self.dequeue(False)
|
||||
if record is self._sentinel:
|
||||
break
|
||||
self.handle(record)
|
||||
if has_task_done:
|
||||
q.task_done()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop the listener.
|
||||
|
||||
This asks the thread to terminate, and then waits for it to do so.
|
||||
Note that if you don't call this before your application exits, there
|
||||
may be some records still left on the queue, which won't be processed.
|
||||
"""
|
||||
self._stop.set()
|
||||
self.queue.put_nowait(self._sentinel)
|
||||
self._thread.join()
|
||||
self._thread = None
|
||||
|
Loading…
Reference in New Issue
Block a user