MSCA certsrv
Jump to navigation
Jump to search
import argparse import base64 import re import time import requests from requests_ntlm import HttpNtlmAuth from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import rsa def generate_key_and_csr(common_name, san_list=None): # Generate a private key key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) # Build subject subject = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, common_name), ]) # Build CSR csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject) if san_list: san = x509.SubjectAlternativeName([ x509.DNSName(dns) for dns in san_list ]) csr_builder = csr_builder.add_extension(san, critical=False) csr = csr_builder.sign(key, hashes.SHA256()) csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8') return key, csr_pem def submit_csr(ca_url, csr_pem, template, auth): """ Submit a CSR to the Microsoft CA Web Enrollment service. Uses Mode=newreq, CertRequest and CertAttrib form fields. ([stackoverflow.com](https://stackoverflow.com/questions/31283476/submitting-base64-csr-to-a-microsoft-ca-via-curl)) """ session = requests.Session() session.auth = auth # Initialize session (get cookies) session.get(f"{ca_url}/certrqxt.asp", verify=False) data = { 'Mode': 'newreq', 'CertRequest': csr_pem, 'CertAttrib': f'CertificateTemplate:{template}', 'FriendlyType': '', 'TargetStoreFlags': '0', 'SaveCert': 'yes' } resp = session.post(f"{ca_url}/certfnsh.asp", data=data, verify=False) resp.raise_for_status() # Extract the Request ID from the response match = re.search(r"certnew\.cer\?ReqID=(\d+)&", resp.text) if not match: raise RuntimeError("Failed to obtain Request ID from CA response") return match.group(1) def retrieve_cert(ca_url, req_id, auth, timeout=60): """ Retrieve issued certificate in DER from CA. ([certsrv.readthedocs.io](https://certsrv.readthedocs.io/en/latest/_modules/certsrv.html?utm_source=chatgpt.com)) """ session = requests.Session() session.auth = auth end_time = time.time() + timeout while time.time() < end_time: resp = session.get( f"{ca_url}/certnew.cer", params={'ReqID': req_id, 'Enc': 'bin'}, verify=False ) if resp.headers.get('Content-Type') == 'application/pkix-cert': return resp.content time.sleep(2) raise TimeoutError("Certificate not issued within timeout period") def main(): parser = argparse.ArgumentParser( description='Generate a key, CSR and submit to Microsoft CA Web Enrollment' ) parser.add_argument('--ca-url', required=True, help='Base URL of CA Web Enrollment (e.g. https://ca-server/certsrv)') parser.add_argument('--template', required=True, help='Certificate template name') parser.add_argument('--username', required=True, help='User in DOMAIN\\user format for NTLM auth') parser.add_argument('--password', required=True, help='Password for NTLM auth') parser.add_argument('--cn', required=True, help='Common Name for certificate subject') parser.add_argument('--san', nargs='*', default=None, help='Optional Subject Alternative Names (DNS)') parser.add_argument('--output-key', default='private_key.pem', help='Output path for private key (PEM)') parser.add_argument('--output-cert', default='certificate.pem', help='Output path for certificate (PEM)') args = parser.parse_args() auth = HttpNtlmAuth(args.username, args.password) # Generate key and CSR key, csr_pem = generate_key_and_csr(args.cn, args.san) # Save private key with open(args.output_key, 'wb') as f: f.write( key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) ) print(f'Private key saved to {args.output_key}') # Submit CSR req_id = submit_csr(args.ca_url.rstrip('/'), csr_pem, args.template, auth) print(f'Submitted CSR, Request ID = {req_id}') # Retrieve issued certificate cert_der = retrieve_cert(args.ca_url.rstrip('/'), req_id, auth) # Convert DER to PEM and save cert = x509.load_der_x509_certificate(cert_der) with open(args.output_cert, 'wb') as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) print(f'Certificate saved to {args.output_cert}') if __name__ == '__main__': main()