-
Notifications
You must be signed in to change notification settings - Fork 218
MPP-3852: Use cryptography
for SNS signature validation, remove pyopenssl
and pem
#5235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
2fc8a31
4fd477a
efae204
2a0af6c
0f7c799
a16c748
52b31a1
c4bb960
f4c1477
a1c0c35
dd7bba9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,14 +8,16 @@ | |
from django.conf import settings | ||
from django.core.cache import caches | ||
from django.core.exceptions import SuspiciousOperation | ||
from django.utils.encoding import smart_bytes | ||
|
||
import pem | ||
from OpenSSL import crypto | ||
from cryptography import x509 | ||
from cryptography.exceptions import InvalidSignature | ||
from cryptography.hazmat.primitives import hashes | ||
from cryptography.hazmat.primitives.asymmetric import padding, rsa | ||
|
||
logger = logging.getLogger("events") | ||
|
||
NOTIFICATION_HASH_FORMAT = """Message | ||
NOTIFICATION_HASH_FORMAT = """\ | ||
Message | ||
{Message} | ||
MessageId | ||
{MessageId} | ||
|
@@ -29,7 +31,8 @@ | |
{Type} | ||
""" | ||
|
||
NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT = """Message | ||
NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT = """\ | ||
Message | ||
{Message} | ||
MessageId | ||
{MessageId} | ||
|
@@ -41,7 +44,8 @@ | |
{Type} | ||
""" | ||
|
||
SUBSCRIPTION_HASH_FORMAT = """Message | ||
SUBSCRIPTION_HASH_FORMAT = """\ | ||
Message | ||
{Message} | ||
MessageId | ||
{MessageId} | ||
|
@@ -63,6 +67,10 @@ | |
] | ||
|
||
|
||
class VerificationFailed(ValueError): | ||
pass | ||
|
||
|
||
def verify_from_sns(json_body): | ||
""" | ||
Check that the SNS message was signed by the cetificate. | ||
|
@@ -71,18 +79,29 @@ def verify_from_sns(json_body): | |
|
||
Only supports SignatureVersion 1. SignatureVersion 2 (SHA256) was added in | ||
September 2022, and requires opt-in. | ||
|
||
TODO MPP-3852: Stop using OpenSSL.crypto | ||
""" | ||
pemfile = _grab_keyfile(json_body["SigningCertURL"]) | ||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, pemfile) | ||
signature = base64.decodebytes(json_body["Signature"].encode("utf-8")) | ||
signing_cert_url = json_body["SigningCertURL"] | ||
pemfile = _grab_keyfile(signing_cert_url) | ||
cert = x509.load_pem_x509_certificate(pemfile) | ||
signature = base64.decodebytes(json_body["Signature"].encode()) | ||
|
||
hash_format = _get_hash_format(json_body) | ||
cert_pubkey = cert.public_key() | ||
if not isinstance(cert_pubkey, rsa.RSAPublicKey): | ||
raise VerificationFailed(f"SigningCertURL {signing_cert_url} is not an RSA key") | ||
|
||
try: | ||
cert_pubkey.verify( | ||
signature, | ||
hash_format.format(**json_body).encode(), | ||
padding.PKCS1v15(), | ||
hashes.SHA1(), # noqa: S303 # Use of insecure hash SHA1 | ||
) | ||
except InvalidSignature as e: | ||
raise VerificationFailed( | ||
f"Invalid signature with SigningCertURL {signing_cert_url}" | ||
) from e | ||
|
||
crypto.verify( | ||
cert, signature, hash_format.format(**json_body).encode("utf-8"), "sha1" | ||
) | ||
return json_body | ||
|
||
|
||
|
@@ -109,14 +128,18 @@ def _grab_keyfile(cert_url): | |
if not pemfile: | ||
response = urlopen(cert_url) # noqa: S310 (check for custom scheme) | ||
pemfile = response.read() | ||
|
||
# Extract the first certificate in the file and confirm it's a valid | ||
# PEM certificate | ||
certificates = pem.parse(smart_bytes(pemfile)) | ||
|
||
certs = x509.load_pem_x509_certificates(pemfile) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. quibble (non-blocking): this is 2x calls to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I took it one step futher and changed |
||
# A proper certificate file will contain 1 certificate | ||
if len(certificates) != 1: | ||
logger.error("Invalid Certificate File: URL %s", cert_url) | ||
raise ValueError("Invalid Certificate File") | ||
if len(certs) != 1: | ||
raise VerificationFailed( | ||
f"SigningCertURL {cert_url} has {len(certs)} certificates." | ||
) | ||
cert_pubkey = certs[0].public_key() | ||
if not isinstance(cert_pubkey, rsa.RSAPublicKey): | ||
raise VerificationFailed(f"SigningCertURL {cert_url} is not an RSA key") | ||
|
||
key_cache.set(cert_url, pemfile) | ||
return pemfile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: nice little clean-up.