diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 00b9e75..acde084 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -18,6 +18,7 @@ workflow: - if: $CI_EXTERNAL_PULL_REQUEST_IID - if: $CI_COMMIT_TAG - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH + - if: $CI_PIPELINE_SOURCE == "web" stages: - test @@ -28,6 +29,12 @@ caramel:test: image: ${PYTHON_IMAGE} before_script: - pip3 install . + script: + - python3 -m unittest discover + +caramel:test:deprecation: + extends: caramel:test + allow_failure: true script: - python3 -W error::DeprecationWarning -m unittest discover @@ -35,7 +42,9 @@ caramel:systest: stage: test image: ${BUILD_IMAGE} before_script: + - dnf -y install openssl - pip3 install . + - test -e /etc/machine-id || echo f26871a049f84136bb011f4744cde2dd > /etc/machine-id script: - make systest diff --git a/caramel/config.py b/caramel/config.py index f55b355..2271323 100644 --- a/caramel/config.py +++ b/caramel/config.py @@ -1,7 +1,7 @@ #! /usr/bin/env python # vim: expandtab shiftwidth=4 softtabstop=4 tabstop=17 filetype=python : """caramel.config is a helper library that standardizes and collects the logic - in one place used by the caramel CLI tools/scripts""" +in one place used by the caramel CLI tools/scripts""" import argparse import logging @@ -221,7 +221,7 @@ def get_log_level(argument_level, logger=None, env=None): logger = logging.getLogger() current_level = logger.level - argument_verbosity = logging.ERROR - argument_level * 10 # level steps are 10 + argument_verbosity = logging.ERROR - argument_level * 10 # level steps are 10 verbosity = min(argument_verbosity, env_level, current_level) log_level = ( verbosity if logging.DEBUG <= verbosity <= logging.ERROR else logging.ERROR diff --git a/caramel/models.py b/caramel/models.py index 92b3e4a..3458b2f 100644 --- a/caramel/models.py +++ b/caramel/models.py @@ -1,15 +1,16 @@ #! /usr/bin/env python # vim: expandtab shiftwidth=4 softtabstop=4 tabstop=17 filetype=python : +from cryptography import x509 as _x509 +from cryptography.hazmat.primitives import hashes as _hashes +from cryptography.hazmat.primitives import serialization as _serialization + + import sqlalchemy as _sa -from sqlalchemy.ext.declarative import ( - declared_attr, - as_declarative -) +from sqlalchemy.ext.declarative import declared_attr, as_declarative import sqlalchemy.orm as _orm from zope.sqlalchemy import register -import OpenSSL.crypto as _crypto from pyramid.decorator import reify as _reify import datetime as _datetime import dateutil.parser @@ -19,30 +20,27 @@ X509_V3 = 0x2 # RFC 2459, 4.1.2.1 # Bitlength to Hash Strength lookup table. -HASH = {1024: "sha1", 2048: "sha256", 4096: "sha512"} +HASH = { + 1024: _hashes.SHA1, + 2048: _hashes.SHA256, + 4096: _hashes.SHA512, +} # These parts of the subject _must_ match our CA key -CA_SUBJ_MATCH = (b"C", b"ST", b"L", b"O") - - -def _crypto_patch(): - """hijack _crypto internal lib and violate the default text encoding. - https://github.com/pyca/pyopenssl/pull/115 has a pull&fix for it - https://github.com/pyca/pyopenssl/issues/129 is an open issue - about it.""" - _crypto._lib.ASN1_STRING_set_default_mask_asc(b"utf8only") - +CA_SUBJ_MATCH = ("C", "ST", "L", "O") -_crypto_patch() - -class SigningCert(object): +class SigningCert: """Data class to wrap signing key + cert, to help refactoring""" def __init__(self, cert, key=None): if key: - self.key = _crypto.load_privatekey(_crypto.FILETYPE_PEM, key) - self.cert = _crypto.load_certificate(_crypto.FILETYPE_PEM, cert) + if isinstance(key, str): + key = key.encode("utf8") + self.key = _serialization.load_pem_private_key(key, password=None) + if isinstance(cert, str): + cert = cert.encode("utf8") + self.cert = _x509.load_pem_x509_certificate(cert) @classmethod def from_files(cls, certfile, keyfile=None): @@ -57,24 +55,18 @@ def from_files(cls, certfile, keyfile=None): @_reify def not_before(self): - ts = self.cert.get_notBefore() - if not ts: - return None - return dateutil.parser.parse(ts) + return self.cert.not_valid_before_utc @_reify def pem(self): - return _crypto.dump_certificate(_crypto.FILETYPE_PEM, self.cert) + ret = self.cert.public_bytes(serialization.Encoding.PEM) + return ret # Returns the parts we _care_ about in the subject, from a ca def get_ca_prefix(self, subj_match=CA_SUBJ_MATCH): - subject = self.cert.get_subject() - components = dict(subject.get_components()) - matches = tuple( - (n.decode("utf8"), components[n].decode("utf8")) - for n in subj_match - if n in components - ) + # newer API uses strings instead of bytes, thus we decode them here + name_map = {a.rfc4514_attribute_name: a.value for a in self.cert.subject} + matches = tuple((n, name_map[n]) for n in subj_match if n in name_map) return matches @@ -89,10 +81,12 @@ def _fkcolumn(referent, *args, **kwargs): @as_declarative() -class Base(object): +class Base: + __allow_unmapped__ = True + @declared_attr # type: ignore def __tablename__(cls) -> str: # pylint: disable=no-self-argument - return cls.__name__.lower() # pylint: disable=no-member + return cls.__name__.lower() # pylint: disable=no-member id = _sa.Column(_sa.Integer, primary_key=True) @@ -147,36 +141,37 @@ def __init__(self, sha256sum, reqtext): # XXX: assert sha256(reqtext).hexdigest() == sha256sum ? self.sha256sum = sha256sum self.pem = reqtext - # FIXME: Below 4 lines (try/except) are duped in the req() function. - try: - self.req.verify(self.req.get_pubkey()) - except _crypto.Error: - raise ValueError("invalid PEM reqtext") + + req = _x509.load_pem_x509_csr(self.pem) + if not req.is_signature_valid: + raise ValueError("Invalid Request (invalid signature)") + # Check for and reject reqtext with trailing content - pem = _crypto.dump_certificate_request(_crypto.FILETYPE_PEM, self.req) + pem = req.public_bytes(_serialization.Encoding.PEM) if pem != self.pem: raise ValueError("invalid PEM reqtext") - self.orgunit = self.subject.OU - self.commonname = self.subject.CN + + def subj_item(subject, key): + for item in subject: + if item.rfc4514_attribute_name == key: + return item.value + return "" + + self.orgunit = subj_item(req.subject, "OU") + self.commonname = subj_item(req.subject, "CN") self.rejected = False @_reify def req(self): - req = _crypto.load_certificate_request(_crypto.FILETYPE_PEM, self.pem) - try: - req.verify(req.get_pubkey()) - except _crypto.Error: - raise ValueError("Invalid Request") + req = _x509.load_pem_x509_csr(self.pem) + if not req.is_signature_valid: + raise ValueError("Invalid Request (invalid signature)") return req - @_reify - def subject(self): - return self.req.get_subject() - @_reify def subject_components(self): - components = self.subject.get_components() - return tuple((n.decode("utf8"), v.decode("utf8")) for n, v in components) + res = tuple((a.rfc4514_attribute_name, a.value) for a in self.req.subject) + return res @classmethod def valid(cls): @@ -205,14 +200,14 @@ def refreshable(cls): # Options subqueryload is to prevent thousands of small queries and # instead batch load the certificates at once - all_signed = _sa.select([Certificate.csr_id]) + all_signed = _sa.select(Certificate.csr_id) return ( cls.query().filter_by(rejected=False).filter(CSR.id.in_(all_signed)).all() ) @classmethod def unsigned(cls): - all_signed = _sa.select([Certificate.csr_id]) + all_signed = _sa.select(Certificate.csr_id) return ( cls.query() .filter_by(rejected=False) @@ -243,9 +238,15 @@ def __repr__(self): ).format(self) +def utcnow(): + """Return a non timezone-aware datetime in UTC.""" + ts = _datetime.datetime.now(_datetime.timezone.utc) + return ts.replace(tzinfo=None) + + class AccessLog(Base): # XXX: name could be better - when = _sa.Column(_sa.DateTime, default=_datetime.datetime.utcnow) + when = _sa.Column(_sa.DateTime, default=utcnow) # XXX: name could be better, could perhaps be limited length, # might not want this nullable addr = _sa.Column(_sa.Text) @@ -264,7 +265,7 @@ def __repr__(self): return "<{0.__class__.__name__} id={0.id}>".format(self) -class Extension(object): +class Extension: """Convenience class to make validating Extensions a bit easier""" critical = False @@ -273,9 +274,9 @@ class Extension(object): text = None def __init__(self, ext): - self.name = ext.get_short_name() - self.critical = bool(ext.get_critical()) - self.data = ext.get_data() + self.name = ext.oid._name + self.critical = ext.critical + self.data = ext.value self.text = str(ext) @@ -291,36 +292,34 @@ def __init__(self, CSR, pem, *args, **kws): self.csr_id = CSR.id req = CSR.req - cert_pkey = self.cert.get_pubkey() + cert_pkey = self.cert.public_key() # We can't compare pubkeys directly, so we just verify the signature. - if not req.verify(cert_pkey): + if not req.is_signature_valid: raise ValueError("Public key of cert cannot verify request") - self.not_before = dateutil.parser.parse(self.cert.get_notBefore()) - self.not_after = dateutil.parser.parse(self.cert.get_notAfter()) + self.not_before = self.cert.not_valid_before_utc + self.not_after = self.cert.not_valid_after_utc @_reify def cert(self): - cert = _crypto.load_certificate(_crypto.FILETYPE_PEM, self.pem) + cert = _x509.load_pem_x509_certificate(self.pem) extensions = {} - for index in range(0, cert.get_extension_count()): - ext = cert.get_extension(index) + for ext in cert.extensions: my_ext = Extension(ext) extensions[my_ext.name] = my_ext - - if cert.get_version() != X509_V3: + if cert.version.value != X509_V3: raise ValueError("Not a x509.v3 certificate") - ext = extensions.get(b"basicConstraints") + ext = extensions.get("basicConstraints") if not ext: raise ValueError("Missing Basic Constraints") if not ext.critical: raise ValueError("Extended Key Usage not critical") - ext = extensions.get(b"extendedKeyUsage") + ext = extensions.get("extendedKeyUsage") if not ext: raise ValueError("Missing Extended Key Usage extension") if not ext.critical: @@ -342,54 +341,56 @@ def sign(cls, CSR, ca, lifetime=_datetime.timedelta(30 * 3), backdate=False): timekeeping bug in some firmware. """ assert isinstance(ca, SigningCert) + + now = _datetime.datetime.now(tz=_datetime.timezone.utc) + notAfter = int(lifetime.total_seconds()) # TODO: Verify that the data in DB matches csr_add rules in views.py - cert = _crypto.X509() - cert.set_subject(CSR.req.get_subject()) - cert.set_serial_number(int(uuid.uuid1())) - cert.set_issuer(ca.cert.get_subject()) - cert.set_pubkey(CSR.req.get_pubkey()) - cert.set_version(X509_V3) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(notAfter) + cert = _x509.CertificateBuilder() + cert = cert.subject_name(CSR.req.subject) + cert = cert.serial_number(int(uuid.uuid1())) + cert = cert.issuer_name(ca.cert.subject) + cert = cert.public_key(CSR.req.public_key()) + cert = cert.not_valid_before(now) + cert = cert.not_valid_after(now + _datetime.timedelta(seconds=notAfter)) if backdate and ca.not_before: - now = _datetime.datetime.now(tz=_datetime.timezone.utc) - delta = ca.not_before - now - cert.gmtime_adj_notBefore(int(delta.total_seconds())) - - subjectAltName = bytes("DNS:" + CSR.commonname, "utf-8") - extensions = [ - _crypto.X509Extension( - b"basicConstraints", critical=True, value=b"CA:FALSE" - ), - _crypto.X509Extension( - b"extendedKeyUsage", - critical=True, - value=b"clientAuth,serverAuth", - ), - _crypto.X509Extension( - b"subjectAltName", critical=False, value=subjectAltName - ), - _crypto.X509Extension( - b"subjectKeyIdentifier", - critical=False, - value=b"hash", - subject=cert, + cert = cert.not_valid_before(ca.not_before) + + cert = cert.add_extension( + _x509.BasicConstraints(ca=False, path_length=None), critical=True + ) + cert = cert.add_extension( + _x509.ExtendedKeyUsage( + [ + _x509.ExtendedKeyUsageOID.CLIENT_AUTH, + _x509.ExtendedKeyUsageOID.SERVER_AUTH, + ] ), - ] - cert.add_extensions(extensions) + critical=True, + ) + cert = cert.add_extension( + _x509.SubjectAlternativeName([_x509.DNSName(CSR.commonname)]), + critical=False, + ) + cert = cert.add_extension( + _x509.SubjectKeyIdentifier.from_public_key(CSR.req.public_key()), + critical=False, + ) # subjectKeyIdentifier has to be present before adding auth ident - extensions = [ - _crypto.X509Extension( - b"authorityKeyIdentifier", - critical=False, - value=b"issuer:always,keyid:always", - issuer=ca.cert, - ) - ] - cert.add_extensions(extensions) - bits = cert.get_pubkey().bits() - cert.sign(ca.key, HASH[bits]) - pem = _crypto.dump_certificate(_crypto.FILETYPE_PEM, cert) + issuer_identifier = _x509.SubjectKeyIdentifier.from_public_key( + ca.cert.public_key() + ) + + cert = cert.add_extension( + _x509.AuthorityKeyIdentifier( + key_identifier=issuer_identifier.key_identifier, + authority_cert_serial_number=ca.cert.serial_number, + authority_cert_issuer=[_x509.DirectoryName(ca.cert.subject)], + ), + critical=False, + ) + bits = CSR.req.public_key().key_size + cert = cert.sign(ca.key, HASH[bits]()) + pem = cert.public_bytes(_serialization.Encoding.PEM) return cls(CSR=CSR, pem=pem) diff --git a/caramel/scripts/generate_ca.py b/caramel/scripts/generate_ca.py index 9623780..5411eae 100644 --- a/caramel/scripts/generate_ca.py +++ b/caramel/scripts/generate_ca.py @@ -1,17 +1,26 @@ #! /usr/bin/env python3 # vim: expandtab shiftwidth=4 softtabstop=4 tabstop=17 filetype=python : +"""Generate a ca cert + key""" import datetime import argparse import uuid import os -import OpenSSL.crypto as _crypto +import sys + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + from caramel.config import ( setup_logging, get_appsettings, ) from caramel import config +REQ_VERSION = 0x00 VERSION = 0x2 CA_BITS = 4096 # Subject attribs, in order. @@ -20,151 +29,164 @@ CA_EXTENSIONS = [ # Key usage for a CA cert. - _crypto.X509Extension( - b"basicConstraints", critical=True, value=b"CA:true, pathlen:0" - ), + x509.BasicConstraints(ca=True, path_length=0), # no cRLSign as we do not use CRLs in caramel. - _crypto.X509Extension(b"keyUsage", critical=True, value=b"keyCertSign"), + x509.KeyUsage( + digital_signature=False, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=True, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), ] -def _crypto_patch(): - """hijack _crypto internal lib and violate the default text encoding. - https://github.com/pyca/pyopenssl/pull/115 has a pull&fix for it - https://github.com/pyca/pyopenssl/issues/129 is an open issue - about it.""" - _crypto._lib.ASN1_STRING_set_default_mask_asc(b"utf8only") - - -_crypto_patch() - - -# Hack hack. :-) -def CA_LIFE(): +def ca_life(): + """Return a ca lifetime in seconds""" d = datetime.date.today() t = datetime.date(d.year + CA_YEARS, d.month, d.day) return int((t - d).total_seconds()) -# adapted from models.py -def components(subject): - comps = subject.get_components() - return dict((n.decode("utf8"), v.decode("utf8")) for n, v in comps) - +def matching_template(req, cacert): + """Compare required parts of subjects between two certs. -def matching_template(x509, cacert): - """Takes a subject as a dict, and returns if all required fields - match. Otherwise raises exception""" + Raises an exception on mismatch. + """ def later_check(subject): - """Check that the last two fields in subject are OU, CN""" - pair = subject[-1] - if pair[0].decode("utf8") != "CN": + """Check that the last two fields in subject are OU, CN.""" + name_parts = list(subject) + if name_parts[-1].rfc4514_attribute_name != "CN": raise ValueError("CN needs to be last in subject") - - pair = subject[-2] - if pair[0].decode("utf8") != "OU": + if name_parts[-2].rfc4514_attribute_name != "OU": raise ValueError("OU needs to be second to last") - casubject = cacert.get_subject().get_components() - subject = x509.get_subject().get_components() - later_check(casubject) - later_check(subject) - - casubject = casubject[:-2] - subject = subject[:-2] + later_check(req.subject) + later_check(cacert.subject) - for (ca, sub) in zip(casubject, subject): + # The lists do not contain the same data, so we cannot check + # casubject == casubject_2 + # but we can run the same check twice and it ought to match + casubject = list(cacert.subject)[:-2] + subject = list(req.subject)[:-2] + for ca, sub in zip(casubject, subject): if ca != sub: - raise ValueError("Subject needs to match CA cert:" "{}".format(casubject)) - - -def sign_req(req, cacert, cakey): - # Validate Subject contents. Not necessary for CA gen, but kept anyhow - matching_template(req, cacert) + raise ValueError(f"Subject needs to match CA cert: {casubject}") - # Validate signature - req.verify(req.get_pubkey()) - request_subject = components(req.get_subject()) - cert = _crypto.X509() - subject = cert.get_subject() - cert.set_serial_number(int(uuid.uuid1())) - cert.set_version(VERSION) +def sign_ca_req_inner(nreq, cakey, serial): + """Inner signing, which does the work.""" + builder = x509.CertificateBuilder() + builder = builder.serial_number(serial) + # cert.set_version(VERSION) - for attrib in ATTRIBS_TO_KEEP: - if request_subject.get(attrib): - setattr(subject, attrib, request_subject[attrib]) - - issuer_subject = cert.get_subject() - cert.set_issuer(issuer_subject) - cert.set_pubkey(req.get_pubkey()) + # We want the attributes in a specific order, thus we first split the + # subject, and then iterate over our chosen order, and then re-assmeble it. + name_attrs = [] + name_map = {attrib.rfc4514_attribute_name: attrib for attrib in nreq.subject} + name_attrs = [name_map[a] for a in ATTRIBS_TO_KEEP if a in name_map] + builder = builder.subject_name(x509.Name(name_attrs)) + builder = builder.issuer_name(nreq.subject) + builder = builder.public_key(nreq.public_key()) # Validity times - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(CA_LIFE()) - - cert.add_extensions(CA_EXTENSIONS) - - cacert = cert - - extension = _crypto.X509Extension( - b"subjectKeyIdentifier", critical=False, value=b"hash", subject=cert - ) - cert.add_extensions([extension]) - - # We need subjectKeyIdentifier to be added before we can add - # authorityKeyIdentifier. - extension = _crypto.X509Extension( - b"authorityKeyIdentifier", + now = datetime.datetime.now(datetime.timezone.utc) + builder = builder.not_valid_before(now) + builder = builder.not_valid_after(now + datetime.timedelta(seconds=ca_life())) + for extension in CA_EXTENSIONS: + builder = builder.add_extension(extension, critical=True) + + sub_identifier = x509.SubjectKeyIdentifier.from_public_key(nreq.public_key()) + builder = builder.add_extension(sub_identifier, critical=False) + + builder = builder.add_extension( + x509.AuthorityKeyIdentifier( + key_identifier=sub_identifier.key_identifier, + authority_cert_serial_number=serial, + authority_cert_issuer=[x509.DirectoryName(nreq.subject)], + ), critical=False, - value=b"issuer:always,keyid:always", - issuer=cacert, ) - cert.add_extensions([extension]) - - cert.sign(cakey, "sha512") + cert = builder.sign(cakey, hashes.SHA512()) return cert -def create_ca_req(subject): - key = _crypto.PKey() - key.generate_key(_crypto.TYPE_RSA, CA_BITS) +def sign_ca_req(nreq, cakey): + """Sign a CA request. Different from normal request due to extensions.""" + # Validate Subject contents. Not necessary for CA gen, but kept anyhow + matching_template(nreq, nreq) + assert nreq.is_signature_valid is True + serial = int(uuid.uuid1()) + cert2 = sign_ca_req_inner(nreq, cakey, serial) + return cert2 - req = _crypto.X509Req() - req.set_version(VERSION) - req.set_pubkey(key) - x509subject = req.get_subject() - for k, v in subject: - setattr(x509subject, k, v) +NAME_TO_OID = { + "C": NameOID.COUNTRY_NAME, + "ST": NameOID.STATE_OR_PROVINCE_NAME, + "L": NameOID.LOCALITY_NAME, + "O": NameOID.ORGANIZATION_NAME, + "OU": NameOID.ORGANIZATIONAL_UNIT_NAME, + "CN": NameOID.COMMON_NAME, +} - req.add_extensions(CA_EXTENSIONS) - req.sign(key, "sha512") +def subject_to_name(subject): + """Take a subject set of tuples and turn it into a list of name attributes.""" + res = [x509.NameAttribute(NAME_TO_OID[k], v) for k, v in subject] + return res + + +def create_ca_req(subject): + """Create a CA request (csr) from the subject.""" + key = rsa.generate_private_key( + public_exponent=65537, + key_size=CA_BITS, + ) + attrs = subject_to_name(subject) + req = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(attrs)) + for extension in CA_EXTENSIONS: + req = req.add_extension(extension, critical=True) + req = req.sign(key, hashes.SHA512()) return key, req def create_ca(subject): + """Create a CA and self sign it.""" key, req = create_ca_req(subject) - cert = sign_req(req, req, key) + cert = sign_ca_req(req, key) return key, req, cert -def write_files(key, keyname, cert, certname): +def write_files(key, keyname, cert, certname, req): + """Write the files to disk.""" + def writefile(data, name): - with open(name, "w") as f: + with open(name, "w", encoding="utf8") as f: stream = data.decode("utf8") f.write(stream) - _key = _crypto.dump_privatekey(_crypto.FILETYPE_PEM, key) + _key = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) writefile(_key, keyname) - _cert = _crypto.dump_certificate(_crypto.FILETYPE_PEM, cert) + _cert = cert.public_bytes(serialization.Encoding.PEM) writefile(_cert, certname) + _req = req.public_bytes(serialization.Encoding.PEM) + writefile(_req, f"{certname}.csr") + def cmdline(): + """Commandline parser.""" parser = argparse.ArgumentParser() config.add_inifile_argument(parser) @@ -175,7 +197,8 @@ def cmdline(): return args -def build_ca(keyname, certname): +def prompt_subject(): + """Ask questions and return the subject.""" print("Enter CA settings, leave blank to not include") subject = {} subject["C"] = input("C [countryName (Code, 2 letters)]: ").upper() @@ -187,19 +210,18 @@ def build_ca(keyname, certname): subject["O"] = input("O [Organization]: ") subject["OU"] = input("OU [organizationalUnitName]: ") or "Caramel" subject["CN"] = "Caramel Signing Certificate" - print("CN will be '{}'".format(subject["CN"])) + print(f"""CN will be '{subject["CN"]}'""") template = [] for field in ATTRIBS_TO_KEEP: if field in subject and subject[field]: template.append((field, subject[field])) template = tuple(template) - - key, req, cert = create_ca(template) - write_files(key=key, keyname=keyname, cert=cert, certname=certname) + return template def main(): + """Main entrypoint.""" args = cmdline() config_path = args.inifile @@ -212,17 +234,23 @@ def main(): ca_cert_path, ca_key_path = config.get_ca_cert_key_path(args, settings) except ValueError as error: print(error) - exit() + sys.exit() for f in ca_cert_path, ca_key_path: if os.path.exists(f): - print("File already exists: {}. Refusing to corrupt.".format(f)) - exit() + print(f"File already exists: {f}. Refusing to corrupt.") + sys.exit() else: dname = os.path.dirname(f) os.makedirs(dname, exist_ok=True) - print("Will write key to {}".format(ca_key_path)) - print("Will write cert to {}".format(ca_cert_path)) + print(f"Will write key to {ca_key_path}") + print(f"Will write cert to {ca_cert_path}") + + template = prompt_subject() + key, req, cert = create_ca(template) + write_files(key=key, keyname=ca_key_path, cert=cert, certname=ca_cert_path, req=req) + - build_ca(keyname=ca_key_path, certname=ca_cert_path) +if __name__ == "__main__": + main() diff --git a/caramel/views.py b/caramel/views.py index ff42e99..4670b05 100644 --- a/caramel/views.py +++ b/caramel/views.py @@ -1,7 +1,6 @@ #! /usr/bin/env python # vim: expandtab shiftwidth=4 softtabstop=4 tabstop=17 filetype=python : from hashlib import sha256 -from datetime import datetime from pyramid.response import Response from pyramid.view import view_config @@ -21,6 +20,7 @@ CSR, AccessLog, SigningCert, + utcnow, ) @@ -28,7 +28,7 @@ # 2 kbyte should be enough for up to 4 kbit keys. # XXX: This should probably be handled outside of app (i.e. by the # server), or at least be configurable. -_MAXLEN = 2 * 2 ** 10 +_MAXLEN = 2 * 2**10 def raise_for_length(req, limit=_MAXLEN): @@ -38,7 +38,7 @@ def raise_for_length(req, limit=_MAXLEN): if length is None: raise HTTPLengthRequired if length > limit: - raise HTTPRequestEntityTooLarge("Max size: {0} kB".format(limit / 2 ** 10)) + raise HTTPRequestEntityTooLarge("Max size: {0} kB".format(limit / 2**10)) def raise_for_subject(components, required_prefix): @@ -105,7 +105,7 @@ def cert_fetch(request): raise HTTPForbidden cert = csr.certificates.first() if cert: - if datetime.utcnow() < cert.not_after: + if utcnow() < cert.not_after: # XXX: appropriate content-type is ... ? return Response( cert.pem, content_type="application/octet-stream", charset="UTF-8" diff --git a/setup.py b/setup.py index a28b598..e7d6b56 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,9 @@ "cryptography>=0.5.dev1", "pyOpenSSL>=0.14", "python-dateutil", + # Transient dependency from pyramid->webob, + # should be fixed in a later release of webob + "legacy-cgi; python_version >= '3.13'" ] deplinks = [] diff --git a/tests/fixtures.py b/tests/fixtures.py index d9489cb..89c8975 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -130,30 +130,77 @@ class CertificateData(object): pem=dedent( """\ -----BEGIN CERTIFICATE----- - MIIEVDCCAjygAwIBAgIBZTANBgkqhkiG9w0BAQsFADAmMSQwIgYDVQQDDBtDYXJh - bWVsIFNpZ25pbmcgQ2VydGlmaWNhdGUwHhcNMTQwODA4MTM0NDA0WhcNMTQxMDA4 - MTM0NDA0WjAaMRgwFgYDVQQDDA9mb28uZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3 - DQEBAQUAA4IBDwAwggEKAoIBAQC+UOYhTExyWQL/mPzxvkd4xzju9Gcoxu5WZxxP - +FjvsCKXkdMdzyTGYuaOulsRqyNmc8N73KJa9rlajIxdtR7duBwzX6Ddx2MmOLVL - Q95X0jzdYR9iv6NX+bdYnoFUuzFt6N6Tf6OFS7IGiCeYqN7JTzTwlseJ7O4ozdsk - vNlw7Cybb5RjqJQ/TRDSDWp1Fuq7FXanM+9Eaok0xqGty1TdMiEsCK8t7w3F1gd3 - bt48hld7cfe+OqdPxFLLtXv6wVM8EzcFYmwBGx7avXVS8aOsN6Oc3FZBfW0Fi0XD - S6YO7WSzsRqciXolv/zPIvc1g6z5RkIPlgUaCtAdNBWkDG0hAgMBAAGjgZgwgZUw - DAYDVR0TAQH/BAIwADAWBgNVHSUBAf8EDDAKBggrBgEFBQcDAjAdBgNVHQ4EFgQU - 79OVse0vdmb3VW6T9P7haHDOrhEwTgYDVR0jBEcwRYAUt9UtYpijdgeh1fObZHE4 - EBJPROGhKqQoMCYxJDAiBgNVBAMMG0NhcmFtZWwgU2lnbmluZyBDZXJ0aWZpY2F0 - ZYIBADANBgkqhkiG9w0BAQsFAAOCAgEAN8oppVaZlklLmvD1OfG0jf7SwWa03x6L - Ej7xh0EbCeTZXNCmJNjiPpYG57Lf7OlXit9u1aJUQ5VXpUDmv1njb0jnB3EJBWf/ - Y2KZbRYgXCpMUXxmfLrntULHAn/M3RLNi1ovwisiBqAU5/SZLcRq26Uwb8ygT0K7 - OA80JqjVzv53nKdUmDR0mopiWiU8c5BTi0R5+Q9vGov+DBcmrgatIHrtiXG8exGK - m/DHZMAA93Vv/nh6GUe9uWI4mDhhLRSrdfkHoTPDqrFHIdH4OC+ljlPDRNhpkWAt - FIzejEk5pqyqnAW4HHWM13vtOpddnXea+1y6PCX/9InAP5Tl1HMcq7BOgQa+EyPN - NdzIRtdRnhtncMJUTCJpm2QEH09aPM3411tFpG3nZ6eTvo5f6oZS+epBVwNVqFXm - xTbIEnBFnVBhQh2mXgJJsac2Oy8KGyBnVvGn6FKB6slrBjcIWKg+CDyiXMjwKhk5 - bjUvUdML4uWoGqdJfl+S+f+8m6S6F276p/uLI8Kb0NaE+z+LFRp2wrrC7DW65R+W - eMdFWSu/IZZkoEaTrOeaqmLtQQt/4s66KKNOddftt+uLEMjce7foYQHo+qx9RI/1 - n4rH2rFn8rcQwSaRyE9NcOpirCur43MR42+LAfZq5s9j7CuQJTw6G0wvCHGqRIQ9 - X1qbQOxg6ig= + MIIFnzCCA4egAwIBAgIQEgZIJGyVEfCi8HCFwkLpGzANBgkqhkiG9w0BAQsFADCB + jjELMAkGA1UEBhMCU0UxFzAVBgNVBAgMDsOWc3RlcmfDtnRsYW5kMRQwEgYDVQQH + DAtOb3Jya8O2cGluZzESMBAGA1UECgwJTXVwcGFyIEFCMRYwFAYDVQQLDA1NdXBw + YXIgVGVrbmlrMSQwIgYDVQQDDBtDYXJhbWVsIFNpZ25pbmcgQ2VydGlmaWNhdGUw + HhcNMjUwNzI5MTU1OTU1WhcNMjUxMDI3MTU1OTU1WjBIMRUwEwYDVQQKDAxFeGFt + cGxlIGluYy4xFTATBgNVBAsMDEV4YW1wbGUgRGVwdDEYMBYGA1UEAwwPZm9vLmV4 + YW1wbGUuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4HScCLsW + /pStt8UrmtdkcuacXniRAfmnEnxTW+cVkyoT5zfY3tG9Q4ooiVVopP48oelS6pAT + Xt/eb11uMDxzdur2F1GLN5vczKsmgNklgdByzFizN7rssa6GqnSF4H0tBHYCYtEA + MWexTFaV7bpjPeoSrLy4gN0ajiyayNv7d9A/eQs1G3RvBiFv3RdRHKfCthgaSO/7 + jNuleauJCSh1Ueenvbz2l5t1QAZG5IbSOVLBbYBOjW9AOth7033Wi1O7QrbwwW/A + LdKQNS1Yx12nL9YhG17f3km++nMhOqHFt77fJxby4+XfFpQbSLn5r+O1rGXats8C + Vr/eqy5RehMqHQIDAQABo4IBPDCCATgwDAYDVR0TAQH/BAIwADAgBgNVHSUBAf8E + FjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwGgYDVR0RBBMwEYIPZm9vLmV4YW1wbGUu + Y29tMB0GA1UdDgQWBBQaGLQo9EpIlO+5hXQHgW+kCisKwTCBygYDVR0jBIHCMIG/ + gBQVt+gEYV1LNbaCPsFX9ZyatUeIQaGBlKSBkTCBjjELMAkGA1UEBhMCU0UxFzAV + BgNVBAgMDsOWc3RlcmfDtnRsYW5kMRQwEgYDVQQHDAtOb3Jya8O2cGluZzESMBAG + A1UECgwJTXVwcGFyIEFCMRYwFAYDVQQLDA1NdXBwYXIgVGVrbmlrMSQwIgYDVQQD + DBtDYXJhbWVsIFNpZ25pbmcgQ2VydGlmaWNhdGWCEBH5dvhslRHwovBwhcJC6Rsw + DQYJKoZIhvcNAQELBQADggIBABpDPBJPE0bFglXxNAChf8BK50R3MBnunzdHiHct + d8zo17O3c4Pl+qzQ1W28HeHpkMn5pLYfKZp+XzlwRKU7+6CKs+EhNaexDIh53ddM + dK53fPf8i+tQ0qUGGI5cxmGRnxFM3KamFbkixyU8rZDKSr7dSgK6K7gXkd19vCmU + X9nNbCUhiV70i8ltnWXRuW1DOjKrLYF1z3fZeKmOTcyxH5pzwIBoAOwH0v83A4+p + ljgJfcw4Ja9MJH99EFbD22/jcg6Bmx4zMG6IHT5W3t7AbcWdJ9uT1utFJkjNb28a + j0Tv6QHgXSiulDeCD38wiAqw2+RTtVKpwfQkmMi3ipxU7NJONDkBbTrJWb/ww0Yp + 5a/uBL3rJ/WXQUqTCDyLI2UbmOq5Agmzm6qPWKBQU4wHAOViQxZPBpYl8cLGXamJ + JdMeZDJPrqKvyyGznO6icH9apFwkqHAjQwf2cP2DiLmh0dx9//1mn3/cKr8qmkie + 4Msf/QOoatsM+dof5dlX1uYd1aPtD6kei7rgPPvIe2H5MSLwvlXR7hbID2dKvYEC + VF/b7sC8ZFrwQolBGc0ASlWzkUMEDt9EspbYb/U2giAxqyHoyjpG6s4dOJebOvw+ + YXOUANWf7hycBIZ7NNsr3ALq1fCIqjcn6chlV9AV1e70d7AUoxiiryeHWA9edk4t + d5W1 + -----END CERTIFICATE----- + """ + ).encode("utf8"), + ) + empty_subject = CertificateFixture( + not_before=now - 2 * year, + not_after=now + 2 * year, + pem=dedent( + """\ + -----BEGIN CERTIFICATE----- + MIIFcTCCA1mgAwIBAgIQGXU+KGyYEfCYQnCFwkLpGzANBgkqhkiG9w0BAQsFADCB + jjELMAkGA1UEBhMCU0UxFzAVBgNVBAgMDsOWc3RlcmfDtnRsYW5kMRQwEgYDVQQH + DAtOb3Jya8O2cGluZzESMBAGA1UECgwJTXVwcGFyIEFCMRYwFAYDVQQLDA1NdXBw + YXIgVGVrbmlrMSQwIgYDVQQDDBtDYXJhbWVsIFNpZ25pbmcgQ2VydGlmaWNhdGUw + HhcNMjUwNzI5MTYyMTM2WhcNMjUxMDI3MTYyMTM2WjAaMRgwFgYDVQQDDA9mb28u + ZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCelfEP + IcJGLKc1x507gms1efETJyz2OISoJ+qmbLrxrlHWEb4Nt+UVEUswp4C3FLzfCEns + 8Hut0ywmg3rpN+3Wj9fnHjH+5SdSbqlURm0klghTIq/xRakWtwqU0LiUkVQU40/h + FK+VndEDa7zmXR9wqdZ1wijvi9rEVpknXWTfXbbKqKLwOizUe8tZ01h3StYF/sYy + HqiYwmyE0VHTX+xM50wp+M+zH1afJvtUuGhCZIGjc6UwQ0wAthAW2s3tIszW2wUk + 4Ses1udxslTqiuX8KHhDf0vwK2umpwjLTHenp7uNbaZNmWr4Y/ZVMBo5bczMXb8o + 1py6kwmOK/9ZVMPJAgMBAAGjggE8MIIBODAMBgNVHRMBAf8EAjAAMCAGA1UdJQEB + /wQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATAaBgNVHREEEzARgg9mb28uZXhhbXBs + ZS5jb20wHQYDVR0OBBYEFJ9ZRYBgP8X8FHM931dHnTh1cg/yMIHKBgNVHSMEgcIw + gb+AFLVqhVvcehx58WPUL03pQL+7FuAHoYGUpIGRMIGOMQswCQYDVQQGEwJTRTEX + MBUGA1UECAwOw5ZzdGVyZ8O2dGxhbmQxFDASBgNVBAcMC05vcnJrw7ZwaW5nMRIw + EAYDVQQKDAlNdXBwYXIgQUIxFjAUBgNVBAsMDU11cHBhciBUZWtuaWsxJDAiBgNV + BAMMG0NhcmFtZWwgU2lnbmluZyBDZXJ0aWZpY2F0ZYIQGVqWaGyYEfCYQnCFwkLp + GzANBgkqhkiG9w0BAQsFAAOCAgEAlaaw+/TpfseqaqO83nIAKHCApwSNa7Ztpvai + UpJLX3VSyAW5InFRnppQXLwmc7Dt4XoKyJAUb/CxiZdJGvXrAdZlXCeIGTkTEnhf + GJxsSJqZ7cPlNk0dk0RHDUgss3jXflsmzxzZS8LFm5O1DUE41psQ+NpphzqbNRqf + V79nXLg9srO8x95iXNxGOtjKFzDf+/mv6VNx/vE/B/x7J78VO+Dixf+s2q9ZpUEg + X7NOeGIYBKEzdyOWTcihqwFXpwqDZWhYw349kR/LWELuso1AfENp8MbEWygCU/am + V1sGKYxt/AgK3RajH+tKFnEHqfgIC6VIL5x8z3hT8mECGRnxawXK1LcE3vhjX4LQ + fsXf83O9rIQQsnqobshRXMXLg0XICQlaJ+g4JhWrgRh6FGleRy8Z84hcfsmfSHnn + WS4TezOe5kF30eNOJbSjXuCN+HWV5vUjgvnnKfnnbtW5QoW/MWqC4p51Pq2ZfAsC + Wtv9jEnSMl2bdy9whdzK3GwxW3G0NsMnDrF4TOv+77U5zJw23GcEUfcrzquZmcJn + 5HTXiigjDz/kOcaEYChPpkhYhJweCMNVKLHDq9PDiIF/oYPs5tHAmr4CS7hu04CD + LMyYsPkkWjRkyD+uN1IwW9+MwEZomdqB4W5HkKYA2QsG6bxHoE6F0/CpJ6+7G5gB + sMNVaTs= -----END CERTIFICATE----- """ ).encode("utf8"), @@ -165,35 +212,40 @@ class CertificateData(object): pem=dedent( """\ -----BEGIN CERTIFICATE----- - MIIEVTCCAj2gAwIBAgIBaDANBgkqhkiG9w0BAQsFADAmMSQwIgYDVQQDDBtDYXJh - bWVsIFNpZ25pbmcgQ2VydGlmaWNhdGUwHhcNMTQwODA4MTM1OTUxWhcNMTQxMDA4 - MTM1OTUxWjAbMRkwFwYDVQQDDBBzcGFtLmV4YW1wbGUuY29tMIIBIjANBgkqhkiG - 9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnrzJ2qNhyIiCZvikFvPr0iEfzf27kTfHN+13 - 7bGuTAzmXFFrajz9K+leYcZR4sCPPRI8QjEUSYcngP1LKUrgzUSldjPrQGLsmx+8 - gLdi2JN1kPu6uMT97uB1RDKQpIMHGuV4mJKJku3sh6DJvQMjuMv8xOUXHtCw9jjc - 6CLI9zeEZfM1RXsmRVxLx8HuwlF8ZNRjPGn5lEGIxTORpF1Mef5eTnGIg2kxwj8F - 5aJP4ei9XS6gbYSFZekruT9Ivh41x4FYmx4bcGQfuIu+cG+XI3xGoNrm/IKMguMQ - vwjBj2XnLHzskR5mz9YSLd9vi1nHl7b5u9GDtpmliNOvehaxjQIDAQABo4GYMIGV - MAwGA1UdEwEB/wQCMAAwFgYDVR0lAQH/BAwwCgYIKwYBBQUHAwIwHQYDVR0OBBYE - FH/tkDyXntmyru3UFyYTO6UTOwjuME4GA1UdIwRHMEWAFLfVLWKYo3YHodXzm2Rx - OBAST0ThoSqkKDAmMSQwIgYDVQQDDBtDYXJhbWVsIFNpZ25pbmcgQ2VydGlmaWNh - dGWCAQAwDQYJKoZIhvcNAQELBQADggIBAD4r7SEyD6A61ORVjHHpw+5nZuaIf3Xs - nZakE5I+wm/ZsVmWO3iOjAKdL2HartlDqGAvX5/w/sEoOkrpr48R7Uolm4ojYD2B - Etxb4p9PDHmgT9Sc8nE2hOh8FdIUiGVLRJOuCmZqgk+EBIwB0zJQjREXHVZaN4dS - wNNQGz46Vd2b6e4iRCJdgfOd2tuJi/WtvQORJkJ+HeTwLcN9LYY6d1bgups970DP - Ve3QzxowNaRdX4SNtTmWs2BqcpYJV/Bx+q5r5CxhuDs5Jyvfmh9K7ux8z8Rv9THm - UPJSXahBKLIYI8BYotXB9vpHvgWd15Opa3g+MV1Ego7KDKvdirTKnIcvafAi47vo - kpAussllmvGr3wjDrGLlvbyBbbg44MpTzZrWspAjm9t0XFwkDDZGjY9l1zzqolDa - KXbDda3LApBjWOr2QxsiwuOcemeekyiYusx5O4NqWJhGRVluvLpo5FWvuGZfoTCG - X9QYUVrbsvhMMZR0VQDkJ7zfqbA8HvZ72qRpkC3MkOKDo4Ve9PffRjK+bamZM38I - rvKws1E0w1WI1cK8ypefZhF8BiC/KUFMBw3bQAUQqdubv4eN81IX0PjnpfH+7wwA - 7CRL6mRM5+q4ICfesXzUXpPYjgD2UexIjSJNoYjiu3aKpjwvrZ88t8hPR46Uac7E - 5yvm61Agw/Tg + MIIFkjCCA3qgAwIBAgIBaDANBgkqhkiG9w0BAQsFADCBjjELMAkGA1UEBhMCU0Ux + FzAVBgNVBAgMDsOWc3RlcmfDtnRsYW5kMRQwEgYDVQQHDAtOb3Jya8O2cGluZzES + MBAGA1UECgwJTXVwcGFyIEFCMRYwFAYDVQQLDA1NdXBwYXIgVGVrbmlrMSQwIgYD + VQQDDBtDYXJhbWVsIFNpZ25pbmcgQ2VydGlmaWNhdGUwHhcNMTUwNzIyMTYzNzIz + WhcNMTYwNzIyMTYzNzIzWjBJMRUwEwYDVQQKDAxFeGFtcGxlIGluYy4xFTATBgNV + BAsMDEV4YW1wbGUgRGVwdDEZMBcGA1UEAwwQc3BhbS5leGFtcGxlLmNvbTCCASIw + DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKM64wXWescJGZO9PEEVmFd0CDps + 4uLeSj8CQtPYvSVE9A7PM5UsRST3OwYBHQBF22Bs6A6s0hw45ppgwY/hNZnoaydr + xHkpRzb0fdYJyi1kVZbaRKXmvmqcyp6CdQjHvqFQdcRROryzE9j4Qd13nHHE6tfR + uyBCpMIYzbEmf1YvuM9zsH4Hq5anfthgYGUBDhpPjlvB1qESzx9uXHG5PH7fEhsV + igDxH/BdfvEmuzrAGVNsT2a/g+rtn7hlQABO39+2pQxPdt+Is8QFGUx6Q1XbLC9b + bOaeM1yeBFGciQQ39yfj+p7L46CsEWNoOGOOLojrqmEwC9SVi8Mqp0OqY/UCAwEA + AaOCAT0wggE5MAwGA1UdEwEB/wQCMAAwIAYDVR0lAQH/BBYwFAYIKwYBBQUHAwIG + CCsGAQUFBwMBMBsGA1UdEQQUMBKCEHNwYW0uZXhhbXBsZS5jb20wHQYDVR0OBBYE + FDGw41ISbvpwXVdNdjdrEkEHJE2QMIHKBgNVHSMEgcIwgb+AFMNkmMv/riTDGeFW + IGT4JpGTuUDQoYGUpIGRMIGOMQswCQYDVQQGEwJTRTEXMBUGA1UECAwOw5ZzdGVy + Z8O2dGxhbmQxFDASBgNVBAcMC05vcnJrw7ZwaW5nMRIwEAYDVQQKDAlNdXBwYXIg + QUIxFjAUBgNVBAsMDU11cHBhciBUZWtuaWsxJDAiBgNVBAMMG0NhcmFtZWwgU2ln + bmluZyBDZXJ0aWZpY2F0ZYIQTdq+PmyaEfCxpXCFwkLpGzANBgkqhkiG9w0BAQsF + AAOCAgEAtiHDuxyAJIYfx0vF5ezgNmnKU41OGyyiTEwvlNskKgX4YCGnkKYkoaep + PDfjqjJfJsP9v+VuadJdaFuEyeWB4ODt0hELR0SDClJ+a6m9OPqc235xW8MIdosR + LKll6PXAVcuV48UO1msj36wQfRuW6qG2eciVluPiE8KNyevND2HqmjGvozxZTKdV + 8uF1yFwwn14m8i+T72cl+asGV0htSSFAg4LMmBsg3s8/ypZQlvX4dIuzmPMJGZy4 + En8xODVa3NiHMF8u9pVW1MthvfVyhPFnzHU2XEOsDudUmu7TSUQa3kdNX1UwKwHI + RkUgxPpWk3xxycsr2aC6hl5aGszLrFgg9N6gjNyVdiVUkS/N6jX5st6+W9F3O++s + n6wjWbIz2Vl4fg6bVvGY6ZT5jnckdvDHauk3zMMZ+055VHb2MNl3LNXa3KCWIDby + BTcVYn9EN98t1qZwX35bT2vp316GRnkrz3we980z2mN3kK/wBp6kge8ZRaafrvxC + wCEJrE/SQFOSYHuXr35S/4ct0LivAoGg7lHx2tzaEHhfbvNOKlxZrpj3Zea0h5M9 + CNYHJzx3xAY8s6bX2xePI6WhoDuSjN8Rr2hYBviL5fBwioXjqedeY0kD2qFc248l + bgpyY2Dsl+fnSnCBI2F205R6s3Y453L6ejAPf07EGVndckrL6g4= -----END CERTIFICATE----- """ ).encode("utf8"), ) - ca_cert = CertificateFixture( not_before=initial.not_before, not_after=initial.not_after, @@ -277,20 +329,21 @@ class CSRData(object): pem=dedent( """\ -----BEGIN CERTIFICATE REQUEST----- - MIICjTCCAXUCAQIwSDEVMBMGA1UECgwMRXhhbXBsZSBpbmMuMRUwEwYDVQQLDAxF + MIICtjCCAZ4CAQAwSDEVMBMGA1UECgwMRXhhbXBsZSBpbmMuMRUwEwYDVQQLDAxF eGFtcGxlIERlcHQxGDAWBgNVBAMMD2Zvby5leGFtcGxlLmNvbTCCASIwDQYJKoZI - hvcNAQEBBQADggEPADCCAQoCggEBAL5Q5iFMTHJZAv+Y/PG+R3jHOO70ZyjG7lZn - HE/4WO+wIpeR0x3PJMZi5o66WxGrI2Zzw3vcolr2uVqMjF21Ht24HDNfoN3HYyY4 - tUtD3lfSPN1hH2K/o1f5t1iegVS7MW3o3pN/o4VLsgaIJ5io3slPNPCWx4ns7ijN - 2yS82XDsLJtvlGOolD9NENINanUW6rsVdqcz70RqiTTGoa3LVN0yISwIry3vDcXW - B3du3jyGV3tx9746p0/EUsu1e/rBUzwTNwVibAEbHtq9dVLxo6w3o5zcVkF9bQWL - RcNLpg7tZLOxGpyJeiW//M8i9zWDrPlGQg+WBRoK0B00FaQMbSECAwEAAaAAMA0G - CSqGSIb3DQEBCwUAA4IBAQBxkfl/Nq0y2u8Deq17OlB8WZfJnigEtFtMFstWNxNp - Z3yxSFbaOYpB/+S0qOTjFbx1vQd8sSKTEqSgn2MkLhNsqtakWhejC+rrzVA02K6d - J7uCylX8XVRJPjmt14E2LNLxGx1adV8St0tPrbzXMzr0ygpGaIITvd+ZzXr4CuGQ - vao+T3EooEVQeFmWaoU6URUsqlp1itYzN1O+tvHv9pmCZ5UyTJlHQSc+cKJsUgE4 - O+jruZshnFL0KQybIokYGLLcb6NixdsCTSw+rfztuLUEEMP1ozNCgk8TX8mXWduM - XIP49FHFe6IjLuj0ofRXiJPmS+4ToqRbNIBRoz7kSLov + hvcNAQEBBQADggEPADCCAQoCggEBAOB0nAi7Fv6UrbfFK5rXZHLmnF54kQH5pxJ8 + U1vnFZMqE+c32N7RvUOKKIlVaKT+PKHpUuqQE17f3m9dbjA8c3bq9hdRizeb3Myr + JoDZJYHQcsxYsze67LGuhqp0heB9LQR2AmLRADFnsUxWle26Yz3qEqy8uIDdGo4s + msjb+3fQP3kLNRt0bwYhb90XURynwrYYGkjv+4zbpXmriQkodVHnp7289pebdUAG + RuSG0jlSwW2ATo1vQDrYe9N91otTu0K28MFvwC3SkDUtWMddpy/WIRte395Jvvpz + ITqhxbe+3ycW8uPl3xaUG0i5+a/jtaxl2rbPAla/3qsuUXoTKh0CAwEAAaApMCcG + CSqGSIb3DQEJDjEaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcN + AQELBQADggEBAJjxx/jMg3p7jtNk3wOC67YBaI3+2PhK83cKaSNBvCkXQog5mW4N + nWiA+fhm5itYqRc++X7uNOPQFDgvkKuP3keNJI54yiToLduSp2DCHP4H80JUN1pj + VSs6pzJLyjcQ6ZxuXIiLvPcdVLkGk8VkTULuuKiKfErXbld9JuHUFl0oVZaStoJp + jdbKvm5S9oGiiSQD2p3ucn4SJrJHEBQUyp6kR05P2PBAeA3kwIvkg9smc7J+lfjd + Nk8XFpL2cMZKxwBZnaokfl4RwBr4TbeOillRcZwI3E6FmIuXQM38X8xdBdboeJX2 + IM4D5LBcVTZ82rveE0hLkmZJaIVM8bb82mg= -----END CERTIFICATE REQUEST----- """ ).encode( @@ -310,26 +363,58 @@ class CSRData(object): ], ) + empty_subject = CSRFixture( + orgunit="Example Dept", + commonname="foo.example.com", + pem=dedent( + """\ + -----BEGIN CERTIFICATE REQUEST----- + MIICiDCCAXACAQAwGjEYMBYGA1UEAwwPZm9vLmV4YW1wbGUuY29tMIIBIjANBgkq + hkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnpXxDyHCRiynNcedO4JrNXnxEycs9jiE + qCfqpmy68a5R1hG+DbflFRFLMKeAtxS83whJ7PB7rdMsJoN66Tft1o/X5x4x/uUn + Um6pVEZtJJYIUyKv8UWpFrcKlNC4lJFUFONP4RSvlZ3RA2u85l0fcKnWdcIo74va + xFaZJ11k3122yqii8Dos1HvLWdNYd0rWBf7GMh6omMJshNFR01/sTOdMKfjPsx9W + nyb7VLhoQmSBo3OlMENMALYQFtrN7SLM1tsFJOEnrNbncbJU6orl/Ch4Q39L8Ctr + pqcIy0x3p6e7jW2mTZlq+GP2VTAaOW3MzF2/KNacupMJjiv/WVTDyQIDAQABoCkw + JwYJKoZIhvcNAQkOMRowGDAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DANBgkqhkiG + 9w0BAQsFAAOCAQEAlXvZZ1lkZT/6OCWtCSwVF3o43vzjCL2DzQOiVP7g+kOwAcLJ + qgIT/lboJgR7ND3G0PCfU0/ddh4FdrJSqQyKORx9nO+wUinyq3voTbzkG6KD0FTt + u59Pp+TbjRjjpSmWA16frsqoqI01NFRQWPulpTUPrL6JNNjsz9Xzfd7kiOMDi7VW + d9RUNTv1qmXfB/NwCiWnOzrErC/wpPeU3yRJFq0VQbKwUiY4qbkFcO43SmZ5hLF/ + KfF0Wna+m9IKRE8lykKetIDF6kdwfiq6zR2E3pnsVXOFN51ZL98kxE1VWcVnyj41 + npc1a3rcB7HOhRCC7550gxePeKNfZSsAfrGviQ== + -----END CERTIFICATE REQUEST----- + """ + ).encode( + "utf8" + ), # py3 dedent can't handle bytes + subject_components=(("CN", "foo.example.com"),), + certificates=[ + CertificateData.empty_subject, + ], + ) + with_expired_cert = CSRFixture( orgunit="Example Dept", commonname="spam.example.com", pem=dedent( """\ -----BEGIN CERTIFICATE REQUEST----- - MIICjjCCAXYCAQIwSTEVMBMGA1UECgwMRXhhbXBsZSBpbmMuMRUwEwYDVQQLDAxF + MIICtzCCAZ8CAQAwSTEVMBMGA1UECgwMRXhhbXBsZSBpbmMuMRUwEwYDVQQLDAxF eGFtcGxlIERlcHQxGTAXBgNVBAMMEHNwYW0uZXhhbXBsZS5jb20wggEiMA0GCSqG - SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCevMnao2HIiIJm+KQW8+vSIR/N/buRN8c3 - 7Xftsa5MDOZcUWtqPP0r6V5hxlHiwI89EjxCMRRJhyeA/UspSuDNRKV2M+tAYuyb - H7yAt2LYk3WQ+7q4xP3u4HVEMpCkgwca5XiYkomS7eyHoMm9AyO4y/zE5Rce0LD2 - ONzoIsj3N4Rl8zVFeyZFXEvHwe7CUXxk1GM8afmUQYjFM5GkXUx5/l5OcYiDaTHC - PwXlok/h6L1dLqBthIVl6Su5P0i+HjXHgVibHhtwZB+4i75wb5cjfEag2ub8goyC - 4xC/CMGPZecsfOyRHmbP1hIt32+LWceXtvm70YO2maWI0696FrGNAgMBAAGgADAN - BgkqhkiG9w0BAQsFAAOCAQEAf5SnEDewYNy4bu1bwga2alawbr+BQpysl/h53/aJ - woJakG3E3+jSyAQ6Bu/j+YY+hTMbNX/sOyWMexS6w6HryQ6i/xvFZqhgN5ap/I6M - k7V13j1LyxcTtlD773ikWq7F/+H76FgTAoE4oUmNvptej/C5eW5A1N150vMeecY/ - 0nHIV+eXaZTCbg7UKr1xmR3tdu3DNnFC/BfaVv+Ul7s974k4g53ejLCvnCjMGVS3 - oQN1HuxKCScUJ9Vtr0dnBLGAf62vAqv5yZYhl9Qnt5EJ9OtspWm0e8FwTjNmoA/z - qnvUxskzM2ItxVV9oa9YDTid0GbJvF67QJQyVIO0Vz4uwg== + SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCjOuMF1nrHCRmTvTxBFZhXdAg6bOLi3ko/ + AkLT2L0lRPQOzzOVLEUk9zsGAR0ARdtgbOgOrNIcOOaaYMGP4TWZ6Gsna8R5KUc2 + 9H3WCcotZFWW2kSl5r5qnMqegnUIx76hUHXEUTq8sxPY+EHdd5xxxOrX0bsgQqTC + GM2xJn9WL7jPc7B+B6uWp37YYGBlAQ4aT45bwdahEs8fblxxuTx+3xIbFYoA8R/w + XX7xJrs6wBlTbE9mv4Pq7Z+4ZUAATt/ftqUMT3bfiLPEBRlMekNV2ywvW2zmnjNc + ngRRnIkEN/cn4/qey+OgrBFjaDhjji6I66phMAvUlYvDKqdDqmP1AgMBAAGgKTAn + BgkqhkiG9w0BCQ4xGjAYMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgXgMA0GCSqGSIb3 + DQEBCwUAA4IBAQCWAHBKO4FKfaSUzR3grc/z448OPCTHMWcuLMCTORYyQcaDq6s6 + K4rmNMWVQuqj75Lfn/Hi7Okyyou0yJnJsPUhi1d4QqUwdoracta+bVj67SN/plXk + vKxje8BOB4hZUox1rHXw8kdN/oJD3POLMBTUdx9+aV6KOSXEZEs9iCS7PG5kSH/S + U+h4u3BQOfEppHDzMNgdUTy/7EfMJGNz2deg88oq0/kIluYkXLsP8tH8yClbPSF2 + yezKVLjFgaVt8wqrUbtxIXY9KTHpqaqpLFN0x6KtzJWE9glntfinGx54pEZin4+R + EfSSIOxmQwfw0179P+UcZoDpn0t7lQpZnadu -----END CERTIFICATE REQUEST----- """ ).encode( @@ -351,20 +436,21 @@ class CSRData(object): pem=dedent( """\ -----BEGIN CERTIFICATE REQUEST----- - MIICjTCCAXUCAQIwSDEVMBMGA1UECgwMRXhhbXBsZSBpbmMuMRUwEwYDVQQLDAxF + MIICtjCCAZ4CAQAwSDEVMBMGA1UECgwMRXhhbXBsZSBpbmMuMRUwEwYDVQQLDAxF eGFtcGxlIERlcHQxGDAWBgNVBAMMD2Jhci5leGFtcGxlLmNvbTCCASIwDQYJKoZI - hvcNAQEBBQADggEPADCCAQoCggEBAN7fng4vFo0P0+K1L64rADgXBDrwsa39p3tV - 7GwY/LZ9crxUgwFKfVLM8rX4KAySiOXix8JF44jansXTOkcm8OjnOKVNIJX/5Pf3 - bRDSXcjodFIhPVzUynj8E5Z8rEB2ES9gwYDKIYVNnJa2nGmQVe7IgA5O7lNM6gse - TqYlN3bmB9Dy/dY+ZVyts1p6aSOYMdAcJ7ojCco9HuYFav2hd7k2h4b4lCKvi7p2 - sx6DoKmnblmlvMlP9UdSq7wORkAUnMn5rlzdj5LnWsSB+JLBrQaSlsIfVeL0+5oB - 6zrZI5hJPQiaLcet0v6a7M7UkOyJKiGJVCKYnO122D4Cu4lB2d0CAwEAAaAAMA0G - CSqGSIb3DQEBCwUAA4IBAQAGJwl7vUlDg2L3KgnsNA9A4rMKfFn5fzdf6Z0X3HSY - Zvi8XKkVAKd7aRUSg6jcJbOm0HNmBR+5SWzXUU62KQWuUCwz+dCyCbcEO/frG0IB - HdOs/L4AJgHlIaxwisCLh5VQpj5w0ahhzVfLGWcCK1nbqjUTLcEFhvZviUPUugAD - f7QdWNDBuZTrwXGTLsnhC5XQfsk0maXcNji79ziK4sMm5TU2599JmuWL2NTbqaCQ - MN1FxWzEy4yXgzW8uv+lX6yyTtkfrC7e3LFiAuoUlBeD5GmsVd30Xz5iGnuQv3d0 - fekjT5Np8XIS2ERJmx4CIjs5VpE1FMNOMoJ35kQpkQaQ + hvcNAQEBBQADggEPADCCAQoCggEBAKt5pZo0gKpU+U4UR8Kx+g3b7PQPbdkjhSDp + WGkMPuVtWH/2epS+TNBMY12S2kpX1Ufi8ue6vbiki4qnHCsZP/oIY4gvopV7e8ku + EwfORWVfqpeN8TrI4adQudvgJpr9Fn613hBifer2fDiBV1BvvN8lR+cUi+XfSS7W + CjIVtThd3TqMHzFoNgepI3UiPlunQh/1aWDTw+L/poCa0GlyRZL2z84abvKzahbR + d8iPPuMROHgnst+7pRm1RYflzf+3Hqp2WOS492n62eqwS8xjnhonPO6qVJkbmSms + V5LsFBCo23E9DQwtByiR7YNemAPFeeVR4TAMv/R6GgnWOeqqLV8CAwEAAaApMCcG + CSqGSIb3DQEJDjEaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcN + AQELBQADggEBACQykUfVcAoA160JKhAtQ3H2VhNy0/LxZ22Oqx/XdVW/WNxuKr8C + bR/HoqauckKHtF4dOkMuTDkFpSZkiHB1N6kNfeRetZZiavo6idUC5d9ml2B//+FV + NCDeRIHGzihNu2GJUQk7lzWcBhzUBuFjS4Ic3dCQEgjb2mLN8LcU/coGd0LtAjt3 + U8Vy2U+HhsiHf3iwxwVlAZTDVgbS+hSEyAMHf1Xh9sLZw230C1RGd44TWqrkS3Dp + y/cJGrUtTWx9DBiX6nhQfbLR/nmW7rXYYRodhzj+aVwYW6MfO5Bjr6AfI4e/pYTY + G/9DhJMrIomYuhg9kwnwnNFGDUjw6eqW9zc= -----END CERTIFICATE REQUEST----- """ ).encode("utf8"), diff --git a/tests/gen_testdata.py b/tests/gen_testdata.py new file mode 100644 index 0000000..f42a551 --- /dev/null +++ b/tests/gen_testdata.py @@ -0,0 +1,174 @@ +"""Generation script for test-data in our fixtures.""" + +import datetime + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + +from caramel.scripts.generate_ca import create_ca, subject_to_name + +import caramel +import fixtures + + +# Subject attribs, in order. +ATTRIBS_TO_KEEP = ("C", "ST", "L", "O", "OU", "CN") +HASH = {1024: hashes.SHA1, 2048: hashes.SHA256, 4096: hashes.SHA512} +NAME_TO_OID = { + "C": NameOID.COUNTRY_NAME, + "ST": NameOID.STATE_OR_PROVINCE_NAME, + "L": NameOID.LOCALITY_NAME, + "O": NameOID.ORGANIZATION_NAME, + "OU": NameOID.ORGANIZATIONAL_UNIT_NAME, + "CN": NameOID.COMMON_NAME, +} + + +EXTENSIONS = [ + x509.BasicConstraints(ca=False, path_length=None), + x509.KeyUsage( + digital_signature=True, + content_commitment=True, + key_encipherment=True, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), +] + + +def create_req(subject): + """Create a normal request for the subject. + + Extensions differ from a CA request. + """ + attrs = subject_to_name(subject) + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + req = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(attrs)) + for ext in EXTENSIONS: + req = req.add_extension(ext, critical=False) + req = req.sign(key, hashes.SHA256()) + return key, req + + +def create_expired(req, ca_cert, ca_key): + """Create an expired cert for the request.""" + before = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( + seconds=86400 * 10 * 366 + ) + after = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( + seconds=86400 * 9 * 366 + ) + cb = x509.CertificateBuilder() + cb = cb.serial_number(104) + cb = cb.subject_name(req.subject) + cb = cb.issuer_name(ca_cert.subject) + cb = cb.public_key(req.public_key()) + cb = cb.not_valid_before(before) + cb = cb.not_valid_after(after) + cb = cb.add_extension( + x509.BasicConstraints(ca=False, path_length=None), critical=True + ) + cb = cb.add_extension( + x509.ExtendedKeyUsage( + [ + x509.ExtendedKeyUsageOID.CLIENT_AUTH, + x509.ExtendedKeyUsageOID.SERVER_AUTH, + ] + ), + critical=True, + ) + cb = cb.add_extension( + x509.SubjectAlternativeName([x509.DNSName("spam.example.com")]), + critical=False, + ) + cb = cb.add_extension( + x509.SubjectKeyIdentifier.from_public_key(req.public_key()), critical=False + ) + issuer_identifier = x509.SubjectKeyIdentifier.from_public_key(ca_cert.public_key()) + cb = cb.add_extension( + x509.AuthorityKeyIdentifier( + key_identifier=issuer_identifier.key_identifier, + authority_cert_serial_number=ca_cert.serial_number, + authority_cert_issuer=[x509.DirectoryName(ca_cert.subject)], + ), + critical=False, + ) + + bits = req.public_key().key_size + out = cb.sign(ca_key, HASH[bits]()) + return out + + +def write_files(key, keyname, cert, certname): + """Write certain files we need.""" + + def writefile(data, name): + with open(name, "w", encoding="utf8") as f: + stream = data.decode("utf8") + f.write(stream) + + keydata = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + writefile(keydata, keyname) + + certdata = cert.public_bytes(serialization.Encoding.PEM) + writefile(certdata, certname) + + +CA_KEY_FILE = "fixtures.ca.key" +CA_CRT_FILE = "fixtures.ca.crt" + + +def main(): + """Generate some fixtures.""" + ca_template = fixtures.CertificateData.ca_cert.subject + key, req, cert = create_ca(ca_template) + write_files(key=key, keyname=CA_KEY_FILE, cert=cert, certname=CA_CRT_FILE) + ca = caramel.models.SigningCert.from_files(CA_CRT_FILE, CA_KEY_FILE) + del key, req, cert + + # Initial certs data + _tkey, treq = create_req(fixtures.CSRData.initial.subject_components) + csr_pem = treq.public_bytes(serialization.Encoding.PEM) + csr = caramel.models.CSR("fakeshasum", csr_pem) + cert = caramel.models.Certificate.sign(csr, ca) + print("initial\n", csr.pem.decode("utf8"), cert.pem.decode("utf8")) + + # Empty subject cert + _tkey, treq = create_req(fixtures.CSRData.empty_subject.subject_components) + csr_pem = treq.public_bytes(serialization.Encoding.PEM) + csr = caramel.models.CSR("fakeshasum", csr_pem) + cert = caramel.models.Certificate.sign(csr, ca) + print("empty_subject\n", csr.pem.decode("utf8"), cert.pem.decode("utf8")) + + # Good crt + _tkey, treq = create_req(fixtures.CSRData.good.subject_components) + csr_pem = treq.public_bytes(serialization.Encoding.PEM) + csr = caramel.models.CSR("fakeshasum", csr_pem) + cert = caramel.models.Certificate.sign(csr, ca) + print("good\n", csr.pem.decode("utf8"), cert.pem.decode("utf8")) + + # Expired crt + _tkey, treq = create_req(fixtures.CSRData.with_expired_cert.subject_components) + csr_pem = treq.public_bytes(serialization.Encoding.PEM) + cert = create_expired(treq, ca.cert, ca.key) + crt_pem = cert.public_bytes(serialization.Encoding.PEM) + print("expired\n", csr_pem.decode("utf8"), crt_pem.decode("utf8")) + + +if __name__ == "__main__": + main() diff --git a/tests/test_models.py b/tests/test_models.py index 681a11b..e2948f2 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -67,9 +67,7 @@ def tearDown(self): pass def test_blank_file(self): - import OpenSSL - - with self.assertRaises(OpenSSL.crypto.Error): + with self.assertRaises(ValueError): SigningCert("") def test_valid_cert(self): @@ -81,7 +79,7 @@ def test_outdated_cert_should_work(self): ca.get_ca_prefix() def test_empty_returns_from_empty_subject(self): - ca = SigningCert(fixtures.CertificateData.initial.pem) + ca = SigningCert(fixtures.CertificateData.empty_subject.pem) result = ca.get_ca_prefix() self.assertEqual((), result) @@ -98,7 +96,7 @@ def test_valid_returns_from_default_subject(self): def test_only_CN_returns_from_CN_selector(self): CN_TUPLE = (("CN", "Caramel Signing Certificate"),) ca = SigningCert(fixtures.CertificateData.ca_cert.pem) - result = ca.get_ca_prefix((b"CN",)) + result = ca.get_ca_prefix(("CN",)) self.assertEqual(CN_TUPLE, result) def test_only_wanted_returns_from_selector(self): @@ -107,7 +105,7 @@ def test_only_wanted_returns_from_selector(self): ("L", "Norrköping"), ("OU", "Muppar Teknik"), ) - SELECTOR = (b"ST", b"L", b"OU") + SELECTOR = ("ST", "L", "OU") ca = SigningCert(fixtures.CertificateData.ca_cert.pem) result = ca.get_ca_prefix(SELECTOR) self.assertEqual(SELECTED, result) diff --git a/tests/test_views.py b/tests/test_views.py index 59188cb..761b55e 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -17,6 +17,7 @@ from caramel.models import ( CSR, AccessLog, + utcnow, ) from caramel import views @@ -137,7 +138,7 @@ def test_missing(self): def test_exists_valid(self): sha256sum = fixtures.CSRData.initial.sha256sum csr = CSR.by_sha256sum(sha256sum) - now = datetime.datetime.utcnow() + now = utcnow() self.req.matchdict["sha256"] = sha256sum resp = views.cert_fetch(self.req) # Verify response contents @@ -153,7 +154,7 @@ def test_exists_valid(self): def test_exists_expired(self): csr = fixtures.CSRData.with_expired_cert() csr.save() - now = datetime.datetime.utcnow() + now = utcnow() self.req.matchdict["sha256"] = csr.sha256sum resp = views.cert_fetch(self.req) # Verify response contents @@ -168,7 +169,7 @@ def test_exists_expired(self): def test_not_signed(self): csr = fixtures.CSRData.good() csr.save() - now = datetime.datetime.utcnow() + now = utcnow() self.req.matchdict["sha256"] = csr.sha256sum resp = views.cert_fetch(self.req) # Verify response contents