Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# CHANGELOG

## 97.1.0

* Added `ExpansionCooldownEventletWorker` custom gunicorn worker class
* Allow disabling `ContextRecyclingEventletWorker`'s behaviour via `recycle_eventlet_thread_contexts` gunicorn config variable
* Added `NotifyEventletWorker` custom gunicorn worker class for for enabling an app to make use of all our custom gunicorn worker class features together

## 97.0.2

* Fix external link redirect
Expand Down
105 changes: 96 additions & 9 deletions notifications_utils/gunicorn/eventlet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import contextvars
import os
import time
from collections import deque

import eventlet
import greenlet
from gunicorn.workers import geventlet

Expand All @@ -15,20 +18,104 @@ class ContextRecyclingEventletWorker(geventlet.EventletWorker):
"""

def __init__(self, *args, **kwargs):
self.context_pool = deque() # a stack of unused thread contexts
super().__init__(*args, **kwargs)
self.context_pool = deque() # a stack of unused thread contexts
self.recycle_eventlet_thread_contexts = (os.getenv("RECYCLE_EVENTLET_THREAD_CONTEXTS") or "0") != "0"

def handle(self, *args, **kwargs):
g = greenlet.getcurrent()
if self.context_pool:
# reuse an existing thread context from the pool
g.gr_context = self.context_pool.pop()
if self.recycle_eventlet_thread_contexts:
g = greenlet.getcurrent()
if self.context_pool:
# reuse an existing thread context from the pool
g.gr_context = self.context_pool.pop()

ret = super().handle(*args, **kwargs)

# stash potentially-populated thread context in context_pool
self.context_pool.append(g.gr_context)
# replace reference to now-stashed context with an empty one
g.gr_context = contextvars.Context()
if self.recycle_eventlet_thread_contexts:
# stash potentially-populated thread context in context_pool
self.context_pool.append(g.gr_context)
# replace reference to now-stashed context with an empty one
g.gr_context = contextvars.Context()

return ret


class ExpansionCooldownEventletWorker(geventlet.EventletWorker):
"""
Starting with an initial GreenPool size of `initial_worker_connections`, will gradually
expand the GreenPool size as demanded, though never expanding it more than once within
`worker_connections_expansion_cooldown_seconds` and to an absolute maximum of
`worker_connections`.

If `worker_connections_expansion_min_wait_seconds` is set to a nonzero value, will ensure
*any* pool expansion will only happen after waiting this amount of time for an existing
thread slot to be vacated - the intention being to bias new connections towards being
accepted by processes that already have vacant thread slots.

Also won't accept a connection until our GreenPool has capacity to handle it.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_connections_expansion_cooldown_seconds = float(os.getenv("WORKER_CONNECTIONS_EXPANSION_COOLDOWN_SECONDS") or "0")
self.worker_connections_expansion_min_wait_seconds = float(os.getenv("WORKER_CONNECTIONS_EXPANSION_MIN_WAIT_SECONDS") or "0")
self.initial_worker_connections = int(os.getenv("INITIAL_WORKER_CONNECTIONS") or "1")

# based on gunicorn@1299ea9e967a61ae2edebe191082fd169b864c64's _eventlet_serve
# routine with sections added before and after sock.accept() call
def eventlet_serve(self, sock, handle, concurrency):
pool = eventlet.greenpool.GreenPool(self.initial_worker_connections)
server_gt = eventlet.greenthread.getcurrent()

while True:
try:
time_till_cooldown_expiry = max(
self.worker_connections_expansion_min_wait_seconds,
self.worker_connections_expansion_cooldown_seconds - (time.monotonic() - self.last_expanded),
)
if pool.size >= concurrency or time_till_cooldown_expiry > 0:
# if we've already reached maximum number of connections, wait indefinitely for
# a GreenThread slot to become available, else wait time_till_cooldown_expiry
timeout = None if pool.size >= concurrency else time_till_cooldown_expiry

if pool.sem.acquire(timeout=timeout):
# an existing GreenThread slot has become available - "release" this
# semaphore so it can be re-acquired in pool.spawn
pool.sem.release()
# else it's at least been worker_connections_expansion_cooldown_seconds since
# we last expanded the pool, warranting us to proceed and accept a connection

conn, addr = sock.accept()

# (re)check if we (still) need to expand the pool to handle this connection
if pool.sem.counter == 0:
# we do
pool.resize(pool.size + 1)
self.last_expanded = time.monotonic()
log_context = {
"pool_size": pool.size,
"process_": os.getpid(),
}
self.log.info("Expanded GreenPool size to %(pool_size)s", log_context, extra=log_context)

gt = pool.spawn(handle, conn, addr)
gt.link(geventlet._eventlet_stop, server_gt, conn)
conn, addr, gt = None, None, None
except eventlet.StopServe:
sock.close()
pool.waitall()
return

def run(self, *args, **kwargs):
self.last_expanded = time.monotonic()
super().run(*args, **kwargs)

def patch(self, *args, **kwargs):
# _eventlet_serve being a module-level function, we don't really have any choice but to
# monkey-patch it (or have to totally replace the run() method)
geventlet._eventlet_serve = self.eventlet_serve
super().patch(*args, **kwargs)


class NotifyEventletWorker(ExpansionCooldownEventletWorker, ContextRecyclingEventletWorker):
pass
2 changes: 1 addition & 1 deletion notifications_utils/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# - `make version-minor` for new features
# - `make version-patch` for bug fixes

__version__ = "97.0.2" # 3190104c3cf42f56fa8d4554eb22790a
__version__ = "97.1.0" # dce0eb33be4608c3b