Source code for biobb_remote.ssh_session

""" Module to manage SSH sessions """
__author__ = "gelpi"
__date__ = "$08-March-2019 17:32:38$"

import sys
import os
import stat
import pickle
import paramiko
from io import StringIO
from paramiko import SSHClient, AutoAddPolicy, AuthenticationException, SSHException, RSAKey


[docs]class SSHSession: """ | biobb_remote ssh_session.SSHSession | Class wrapping ssh operations Args: ssh_data (SSHCredentials) (Optional): (None) SSHCredentials object. credentials_path (str) (Optional): (None) Path to packed credentials file to use. private_path (str) (Optional): (None) Path to private key file. passwd (str) (Optional): (None) Password to decrypt credentials. debug (bool) (Optional): (False) Prints (very) verbose debug information on ssh transactions. """ def __init__(self, ssh_data=None, credentials_path=None, private_path=None, passwd=None, debug=False): if ssh_data is None: self.ssh_data = SSHCredentials(credentials_path is None) if credentials_path: self.ssh_data.load_from_file(credentials_path, passwd) elif private_path: self.ssh_data.load_from_private_key_file(private_path, passwd) else: self.ssh_data = ssh_data self.ssh = SSHClient() self.ssh.set_missing_host_key_policy(AutoAddPolicy()) self.sftp = None if debug: paramiko.common.logging.basicConfig(level=paramiko.common.DEBUG) try: self.ssh.connect( self.ssh_data.host, username=self.ssh_data.userid, pkey=self.ssh_data.key, look_for_keys=self.ssh_data.look_for_keys ) except AuthenticationException as err: sys.exit(err) except SSHException as err: sys.exit(err)
[docs] def run_command(self, command): """ SSHSession.run_command Runs a shell command on remote, produces stdout, stderr tuple Args: command (str | list(str)): Command or list of commands to execute on remote. """ if isinstance(command, list): command = ' '.join(command) if self.ssh: stdin, stdout, stderr = self.ssh.exec_command(command) return ''.join(stdout), ''.join(stderr)
[docs] def run_sftp(self, oper, input_file_path, output_file_path='', reuse_session=True): """ SSHSession.run_sftp Opens a SFTP session on remote and execute some file operation Args: oper (str - Operation to perform): * **get** - gets a single file from input_file_path (remote) to output_file_path (local). * **put** - puts a single file from input_file_path (local) to output_file_path (remote). * **create** - creates a file in output_file_path (remote) from input_file_path string. * **file** - opens a remote file in input_file_path for read). Returns a file handle. * **listdir** - returns a list of files in remote input_file_path. input_file_path (str): Input file path or input string output_file_path (str): ('') Output file path. Not required in some ops. reuse_session (bool): (True) Re-use active SFTP session """ #Re-using active sftp session if not self.sftp or not reuse_session: self.sftp = self.ssh.open_sftp() try: if oper == 'get': self.sftp.get(input_file_path, output_file_path) elif oper == 'put': self.sftp.put(input_file_path, output_file_path) elif oper == 'create': with self.sftp.file(output_file_path, "w") as remote_fileh: remote_fileh.write(input_file_path) # elif oper == 'open': # return sftp.open(input_file_path) elif oper == 'file': with self.sftp.file(input_file_path, "r") as remote_file: return remote_file.read().decode() elif oper == "listdir": return self.sftp.listdir(input_file_path) # elif oper == 'rmdir': # return sftp.rmdir(input_file_path) elif oper == 'lstat': return self.sftp.lstat(input_file_path) else: print('Unknown sftp command', oper) return True #TODO check appropriate errors except IOError as err: sys.exit(err) return False
[docs] def is_active(self): """ SSHSession.is_active Tests whether the defined session is active """ return self.ssh and self.ssh.get_transport().is_active()
[docs] def close(self): """ SSHSession.close Closes active SSH session """ if self.ssh: self.ssh.close() self.ssh = None