1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# pylint: disable=missing-docstring,invalid-name,redefined-outer-name
import pytest
from OpenSSL import crypto
# Parameter list for valid_cert fixture
VALID_CERTIFICATE_PARAMS = [
{
'short_name': 'client',
'cn': 'client.example.com',
'serial': 4,
'uses': b'clientAuth',
'dns': [],
'ip': [],
},
{
'short_name': 'server',
'cn': 'server.example.com',
'serial': 5,
'uses': b'serverAuth',
'dns': ['kubernetes', 'openshift'],
'ip': ['10.0.0.1', '192.168.0.1']
},
{
'short_name': 'combined',
'cn': 'combined.example.com',
# Verify that HUGE serials parse correctly.
# Frobs PARSING_HEX_SERIAL in _parse_cert
# See https://bugzilla.redhat.com/show_bug.cgi?id=1464240
'serial': 14449739080294792594019643629255165375,
'uses': b'clientAuth, serverAuth',
'dns': ['etcd'],
'ip': ['10.0.0.2', '192.168.0.2']
}
]
# Extract the short_name from VALID_CERTIFICATE_PARAMS to provide
# friendly naming for the valid_cert fixture
VALID_CERTIFICATE_IDS = [param['short_name'] for param in VALID_CERTIFICATE_PARAMS]
@pytest.fixture(scope='session')
def ca(tmpdir_factory):
ca_dir = tmpdir_factory.mktemp('ca')
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.set_version(3)
cert.set_serial_number(1)
cert.get_subject().commonName = 'test-signer'
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.add_extensions([
crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
crypto.X509Extension(b'keyUsage', True,
b'digitalSignature, keyEncipherment, keyCertSign, cRLSign'),
crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert)
])
cert.add_extensions([
crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always', issuer=cert)
])
cert.sign(key, 'sha256')
return {
'dir': ca_dir,
'key': key,
'cert': cert,
}
@pytest.fixture(scope='session',
ids=VALID_CERTIFICATE_IDS,
params=VALID_CERTIFICATE_PARAMS)
def valid_cert(request, ca):
common_name = request.param['cn']
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.set_serial_number(request.param['serial'])
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(24 * 60 * 60)
cert.set_issuer(ca['cert'].get_subject())
cert.set_pubkey(key)
cert.set_version(3)
cert.get_subject().commonName = common_name
cert.add_extensions([
crypto.X509Extension(b'basicConstraints', True, b'CA:FALSE'),
crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'),
crypto.X509Extension(b'extendedKeyUsage', False, request.param['uses']),
])
if request.param['dns'] or request.param['ip']:
san_list = ['DNS:{}'.format(common_name)]
san_list.extend(['DNS:{}'.format(x) for x in request.param['dns']])
san_list.extend(['IP:{}'.format(x) for x in request.param['ip']])
cert.add_extensions([
crypto.X509Extension(b'subjectAltName', False, ', '.join(san_list).encode('utf8'))
])
cert.sign(ca['key'], 'sha256')
cert_contents = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
cert_file = ca['dir'].join('{}.crt'.format(common_name))
cert_file.write_binary(cert_contents)
return {
'common_name': common_name,
'serial': request.param['serial'],
'dns': request.param['dns'],
'ip': request.param['ip'],
'uses': request.param['uses'],
'cert_file': cert_file,
'cert': cert
}
|