Skip to content

Commit ae57a13

Browse files
author
Jon Wayne Parrott
authored
Overhaul logging background thread transport (#3407)
1 parent 17d8251 commit ae57a13

File tree

4 files changed

+401
-181
lines changed

4 files changed

+401
-181
lines changed

packages/google-cloud-logging/google/cloud/logging/handlers/handlers.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020

2121
DEFAULT_LOGGER_NAME = 'python'
2222

23-
EXCLUDED_LOGGER_DEFAULTS = ('google.cloud', 'oauth2client')
23+
EXCLUDED_LOGGER_DEFAULTS = (
24+
'google.cloud',
25+
'google.auth',
26+
'google_auth_httplib2',
27+
)
2428

2529

2630
class CloudLoggingHandler(logging.StreamHandler):

packages/google-cloud-logging/google/cloud/logging/handlers/transports/background_thread.py

Lines changed: 183 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -17,139 +17,231 @@
1717
Uses a background worker to log to Stackdriver Logging asynchronously.
1818
"""
1919

20+
from __future__ import print_function
21+
2022
import atexit
2123
import copy
24+
import logging
2225
import threading
2326

27+
from six.moves import range
28+
from six.moves import queue
29+
2430
from google.cloud.logging.handlers.transports.base import Transport
2531

26-
_WORKER_THREAD_NAME = 'google.cloud.logging.handlers.transport.Worker'
32+
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
33+
_DEFAULT_MAX_BATCH_SIZE = 10
34+
_WORKER_THREAD_NAME = 'google.cloud.logging.Worker'
35+
_WORKER_TERMINATOR = object()
36+
_LOGGER = logging.getLogger(__name__)
2737

2838

29-
class _Worker(object):
30-
"""A threaded worker that writes batches of log entries
39+
def _get_many(queue_, max_items=None):
40+
"""Get multiple items from a Queue.
3141
32-
Writes entries to the logger API.
42+
Gets at least one (blocking) and at most ``max_items`` items
43+
(non-blocking) from a given Queue. Does not mark the items as done.
3344
34-
This class reuses a single :class:`Batch` method to write successive
35-
entries.
45+
:type queue_: :class:`~queue.Queue`
46+
:param queue_: The Queue to get items from.
3647
37-
Currently, the only public methods are constructing it (which also starts
38-
it) and enqueuing :class:`Logger` (record, message) pairs.
48+
:type max_items: int
49+
:param max_items: The maximum number of items to get. If ``None``, then all
50+
available items in the queue are returned.
51+
52+
:rtype: Sequence
53+
:returns: A sequence of items retrieved from the queue.
3954
"""
55+
# Always return at least one item.
56+
items = [queue_.get()]
57+
while max_items is None or len(items) < max_items:
58+
try:
59+
items.append(queue_.get_nowait())
60+
except queue.Empty:
61+
break
62+
return items
4063

41-
def __init__(self, logger):
42-
self.started = False
43-
self.stopping = False
44-
self.stopped = False
4564

46-
# _entries_condition is used to signal from the main thread whether
47-
# there are any waiting queued logger entries to be written
48-
self._entries_condition = threading.Condition()
65+
class _Worker(object):
66+
"""A background thread that writes batches of log entries.
4967
50-
# _stop_condition is used to signal from the worker thread to the
51-
# main thread that it's finished its last entries
52-
self._stop_condition = threading.Condition()
68+
:type cloud_logger: :class:`~google.cloud.logging.logger.Logger`
69+
:param cloud_logger: The logger to send entries to.
5370
54-
# This object continually reuses the same :class:`Batch` object to
55-
# write multiple entries at the same time.
56-
self.logger = logger
57-
self.batch = self.logger.batch()
71+
:type grace_period: float
72+
:param grace_period: The amount of time to wait for pending logs to
73+
be submitted when the process is shutting down.
5874
75+
:type max_batch_size: int
76+
:param max_batch_size: The maximum number of items to send at a time
77+
in the background thread.
78+
"""
79+
80+
def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD,
81+
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
82+
self._cloud_logger = cloud_logger
83+
self._grace_period = grace_period
84+
self._max_batch_size = max_batch_size
85+
self._queue = queue.Queue(0)
86+
self._operational_lock = threading.Lock()
5987
self._thread = None
6088

61-
# Number in seconds of how long to wait for worker to send remaining
62-
self._stop_timeout = 5
89+
@property
90+
def is_alive(self):
91+
"""Returns True is the background thread is running."""
92+
return self._thread is not None and self._thread.is_alive()
6393

64-
self._start()
94+
def _safely_commit_batch(self, batch):
95+
total_logs = len(batch.entries)
6596

66-
def _run(self):
97+
try:
98+
if total_logs > 0:
99+
batch.commit()
100+
_LOGGER.debug('Submitted %d logs', total_logs)
101+
except Exception:
102+
_LOGGER.error(
103+
'Failed to submit %d logs.', total_logs, exc_info=True)
104+
105+
def _thread_main(self):
67106
"""The entry point for the worker thread.
68107
69-
Loops until ``stopping`` is set to :data:`True`, and commits batch
70-
entries written during :meth:`enqueue`.
108+
Pulls pending log entries off the queue and writes them in batches to
109+
the Cloud Logger.
71110
"""
72-
try:
73-
self._entries_condition.acquire()
74-
self.started = True
75-
while not self.stopping:
76-
if len(self.batch.entries) == 0:
77-
# branch coverage of this code extremely flaky
78-
self._entries_condition.wait() # pragma: NO COVER
79-
80-
if len(self.batch.entries) > 0:
81-
self.batch.commit()
82-
finally:
83-
self._entries_condition.release()
84-
85-
# main thread may be waiting for worker thread to finish writing its
86-
# final entries. here we signal that it's done.
87-
self._stop_condition.acquire()
88-
self._stop_condition.notify()
89-
self._stop_condition.release()
90-
91-
def _start(self):
92-
"""Called by this class's constructor
93-
94-
This method is responsible for starting the thread and registering
95-
the exit handlers.
111+
_LOGGER.debug('Background thread started.')
112+
113+
quit_ = False
114+
while True:
115+
batch = self._cloud_logger.batch()
116+
items = _get_many(self._queue, max_items=self._max_batch_size)
117+
118+
for item in items:
119+
if item is _WORKER_TERMINATOR:
120+
quit_ = True
121+
# Continue processing items, don't break, try to process
122+
# all items we got back before quitting.
123+
else:
124+
batch.log_struct(**item)
125+
126+
self._safely_commit_batch(batch)
127+
128+
for _ in range(len(items)):
129+
self._queue.task_done()
130+
131+
if quit_:
132+
break
133+
134+
_LOGGER.debug('Background thread exited gracefully.')
135+
136+
def start(self):
137+
"""Starts the background thread.
138+
139+
Additionally, this registers a handler for process exit to attempt
140+
to send any pending log entries before shutdown.
96141
"""
97-
try:
98-
self._entries_condition.acquire()
142+
with self._operational_lock:
143+
if self.is_alive:
144+
return
145+
99146
self._thread = threading.Thread(
100-
target=self._run, name=_WORKER_THREAD_NAME)
101-
self._thread.setDaemon(True)
147+
target=self._thread_main,
148+
name=_WORKER_THREAD_NAME)
149+
self._thread.daemon = True
102150
self._thread.start()
103-
finally:
104-
self._entries_condition.release()
105-
atexit.register(self._stop)
151+
atexit.register(self._main_thread_terminated)
152+
153+
def stop(self, grace_period=None):
154+
"""Signals the background thread to stop.
106155
107-
def _stop(self):
108-
"""Signals the worker thread to shut down
156+
This does not terminate the background thread. It simply queues the
157+
stop signal. If the main process exits before the background thread
158+
processes the stop signal, it will be terminated without finishing
159+
work. The ``grace_period`` parameter will give the background
160+
thread some time to finish processing before this function returns.
109161
110-
Also waits for ``stop_timeout`` seconds for the worker to finish.
162+
:type grace_period: float
163+
:param grace_period: If specified, this method will block up to this
164+
many seconds to allow the background thread to finish work before
165+
returning.
111166
112-
This method is called by the ``atexit`` handler registered by
113-
:meth:`start`.
167+
:rtype: bool
168+
:returns: True if the thread terminated. False if the thread is still
169+
running.
114170
"""
115-
if not self.started or self.stopping:
116-
return
171+
if not self.is_alive:
172+
return True
173+
174+
with self._operational_lock:
175+
self._queue.put_nowait(_WORKER_TERMINATOR)
176+
177+
if grace_period is not None:
178+
print('Waiting up to %d seconds.' % (grace_period,))
117179

118-
# lock the stop condition first so that the worker
119-
# thread can't notify it's finished before we wait
120-
self._stop_condition.acquire()
180+
self._thread.join(timeout=grace_period)
121181

122-
# now notify the worker thread to shutdown
123-
self._entries_condition.acquire()
124-
self.stopping = True
125-
self._entries_condition.notify()
126-
self._entries_condition.release()
182+
# Check this before disowning the thread, because after we disown
183+
# the thread is_alive will be False regardless of if the thread
184+
# exited or not.
185+
success = not self.is_alive
127186

128-
# now wait for it to signal it's finished
129-
self._stop_condition.wait(self._stop_timeout)
130-
self._stop_condition.release()
131-
self.stopped = True
187+
self._thread = None
188+
189+
return success
190+
191+
def _main_thread_terminated(self):
192+
"""Callback that attempts to send pending logs before termination."""
193+
if not self.is_alive:
194+
return
195+
196+
if not self._queue.empty():
197+
print(
198+
'Program shutting down, attempting to send %d queued log '
199+
'entries to Stackdriver Logging...' % (self._queue.qsize(),))
200+
201+
if self.stop(self._grace_period):
202+
print('Sent all pending logs.')
203+
else:
204+
print('Failed to send %d pending logs.' % (self._queue.qsize(),))
132205

133206
def enqueue(self, record, message):
134-
"""Queues up a log entry to be written by the background thread."""
135-
try:
136-
self._entries_condition.acquire()
137-
if self.stopping:
138-
return
139-
info = {'message': message, 'python_logger': record.name}
140-
self.batch.log_struct(info, severity=record.levelname)
141-
self._entries_condition.notify()
142-
finally:
143-
self._entries_condition.release()
207+
"""Queues a log entry to be written by the background thread.
208+
209+
:type record: :class:`logging.LogRecord`
210+
:param record: Python log record that the handler was called with.
211+
212+
:type message: str
213+
:param message: The message from the ``LogRecord`` after being
214+
formatted by the associated log formatters.
215+
"""
216+
self._queue.put_nowait({
217+
'info': {
218+
'message': message,
219+
'python_logger': record.name,
220+
},
221+
'severity': record.levelname,
222+
})
144223

145224

146225
class BackgroundThreadTransport(Transport):
147-
"""Aysnchronous transport that uses a background thread.
226+
"""Asynchronous transport that uses a background thread.
227+
228+
:type client: :class:`~google.cloud.logging.client.Client`
229+
:param client: The Logging client.
230+
231+
:type name: str
232+
:param name: the name of the logger.
233+
234+
:type grace_period: float
235+
:param grace_period: The amount of time to wait for pending logs to
236+
be submitted when the process is shutting down.
148237
149-
Writes logging entries as a batch process.
238+
:type batch_size: int
239+
:param batch_size: The maximum number of items to send at a time in the
240+
background thread.
150241
"""
151242

152-
def __init__(self, client, name):
243+
def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
244+
batch_size=_DEFAULT_MAX_BATCH_SIZE):
153245
http = copy.deepcopy(client._http)
154246
self.client = client.__class__(
155247
client.project, client._credentials, http)

packages/google-cloud-logging/nox.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def unit_tests(session, python_version):
3939
'py.test', '--quiet',
4040
'--cov=google.cloud.logging', '--cov=tests.unit', '--cov-append',
4141
'--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97',
42-
'tests/unit',
42+
'tests/unit', *session.posargs
4343
)
4444

4545

@@ -63,7 +63,7 @@ def system_tests(session, python_version):
6363
session.install('.')
6464

6565
# Run py.test against the system tests.
66-
session.run('py.test', '-vvv', 'tests/system.py')
66+
session.run('py.test', '-vvv', 'tests/system.py', *session.posargs)
6767

6868

6969
@nox.session

0 commit comments

Comments
 (0)