|
|
|
|
@ -50,9 +50,6 @@ PREFIX_RSA_SHA512 = bytearray([0x30,0x51,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01
|
|
|
|
|
class CertificateError(Exception): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def decode_str(data): |
|
|
|
|
encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8' |
|
|
|
|
return bytes(data).decode(encoding) |
|
|
|
|
|
|
|
|
|
def decode_OID(s): |
|
|
|
|
s = map(ord, s) |
|
|
|
|
@ -69,33 +66,31 @@ def decode_OID(s):
|
|
|
|
|
return '.'.join(map(str,r)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def asn1_get_children(der, i): |
|
|
|
|
nodes = [] |
|
|
|
|
ii = asn1_node_first_child(der,i) |
|
|
|
|
nodes.append(ii) |
|
|
|
|
while ii[2]<i[2]: |
|
|
|
|
ii = asn1_node_next(der,ii) |
|
|
|
|
nodes.append(ii) |
|
|
|
|
return nodes |
|
|
|
|
|
|
|
|
|
def asn1_get_sequence(s): |
|
|
|
|
return map(lambda j: asn1_get_value(s, j), asn1_get_children(s, asn1_node_root(s))) |
|
|
|
|
|
|
|
|
|
def asn1_get_dict(der, i): |
|
|
|
|
p = {} |
|
|
|
|
for ii in asn1_get_children(der, i): |
|
|
|
|
for iii in asn1_get_children(der, ii): |
|
|
|
|
iiii = asn1_node_first_child(der, iii) |
|
|
|
|
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) |
|
|
|
|
iiii = asn1_node_next(der, iiii) |
|
|
|
|
value = asn1_get_value(der, iiii) |
|
|
|
|
p[oid] = value |
|
|
|
|
return p |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class X509(tlslite.X509): |
|
|
|
|
"""Child class of tlslite.X509 that uses pyasn1 to parse cert |
|
|
|
|
information. Note: pyasn1 is a lot slower than tlslite, so we |
|
|
|
|
should try to do everything in tlslite. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def get_children(self, der, i): |
|
|
|
|
nodes = [] |
|
|
|
|
ii = asn1_node_first_child(der,i) |
|
|
|
|
while True: |
|
|
|
|
nodes.append(ii) |
|
|
|
|
ii = asn1_node_next(der,ii) |
|
|
|
|
if ii[0] > i[2]: |
|
|
|
|
break |
|
|
|
|
return nodes |
|
|
|
|
|
|
|
|
|
def get_dict(self, der, i): |
|
|
|
|
p = {} |
|
|
|
|
for ii in self.get_children(der, i): |
|
|
|
|
for iii in self.get_children(der, ii): |
|
|
|
|
iiii = asn1_node_first_child(der, iii) |
|
|
|
|
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) |
|
|
|
|
iiii = asn1_node_next(der, iiii) |
|
|
|
|
value = asn1_get_value(der, iiii) |
|
|
|
|
p[oid] = value |
|
|
|
|
return p |
|
|
|
|
|
|
|
|
|
def parseBinary(self, b): |
|
|
|
|
|
|
|
|
|
@ -123,7 +118,7 @@ class X509(tlslite.X509):
|
|
|
|
|
|
|
|
|
|
# issuer |
|
|
|
|
issuer = asn1_node_next(der, sig_algo) |
|
|
|
|
self.issuer = self.get_dict(der, issuer) |
|
|
|
|
self.issuer = asn1_get_dict(der, issuer) |
|
|
|
|
|
|
|
|
|
# validity |
|
|
|
|
validity = asn1_node_next(der, issuer) |
|
|
|
|
@ -134,26 +129,31 @@ class X509(tlslite.X509):
|
|
|
|
|
|
|
|
|
|
# subject |
|
|
|
|
subject = asn1_node_next(der, validity) |
|
|
|
|
self.subject = self.get_dict(der, subject) |
|
|
|
|
self.subject = asn1_get_dict(der, subject) |
|
|
|
|
subject_pki = asn1_node_next(der, subject) |
|
|
|
|
|
|
|
|
|
# optional fields: issuer_uid, subject_uid, extensions |
|
|
|
|
i = subject_pki |
|
|
|
|
# extensions |
|
|
|
|
self.CA = False |
|
|
|
|
while True: |
|
|
|
|
self.AKI = None |
|
|
|
|
self.SKI = None |
|
|
|
|
i = subject_pki |
|
|
|
|
while i[2] < cert[2]: |
|
|
|
|
i = asn1_node_next(der, i) |
|
|
|
|
if i[0] > cert[2]: |
|
|
|
|
break |
|
|
|
|
for ii in self.get_children(der, i): |
|
|
|
|
for iii in self.get_children(der, ii): |
|
|
|
|
iiii = asn1_node_first_child(der, iii) |
|
|
|
|
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) |
|
|
|
|
iiii = asn1_node_next(der, iiii) |
|
|
|
|
value = asn1_get_value(der, iiii) |
|
|
|
|
if oid == '2.5.29.19': # basic constraints |
|
|
|
|
self.CA = value |
|
|
|
|
else: |
|
|
|
|
pass |
|
|
|
|
d = asn1_get_dict(der, i) |
|
|
|
|
for oid, value in d.items(): |
|
|
|
|
if oid == '2.5.29.19': |
|
|
|
|
# Basic Constraints |
|
|
|
|
self.CA = bool(value) |
|
|
|
|
elif oid == '2.5.29.14': |
|
|
|
|
# Subject Key Identifier |
|
|
|
|
r = asn1_node_root(value) |
|
|
|
|
value = asn1_get_value_of_type(value, r, 'OCTET STRING') |
|
|
|
|
self.SKI = value.encode('hex') |
|
|
|
|
elif oid == '2.5.29.35': |
|
|
|
|
# Authority Key Identifier |
|
|
|
|
self.AKI = asn1_get_sequence(value)[0].encode('hex') |
|
|
|
|
else: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
# cert signature |
|
|
|
|
cert_sig_algo = asn1_node_next(der, cert) |
|
|
|
|
@ -162,13 +162,16 @@ class X509(tlslite.X509):
|
|
|
|
|
cert_sig = asn1_node_next(der, cert_sig_algo) |
|
|
|
|
self.signature = asn1_get_value(der, cert_sig)[1:] |
|
|
|
|
|
|
|
|
|
def get_keyID(self): |
|
|
|
|
# http://security.stackexchange.com/questions/72077/validating-an-ssl-certificate-chain-according-to-rfc-5280-am-i-understanding-th |
|
|
|
|
return self.SKI if self.SKI else repr(self.subject) |
|
|
|
|
|
|
|
|
|
def get_issuer_keyID(self): |
|
|
|
|
return self.AKI if self.AKI else repr(self.issuer) |
|
|
|
|
|
|
|
|
|
def get_common_name(self): |
|
|
|
|
return self.subject.get('2.5.4.3', 'unknown') |
|
|
|
|
|
|
|
|
|
def get_issuer(self): |
|
|
|
|
return self.issuer.get('2.5.4.3', 'unknown') |
|
|
|
|
|
|
|
|
|
def get_signature(self): |
|
|
|
|
return self.cert_sig_algo, self.signature, self.data |
|
|
|
|
|
|
|
|
|
@ -198,6 +201,7 @@ class X509CertChain(tlslite.X509CertChain):
|
|
|
|
|
@profiler |
|
|
|
|
def load_certificates(ca_path): |
|
|
|
|
ca_list = {} |
|
|
|
|
ca_keyID = {} |
|
|
|
|
with open(ca_path, 'r') as f: |
|
|
|
|
s = f.read() |
|
|
|
|
bList = tlslite.utils.pem.dePemList(s, "CERTIFICATE") |
|
|
|
|
@ -206,10 +210,12 @@ def load_certificates(ca_path):
|
|
|
|
|
try: |
|
|
|
|
x.parseBinary(b) |
|
|
|
|
x.check_date() |
|
|
|
|
except Exception as e: |
|
|
|
|
except BaseException as e: |
|
|
|
|
util.print_error("cert error:", e) |
|
|
|
|
continue |
|
|
|
|
ca_list[x.get_common_name()] = x |
|
|
|
|
|
|
|
|
|
fp = x.getFingerprint() |
|
|
|
|
ca_list[fp] = x |
|
|
|
|
ca_keyID[x.get_keyID()] = fp |
|
|
|
|
|
|
|
|
|
return ca_list |
|
|
|
|
return ca_list, ca_keyID |
|
|
|
|
|