Skip to content

Commit 3b886e4

Browse files
authored
Use statsd pipelines (#3504)
Avoid sending a udp packet per bit of information, instead use a pipeline per request. Add a MaybeStatsd utility class so we can skip sending the stats when not in a flask app context.
1 parent 1021363 commit 3b886e4

File tree

13 files changed

+98
-56
lines changed

13 files changed

+98
-56
lines changed

src/auslib/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from sqlalchemy.exc import SQLAlchemyError
1717
from sqlalchemy.sql.expression import null
1818
from sqlalchemy.sql.functions import max as sql_max
19-
from statsd.defaults.env import statsd
2019

2120
from auslib.blobs.base import createBlob, merge_dicts
2221
from auslib.errors import PermissionDeniedError, ReadOnlyError, SignoffRequiredError
@@ -32,6 +31,7 @@
3231
matchSimpleExpression,
3332
matchVersion,
3433
)
34+
from auslib.util.statsd import statsd
3535
from auslib.util.timestamp import getMillisecondTimestamp
3636
from auslib.util.versions import get_version_class
3737

src/auslib/util/auth.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import requests
33
from auth0.authentication import Users as auth0_Users
44
from repoze.lru import lru_cache
5-
from statsd.defaults.env import statsd
5+
6+
from auslib.util.statsd import statsd
67

78

89
class AuthError(Exception):
@@ -50,9 +51,9 @@ def get_jwks(auth_domain):
5051

5152

5253
@lru_cache(2048)
53-
@statsd.timer("auth0_userinfo")
5454
def get_additional_userinfo(auth_domain, access_token):
55-
return auth0_Users(auth_domain).userinfo(access_token)
55+
with statsd.timer("auth0_userinfo"):
56+
return auth0_Users(auth_domain).userinfo(access_token)
5657

5758

5859
def verified_userinfo(request, auth_domain, auth_audience):

src/auslib/util/autograph.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33

44
import requests
55
from requests_hawk import HawkAuth
6-
from statsd.defaults.env import statsd
76

87
from auslib.util.retry import retry_sync
8+
from auslib.util.statsd import statsd
99

1010
SIGNATURE_PREFIX = "Content-Signature:\x00"
1111

src/auslib/util/cache.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from copy import deepcopy
22

33
from repoze.lru import ExpiringLRUCache
4-
from statsd.defaults.env import statsd
4+
5+
from auslib.util.statsd import statsd
56

67
uncached_sentinel = object()
78

src/auslib/util/statsd.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from contextlib import contextmanager
2+
3+
from flask import g
4+
5+
6+
class MaybeStatsd:
7+
@contextmanager
8+
def timer(self, *args, **kwargs):
9+
if g:
10+
with g.statsd.timer(*args, **kwargs):
11+
yield
12+
else:
13+
yield
14+
15+
def incr(self, *args, **kwargs):
16+
if g:
17+
return g.statsd.incr(*args, **kwargs)
18+
19+
20+
statsd = MaybeStatsd()

src/auslib/web/admin/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ def create_app():
5555
create_dockerflow_endpoints(flask_app)
5656

5757
@flask_app.before_request
58-
def setup_timer():
58+
def setup_statsd():
59+
g.statsd = statsd.pipeline()
5960
g.request_timer = None
6061
if should_time_request():
6162
# do some massaging to get the metric name right
@@ -70,7 +71,7 @@ def setup_timer():
7071
.removeprefix("auslib_web_common_")
7172
)
7273
metric = f"endpoint_{metric}"
73-
g.request_timer = statsd.timer(metric)
74+
g.request_timer = g.statsd.timer(metric)
7475
g.request_timer.start()
7576

7677
@flask_app.before_request
@@ -203,6 +204,7 @@ def add_security_headers(response):
203204
def send_stats(response):
204205
if hasattr(g, "request_timer") and g.request_timer:
205206
g.request_timer.stop()
207+
g.statsd.send()
206208

207209
return response
208210

src/auslib/web/public/base.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from os import path
55

