|
27 | 27 | from nehta_signature import NehtaXMLSignature
|
28 | 28 | from datetime import datetime, timezone
|
29 | 29 | import logging
|
| 30 | +from sys import platform |
| 31 | +if platform == "win32": |
| 32 | + from ssl import enum_certificates |
| 33 | +import OpenSSL.crypto |
30 | 34 |
|
31 | 35 | # Required to make urllib use PyOpenSSL, which supports PKCS#12 handling
|
32 | 36 | urllib3.contrib.pyopenssl.inject_into_urllib3()
|
@@ -67,6 +71,34 @@ def hpio_from_certificate(certificate):
|
67 | 71 | return hpio, orgname
|
68 | 72 |
|
69 | 73 |
|
| 74 | +def setup_certificate_verification(context, logger): |
| 75 | + """Add default verification paths, and on Windows, the system CA stores to a pyOpenSSL context""" |
| 76 | + context.set_default_verify_paths() |
| 77 | + if platform == "win32": |
| 78 | + # The standard library ssl module does this on Windows, but PyOpenSSL does not (yet) |
| 79 | + store = context.get_cert_store() |
| 80 | + # "ROOT" is the "Trusted Root Certification Authorities", "CA" is the "Intermediate Certification Authorities" |
| 81 | + # The Python SSL module loads both, so do the same |
| 82 | + for store_name in ("ROOT", "CA"): |
| 83 | + for cert_bytes, encoding_type, trust in enum_certificates(store_name): |
| 84 | + if encoding_type == "x509_asn": |
| 85 | + # Must be trusted for all purposes (True) or for server authentication |
| 86 | + if trust == True or "1.3.6.1.5.5.7.3.1" in trust: |
| 87 | + try: |
| 88 | + # Load as an OpenSSL X509 object rather than a cryptography object which is then converted to OpenSSL.X509 |
| 89 | + store.add_cert( |
| 90 | + OpenSSL.crypto.load_certificate( |
| 91 | + OpenSSL.crypto.FILETYPE_ASN1, cert_bytes |
| 92 | + ) |
| 93 | + ) |
| 94 | + except OpenSSL.crypto.Error as e: |
| 95 | + logger.warn("Unable to load system certificate: {e!s}") |
| 96 | + else: |
| 97 | + logger.warn( |
| 98 | + "Unrecognised certificate type ({type}) when importing from system certificate store." |
| 99 | + ) |
| 100 | + |
| 101 | + |
70 | 102 | class WsaAnonymisePlugin(zeep.plugins.Plugin):
|
71 | 103 | """Plugin to force the WS-Addressing To: header to the anonymous value
|
72 | 104 |
|
@@ -157,7 +189,7 @@ def __init__(self, cert_file, cert_password, config):
|
157 | 189 | # The standard library's ssl module does not support loading from memory; the only real
|
158 | 190 | # justification is that this is less secure - see https://github.com/python/cpython/issues/129216
|
159 | 191 | ctx = create_urllib3_context()
|
160 |
| - ctx.set_default_verify_paths() |
| 192 | + setup_certificate_verification(ctx._ctx, self.log) |
161 | 193 | ctx._ctx.use_certificate(cert_cg)
|
162 | 194 | ctx._ctx.use_privatekey(pkcs_cg.key)
|
163 | 195 |
|
|
0 commit comments