MSCA certsrv
Jump to navigation
Jump to search
import argparse
import base64
import re
import time
import requests
from requests_ntlm import HttpNtlmAuth
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
def generate_key_and_csr(common_name, san_list=None):
# Generate a private key
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Build subject
subject = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
])
# Build CSR
csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject)
if san_list:
san = x509.SubjectAlternativeName([
x509.DNSName(dns) for dns in san_list
])
csr_builder = csr_builder.add_extension(san, critical=False)
csr = csr_builder.sign(key, hashes.SHA256())
csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8')
return key, csr_pem
def submit_csr(ca_url, csr_pem, template, auth):
"""
Submit a CSR to the Microsoft CA Web Enrollment service.
Uses Mode=newreq, CertRequest and CertAttrib form fields. ([stackoverflow.com](https://stackoverflow.com/questions/31283476/submitting-base64-csr-to-a-microsoft-ca-via-curl))
"""
session = requests.Session()
session.auth = auth
# Initialize session (get cookies)
session.get(f"{ca_url}/certrqxt.asp", verify=False)
data = {
'Mode': 'newreq',
'CertRequest': csr_pem,
'CertAttrib': f'CertificateTemplate:{template}',
'FriendlyType': '',
'TargetStoreFlags': '0',
'SaveCert': 'yes'
}
resp = session.post(f"{ca_url}/certfnsh.asp", data=data, verify=False)
resp.raise_for_status()
# Extract the Request ID from the response
match = re.search(r"certnew\.cer\?ReqID=(\d+)&", resp.text)
if not match:
raise RuntimeError("Failed to obtain Request ID from CA response")
return match.group(1)
def retrieve_cert(ca_url, req_id, auth, timeout=60):
"""
Retrieve issued certificate in DER from CA. ([certsrv.readthedocs.io](https://certsrv.readthedocs.io/en/latest/_modules/certsrv.html?utm_source=chatgpt.com))
"""
session = requests.Session()
session.auth = auth
end_time = time.time() + timeout
while time.time() < end_time:
resp = session.get(
f"{ca_url}/certnew.cer",
params={'ReqID': req_id, 'Enc': 'bin'},
verify=False
)
if resp.headers.get('Content-Type') == 'application/pkix-cert':
return resp.content
time.sleep(2)
raise TimeoutError("Certificate not issued within timeout period")
def main():
parser = argparse.ArgumentParser(
description='Generate a key, CSR and submit to Microsoft CA Web Enrollment'
)
parser.add_argument('--ca-url', required=True,
help='Base URL of CA Web Enrollment (e.g. https://ca-server/certsrv)')
parser.add_argument('--template', required=True,
help='Certificate template name')
parser.add_argument('--username', required=True,
help='User in DOMAIN\\user format for NTLM auth')
parser.add_argument('--password', required=True,
help='Password for NTLM auth')
parser.add_argument('--cn', required=True,
help='Common Name for certificate subject')
parser.add_argument('--san', nargs='*', default=None,
help='Optional Subject Alternative Names (DNS)')
parser.add_argument('--output-key', default='private_key.pem',
help='Output path for private key (PEM)')
parser.add_argument('--output-cert', default='certificate.pem',
help='Output path for certificate (PEM)')
args = parser.parse_args()
auth = HttpNtlmAuth(args.username, args.password)
# Generate key and CSR
key, csr_pem = generate_key_and_csr(args.cn, args.san)
# Save private key
with open(args.output_key, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
)
print(f'Private key saved to {args.output_key}')
# Submit CSR
req_id = submit_csr(args.ca_url.rstrip('/'), csr_pem, args.template, auth)
print(f'Submitted CSR, Request ID = {req_id}')
# Retrieve issued certificate
cert_der = retrieve_cert(args.ca_url.rstrip('/'), req_id, auth)
# Convert DER to PEM and save
cert = x509.load_der_x509_certificate(cert_der)
with open(args.output_cert, 'wb') as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
print(f'Certificate saved to {args.output_cert}')
if __name__ == '__main__':
main()