66
import connexion
7-
from flask import Response, make_response, request, send_from_directory
7+
from flask import Response, g, make_response, request, send_from_directory
88
from sentry_sdk import capture_exception
99
from specsynthase.specbuilder import SpecBuilder
1010
from statsd.defaults.env import statsd
@@ -108,6 +108,15 @@ def robots():
108108
def contributejson():
109109
return send_from_directory(flask_app.static_folder, "contribute.json")
110110

111+
@flask_app.before_request
112+
def create_statsd_pipeline():
113+
g.statsd = statsd.pipeline()
114+
115+
@flask_app.after_request
116+
def send_statsd_pipeline(response):
117+
g.statsd.send()
118+
return response
119+
111120
@flask_app.before_request
112121
def set_cache_control():
113122
# By default, we want a cache that can be shared across requests from
@@ -141,7 +150,7 @@ def log_request(response):
141150
prefix = request.path.split("/")[1]
142151
if prefix not in ("update", "json", "api"):
143152
prefix = "unknown"
144-
statsd.incr(f"response.{prefix}.{response.status_code}")
153+
g.statsd.incr(f"response.{prefix}.{response.status_code}")
145154

146155
return response
147156

src/auslib/web/public/client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
import sys
44

55
from flask import current_app as app
6-
from flask import make_response, request
7-
from statsd.defaults.env import statsd
6+
from flask import g, make_response, request
87

98
from auslib.AUS import FORCE_FALLBACK_MAPPING, FORCE_MAIN_MAPPING
109
from auslib.blobs.base import XMLBlob, createBlob
@@ -138,7 +137,7 @@ def extract_query_version(request_url):
138137

139138
@with_transaction
140139
def get_update_blob(transaction, **url):
141-
with statsd.timer("client.parse_query"):
140+
with g.statsd.timer("client.parse_query"):
142141
url["queryVersion"] = extract_query_version(request.url)
143142
# Underlying code depends on osVersion being set. Since this route only
144143
# exists to support ancient queries, and all newer versions have osVersion
@@ -152,7 +151,7 @@ def get_update_blob(transaction, **url):
152151
query = getQueryFromURL(url)
153152
LOG.debug("Got query: %s", query)
154153

155-
with statsd.timer("client.evaluate_rules"):
154+
with g.statsd.timer("client.evaluate_rules"):
156155
release, update_type, eval_metadata = AUS.evaluateRules(query, transaction=transaction)
157156

158157
response_blobs = []
@@ -162,12 +161,12 @@ def get_update_blob(transaction, **url):
162161
response_products = release.getResponseProducts()
163162
response_blob_names = release.getResponseBlobs()
164163
if response_products:
165-
with statsd.timer("client.process_response_products"):
164+
with g.statsd.timer("client.process_response_products"):
166165
# if we have a SuperBlob of gmp, we process the response products and
167166
# concatenate their inner XMLs
168167
response_blobs.extend(evaluate_response_products(response_products, query, transaction))
169168
elif response_blob_names:
170-
with statsd.timer("client.process_response_blobs"):
169+
with g.statsd.timer("client.process_response_blobs"):
171170
# if we have a SuperBlob of systemaddons, we process the response products and
172171
# concatenate their inner XMLs
173172
response_blobs.extend(evaluate_response_blobs(response_blob_names, update_type, query, transaction))
@@ -179,7 +178,7 @@ def get_update_blob(transaction, **url):
179178
squash_response = True
180179
LOG.debug("Busted nightly detected, will squash xml response")
181180

182-
with statsd.timer("client.make_response"):
181+
with g.statsd.timer("client.make_response"):
183182
return construct_response(release, query, update_type, response_blobs, squash_response, eval_metadata)
184183

185184

src/auslib/web/public/json.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from flask import Response
44
from flask import current_app as app
5-
from statsd.defaults.env import statsd
5+
from flask import g
66

