diff options
Diffstat (limited to 'roles/openshift_certificate_expiry/library')
-rw-r--r-- | roles/openshift_certificate_expiry/library/openshift_cert_expiry.py | 197 |
1 files changed, 181 insertions, 16 deletions
diff --git a/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py b/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py index 85671b164..33930c0c1 100644 --- a/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py +++ b/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py @@ -5,13 +5,32 @@ """For details on this module see DOCUMENTATION (below)""" import datetime +import io import os import subprocess +import sys +import tempfile +# File pointers from io.open require unicode inputs when using their +# `write` method +import six from six.moves import configparser import yaml -import OpenSSL.crypto +try: + # You can comment this import out and include a 'pass' in this + # block if you're manually testing this module on a NON-ATOMIC + # HOST (or any host that just doesn't have PyOpenSSL + # available). That will force the `load_and_handle_cert` function + # to use the Fake OpenSSL classes. + import OpenSSL.crypto +except ImportError: + # Some platforms (such as RHEL Atomic) may not have the Python + # OpenSSL library installed. In this case we will use a manual + # work-around to parse each certificate. + # + # Check for 'OpenSSL.crypto' in `sys.modules` later. + pass DOCUMENTATION = ''' --- @@ -66,6 +85,128 @@ EXAMPLES = ''' ''' +class FakeOpenSSLCertificate(object): + """This provides a rough mock of what you get from +`OpenSSL.crypto.load_certificate()`. This is a work-around for +platforms missing the Python OpenSSL library. + """ + def __init__(self, cert_string): + """`cert_string` is a certificate in the form you get from running a +.crt through 'openssl x509 -in CERT.cert -text'""" + self.cert_string = cert_string + self.serial = None + self.subject = None + self.extensions = [] + self.not_after = None + self._parse_cert() + + def _parse_cert(self): + """Manually parse the certificate line by line""" + self.extensions = [] + + PARSING_ALT_NAMES = False + for line in self.cert_string.split('\n'): + l = line.strip() + if PARSING_ALT_NAMES: + # We're parsing a 'Subject Alternative Name' line + self.extensions.append( + FakeOpenSSLCertificateSANExtension(l)) + + PARSING_ALT_NAMES = False + continue + + # parse out the bits that we can + if l.startswith('Serial Number:'): + # Serial Number: 11 (0xb) + # => 11 + self.serial = int(l.split()[-2]) + + elif l.startswith('Not After :'): + # Not After : Feb 7 18:19:35 2019 GMT + # => strptime(str, '%b %d %H:%M:%S %Y %Z') + # => strftime('%Y%m%d%H%M%SZ') + # => 20190207181935Z + not_after_raw = l.partition(' : ')[-1] + # Last item: ('Not After', ' : ', 'Feb 7 18:19:35 2019 GMT') + not_after_parsed = datetime.datetime.strptime(not_after_raw, '%b %d %H:%M:%S %Y %Z') + self.not_after = not_after_parsed.strftime('%Y%m%d%H%M%SZ') + + elif l.startswith('X509v3 Subject Alternative Name:'): + PARSING_ALT_NAMES = True + continue + + elif l.startswith('Subject:'): + # O=system:nodes, CN=system:node:m01.example.com + self.subject = FakeOpenSSLCertificateSubjects(l.partition(': ')[-1]) + + def get_serial_number(self): + """Return the serial number of the cert""" + return self.serial + + def get_subject(self): + """Subjects must implement get_components() and return dicts or +tuples. An 'openssl x509 -in CERT.cert -text' with 'Subject': + + Subject: Subject: O=system:nodes, CN=system:node:m01.example.com + +might return: [('O=system', 'nodes'), ('CN=system', 'node:m01.example.com')] + """ + return self.subject + + def get_extension(self, i): + """Extensions must implement get_short_name() and return the string +'subjectAltName'""" + return self.extensions[i] + + def get_notAfter(self): + """Returns a date stamp as a string in the form +'20180922170439Z'. strptime the result with format param: +'%Y%m%d%H%M%SZ'.""" + return self.not_after + + +class FakeOpenSSLCertificateSANExtension(object): # pylint: disable=too-few-public-methods + """Mocks what happens when `get_extension` is called on a certificate +object""" + + def __init__(self, san_string): + """With `san_string` as you get from: + + $ openssl x509 -in certificate.crt -text + """ + self.san_string = san_string + self.short_name = 'subjectAltName' + + def get_short_name(self): + """Return the 'type' of this extension. It's always the same though +because we only care about subjectAltName's""" + return self.short_name + + def __str__(self): + """Return this extension and the value as a simple string""" + return self.san_string + + +# pylint: disable=too-few-public-methods +class FakeOpenSSLCertificateSubjects(object): + """Mocks what happens when `get_subject` is called on a certificate +object""" + + def __init__(self, subject_string): + """With `subject_string` as you get from: + + $ openssl x509 -in certificate.crt -text + """ + self.subjects = [] + for s in subject_string.split(', '): + name, _, value = s.partition('=') + self.subjects.append((name, value)) + + def get_components(self): + """Returns a list of tuples""" + return self.subjects + + # We only need this for one thing, we don't care if it doesn't have # that many public methods # @@ -100,7 +241,8 @@ will be returned return [p for p in path_list if os.path.exists(os.path.realpath(p))] -def load_and_handle_cert(cert_string, now, base64decode=False): +# pylint: disable=too-many-locals,too-many-branches +def load_and_handle_cert(cert_string, now, base64decode=False, ans_module=None): """Load a certificate, split off the good parts, and return some useful data @@ -109,6 +251,7 @@ Params: - `cert_string` (string) - a certificate loaded into a string object - `now` (datetime) - a datetime object of the time to calculate the certificate 'time_remaining' against - `base64decode` (bool) - run .decode('base64') on the input? +- `ans_module` (AnsibleModule) - The AnsibleModule object for this module (so we can raise errors) Returns: A 3-tuple of the form: (certificate_common_name, certificate_expiry_date, certificate_time_remaining) @@ -119,10 +262,33 @@ A 3-tuple of the form: (certificate_common_name, certificate_expiry_date, certif else: _cert_string = cert_string - cert_loaded = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, _cert_string) - - cert_serial = cert_loaded.get_serial_number() + # Disable this. We 'redefine' the type because we are working + # around a missing library on the target host. + # + # pylint: disable=redefined-variable-type + if 'OpenSSL.crypto' in sys.modules: + # No work-around required + cert_loaded = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, _cert_string) + else: + # Missing library, work-around required. We need to write the + # cert out to disk temporarily so we can run the 'openssl' + # command on it to decode it + _, path = tempfile.mkstemp() + with io.open(path, 'w') as fp: + fp.write(six.u(_cert_string)) + fp.flush() + + cmd = 'openssl x509 -in {} -text'.format(path) + try: + openssl_decoded = subprocess.Popen(cmd.split(), + stdout=subprocess.PIPE) + except OSError: + ans_module.fail_json(msg="Error: The 'OpenSSL' python library and CLI command were not found on the target host. Unable to parse any certificates. This host will not be included in generated reports.") + else: + openssl_decoded = openssl_decoded.communicate()[0] + os.remove(path) + cert_loaded = FakeOpenSSLCertificate(openssl_decoded) ###################################################################### # Read all possible names from the cert @@ -172,15 +338,14 @@ A 3-tuple of the form: (certificate_common_name, certificate_expiry_date, certif ###################################################################### # Grab the expiration date - cert_expiry = cert_loaded.get_notAfter() cert_expiry_date = datetime.datetime.strptime( - cert_expiry, + cert_loaded.get_notAfter(), # example get_notAfter() => 20180922170439Z '%Y%m%d%H%M%SZ') time_remaining = cert_expiry_date - now - return (cert_subject, cert_expiry_date, time_remaining, cert_serial) + return (cert_subject, cert_expiry_date, time_remaining, cert_loaded.get_serial_number()) def classify_cert(cert_meta, now, time_remaining, expire_window, cert_list): @@ -379,7 +544,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(cert, now) + cert_serial) = load_and_handle_cert(cert, now, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, @@ -428,7 +593,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(c, now, base64decode=True) + cert_serial) = load_and_handle_cert(c, now, base64decode=True, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, @@ -458,7 +623,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(c, now, base64decode=True) + cert_serial) = load_and_handle_cert(c, now, base64decode=True, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, @@ -512,7 +677,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(c, now) + cert_serial) = load_and_handle_cert(c, now, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, @@ -551,7 +716,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(etcd_fp.read(), now) + cert_serial) = load_and_handle_cert(etcd_fp.read(), now, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, @@ -597,7 +762,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(router_c, now, base64decode=True) + cert_serial) = load_and_handle_cert(router_c, now, base64decode=True, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, @@ -628,7 +793,7 @@ an OpenShift Container Platform cluster (cert_subject, cert_expiry_date, time_remaining, - cert_serial) = load_and_handle_cert(registry_c, now, base64decode=True) + cert_serial) = load_and_handle_cert(registry_c, now, base64decode=True, ans_module=module) expire_check_result = { 'cert_cn': cert_subject, |