MSCA certsrv

From UVOO Tech Wiki
Jump to navigation Jump to search
import argparse
import base64
import re
import time
import requests
from requests_ntlm import HttpNtlmAuth
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa


def generate_key_and_csr(common_name, san_list=None):
    # Generate a private key
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
    )

    # Build subject
    subject = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, common_name),
    ])

    # Build CSR
    csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject)
    if san_list:
        san = x509.SubjectAlternativeName([
            x509.DNSName(dns) for dns in san_list
        ])
        csr_builder = csr_builder.add_extension(san, critical=False)

    csr = csr_builder.sign(key, hashes.SHA256())
    csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8')
    return key, csr_pem


def submit_csr(ca_url, csr_pem, template, auth):
    """
    Submit a CSR to the Microsoft CA Web Enrollment service.
    Uses Mode=newreq, CertRequest and CertAttrib form fields. ([stackoverflow.com](https://stackoverflow.com/questions/31283476/submitting-base64-csr-to-a-microsoft-ca-via-curl))
    """
    session = requests.Session()
    session.auth = auth
    # Initialize session (get cookies)
    session.get(f"{ca_url}/certrqxt.asp", verify=False)

    data = {
        'Mode': 'newreq',
        'CertRequest': csr_pem,
        'CertAttrib': f'CertificateTemplate:{template}',
        'FriendlyType': '',
        'TargetStoreFlags': '0',
        'SaveCert': 'yes'
    }
    resp = session.post(f"{ca_url}/certfnsh.asp", data=data, verify=False)
    resp.raise_for_status()

    # Extract the Request ID from the response
    match = re.search(r"certnew\.cer\?ReqID=(\d+)&", resp.text)
    if not match:
        raise RuntimeError("Failed to obtain Request ID from CA response")
    return match.group(1)


def retrieve_cert(ca_url, req_id, auth, timeout=60):
    """
    Retrieve issued certificate in DER from CA. ([certsrv.readthedocs.io](https://certsrv.readthedocs.io/en/latest/_modules/certsrv.html?utm_source=chatgpt.com))
    """
    session = requests.Session()
    session.auth = auth
    end_time = time.time() + timeout
    while time.time() < end_time:
        resp = session.get(
            f"{ca_url}/certnew.cer", 
            params={'ReqID': req_id, 'Enc': 'bin'},
            verify=False
        )
        if resp.headers.get('Content-Type') == 'application/pkix-cert':
            return resp.content
        time.sleep(2)
    raise TimeoutError("Certificate not issued within timeout period")


def main():
    parser = argparse.ArgumentParser(
        description='Generate a key, CSR and submit to Microsoft CA Web Enrollment'
    )
    parser.add_argument('--ca-url', required=True,
                        help='Base URL of CA Web Enrollment (e.g. https://ca-server/certsrv)')
    parser.add_argument('--template', required=True,
                        help='Certificate template name')
    parser.add_argument('--username', required=True,
                        help='User in DOMAIN\\user format for NTLM auth')
    parser.add_argument('--password', required=True,
                        help='Password for NTLM auth')
    parser.add_argument('--cn', required=True,
                        help='Common Name for certificate subject')
    parser.add_argument('--san', nargs='*', default=None,
                        help='Optional Subject Alternative Names (DNS)')
    parser.add_argument('--output-key', default='private_key.pem',
                        help='Output path for private key (PEM)')
    parser.add_argument('--output-cert', default='certificate.pem',
                        help='Output path for certificate (PEM)')
    args = parser.parse_args()

    auth = HttpNtlmAuth(args.username, args.password)

    # Generate key and CSR
    key, csr_pem = generate_key_and_csr(args.cn, args.san)

    # Save private key
    with open(args.output_key, 'wb') as f:
        f.write(
            key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption()
            )
        )
    print(f'Private key saved to {args.output_key}')

    # Submit CSR
    req_id = submit_csr(args.ca_url.rstrip('/'), csr_pem, args.template, auth)
    print(f'Submitted CSR, Request ID = {req_id}')

    # Retrieve issued certificate
    cert_der = retrieve_cert(args.ca_url.rstrip('/'), req_id, auth)

    # Convert DER to PEM and save
    cert = x509.load_der_x509_certificate(cert_der)
    with open(args.output_cert, 'wb') as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))
    print(f'Certificate saved to {args.output_cert}')


if __name__ == '__main__':
    main()