77
from auslib.AUS import FORCE_FALLBACK_MAPPING, FORCE_MAIN_MAPPING
88
from auslib.web.public.helpers import AUS, get_aus_metadata_headers, get_content_signature_headers, with_transaction
@@ -12,13 +12,13 @@
1212
def get_update(transaction, **parameters):
1313
force = parameters.get("force")
1414
parameters["force"] = {FORCE_MAIN_MAPPING.query_value: FORCE_MAIN_MAPPING, FORCE_FALLBACK_MAPPING.query_value: FORCE_FALLBACK_MAPPING}.get(force)
15-
with statsd.timer("json.evaluate_rules"):
15+
with g.statsd.timer("json.evaluate_rules"):
1616
release, _, eval_metadata = AUS.evaluateRules(parameters, transaction=transaction)
1717

1818
if not release:
1919
return Response(status=404)
2020

21-
with statsd.timer("json.make_response"):
21+
with g.statsd.timer("json.make_response"):
2222
headers = get_aus_metadata_headers(eval_metadata)
2323
response = json.dumps(release.getResponse(parameters, app.config["ALLOWLISTED_DOMAINS"]))
2424
headers.update(get_content_signature_headers(response, ""))

tests/admin/views/test_releases_v2.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2195,8 +2195,9 @@ def test_statsd(api, method, endpoint, metric):
21952195
Timings are handled centrally, so it's not worth the trouble to test
21962196
every single one. Requests don't need to succeed for these to pass;
21972197
they just need to be routable."""
2198-
with mock.patch("auslib.web.admin.base.statsd.timer") as mocked_timer:
2198+
with mock.patch("auslib.web.admin.base.statsd.pipeline") as mocked_pipeline:
21992199
api.open(endpoint, method=method)
2200+
mocked_timer = mocked_pipeline().timer
22002201
assert mocked_timer.call_count == 1
22012202
mocked_timer.assert_has_calls([mock.call(metric)])
22022203

@@ -2209,9 +2210,8 @@ def test_statsd_gcs_creation(api, firefox_62_0_build1):
22092210
# 20 locales to upload for + top level
22102211
# a creation uploads twice
22112212
# 21 * 2 = 42
2212-
# plus 1 for the overall request
2213-
assert mocked_timer.call_count == 43
2214-
mocked_timer.assert_has_calls([mock.call("async_gcs_upload"), mock.call("endpoint_releases_v2_ensure")], any_order=True)
2213+
assert mocked_timer.call_count == 42
2214+
assert mocked_timer.call_args_list == [mock.call("async_gcs_upload")] * 42
22152215

22162216

22172217
@pytest.mark.usefixtures("releases_db", "mock_verified_userinfo")
@@ -2224,9 +2224,9 @@ def test_statsd_gcs_update(api):
22242224
assert old_data_versions["platforms"]["Darwin_x86_64-gcc3-u-i386-x86_64"]["locales"]["de"]
22252225
ret = api.post("/v2/releases/Firefox-60.0b3-build1", json={"blob": blob, "old_data_versions": old_data_versions})
22262226
assert ret.status_code == 200, ret.data
2227-
# one call for top level modification, one for the changed locale, one for overall request
2228-
assert mocked_timer.call_count == 3
2229-
mocked_timer.assert_has_calls([mock.call("async_gcs_upload"), mock.call("endpoint_releases_v2_update")], any_order=True)
2227+
# one call for top level modification, one for the changed locale
2228+
assert mocked_timer.call_count == 2
2229+
assert mocked_timer.call_args_list == [mock.call("async_gcs_upload")] * 2
22302230

22312231

22322232
@pytest.mark.usefixtures("releases_db", "mock_verified_userinfo")
@@ -2235,6 +2235,6 @@ def test_statsd_gcs_delete(api):
22352235
ret = api.delete("/v2/releases/Firefox-65.0-build1")
22362236
assert ret.status_code == 200, ret.data
22372237

2238-
# one call for top level, one call for each locale, one for overall request
2239-
assert mocked_timer.call_count == 22
2240-
mocked_timer.assert_has_calls([mock.call("async_gcs_upload"), mock.call("endpoint_releases_v2_delete")], any_order=True)
2238+
# one call for top level, one call for each locale
2239+
assert mocked_timer.call_count == 21
2240+
assert mocked_timer.call_args_list == [mock.call("async_gcs_upload")] * 21

0 commit comments

Comments
 (0)