""" Module to generate and manage SSL credentials"""
__author__ = "gelpi"
__date__ = "$08-March-2019 17:32:38$"
import sys
import os
import stat
import pickle
import paramiko
from io import StringIO
from paramiko import SSHClient, AutoAddPolicy, AuthenticationException, RSAKey
[docs]class SSHCredentials:
"""
| biobb_remote SSHCredentials
| Class to generate and manage SSL key-pairs for remote execution.
Args:
host (str) (Optional): Target host name.
userid (str) (Optional): Target user id.
generate_key (bool) (Optional): (False) Generate a pub/private key pair.
look_for_keys (bool) (Optional): (True) Look for keys in user's .ssh directory if no key provided.
"""
def __init__(self, host='', userid='', generate_key=False, look_for_keys=True):
self.host = host
self.userid = userid
self.key = None
self.user_ssh = None
self.sftp = None
self.look_for_keys = look_for_keys
self.remote_auth_keys = []
if generate_key:
self.generate_key()
[docs] def load_from_file(self, credentials_path, passwd=None):
""" SSHCredentials.load_from_file
Recovers SSHCredentials object from disk file.
Args:
credentials_path (str): Path to packed credentials file.
passwd (str): (None) Use to decrypt private key.
"""
try:
file = open(credentials_path, 'rb')
data = pickle.load(file)
except IOError as err:
sys.exit(err)
self.host = data['host']
self.userid = data['userid']
self.look_for_keys = data['look_for_keys']
self.key = RSAKey.from_private_key(StringIO(data['data'].getvalue()), passwd)
[docs] def load_from_private_key_file(self, private_path, passwd=None):
""" SSHCredentials.load_from_private_key_file
Loads private key from an standard file.
Args:
private_path (str): Path to private key file.
passwd (str): (None) Password to decrypt private key.
"""
try:
self.key = RSAKey.from_private_key_file(private_path, passwd)
except IOError as err:
sys.exit(err)
[docs] def generate_key(self, nbits=2048):
""" SSHCredentials.generate_key
Generates RSA keys pair
Args:
nbits (int): \(2048\) Number of bits of the generated key.
"""
self.key = RSAKey.generate(nbits)
[docs] def get_public_key(self, suffix='@biobb'):
""" SSHCredentials.get_public_key
Returns a readable public key suitable to add to authorized keys.
Args:
suffix (str): ('@biobb') Suffix added to the key for identify it.
"""
if self.key:
return '{} {} {}{}\n'.format(
self.key.get_name(), self.key.get_base64(), self.userid, suffix
)
else:
return None
[docs] def get_private_key(self, passwd=None):
""" SSHCredentials.get_private_key
Returns a readable private key.
Args:
passwd (str): (None) Use passwd to encrypt key.
"""
if self.key:
private = StringIO()
self.key.write_private_key(private, passwd)
return private.getvalue()
else:
return None
[docs] def save(self, output_path, public_key_path=None, private_key_path=None, passwd=None):
""" SSHCredentials.save
Save packed credentials on external file for re-usage.
Args:
output_path (str): Path to file
public_key_path (str): (None) Path to a standard public key file.
private_key_path (str): (None) Path to a standard private key file.
passwd (str): (None) Password to be saved.
"""
with open(output_path, 'wb') as keys_file:
private = StringIO()
if self.key:
self.key.write_private_key(private)
pickle.dump(
{
'userid': self.userid,
'host': self.host,
'data': private,
'look_for_keys': self.look_for_keys
}, keys_file)
if public_key_path:
with open(public_key_path, 'w') as pubkey_file:
pubkey_file.write(self.get_public_key())
if private_key_path:
with open(private_key_path, 'w') as privkey_file:
privkey_file.write(self.get_private_key(passwd))
os.chmod(private_key_path, stat.S_IREAD + stat.S_IWRITE)
[docs] def check_host_auth(self):
""" SSHCredentials.check_host_auth
Checks for public_key in remote .ssh/authorized_keys file.
Requires users' SSH access to host.
"""
if not self.remote_auth_keys:
self._get_remote_auth_keys()
return self.get_public_key() in self.remote_auth_keys
[docs] def install_host_auth(self, file_bck='bck'):
""" SSHCredentials.install_host_auth
Installs public_key on remote .ssh/authorized_keys file.
Requires users' SSH access to host.
Args:
file_bck (str): ('bck') Extension to add to backed-up authorized_keys file.
"""
if not self.check_host_auth():
if file_bck:
self._put_remote_auth_keys(file_bck)
print("Previous authorized keys backed up at", ".ssh/authorized_keys." + file_bck)
self.remote_auth_keys = self.remote_auth_keys + [self.get_public_key()]
self._put_remote_auth_keys()
print('Biobb Public key installed on host')
else:
print('Biobb Public key already authorized')
[docs] def remove_host_auth(self, file_bck='biobb'):
""" SSHCredentials.remove_host_auth
Removes public_key from remote .ssh/authorized_keys file.
Requires users' SSH access to host.
Args:
file_bck (str): ('biobb') Extension to add to backed-up authorized_keys.
"""
if self.check_host_auth():
if file_bck:
self._put_remote_auth_keys(file_bck)
print("Previous authorized keys backed up at", ".ssh/authorized_keys." + file_bck)
self.remote_auth_keys = [pkey for pkey in self.remote_auth_keys if pkey != self.get_public_key()]
self._put_remote_auth_keys()
print("Biobb Public key removed from host")
else:
print("Biobb Public key not found in remote")
# =================================================================================================================
def _set_user_ssh_session(self, debug=False):
""" Private.
Opens a ssh session using user's keys
Args:
debug (bool): (False) Retrieve debug information from SSH
"""
self.user_ssh = SSHClient()
self.user_ssh.set_missing_host_key_policy(AutoAddPolicy())
if debug:
paramiko.common.logging.basicConfig(level=paramiko.common.DEBUG)
try:
self.user_ssh.connect(
self.host,
username=self.userid,
)
except AuthenticationException as err:
sys.exit(err)
self.sftp = self.user_ssh.open_sftp()
def _get_remote_auth_keys(self):
""" Private.
Obtains authorized keys on remote
"""
if not self.sftp:
self._set_user_ssh_session()
try:
with self.sftp.file('.ssh/authorized_keys', mode='r') as ak_file:
self.remote_auth_keys = ak_file.readlines()
except IOError:
self.remote_auth_keys = []
def _put_remote_auth_keys(self, file_ext=''):
""" Private
Adds public_key to remote authorized_keys
Args:
file_ext (str): ('No default') file extension for backup of the original file.
"""
if not self.remote_auth_keys:
return True
if not self.sftp:
self._set_user_ssh_session()
auth_file = '.ssh/authorized_keys'
if file_ext:
auth_file += '.' + file_ext
with self.sftp.file(auth_file, 'w') as bck_file:
bck_file.writelines(self.remote_auth_keys)
return False