Difference between revisions of "Venafi python"

From UVOO Tech Wiki
Jump to navigation Jump to search
(Created page with "``` #!/usr/bin/env python3 import argparse import base64 from getpass import getpass import json import os import requests import sys class EnvDefault(argparse.Action): d...")
 
Line 1: Line 1:
 +
https://docs.venafi.com/Docs/current/TopNav/Content/SDK/WebSDK/r-SDK-POST-Certificates-retrieve.php
 +
 
```
 
```
 
#!/usr/bin/env python3
 
#!/usr/bin/env python3

Revision as of 00:42, 18 April 2024

https://docs.venafi.com/Docs/current/TopNav/Content/SDK/WebSDK/r-SDK-POST-Certificates-retrieve.php

#!/usr/bin/env python3
import argparse
import base64
from getpass import getpass
import json
import os
import requests
import sys

class EnvDefault(argparse.Action):
    def __init__(self, envvar, required=True, default=None, **kwargs):
        if not default and envvar:
            if envvar in os.environ:
                default = os.environ[envvar]
        if required and default:
            required = False
        super(EnvDefault, self).__init__(default=default, required=required,
                                         **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, values)


def get_required_environment_variables(variables):
    for var in variables:
        if os.environ.get(var) is None:
            raise EnvironmentError(f"Environment variable {var} is not set.")
    return {var: os.environ[var] for var in variables}


def get_env_var(env_name):
    env_value = os.environ.get(env_name)
    if env_value is None:
        raise EnvironmentError(f"Environment variable '{env_name}' is not set.")
    return env_value


def get_cert_via_dn(token, cert_path, include_private_key=False, cert_password=False):
    url = f"https://{args.fqdn}/vedsdk/Certificates/Retrieve"

    # Escape backslashes in cert_path
    cert_path_escaped = cert_path.replace("\\", "\\\\")

    cert_prefix = "\\VED\\Policy\\Certificates\\"
    cert_prefix_escaped = cert_prefix.replace("\\", "\\\\")
    cert_dn = cert_prefix_escaped + cert_path_escaped

    if cert_password and include_private_key:
        payload = {
            "CertificateDN": cert_dn,
            "Format": "Base64",
            "IncludeChain": True,
            "IncludePrivateKey": include_private_key,
            "Password": cert_password,
            "RootFirstOrder": True
        }
    else:
        payload = {
            "CertificateDN": cert_dn,
            "Format": "Base64",
            "IncludeChain": True,
            "RootFirstOrder": True
        }

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    response = requests.post(url, json=payload, headers=headers)

    if response.status_code == 200:
        b64 = response.json()["CertificateData"].encode("utf-8")
        data = base64.b64decode(b64)
        return data.decode("utf-8")
        # pretty_json = json.dumps(data, indent=4)
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None


def get_access_token(username, password, client_id, scope):
    url = f"https://{args.fqdn}/vedauth/authorize/oauth"
    payload = {
        "username": username,
        "password": password,
        "client_id": client_id,
        "scope": scope
    }
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        return response.json()["access_token"]
    else:
        response.raise_for_status()

def search_cert_by_alias(token, alias):
    url = f"https://{args.fqdn}/api/v1/certificate/search"
    headers = {"Authorization": f"Bearer {token}"}
    params = {"alias": alias}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        certificates = response.json()["certificates"]
        if certificates:
            return certificates[0]["id"]  # Assuming we take the first certificate ID found
    else:
        response.raise_for_status()

def download_cert_pfx(token, certificate_id, output_file):
    url = f"https://args.fqdn/api/v1/certificate/{certificate_id}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        # Assuming the certificate is returned in PKCS#12 format
        with open(output_file, "wb") as f:
            f.write(response.content)
    else:
        response.raise_for_status()


def main():
    global args
    parser = argparse.ArgumentParser(
        description='Get venafi certifcate.')
    parser.add_argument('-a', '--alias', required=True, type=str,
                        action=EnvDefault, envvar='ALIAS',
                        help='Venafi alias.')
    parser.add_argument('-c', '--client-id', required=True, type=str,
                        action=EnvDefault, envvar='CLIENT_ID',
                        # default=os.environ.get('CLIENT_ID'),
                        help='Venafi client_id - Example: certificate:manage.')
    parser.add_argument('-p', '--password', required=True, type=str,
                        action=EnvDefault, envvar='PASSWORD',
                        # default=os.environ.get('PASSWORD'),
                        help='Venafi alias. Often the CN of hostname portion of CN')
    parser.add_argument('-s', '--scope', required=True, type=str,
                        action=EnvDefault, envvar='SCOPE',
                        # default=os.environ.get('SCOPE'),
                        help='Venafi scope.')
    parser.add_argument('-u', '--username', required=True, type=str,
                        action=EnvDefault, envvar='USERNAME',
                        # default=os.environ.get('USERNAME'),
                        help='Venafi alias.')
    parser.add_argument('-P', '--path', required=True, type=str,
                        help='Venafi certificate path.')
    parser.add_argument('-F', '--fqdn', required=True, type=str,
                        action=EnvDefault, envvar='FQDN',
                        # default=os.environ.get('FQDN'),
                        help='Venafi API FQDN.')
    parser.add_argument('--include-private-key', action='store_const', const=True, default=False,
                        help='Include certifcate private key.')
    parser.add_argument('--cert-password', type=str, default=False,
                        action=EnvDefault, envvar='CERT_PASSWORD',
                        help='Include certifcate private key.')
    args = parser.parse_args()

    token = get_access_token(args.username, args.password, args.client_id, args.scope)
    print(f"Access token: {token}")
    print(args.include_private_key)
    rsp = get_cert_via_dn(token, args.path, args.include_private_key, args.cert_password)
    # rsp.raise_for_status()
    print(rsp)

if __name__ == "__main__":
        main()