import datetime import pathlib import fabric import invoke import os import paramiko import re import socket import getpass from urllib.parse import urlparse, parse_qs from . import hpc_json try: from qtpy.QtWidgets import QApplication HAS_QT = True except ImportError: HAS_QT = False class Connection(): def __init__(self): self.user = "" self.pkey = "" self.host = "" self.con = None self.totp_handler = None self._interactive_prompt_seen = False self._interactive_cancelled = False self._interactive_password = None self._interactive_okta_prompt_seen = False @staticmethod def _normalize_auth_prompt(prompt_text): prompt = str(prompt_text or "") # Remove terminal control sequences that can appear in keyboard-interactive prompts. prompt = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", prompt) return prompt.strip() @staticmethod def _is_okta_push_prompt(prompt_text): prompt = Connection._normalize_auth_prompt(prompt_text) has_push_code = re.search(r"push\s+code\s+\d+", prompt, re.IGNORECASE) is not None has_enter_hint = re.search(r"press\s+enter|decline|okta", prompt, re.IGNORECASE) is not None return has_push_code and has_enter_hint @staticmethod def _extract_okta_push_code(prompt_text): prompt = Connection._normalize_auth_prompt(prompt_text) match = re.search(r"Push code\s+(\d+)\s+sent", prompt, re.IGNORECASE) if match is None: match = re.search(r"push\s+code\s+(\d+)", prompt, re.IGNORECASE) if match is None: return None return match.group(1) @staticmethod def _is_password_prompt(prompt_text): prompt = Connection._normalize_auth_prompt(prompt_text) return re.search(r"(^|\)|\s)password\s*:\s*$", prompt, re.IGNORECASE) is not None @staticmethod def _is_okta_activation_prompt(prompt_text): prompt = Connection._normalize_auth_prompt(prompt_text) has_activate_url = re.search(r"https?://\S*activate\S*", prompt, re.IGNORECASE) is not None has_enter_hint = re.search(r"press\s+enter\s+to\s+continue", prompt, re.IGNORECASE) is not None return has_activate_url and has_enter_hint @staticmethod def _extract_okta_activation_url(prompt_text): prompt = Connection._normalize_auth_prompt(prompt_text) match = re.search(r"(https?://\S+)", prompt, re.IGNORECASE) if match is None: return None return match.group(1).rstrip('.,;:') @staticmethod def _extract_okta_activation_code(prompt_text): prompt = Connection._normalize_auth_prompt(prompt_text) match = re.search(r"user_code\s*=\s*([A-Za-z0-9_-]+)", prompt, re.IGNORECASE) if match: return match.group(1) url = Connection._extract_okta_activation_url(prompt) if not url: return None try: parsed = urlparse(url) query = parse_qs(parsed.query) code_values = query.get("user_code", []) if code_values: return code_values[0] except Exception: return None return None def _keyboard_interactive_handler(self, title, instructions, prompt_list): """Handle keyboard-interactive authentication for 2FA prompts.""" if self.totp_handler is None: return [] context_lines = [] if title: context_lines.append(str(title).strip()) if instructions: context_lines.append(str(instructions).strip()) responses = [] for prompt_tuple in prompt_list: self._interactive_prompt_seen = True # prompt_tuple is (prompt_text, show_input) prompt_text = prompt_tuple[0] if isinstance(prompt_tuple, tuple) else str(prompt_tuple) if self._is_okta_push_prompt(prompt_text): self._interactive_okta_prompt_seen = True # Always ask the user for keyboard-interactive responses instead of auto-skipping prompts. challenge = str(prompt_text).strip() or "Authentication response:" full_prompt = "\n\n".join([line for line in context_lines if line]) if full_prompt: full_prompt += "\n\n" full_prompt += challenge response = self.totp_handler(full_prompt) if response is None: self._interactive_cancelled = True return [] if self._is_password_prompt(prompt_text) and response: self._interactive_password = response responses.append(response) return responses def open(self, user, host, pkey, totp_handler=None): self.user = user self.pkey = pkey self.host = host self.totp_handler = totp_handler self._interactive_password = None self._interactive_okta_prompt_seen = False if self.con: self.con.close() # Check if file exists if not os.path.exists(self.pkey): raise FileNotFoundError(f"Private key file not found: {self.pkey}") # Read first line to detect key format with open(self.pkey, 'r', encoding='utf-8', errors='ignore') as f: first_line = f.readline().strip() # Check for public key (common mistake!) if first_line.startswith(("ssh-rsa", "ssh-ed25519", "ecdsa-", "ssh-dss")): raise paramiko.ssh_exception.SSHException( "ERROR: This is a PUBLIC key file, not a PRIVATE key!\n\n" "You need to select the PRIVATE key file (without .pub extension).\n\n" "Public keys look like:\n" " ssh-rsa AAAAB3NzaC1yc2EAAA...\n\n" "Private keys look like:\n" " -----BEGIN OPENSSH PRIVATE KEY-----\n" " or\n" " -----BEGIN RSA PRIVATE KEY-----\n\n" "The public key goes on the SERVER in ~/.ssh/authorized_keys\n" "The private key stays on YOUR COMPUTER and is used to connect.\n\n" "Look for a file with the SAME name but WITHOUT the .pub extension." ) # Check for PuTTY format if first_line.startswith("PuTTY-User-Key-File"): raise paramiko.ssh_exception.SSHException( "PuTTY .ppk key format detected. Please convert to OpenSSH format using:\n\n" "Option 1 - PuTTYgen (GUI):\n" "1. Open PuTTYgen\n" "2. Load your .ppk file\n" "3. Go to Conversions → Export OpenSSH key\n" "4. Save as 'id_rsa' (or another name)\n" "5. Use that file in the settings\n\n" "Option 2 - Command line:\n" "puttygen yourkey.ppk -O private-openssh -o id_rsa" ) # Prepare connection kwargs connect_kwargs = { "key_filename": self.pkey, "look_for_keys": False, # Don't search default SSH locations "allow_agent": False, # Don't use SSH agent } # Try to pre-load the key to check if it needs a passphrase key_obj = None passphrase = None last_error = None # Detect key type and check for passphrase requirement key_types = [ (paramiko.RSAKey, "RSA"), (paramiko.Ed25519Key, "Ed25519"), (paramiko.ECDSAKey, "ECDSA"), (paramiko.DSSKey, "DSS") ] for key_class, key_name in key_types: try: key_obj = key_class.from_private_key_file(self.pkey) # Successfully loaded without passphrase connect_kwargs["pkey"] = key_obj del connect_kwargs["key_filename"] break except paramiko.ssh_exception.PasswordRequiredException: # Key needs passphrase - prompt user if totp_handler: passphrase = totp_handler(f"Enter passphrase for {key_name} private key:") if passphrase: try: key_obj = key_class.from_private_key_file(self.pkey, password=passphrase) connect_kwargs["pkey"] = key_obj del connect_kwargs["key_filename"] break except Exception as e: last_error = e # Wrong passphrase or other error, try next key type continue else: raise paramiko.ssh_exception.AuthenticationException("Passphrase required but not provided") else: raise paramiko.ssh_exception.SSHException(f"{key_name} key requires a passphrase but no handler provided") except Exception as e: last_error = e # Not this key type, try next continue # If no key could be loaded, raise the last error with helpful context if key_obj is None and "key_filename" not in connect_kwargs: error_msg = f"Could not load private key from {self.pkey}.\n" error_msg += f"First line of file: {first_line[:50]}...\n\n" error_msg += "Supported formats:\n" error_msg += "• OpenSSH format (starts with '-----BEGIN OPENSSH PRIVATE KEY-----')\n" error_msg += "• Traditional PEM format (starts with '-----BEGIN RSA PRIVATE KEY-----')\n\n" if last_error: error_msg += f"Last error: {str(last_error)}" raise paramiko.ssh_exception.SSHException(error_msg) # Create Fabric connection def build_connection(connection_kwargs): self.con = fabric.Connection( user=self.user, host=self.host, connect_kwargs=connection_kwargs ) # Set host key policy to auto-accept (prevents unknown host errors) self.con.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) def open_with_interactive_patch(): self._interactive_prompt_seen = False self._interactive_cancelled = False if totp_handler is None: return self.con.open() # Store original auth_interactive_dumb original_auth_interactive_dumb = paramiko.Transport.auth_interactive_dumb def patched_auth_interactive_dumb(transport_self, username, handler=None, submethods=''): # Call our handler instead of the default stdin-based one return transport_self.auth_interactive(username, self._keyboard_interactive_handler, submethods) try: # Patch Paramiko's default keyboard-interactive handler paramiko.Transport.auth_interactive_dumb = patched_auth_interactive_dumb try: return self.con.open() except (paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.SSHException): # Some Paramiko/Fabric paths can still raise an auth-related # exception even though the underlying transport has already # completed authentication successfully. Treat that as success. transport = ( self.con.client.get_transport() if self.con.client else None ) if transport and transport.is_authenticated(): return # Key-based auth failed or Paramiko exhausted its own auth # methods (e.g. "No authentication methods available"). # The TCP+SSH handshake is already complete and the transport # is still alive; invoke keyboard-interactive directly so the # server can issue its own password / push-code challenges. if (transport and transport.is_active() and not transport.is_authenticated()): # Attempt keyboard-interactive auth on the existing transport. # A single round is correct: the GUI's totp_callback already # waits for Okta push approval before returning, so retrying # on the same transport would trigger a new auth round and # cause the server to issue unexpected challenges (e.g. an # Okta device-activation URL). If this attempt fails, the # caller's outer retry (fresh connection) handles the retry. last_auth_error = None self._interactive_okta_prompt_seen = False try: transport.auth_interactive( self.user, self._keyboard_interactive_handler ) except paramiko.ssh_exception.AuthenticationException as auth_err: last_auth_error = auth_err if transport.is_authenticated(): return if last_auth_error is not None: raise last_auth_error raise finally: # Restore original paramiko.Transport.auth_interactive_dumb = original_auth_interactive_dumb build_connection(connect_kwargs) try: ret = open_with_interactive_patch() except (paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.SSHException): # If authentication ultimately succeeded on the current transport, # do not force a retry from scratch. transport = ( self.con.client.get_transport() if self.con and self.con.client else None ) if transport and transport.is_authenticated(): return # Retry once from a fresh transport. # If password was entered in a keyboard-interactive prompt, reuse it # explicitly so Paramiko can perform password auth and then continue # with keyboard-interactive for Okta push. retry_connect_kwargs = { "look_for_keys": False, "allow_agent": False, } if self._interactive_password: retry_connect_kwargs["password"] = self._interactive_password build_connection(retry_connect_kwargs) ret = open_with_interactive_patch() except Exception: # Fabric can raise its own auth exception types (e.g. with the # message "Authentication failed.") even when the underlying # Paramiko transport is already authenticated. transport = ( self.con.client.get_transport() if self.con and self.con.client else None ) if transport and transport.is_authenticated(): return raise return ret def close(self): self.con.close() def run(self, cmd): # Keep remote command output inside returned result; GUI decides what to display. return self.con.run(cmd, hide=True) def put(self, src, dst): return self.con.put(src, dst) class Settings(): def __init__(self, settings_json): self.settings = settings_json def username(self): return self.settings['cluster_settings']['username'] def hostname(self): return self.settings['cluster_settings']['hostname'] def private_key_file(self): return self.settings['cluster_settings']['private_key_file'] def utilities_dir(self): return self.settings['cluster_settings']['utilities_dir'] def working_dir(self): return self.settings['cluster_settings']['working_dir'] def filename(self): return self.settings['filename'] def run_option(self): if self.settings['solver'] == 'Single Solver': return 'Active_solver' if self.settings['solver'] == 'Parameter Sweep': return 'Parameter_sweep' if self.settings['solver'] == 'Optimizer': return 'Optimizer' if self.settings['solver'] == 'Schematic Tasks': return 'DS_run' return 'Active_solver' def type(self): if self.settings['type'] == 'Single node': return 'Single node' if self.settings['type'] == 'MPI': return 'MPI_computing' if self.settings['type'] == 'DC': return 'Distributed_computing' return 'Single node' def gpus(self): return self.settings['gpu'] def cores(self): return self.settings['core'] def nodes(self): return self.settings['node'] def queue(self): return self.settings['queue'] def walltime(self): return self.settings.get('walltime', 'Auto') def memory(self): return self.settings.get('memory', 'Auto') def _memory_to_mb(memory_text): if memory_text is None: return None text = str(memory_text).strip() if not text or text.lower() in ['auto', 'none']: return None match = re.fullmatch(r'([1-9][0-9]*)([mMgGtT]|[mM][bB]|[gG][bB]|[tT][bB])?', text) if not match: return None value = int(match.group(1)) unit = match.group(2) unit = 'M' if unit is None else unit.upper() if unit in ['M', 'MB']: return str(value) if unit in ['G', 'GB']: return str(value * 1024) if unit in ['T', 'TB']: return str(value * 1024 * 1024) return None def open_connection(con, settings, mbox, totp_callback=None): user = settings.username() host = settings.hostname() pkey = settings.private_key_file() wdir = settings.working_dir() udir = settings.utilities_dir() if (not user) or (not host) or (not pkey) or (not wdir) or (not udir): msg = "SSH setup has not been finished" mbox.appendPlainText(msg) return -1 # Define 2FA handler that prompts through GUI def totp_handler(prompt_text): if totp_callback: # Process events to keep GUI responsive if HAS_QT: QApplication.processEvents() return totp_callback(prompt_text) return None def _has_authenticated_transport(connection_wrapper): try: fabric_con = connection_wrapper.con client = fabric_con.client if fabric_con else None transport = client.get_transport() if client else None return bool(transport and transport.is_authenticated()) except Exception: return False def _has_usable_connection(connection_wrapper): try: ret = connection_wrapper.run('true') return ret.return_code == 0 except Exception: return False def _is_effectively_authenticated(connection_wrapper): return ( _has_authenticated_transport(connection_wrapper) or _has_usable_connection(connection_wrapper) ) def _should_ignore_probe_exception(connection_wrapper, err): if isinstance(err, paramiko.ssh_exception.AuthenticationException): return True return _has_authenticated_transport(connection_wrapper) # Keep GUI responsive if HAS_QT: QApplication.processEvents() mbox.appendPlainText("Attempting SSH connection...") if HAS_QT: QApplication.processEvents() # ssh: 1. open connection try: con.open(user, host, pkey, totp_handler=totp_handler if totp_callback else None) # Keep GUI responsive after connection if HAS_QT: QApplication.processEvents() except paramiko.ssh_exception.AuthenticationException as err: if not _is_effectively_authenticated(con): msg = '' msg += 'AuthenticationException' msg += '
' msg += 'Possible causes:
' msg += '• Public key not in server\'s ~/.ssh/authorized_keys
' msg += '• Username is incorrect
' msg += '• Wrong passphrase (if prompted)
' msg += '• Key permissions on server (authorized_keys should be 600)
' msg += '
Error details: ' + str(err) mbox.appendHtml(msg) return -1 except paramiko.ssh_exception.SSHException as err: if not _is_effectively_authenticated(con): err_str = str(err) msg = '' msg += 'SSHException' msg += '
' msg += 'Possible causes:
' msg += '• Private key file format is invalid
' msg += '• Key is encrypted and passphrase not provided
' msg += '• Wrong key type selected
' msg += '• File permissions issue (key should be readable)
' msg += '
Error details:
' # Handle multi-line error messages msg += err_str.replace('\n', '
') mbox.appendHtml(msg) return -1 except FileNotFoundError as err: msg = '' msg += 'FileNotFoundError' msg += '
' msg += 'No such file or directory: ' + '\'' + err.filename + '\'' mbox.appendHtml(msg) return -1 except socket.gaierror as err: msg = '' msg += 'GetAddressInfoException' msg += '
' msg += 'Check the hostname, network connection, etc.' mbox.appendHtml(msg) return -1 except Exception as err: if not _is_effectively_authenticated(con): mbox.appendPlainText(str(err)) return -1 # ssh: 2. run hostname connection test try: ret = con.run('hostname') if ret.stderr: mbox.appendPlainText(ret.stderr) if not ret.return_code == 0: return ret.return_code except Exception as err: if not _should_ignore_probe_exception(con, err): mbox.appendPlainText(str(err)) return -1 # ssh: 3. run cluster utilities directory check try: ret = con.run('ls -d ' + '"' + udir + '"') if ret.stderr: mbox.appendPlainText(ret.stderr) return ret.return_code except invoke.exceptions.UnexpectedExit as err: msg = '' msg += 'UnexpectedExit' msg += '
' msg += err.result.stderr mbox.appendHtml(msg) return -1 except Exception as err: if not _should_ignore_probe_exception(con, err): mbox.appendPlainText(str(err)) return -1 # ssh: 4. run working directory check try: ret = con.run('ls -d ' + '"' + wdir + '"') if ret.stderr: mbox.appendPlainText(ret.stderr) return ret.return_code except invoke.exceptions.UnexpectedExit as err: msg = '' msg += 'UnexpectedExit' msg += '
' msg += err.result.stderr mbox.appendHtml(msg) return -1 except Exception as err: if not _should_ignore_probe_exception(con, err): mbox.appendPlainText(str(err)) return -1 return 0 def submit(con, settings, mbox): wdir = settings.working_dir() udir = settings.utilities_dir() proj = settings.filename() # create calculation directory cst_project_name = pathlib.Path(proj).stem uid = datetime.datetime.now().strftime('%Y%m%d%H%M%S') dst_file_path = pathlib.PurePosixPath(wdir).joinpath(cst_project_name + '_' + uid, cst_project_name + ".cst") dst_file = str(dst_file_path) dst_dir_path = dst_file_path.parent dst_dir = str(dst_dir_path) # ssh: run make destination directory try: ret = con.run('mkdir ' + '"' + dst_dir + '"') if ret.stderr: mbox.appendPlainText(ret.stderr) return ret.return_code except Exception as err: msg = '\nFailed to create remote destination directory:\n' msg += f' path: {dst_dir}\n' msg += f' error: {err}' mbox.appendPlainText(msg) return -1 # ssh: put CST project file (and more) to remote compute node try: root = os.path.splitext(proj)[0] extensions = ['.cst', '.inp', '_tosca.par', '_topology_init.onf'] for ext in extensions: src_file = root + ext if os.path.isfile(src_file): con.put(src_file, dst_dir) except Exception as err: mbox.appendPlainText(f'Failed while uploading project files: {err}') return -1 # run CST project cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit') cst_job_submit = '"' + str(cst_job_submit) + '"' cst_job_submit += ' -b' cst_job_submit += ' -s ' + settings.run_option() if settings.gpus() > 0: cst_job_submit += ' -g ' + str(settings.gpus()) if settings.cores() > 0: cst_job_submit += ' --num-cores ' + str(settings.cores()) memory = _memory_to_mb(settings.memory()) if memory is not None: cst_job_submit += ' --request-memory=' + '"' + memory + '"' walltime = str(settings.walltime()).strip() if walltime and walltime.lower() not in ['auto', 'none']: walltime_arg = ' -w ' + '"' + walltime + '"' cst_job_submit += walltime_arg if (settings.type() != 'Single node'): cst_job_submit += ' -a ' + settings.type() cst_job_submit += ' -n ' + str(settings.nodes()) cst_job_submit += ' -q ' + settings.queue() cst_job_submit += ' -m ' + '"' + dst_file + '"' # ssh: run cst_job_submit try: ret = con.run(cst_job_submit) if ret.stdout: mbox.appendPlainText('\n' + ret.stdout) if ret.stderr: mbox.appendPlainText(ret.stderr) return ret.return_code except invoke.exceptions.UnexpectedExit as err: stderr_text = err.result.stderr if err.result and err.result.stderr else '' stdout_text = err.result.stdout if err.result and err.result.stdout else '' exit_code = err.result.return_code if err.result else -1 combined_text_lower = (stdout_text + '\n' + stderr_text).lower() stderr_lower = stderr_text.lower() unknown_option = ('unknown option' in stderr_lower) or ('unrecognized option' in stderr_lower) mbox.appendPlainText('Remote command failed while submitting the job:') mbox.appendPlainText(f' command: {cst_job_submit}') mbox.appendPlainText(f' exit code: {exit_code}') if stdout_text and stdout_text.strip(): mbox.appendPlainText(' stdout:') mbox.appendPlainText(stdout_text) if stderr_text and stderr_text.strip(): mbox.appendPlainText(' stderr:') mbox.appendPlainText(stderr_text) if not (stdout_text and stdout_text.strip()) and not (stderr_text and stderr_text.strip()): mbox.appendPlainText(f' exception: {err}') return exit_code except Exception as err: mbox.appendPlainText(f'Submit execution error: {err}') return -1 return 0 def get_queues(con, settings, mbox): udir = settings.utilities_dir() cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit') cst_job_submit = '"' + str(cst_job_submit) + '"' cst_job_submit += ' --get-available-queues' try: ret = con.run(cst_job_submit) if ret.stderr: mbox.appendPlainText(ret.stderr) return ret.return_code except paramiko.ssh_exception.AuthenticationException: # Post-auth query calls can still raise auth exceptions in this # environment even after the main SSH session is marked successful. # Keep the UI clean and return an empty queue list. return "" except Exception as err: mbox.appendPlainText(str(err)) return "" return ret.stdout def get_scheduler(con, settings, mbox): udir = settings.utilities_dir() cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit') cst_job_submit = '"' + str(cst_job_submit) + '"' cst_job_submit += ' --get-queuesys-name' try: ret = con.run(cst_job_submit) if ret.stderr: mbox.appendPlainText(ret.stderr) return ret.return_code except paramiko.ssh_exception.AuthenticationException: # Same rationale as get_queues(): avoid noisy false negatives. return "" except Exception as err: mbox.appendPlainText(str(err)) return "" return ret.stdout def get_available_gpu_num(con, settings, queue, mbox): if not queue: return None udir = settings.utilities_dir() cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit') cst_job_submit = '"' + str(cst_job_submit) + '"' cst_job_submit += ' --get-available-gpu-num ' cst_job_submit += ' -q ' + '"' + queue + '"' try: ret = con.run(cst_job_submit) if ret.stderr: mbox.appendPlainText(ret.stderr) return None except paramiko.ssh_exception.AuthenticationException: return None except Exception as err: mbox.appendPlainText(str(err)) return None output = ret.stdout.strip() if not output: return None try: return int(output) except ValueError: mbox.appendPlainText('Unexpected GPU query output: ' + output) return None def _extract_walltime_value(output_text): if not output_text: return None text = str(output_text).strip() # Match HH:MM (e.g. 167:00) match = re.search(r'(?