Paramiko
Jump to navigation
Jump to search
Example
# powershell commands will be encoded by default for many reasons. If you want logged turn on logging https://github.com/ansible/ansible/issues/50107#issuecomment-448442954 # https://github.com/diyan/pywinrm # Invoke-Expression ((New-Object System.Net.Webclient).DownloadString('https://raw.githubusercontent.com/ansible/ansible/devel/examples/scripts/ConfigureRemotingForAnsible.ps1')) import logging import winrm import paramiko import socket import sys ssh_port = 22 winrm_port_http = 5985 winrm_port_https = 5986 def get_os(host): if test_tcp_port_open(host, 5986) == 0: return "windows" elif test_tcp_port_open(host, 5985) == 0: return "windows" elif test_tcp_port_open(host, 22) == 0: return "linux" else: return "unknown" def get_remoteshell_port(host): if test_tcp_port_open(host, 5986) == 0: return 5986, 'winrm' elif test_tcp_port_open(host, 5985) == 0: return 5985, 'winrm' elif test_tcp_port_open(host, 22) == 0: return 22, 'ssh' else: return "unknown", 'unknown' def test_is_valid_host_or_ipaddr(host): try: ipaddress.ip_address(host) return 0 except: pass try: socket.gethostbyname(host) return 0 except: print(f"E: {host} is not valid ip address or can't be resolved!") return 1 def test_tcp_port_open(host, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) address = (host, port) try: # r = s.connect_ex(address) s.connect((host, int(port))) s.shutdown(socket.SHUT_RDWR) s.close() return 0 # sock.settimeout(None) except: return 1 class RcmdClient: """Winrm and ssh wrapper class that currently uses pywinrm and paramiko""" def __init__(self, host, username, userpass, transport='ntlm', server_cert_validation='validate', rshport=None, rshproto=None): if rshport: self.rshport = rshport self.rshproto = rshproto else: self.rshport, self.shproto = get_remoteshell_port(host) if test_is_valid_host_or_ipaddr(host) != 0: print("ERROR: host is not resolvable or invalid ip addr.") return 1 if self.rshport == ssh_port: client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, username=username, password=userpass) self.client = client self.username = username self.userpass = userpass self.host = host elif self.rshport == winrm_port_https or self.rshport == winrm_port_http: client = winrm.Session(host, auth=(username, userpass), transport=transport, server_cert_validation=server_cert_validation) self.client = client self.username = username self.userpass = userpass self.transport = transport self.server_cert_validation = server_cert_validation else: # sys.exit('ERROR: Unsupported rsh port!') print('ERROR: Unsupported rsh port or host down!') # return 1 # return None # return 0 def close(self): if self.client is not None: self.client.close() self.client = None def execute(self, cmd, log=True, logfile='./ssh.log'): global ssh_port global winrm_port_http global winrm_port_https if self.rshport == ssh_port: use_password = False sudo = cmd.strip().split()[0] if sudo == 'sudo' and self.user != "root": # cmd = "sudo -S -p ' ' %s" % cmd cmd = f"sudo -S -p ' ' {cmd}" use_password = self.userpass is not None and len(self.userpass) > 0 stdin, stdout, stderr = self.client.exec_command(cmd, get_pty=True) if use_password: stdin.write(self.userpass + "\n") stdin.flush() rsp = { 'exit_status': stdout.channel.recv_exit_status(), 'cmd': cmd, 'out': bytes.decode(stdout.read()), 'err': bytes.decode(stderr.read()) } return rsp elif self.rshport == winrm_port_https or self.rshport == winrm_port_http: #logging.basicConfig(filename='/tmp/example.log', encoding='utf-8', level=logging.DEBUG) # Use "nohup" before cmds to keep shell scripts running after return. # rsp = self.client.run_ps(cmd) logging.basicConfig(handlers=[logging.FileHandler(filename=logfile, encoding='utf-8', mode='a+')], format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%F %A %T", level=logging.INFO) rsp = self.client.run_ps(cmd) exit_status = rsp.status_code out = rsp.std_out.decode() err = rsp.std_err.decode() if log: logging.info(cmd) rsp = { 'exit_status': exit_status, 'cmd': cmd, 'out': out, 'err': err } return rsp # TRASH def sshcmd_alt(host, cmd): USERNAME = config('LINUX_USERNAME') USERPASS = config('LINUX_USERPASS') TIMEOUT = 30 PORT = 22 ssh = Session() try: ssh.connect( host=host, user=USERNAME, password=USERPASS, timeout=TIMEOUT, port=PORT, ) except LibsshSessionException as ssh_exc: print(f'Failed to connect to {HOST}:{PORT} over SSH: {ssh_exc!s}') print(f'{ssh.is_connected}') ssh_channel = ssh.new_channel() cmd_resp = ssh_channel.write(b'ls') print(f'stdout:\n{cmd_resp.stdout}\n') print(f'stderr:\n{cmd_resp.stderr}\n') print(f'return code: {cmd_resp.returncode}\n') ssh_channel.close() chan_shell = ssh.invoke_shell() # chan_shell.sendall(b'ls') chan_shell.sendall(cmd) data = chan_shell.read_bulk_response(timeout=2, retry=10) chan_shell.close() print(data) ssh.close()