import datetime
import pathlib
import fabric
import invoke
import os
import paramiko
import re
import socket
import getpass
from urllib.parse import urlparse, parse_qs
from . import hpc_json
try:
from qtpy.QtWidgets import QApplication
HAS_QT = True
except ImportError:
HAS_QT = False
class Connection():
def __init__(self):
self.user = ""
self.pkey = ""
self.host = ""
self.con = None
self.totp_handler = None
self._interactive_prompt_seen = False
self._interactive_cancelled = False
self._interactive_password = None
self._interactive_okta_prompt_seen = False
@staticmethod
def _normalize_auth_prompt(prompt_text):
prompt = str(prompt_text or "")
# Remove terminal control sequences that can appear in keyboard-interactive prompts.
prompt = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", prompt)
return prompt.strip()
@staticmethod
def _is_okta_push_prompt(prompt_text):
prompt = Connection._normalize_auth_prompt(prompt_text)
has_push_code = re.search(r"push\s+code\s+\d+", prompt, re.IGNORECASE) is not None
has_enter_hint = re.search(r"press\s+enter|decline|okta", prompt, re.IGNORECASE) is not None
return has_push_code and has_enter_hint
@staticmethod
def _extract_okta_push_code(prompt_text):
prompt = Connection._normalize_auth_prompt(prompt_text)
match = re.search(r"Push code\s+(\d+)\s+sent", prompt, re.IGNORECASE)
if match is None:
match = re.search(r"push\s+code\s+(\d+)", prompt, re.IGNORECASE)
if match is None:
return None
return match.group(1)
@staticmethod
def _is_password_prompt(prompt_text):
prompt = Connection._normalize_auth_prompt(prompt_text)
return re.search(r"(^|\)|\s)password\s*:\s*$", prompt, re.IGNORECASE) is not None
@staticmethod
def _is_okta_activation_prompt(prompt_text):
prompt = Connection._normalize_auth_prompt(prompt_text)
has_activate_url = re.search(r"https?://\S*activate\S*", prompt, re.IGNORECASE) is not None
has_enter_hint = re.search(r"press\s+enter\s+to\s+continue", prompt, re.IGNORECASE) is not None
return has_activate_url and has_enter_hint
@staticmethod
def _extract_okta_activation_url(prompt_text):
prompt = Connection._normalize_auth_prompt(prompt_text)
match = re.search(r"(https?://\S+)", prompt, re.IGNORECASE)
if match is None:
return None
return match.group(1).rstrip('.,;:')
@staticmethod
def _extract_okta_activation_code(prompt_text):
prompt = Connection._normalize_auth_prompt(prompt_text)
match = re.search(r"user_code\s*=\s*([A-Za-z0-9_-]+)", prompt, re.IGNORECASE)
if match:
return match.group(1)
url = Connection._extract_okta_activation_url(prompt)
if not url:
return None
try:
parsed = urlparse(url)
query = parse_qs(parsed.query)
code_values = query.get("user_code", [])
if code_values:
return code_values[0]
except Exception:
return None
return None
def _keyboard_interactive_handler(self, title, instructions, prompt_list):
"""Handle keyboard-interactive authentication for 2FA prompts."""
if self.totp_handler is None:
return []
context_lines = []
if title:
context_lines.append(str(title).strip())
if instructions:
context_lines.append(str(instructions).strip())
responses = []
for prompt_tuple in prompt_list:
self._interactive_prompt_seen = True
# prompt_tuple is (prompt_text, show_input)
prompt_text = prompt_tuple[0] if isinstance(prompt_tuple, tuple) else str(prompt_tuple)
if self._is_okta_push_prompt(prompt_text):
self._interactive_okta_prompt_seen = True
# Always ask the user for keyboard-interactive responses instead of auto-skipping prompts.
challenge = str(prompt_text).strip() or "Authentication response:"
full_prompt = "\n\n".join([line for line in context_lines if line])
if full_prompt:
full_prompt += "\n\n"
full_prompt += challenge
response = self.totp_handler(full_prompt)
if response is None:
self._interactive_cancelled = True
return []
if self._is_password_prompt(prompt_text) and response:
self._interactive_password = response
responses.append(response)
return responses
def open(self, user, host, pkey, totp_handler=None):
self.user = user
self.pkey = pkey
self.host = host
self.totp_handler = totp_handler
self._interactive_password = None
self._interactive_okta_prompt_seen = False
if self.con:
self.con.close()
# Check if file exists
if not os.path.exists(self.pkey):
raise FileNotFoundError(f"Private key file not found: {self.pkey}")
# Read first line to detect key format
with open(self.pkey, 'r', encoding='utf-8', errors='ignore') as f:
first_line = f.readline().strip()
# Check for public key (common mistake!)
if first_line.startswith(("ssh-rsa", "ssh-ed25519", "ecdsa-", "ssh-dss")):
raise paramiko.ssh_exception.SSHException(
"ERROR: This is a PUBLIC key file, not a PRIVATE key!\n\n"
"You need to select the PRIVATE key file (without .pub extension).\n\n"
"Public keys look like:\n"
" ssh-rsa AAAAB3NzaC1yc2EAAA...\n\n"
"Private keys look like:\n"
" -----BEGIN OPENSSH PRIVATE KEY-----\n"
" or\n"
" -----BEGIN RSA PRIVATE KEY-----\n\n"
"The public key goes on the SERVER in ~/.ssh/authorized_keys\n"
"The private key stays on YOUR COMPUTER and is used to connect.\n\n"
"Look for a file with the SAME name but WITHOUT the .pub extension."
)
# Check for PuTTY format
if first_line.startswith("PuTTY-User-Key-File"):
raise paramiko.ssh_exception.SSHException(
"PuTTY .ppk key format detected. Please convert to OpenSSH format using:\n\n"
"Option 1 - PuTTYgen (GUI):\n"
"1. Open PuTTYgen\n"
"2. Load your .ppk file\n"
"3. Go to Conversions → Export OpenSSH key\n"
"4. Save as 'id_rsa' (or another name)\n"
"5. Use that file in the settings\n\n"
"Option 2 - Command line:\n"
"puttygen yourkey.ppk -O private-openssh -o id_rsa"
)
# Prepare connection kwargs
connect_kwargs = {
"key_filename": self.pkey,
"look_for_keys": False, # Don't search default SSH locations
"allow_agent": False, # Don't use SSH agent
}
# Try to pre-load the key to check if it needs a passphrase
key_obj = None
passphrase = None
last_error = None
# Detect key type and check for passphrase requirement
key_types = [
(paramiko.RSAKey, "RSA"),
(paramiko.Ed25519Key, "Ed25519"),
(paramiko.ECDSAKey, "ECDSA"),
(paramiko.DSSKey, "DSS")
]
for key_class, key_name in key_types:
try:
key_obj = key_class.from_private_key_file(self.pkey)
# Successfully loaded without passphrase
connect_kwargs["pkey"] = key_obj
del connect_kwargs["key_filename"]
break
except paramiko.ssh_exception.PasswordRequiredException:
# Key needs passphrase - prompt user
if totp_handler:
passphrase = totp_handler(f"Enter passphrase for {key_name} private key:")
if passphrase:
try:
key_obj = key_class.from_private_key_file(self.pkey, password=passphrase)
connect_kwargs["pkey"] = key_obj
del connect_kwargs["key_filename"]
break
except Exception as e:
last_error = e
# Wrong passphrase or other error, try next key type
continue
else:
raise paramiko.ssh_exception.AuthenticationException("Passphrase required but not provided")
else:
raise paramiko.ssh_exception.SSHException(f"{key_name} key requires a passphrase but no handler provided")
except Exception as e:
last_error = e
# Not this key type, try next
continue
# If no key could be loaded, raise the last error with helpful context
if key_obj is None and "key_filename" not in connect_kwargs:
error_msg = f"Could not load private key from {self.pkey}.\n"
error_msg += f"First line of file: {first_line[:50]}...\n\n"
error_msg += "Supported formats:\n"
error_msg += "• OpenSSH format (starts with '-----BEGIN OPENSSH PRIVATE KEY-----')\n"
error_msg += "• Traditional PEM format (starts with '-----BEGIN RSA PRIVATE KEY-----')\n\n"
if last_error:
error_msg += f"Last error: {str(last_error)}"
raise paramiko.ssh_exception.SSHException(error_msg)
# Create Fabric connection
def build_connection(connection_kwargs):
self.con = fabric.Connection(
user=self.user,
host=self.host,
connect_kwargs=connection_kwargs
)
# Set host key policy to auto-accept (prevents unknown host errors)
self.con.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def open_with_interactive_patch():
self._interactive_prompt_seen = False
self._interactive_cancelled = False
if totp_handler is None:
return self.con.open()
# Store original auth_interactive_dumb
original_auth_interactive_dumb = paramiko.Transport.auth_interactive_dumb
def patched_auth_interactive_dumb(transport_self, username, handler=None, submethods=''):
# Call our handler instead of the default stdin-based one
return transport_self.auth_interactive(username, self._keyboard_interactive_handler, submethods)
try:
# Patch Paramiko's default keyboard-interactive handler
paramiko.Transport.auth_interactive_dumb = patched_auth_interactive_dumb
try:
return self.con.open()
except (paramiko.ssh_exception.AuthenticationException,
paramiko.ssh_exception.SSHException):
# Some Paramiko/Fabric paths can still raise an auth-related
# exception even though the underlying transport has already
# completed authentication successfully. Treat that as success.
transport = (
self.con.client.get_transport()
if self.con.client else None
)
if transport and transport.is_authenticated():
return
# Key-based auth failed or Paramiko exhausted its own auth
# methods (e.g. "No authentication methods available").
# The TCP+SSH handshake is already complete and the transport
# is still alive; invoke keyboard-interactive directly so the
# server can issue its own password / push-code challenges.
if (transport and transport.is_active()
and not transport.is_authenticated()):
# Attempt keyboard-interactive auth on the existing transport.
# A single round is correct: the GUI's totp_callback already
# waits for Okta push approval before returning, so retrying
# on the same transport would trigger a new auth round and
# cause the server to issue unexpected challenges (e.g. an
# Okta device-activation URL). If this attempt fails, the
# caller's outer retry (fresh connection) handles the retry.
last_auth_error = None
self._interactive_okta_prompt_seen = False
try:
transport.auth_interactive(
self.user, self._keyboard_interactive_handler
)
except paramiko.ssh_exception.AuthenticationException as auth_err:
last_auth_error = auth_err
if transport.is_authenticated():
return
if last_auth_error is not None:
raise last_auth_error
raise
finally:
# Restore original
paramiko.Transport.auth_interactive_dumb = original_auth_interactive_dumb
build_connection(connect_kwargs)
try:
ret = open_with_interactive_patch()
except (paramiko.ssh_exception.AuthenticationException,
paramiko.ssh_exception.SSHException):
# If authentication ultimately succeeded on the current transport,
# do not force a retry from scratch.
transport = (
self.con.client.get_transport()
if self.con and self.con.client else None
)
if transport and transport.is_authenticated():
return
# Retry once from a fresh transport.
# If password was entered in a keyboard-interactive prompt, reuse it
# explicitly so Paramiko can perform password auth and then continue
# with keyboard-interactive for Okta push.
retry_connect_kwargs = {
"look_for_keys": False,
"allow_agent": False,
}
if self._interactive_password:
retry_connect_kwargs["password"] = self._interactive_password
build_connection(retry_connect_kwargs)
ret = open_with_interactive_patch()
except Exception:
# Fabric can raise its own auth exception types (e.g. with the
# message "Authentication failed.") even when the underlying
# Paramiko transport is already authenticated.
transport = (
self.con.client.get_transport()
if self.con and self.con.client else None
)
if transport and transport.is_authenticated():
return
raise
return ret
def close(self):
self.con.close()
def run(self, cmd):
# Keep remote command output inside returned result; GUI decides what to display.
return self.con.run(cmd, hide=True)
def put(self, src, dst):
return self.con.put(src, dst)
class Settings():
def __init__(self, settings_json):
self.settings = settings_json
def username(self):
return self.settings['cluster_settings']['username']
def hostname(self):
return self.settings['cluster_settings']['hostname']
def private_key_file(self):
return self.settings['cluster_settings']['private_key_file']
def utilities_dir(self):
return self.settings['cluster_settings']['utilities_dir']
def working_dir(self):
return self.settings['cluster_settings']['working_dir']
def filename(self):
return self.settings['filename']
def run_option(self):
if self.settings['solver'] == 'Single Solver':
return 'Active_solver'
if self.settings['solver'] == 'Parameter Sweep':
return 'Parameter_sweep'
if self.settings['solver'] == 'Optimizer':
return 'Optimizer'
if self.settings['solver'] == 'Schematic Tasks':
return 'DS_run'
return 'Active_solver'
def type(self):
if self.settings['type'] == 'Single node':
return 'Single node'
if self.settings['type'] == 'MPI':
return 'MPI_computing'
if self.settings['type'] == 'DC':
return 'Distributed_computing'
return 'Single node'
def gpus(self):
return self.settings['gpu']
def cores(self):
return self.settings['core']
def nodes(self):
return self.settings['node']
def queue(self):
return self.settings['queue']
def walltime(self):
return self.settings.get('walltime', 'Auto')
def memory(self):
return self.settings.get('memory', 'Auto')
def _memory_to_mb(memory_text):
if memory_text is None:
return None
text = str(memory_text).strip()
if not text or text.lower() in ['auto', 'none']:
return None
match = re.fullmatch(r'([1-9][0-9]*)([mMgGtT]|[mM][bB]|[gG][bB]|[tT][bB])?', text)
if not match:
return None
value = int(match.group(1))
unit = match.group(2)
unit = 'M' if unit is None else unit.upper()
if unit in ['M', 'MB']:
return str(value)
if unit in ['G', 'GB']:
return str(value * 1024)
if unit in ['T', 'TB']:
return str(value * 1024 * 1024)
return None
def open_connection(con, settings, mbox, totp_callback=None):
user = settings.username()
host = settings.hostname()
pkey = settings.private_key_file()
wdir = settings.working_dir()
udir = settings.utilities_dir()
if (not user) or (not host) or (not pkey) or (not wdir) or (not udir):
msg = "SSH setup has not been finished"
mbox.appendPlainText(msg)
return -1
# Define 2FA handler that prompts through GUI
def totp_handler(prompt_text):
if totp_callback:
# Process events to keep GUI responsive
if HAS_QT:
QApplication.processEvents()
return totp_callback(prompt_text)
return None
def _has_authenticated_transport(connection_wrapper):
try:
fabric_con = connection_wrapper.con
client = fabric_con.client if fabric_con else None
transport = client.get_transport() if client else None
return bool(transport and transport.is_authenticated())
except Exception:
return False
def _has_usable_connection(connection_wrapper):
try:
ret = connection_wrapper.run('true')
return ret.return_code == 0
except Exception:
return False
def _is_effectively_authenticated(connection_wrapper):
return (
_has_authenticated_transport(connection_wrapper)
or _has_usable_connection(connection_wrapper)
)
def _should_ignore_probe_exception(connection_wrapper, err):
if isinstance(err, paramiko.ssh_exception.AuthenticationException):
return True
return _has_authenticated_transport(connection_wrapper)
# Keep GUI responsive
if HAS_QT:
QApplication.processEvents()
mbox.appendPlainText("Attempting SSH connection...")
if HAS_QT:
QApplication.processEvents()
# ssh: 1. open connection
try:
con.open(user, host, pkey, totp_handler=totp_handler if totp_callback else None)
# Keep GUI responsive after connection
if HAS_QT:
QApplication.processEvents()
except paramiko.ssh_exception.AuthenticationException as err:
if not _is_effectively_authenticated(con):
msg = ''
msg += 'AuthenticationException'
msg += '
'
msg += 'Possible causes:
'
msg += '• Public key not in server\'s ~/.ssh/authorized_keys
'
msg += '• Username is incorrect
'
msg += '• Wrong passphrase (if prompted)
'
msg += '• Key permissions on server (authorized_keys should be 600)
'
msg += '
Error details: ' + str(err)
mbox.appendHtml(msg)
return -1
except paramiko.ssh_exception.SSHException as err:
if not _is_effectively_authenticated(con):
err_str = str(err)
msg = ''
msg += 'SSHException'
msg += '
'
msg += 'Possible causes:
'
msg += '• Private key file format is invalid
'
msg += '• Key is encrypted and passphrase not provided
'
msg += '• Wrong key type selected
'
msg += '• File permissions issue (key should be readable)
'
msg += '
Error details:
'
# Handle multi-line error messages
msg += err_str.replace('\n', '
')
mbox.appendHtml(msg)
return -1
except FileNotFoundError as err:
msg = ''
msg += 'FileNotFoundError'
msg += '
'
msg += 'No such file or directory: ' + '\'' + err.filename + '\''
mbox.appendHtml(msg)
return -1
except socket.gaierror as err:
msg = ''
msg += 'GetAddressInfoException'
msg += '
'
msg += 'Check the hostname, network connection, etc.'
mbox.appendHtml(msg)
return -1
except Exception as err:
if not _is_effectively_authenticated(con):
mbox.appendPlainText(str(err))
return -1
# ssh: 2. run hostname connection test
try:
ret = con.run('hostname')
if ret.stderr:
mbox.appendPlainText(ret.stderr)
if not ret.return_code == 0:
return ret.return_code
except Exception as err:
if not _should_ignore_probe_exception(con, err):
mbox.appendPlainText(str(err))
return -1
# ssh: 3. run cluster utilities directory check
try:
ret = con.run('ls -d ' + '"' + udir + '"')
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return ret.return_code
except invoke.exceptions.UnexpectedExit as err:
msg = ''
msg += 'UnexpectedExit'
msg += '
'
msg += err.result.stderr
mbox.appendHtml(msg)
return -1
except Exception as err:
if not _should_ignore_probe_exception(con, err):
mbox.appendPlainText(str(err))
return -1
# ssh: 4. run working directory check
try:
ret = con.run('ls -d ' + '"' + wdir + '"')
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return ret.return_code
except invoke.exceptions.UnexpectedExit as err:
msg = ''
msg += 'UnexpectedExit'
msg += '
'
msg += err.result.stderr
mbox.appendHtml(msg)
return -1
except Exception as err:
if not _should_ignore_probe_exception(con, err):
mbox.appendPlainText(str(err))
return -1
return 0
def submit(con, settings, mbox):
wdir = settings.working_dir()
udir = settings.utilities_dir()
proj = settings.filename()
# create calculation directory
cst_project_name = pathlib.Path(proj).stem
uid = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
dst_file_path = pathlib.PurePosixPath(wdir).joinpath(cst_project_name + '_' + uid, cst_project_name + ".cst")
dst_file = str(dst_file_path)
dst_dir_path = dst_file_path.parent
dst_dir = str(dst_dir_path)
# ssh: run make destination directory
try:
ret = con.run('mkdir ' + '"' + dst_dir + '"')
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return ret.return_code
except Exception as err:
msg = '\nFailed to create remote destination directory:\n'
msg += f' path: {dst_dir}\n'
msg += f' error: {err}'
mbox.appendPlainText(msg)
return -1
# ssh: put CST project file (and more) to remote compute node
try:
root = os.path.splitext(proj)[0]
extensions = ['.cst', '.inp', '_tosca.par', '_topology_init.onf']
for ext in extensions:
src_file = root + ext
if os.path.isfile(src_file):
con.put(src_file, dst_dir)
except Exception as err:
mbox.appendPlainText(f'Failed while uploading project files: {err}')
return -1
# run CST project
cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit')
cst_job_submit = '"' + str(cst_job_submit) + '"'
cst_job_submit += ' -b'
cst_job_submit += ' -s ' + settings.run_option()
if settings.gpus() > 0:
cst_job_submit += ' -g ' + str(settings.gpus())
if settings.cores() > 0:
cst_job_submit += ' --num-cores ' + str(settings.cores())
memory = _memory_to_mb(settings.memory())
if memory is not None:
cst_job_submit += ' --request-memory=' + '"' + memory + '"'
walltime = str(settings.walltime()).strip()
if walltime and walltime.lower() not in ['auto', 'none']:
walltime_arg = ' -w ' + '"' + walltime + '"'
cst_job_submit += walltime_arg
if (settings.type() != 'Single node'):
cst_job_submit += ' -a ' + settings.type()
cst_job_submit += ' -n ' + str(settings.nodes())
cst_job_submit += ' -q ' + settings.queue()
cst_job_submit += ' -m ' + '"' + dst_file + '"'
# ssh: run cst_job_submit
try:
ret = con.run(cst_job_submit)
if ret.stdout:
mbox.appendPlainText('\n' + ret.stdout)
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return ret.return_code
except invoke.exceptions.UnexpectedExit as err:
stderr_text = err.result.stderr if err.result and err.result.stderr else ''
stdout_text = err.result.stdout if err.result and err.result.stdout else ''
exit_code = err.result.return_code if err.result else -1
combined_text_lower = (stdout_text + '\n' + stderr_text).lower()
stderr_lower = stderr_text.lower()
unknown_option = ('unknown option' in stderr_lower) or ('unrecognized option' in stderr_lower)
mbox.appendPlainText('Remote command failed while submitting the job:')
mbox.appendPlainText(f' command: {cst_job_submit}')
mbox.appendPlainText(f' exit code: {exit_code}')
if stdout_text and stdout_text.strip():
mbox.appendPlainText(' stdout:')
mbox.appendPlainText(stdout_text)
if stderr_text and stderr_text.strip():
mbox.appendPlainText(' stderr:')
mbox.appendPlainText(stderr_text)
if not (stdout_text and stdout_text.strip()) and not (stderr_text and stderr_text.strip()):
mbox.appendPlainText(f' exception: {err}')
return exit_code
except Exception as err:
mbox.appendPlainText(f'Submit execution error: {err}')
return -1
return 0
def get_queues(con, settings, mbox):
udir = settings.utilities_dir()
cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit')
cst_job_submit = '"' + str(cst_job_submit) + '"'
cst_job_submit += ' --get-available-queues'
try:
ret = con.run(cst_job_submit)
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return ret.return_code
except paramiko.ssh_exception.AuthenticationException:
# Post-auth query calls can still raise auth exceptions in this
# environment even after the main SSH session is marked successful.
# Keep the UI clean and return an empty queue list.
return ""
except Exception as err:
mbox.appendPlainText(str(err))
return ""
return ret.stdout
def get_scheduler(con, settings, mbox):
udir = settings.utilities_dir()
cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit')
cst_job_submit = '"' + str(cst_job_submit) + '"'
cst_job_submit += ' --get-queuesys-name'
try:
ret = con.run(cst_job_submit)
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return ret.return_code
except paramiko.ssh_exception.AuthenticationException:
# Same rationale as get_queues(): avoid noisy false negatives.
return ""
except Exception as err:
mbox.appendPlainText(str(err))
return ""
return ret.stdout
def get_available_gpu_num(con, settings, queue, mbox):
if not queue:
return None
udir = settings.utilities_dir()
cst_job_submit = pathlib.PurePosixPath(udir).joinpath('cst_job_submit')
cst_job_submit = '"' + str(cst_job_submit) + '"'
cst_job_submit += ' --get-available-gpu-num '
cst_job_submit += ' -q ' + '"' + queue + '"'
try:
ret = con.run(cst_job_submit)
if ret.stderr:
mbox.appendPlainText(ret.stderr)
return None
except paramiko.ssh_exception.AuthenticationException:
return None
except Exception as err:
mbox.appendPlainText(str(err))
return None
output = ret.stdout.strip()
if not output:
return None
try:
return int(output)
except ValueError:
mbox.appendPlainText('Unexpected GPU query output: ' + output)
return None
def _extract_walltime_value(output_text):
if not output_text:
return None
text = str(output_text).strip()
# Match HH:MM (e.g. 167:00)
match = re.search(r'(?