#!/usr/bin/env python3 ############################################################### ### Python package imports ### ############################################################### import copy import datetime import functools import json import logging import os import platform import pwd import re import shutil import signal import socket import subprocess import sys import tarfile from urllib import request ############################################################### ### Installer defaults ### ############################################################### LAST_N_RELEASES = 5 DEFAULT_CHEQD_HOME_DIR = "/home/cheqd" DEFAULT_INSTALL_PATH = "/usr/bin" DEFAULT_CHEQD_USER = "cheqd" DEFAULT_BINARY_NAME = "cheqd-noded" DEFAULT_COSMOVISOR_BINARY_NAME = "cosmovisor" MAINNET_CHAIN_ID = "cheqd-mainnet-1" TESTNET_CHAIN_ID = "cheqd-testnet-6" PRINT_PREFIX = "********* " # Set branch dynamically in CI workflow for testing if Python dev mode is enabled and DEFAULT_DEBUG_BRANCH is set # Otherwise, use the main branch DEFAULT_DEBUG_BRANCH = os.getenv("DEFAULT_DEBUG_BRANCH") if os.getenv("DEFAULT_DEBUG_BRANCH") is not None else "main" # RPC endpoints MAINNET_RPC_ENDPOINT_EU = "https://eu-rpc.cheqd.net:443" MAINNET_RPC_ENDPOINT_AP = "https://ap-rpc.cheqd.net:443" TESTNET_RPC_ENDPOINT_EU = "https://eu-rpc.cheqd.network:443" TESTNET_RPC_ENDPOINT_AP = "https://ap-rpc.cheqd.network:443" ############################################################### ### Cosmovisor configuration ### ############################################################### DEFAULT_LATEST_COSMOVISOR_VERSION = "v1.3.0" COSMOVISOR_BINARY_URL = "https://github.com/cosmos/cosmos-sdk/releases/download/cosmovisor%2F{}/cosmovisor-{}-linux-{}.tar.gz" DEFAULT_USE_COSMOVISOR = "yes" DEFAULT_BUMP_COSMOVISOR = "yes" DEFAULT_DAEMON_ALLOW_DOWNLOAD_BINARIES = "true" DEFAULT_DAEMON_RESTART_AFTER_UPGRADE = "true" DEFAULT_DAEMON_POLL_INTERVAL = "300s" DEFAULT_UNSAFE_SKIP_BACKUP = "true" DEFAULT_DAEMON_RESTART_DELAY = "30s" DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM="true" DAEMON_SHUTDOWN_GRACE="30s" ############################################################### ### Systemd configuration ### ############################################################### STANDALONE_SERVICE_TEMPLATE = f"https://raw.githubusercontent.com/cheqd/cheqd-node/{DEFAULT_DEBUG_BRANCH}/installer/templates/cheqd-noded.service" COSMOVISOR_SERVICE_TEMPLATE = f"https://raw.githubusercontent.com/cheqd/cheqd-node/{DEFAULT_DEBUG_BRANCH}/installer/templates/cheqd-cosmovisor.service" COSMOVISOR_CONFIG_TEMPLATE = f"https://raw.githubusercontent.com/cheqd/cheqd-node/{DEFAULT_DEBUG_BRANCH}/installer/templates/cosmovisor-config.toml" JOURNAL_TEMPLATE=f"https://raw.githubusercontent.com/cheqd/cheqd-node/{DEFAULT_DEBUG_BRANCH}/installer/templates/journald.conf" DEFAULT_STANDALONE_SERVICE_NAME = 'cheqd-noded' DEFAULT_COSMOVISOR_SERVICE_NAME = 'cheqd-cosmovisor' DEFAULT_STANDALONE_SERVICE_FILE_PATH = f"/lib/systemd/system/{DEFAULT_STANDALONE_SERVICE_NAME}.service" DEFAULT_COSMOVISOR_SERVICE_FILE_PATH = f"/lib/systemd/system/{DEFAULT_COSMOVISOR_SERVICE_NAME}.service" DEFAULT_COSMOVISOR_CONFIG_FILE_PATH = f"{DEFAULT_CHEQD_HOME_DIR}/.cheqdnode/cosmovisor/config.toml" DEFAULT_JOURNAL_CONFIG_FILE= "/etc/systemd/journald.conf" DEFAULT_LOGROTATE_FILE = "/etc/logrotate.d/cheqd-node" DEFAULT_RSYSLOG_FILE = "/etc/rsyslog.d/cheqd-node.conf" DEFAULT_LOGIN_SHELL_ENV_FILE_PATH = "/etc/profile.d/cheqd-node.sh" BASH_PROFILE_TEMPLATE = f"https://raw.githubusercontent.com/cheqd/cheqd-node/{DEFAULT_DEBUG_BRANCH}/installer/templates/bash-profile.txt" ############################################################### ### Network configuration files ### ############################################################### GENESIS_FILE = "https://raw.githubusercontent.com/cheqd/cheqd-node/%s/networks/{}/genesis.json" % ( DEFAULT_DEBUG_BRANCH) SEEDS_FILE = "https://raw.githubusercontent.com/cheqd/cheqd-node/%s/networks/{}/seeds.txt" % ( DEFAULT_DEBUG_BRANCH) ############################################################### ### Node snapshots ### ############################################################### TESTNET_SNAPSHOT = "https://snapshots-cdn.cheqd.net/testnet/{}/cheqd-testnet-6_{}.tar.lz4" MAINNET_SNAPSHOT = "https://snapshots-cdn.cheqd.net/mainnet/{}/cheqd-mainnet-1_{}.tar.lz4" MAX_SNAPSHOT_DAYS = 7 ############################################################### ### Default node environment variables ### ############################################################### DEFAULT_RPC_PORT = "26657" DEFAULT_P2P_PORT = "26656" CHEQD_NODED_HOME = "/home/cheqd/.cheqdnode" CHEQD_NODED_NODE = "tcp://localhost:26657" CHEQD_NODED_MONIKER = platform.node() CHEQD_NODED_CHAIN_ID = MAINNET_CHAIN_ID CHEQD_NODED_MINIMUM_GAS_PRICES = "5000ncheq" CHEQD_NODED_LOG_LEVEL = "error" CHEQD_NODED_LOG_FORMAT = "json" CHEQD_NODED_FASTSYNC_VERSION = "v0" CHEQD_NODED_P2P_MAX_PACKET_MSG_PAYLOAD_SIZE = 10240 ############################################################### ### Common, reusable functions ### ############################################################### # Set logging configuration if sys.flags.dev_mode: # If PYTHONDEVMODE = 1, show more detailed logging messages logging.basicConfig(format='[%(levelname)s]: %(message)s', level=logging.DEBUG) logging.raiseExceptions = True logging.propagate = True else: # Else show logging messages INFO level and above logging.basicConfig(format='[%(levelname)s]: %(message)s', level=logging.INFO) logging.raiseExceptions = True logging.propagate = True # Handle Ctrl+C / SIGINT halts requests def sigint_handler(): logging.info('Exiting installer') sys.exit(0) signal.signal(signal.SIGINT, sigint_handler) def is_valid_url(url) -> bool: # Helper function to check if the URL is valid try: status_code = request.urlopen(url).getcode() if status_code == 200: logging.debug(f"URL is valid: {url}") return True except request.HTTPError: logging.exception(f"URL is not valid: {url}") raise def search_and_replace(search_text, replace_text, file_path): # Common function to search and replace text in a file try: with open(file_path, "r") as file: for line in file: line = line.strip() if search_text in line: with open(file_path, "r") as file: data = file.read() data = data.replace(line, replace_text) with open(file_path, "w") as file: file.write(data) except Exception as e: logging.exception(f"Failed to search and replace text in {file_path}. Reason: {e}") raise def post_process(func): # Common function to post-process commands @functools.wraps(func) def wrapper(*args, **kwds): _allow_error = kwds.pop('allow_error', False) try: value = func(*args, **kwds) except subprocess.CalledProcessError as err: if err.returncode and _allow_error: return err logging.exception(err) return value return wrapper def default_answer(func): # Common function to add default answer to questions @functools.wraps(func) def wrapper(*args, **kwds): _default = kwds.get('default', "") if _default: args = list(args) args[-1] += f" [default: {_default}]:{os.linesep}" value = func(*args) return value if value != "" else _default return wrapper ############################################################### ### Release class: Get cheqd-node releases ### ############################################################### class Release: def __init__(self, release_map): self.version = release_map['tag_name'] self.url = release_map['html_url'] self.assets = release_map['assets'] self.is_prerelease = release_map['prerelease'] def get_release_url(self): # Construct the URL to download selected release from GitHub # This fetches the release tagged "latest", plus any other releases or pre-releases. # Release version numbers are in format "vX.Y.Z", but the release URL does not include the "v". # We also determine the OS and architecture, and construct the URL to download the release. try: os_arch = str.lower(platform.machine()) # Python returns "x86_64" for 64-bit OS, but the release URL uses "amd64" since that's the Go convention. if os_arch == 'x86_64': os_arch = 'amd64' else: os_arch = 'arm64' os_name = str.lower(platform.system()) for _url_item in self.assets: _url = _url_item["browser_download_url"] version_without_v_prefix = self.version.replace('v', '', 1) if os.path.basename(_url) == f"cheqd-noded-{version_without_v_prefix}-{os_name}-{os_arch}.tar.gz": if is_valid_url(_url): logging.debug(f"Release URL for binary download: {_url}") return _url else: logging.exception(f"Release URL is not valid: {_url}") else: logging.exception(f"No asset found to download for release: {self.version}") except Exception as e: logging.exception(f"Failed to get cheqd-node binaries from GitHub. Reason: {e}") def __str__(self): return f"Name: {self.version}" ############################################################### ### Installer class: Configure installation ### ############################################################### class Installer(): def __init__(self, interviewer): self.version = interviewer.release.version self.release = interviewer.release self.interviewer = interviewer self._snapshot_url = "" @property def snapshot_url(self): return self._snapshot_url @snapshot_url.setter def snapshot_url(self, value): self._snapshot_url = value @property def cheqd_home_dir(self): # Root directory for cheqd-noded # Default: /home/cheqd return self.interviewer.home_dir @property def cheqd_backup_dir(self): # Root directory for cheqd-noded # Default: /home/cheqd/backup return os.path.join(self.cheqd_home_dir, "backup") @property def cheqd_root_dir(self): # Root directory for cheqd-noded # Default: /home/cheqd/.cheqdnode return os.path.join(self.cheqd_home_dir, ".cheqdnode") @property def cheqd_config_dir(self): # cheqd-noded config directory # Default: /home/cheqd/.cheqdnode/config return os.path.join(self.cheqd_root_dir, "config") @property def cheqd_data_dir(self): # cheqd-noded data directory # Default: /home/cheqd/.cheqdnode/data return os.path.join(self.cheqd_root_dir, "data") @property def cheqd_user_bashrc_path(self): # Path where .bashrc file for cheqd user will be created # Default: /home/cheqd/.bashrc return os.path.join(self.cheqd_home_dir, ".bashrc") @property def cheqd_user_bash_profile_path(self): # Path where .bash_profile file for cheqd user will be created # Default: /home/cheqd/.bash_profile return os.path.join(self.cheqd_home_dir, ".bash_profile") @property def cosmovisor_root_dir(self): # cosmovisor root directory # Default: /home/cheqd/.cheqdnode/cosmovisor return os.path.join(self.cheqd_root_dir, "cosmovisor") @property def cosmovisor_binary_path(self): # Path where Cosmovisor binary will be installed # Default: /usr/bin/cosmovisor return os.path.join(DEFAULT_INSTALL_PATH, DEFAULT_COSMOVISOR_BINARY_NAME) @property def temporary_cosmovisor_binary_path(self): # Temporary path for Cosmovisor binary just after it's downloaded # This is NOT the final install path return os.path.join(os.path.realpath(os.path.curdir), DEFAULT_COSMOVISOR_BINARY_NAME) @property def standalone_node_binary_path(self): # Path where cheqd-noded binary will be installed # Default: /usr/bin/cheqd-noded # When installing with Cosmovisor, this will be a symlink to Cosmovisor directory return os.path.join(DEFAULT_INSTALL_PATH, DEFAULT_BINARY_NAME) @property def temporary_node_binary_path(self): # Temporary path for cheqd-node binary just after it's downloaded # This is NOT the final install path return os.path.join(os.path.realpath(os.path.curdir), DEFAULT_BINARY_NAME) @property def cosmovisor_current_bin_path(self): # cheqd-noded binary path if installed with cosmovisor # Default: /home/cheqd/.cheqdnode/cosmovisor/current/bin/cheqd-noded return os.path.join(self.cosmovisor_root_dir, f"current/bin/{DEFAULT_BINARY_NAME}") @property def cosmovisor_download_url(self): # Compute the download URL for cosmovisor binary based on the OS architecture and version number try: os_arch = platform.machine() if os_arch == 'x86_64': os_arch = 'amd64' else: os_arch = 'arm64' _url = COSMOVISOR_BINARY_URL.format(DEFAULT_LATEST_COSMOVISOR_VERSION, DEFAULT_LATEST_COSMOVISOR_VERSION, os_arch) if is_valid_url(_url): logging.debug(f"Cosmovisor download URL: {_url}") return _url else: logging.exception(f"Cosmovisor download URL is not valid: {_url}") except Exception as e: logging.exception(f"Failed to compute Cosmovisor download URL. Reason: {e}") @property def cosmovisor_service_cfg(self): # Modify cheqd-cosmovisor.service template file to replace values for environment variables # The template file is fetched from the GitHub repo # Some of these variables are explicitly asked during the installer process. Others are set to default values. try: # Fetch the template file from GitHub if is_valid_url(COSMOVISOR_SERVICE_TEMPLATE): with request.urlopen(COSMOVISOR_SERVICE_TEMPLATE) as response: # Replace the values for environment variables in the template file s = re.sub( r'({CHEQD_ROOT_DIR}|{DEFAULT_BINARY_NAME}|{COSMOVISOR_DAEMON_ALLOW_DOWNLOAD_BINARIES}|{COSMOVISOR_DAEMON_RESTART_AFTER_UPGRADE}|{DEFAULT_DAEMON_POLL_INTERVAL}|{DEFAULT_UNSAFE_SKIP_BACKUP}|{DEFAULT_DAEMON_RESTART_DELAY}|{DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM}|{DAEMON_SHUTDOWN_GRACE})', lambda m: {'{CHEQD_ROOT_DIR}': self.cheqd_root_dir, '{DEFAULT_BINARY_NAME}': DEFAULT_BINARY_NAME, '{COSMOVISOR_DAEMON_ALLOW_DOWNLOAD_BINARIES}': self.interviewer.daemon_allow_download_binaries, '{COSMOVISOR_DAEMON_RESTART_AFTER_UPGRADE}': self.interviewer.daemon_restart_after_upgrade, '{DEFAULT_DAEMON_POLL_INTERVAL}': DEFAULT_DAEMON_POLL_INTERVAL, '{DEFAULT_UNSAFE_SKIP_BACKUP}': DEFAULT_UNSAFE_SKIP_BACKUP, '{DEFAULT_DAEMON_RESTART_DELAY}': DEFAULT_DAEMON_RESTART_DELAY, '{DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM}': DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM, '{DAEMON_SHUTDOWN_GRACE}': DAEMON_SHUTDOWN_GRACE}[m.group()], response.read().decode("utf-8").strip() ) # If the service file is successfully created, return the string return s else: logging.exception(f"URL is not valid: {COSMOVISOR_SERVICE_TEMPLATE}") except Exception as e: logging.exception(f"Failed to set up service file from template. Reason: {e}") @property def cosmovisor_config_cfg(self): # Modify cosmovisor-config.toml template file to replace values for environment variables # Some of these variables are explicitly asked during the installer process. Others are set to default values. try: # Fetch the template file from GitHub if is_valid_url(COSMOVISOR_CONFIG_TEMPLATE): with request.urlopen(COSMOVISOR_CONFIG_TEMPLATE) as response: # Replace the values for environment variables in the template file s = re.sub( r'({CHEQD_ROOT_DIR}|{DEFAULT_BINARY_NAME}|{COSMOVISOR_DAEMON_ALLOW_DOWNLOAD_BINARIES}|{COSMOVISOR_DAEMON_RESTART_AFTER_UPGRADE}|{DEFAULT_DAEMON_POLL_INTERVAL}|{DEFAULT_UNSAFE_SKIP_BACKUP}|{DEFAULT_DAEMON_RESTART_DELAY}|{DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM}|{DAEMON_SHUTDOWN_GRACE})', lambda m: {'{CHEQD_ROOT_DIR}': self.cheqd_root_dir, '{DEFAULT_BINARY_NAME}': DEFAULT_BINARY_NAME, '{COSMOVISOR_DAEMON_ALLOW_DOWNLOAD_BINARIES}': self.interviewer.daemon_allow_download_binaries, '{COSMOVISOR_DAEMON_RESTART_AFTER_UPGRADE}': self.interviewer.daemon_restart_after_upgrade, '{DEFAULT_DAEMON_POLL_INTERVAL}': DEFAULT_DAEMON_POLL_INTERVAL, '{DEFAULT_UNSAFE_SKIP_BACKUP}': DEFAULT_UNSAFE_SKIP_BACKUP, '{DEFAULT_DAEMON_RESTART_DELAY}': DEFAULT_DAEMON_RESTART_DELAY, '{DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM}': DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM, '{DAEMON_SHUTDOWN_GRACE}': DAEMON_SHUTDOWN_GRACE}[m.group()], response.read().decode("utf-8").strip() ) # If the configuration file is successfully created, return the string return s else: logging.exception(f"URL is not valid: {COSMOVISOR_CONFIG_TEMPLATE}") except Exception as e: logging.exception(f"Failed to set up cosmovisor config file from template. Reason: {e}") @post_process def exec(self, cmd, use_stdout=True, suppress_err=False): # Helper function to safely execute shell commands logging.info(f"Executing command: {cmd}") kwargs = { "shell": True, "check": True, } if use_stdout: kwargs["stdout"] = subprocess.PIPE else: kwargs["capture_output"] = True if suppress_err: kwargs["stderr"] = subprocess.DEVNULL return subprocess.run(cmd, **kwargs) def remove_safe(self, path) -> bool: # Helper function to remove a file or directory safely try: if os.path.isdir(path) and os.path.exists(path): shutil.rmtree(path) logging.warning(f"Removed {path}") return True elif os.path.exists(path): os.remove(path) logging.warning(f"Removed {path}") return True else: logging.debug(f"{path} does not exist. Skipping removal...") return True except Exception as e: logging.exception(f"Failed to remove {path}. Reason: {e}") return False def install(self) -> bool: # Main function that controls calls to installation process functions try: # Download and extract cheqd-node binary if self.get_binary(): logging.info("Successfully downloaded and extracted cheqd-noded binary") else: logging.error("Failed to download and extract binary") return False # Create cheqd user if it doesn't exist if self.prepare_cheqd_user(): logging.info("User/group cheqd setup successfully") else: logging.error("Failed to setup user/group cheqd") return False # Carry out pre-installation steps # Mostly relevant if installing from scratch or re-installing if self.pre_install(): logging.info("Pre-installation steps completed successfully") else: logging.error("Failed to complete pre-installation steps") return False # Setup Cosmovisor binary if needed if self.interviewer.is_cosmovisor_needed or self.interviewer.is_cosmovisor_bump_needed: if self.install_cosmovisor(): logging.info("Successfully installed Cosmovisor") else: logging.error("Failed to setup Cosmovisor") return False # If Cosmovisor is not needed, treat it as a standalone installation else: if self.install_standalone(): logging.info("Successfully installed cheqd-noded as a standalone binary") else: logging.error("Failed to setup cheqd-noded as a standalone binary") return False # Setup cheqd-noded environment variables # These are independent of Cosmovisor environment variables # Set them regardless of whether Cosmovisor is used or not logging.info("Setting cheqd-noded environment variables...") self.set_cheqd_env_vars() logging.info("Finished setting cheqd-noded environment variables") # Configure cheqd-noded settings # This edits the config.toml and app.toml files if self.configure_node_settings(): logging.info("Successfully configured cheqd-noded settings") else: logging.error("Failed to configure cheqd-noded settings") return False # Configure state sync only for fresh installs if self.interviewer.is_from_scratch and getattr(self.interviewer, 'use_statesync', False): logging.info("Configuring state sync (default)") if not self.configure_statesync(): logging.error("Failed to configure state sync") return False # Ensure snapshot is not attempted self.interviewer.init_from_snapshot = False # Configure systemd service for cheqd-noded # Sets up either a standalone service or a Cosmovisor service # ONLY enables it without activating it if self.setup_node_systemd(): logging.info("Successfully configured systemd service for node operations") else: logging.error("Failed to configure systemd service for node operations") return False # Configure systemd services for logging if self.setup_journal_logging(): logging.info("Successfully configured systemd service for logging") else: logging.error("Failed to configure systemd service for logging") return False # Download and extract snapshot if needed if self.interviewer.init_from_snapshot: # Check if snapshot download was successful if self.download_snapshot(): logging.info("Successfully downloaded snapshot") else: logging.error("Failed to download snapshot") return False if self.extract_snapshot(): logging.info("Successfully extracted snapshot") else: logging.error("Failed to extract snapshot") return False else: logging.debug("Skipping snapshot download and extraction as it was not requested") # Return True if all steps were successful logging.info("Installation steps completed successfully") return True except Exception as e: logging.exception(f"Failed to install cheqd-noded. Reason: {e}") def get_binary(self) -> bool: # Download cheqd-noded binary and extract it # Also remove the downloaded archive file, if applicable try: logging.info("Downloading cheqd-noded binary...") binary_url = self.release.get_release_url() fname = os.path.basename(binary_url) # Download the binary from GitHub with request.urlopen(binary_url) as response, open(fname, "wb") as file: file.write(response.read()) # Extract the binary from the archive file # Using tarfile to extract is a safer option than just executing a command tar = tarfile.open(fname) tar.extractall() # Remove the archive file self.remove_safe(fname) # Make the binary executable # 0755 is equivalent to chmod +x os.chmod(DEFAULT_BINARY_NAME, 0o755) return True except Exception as e: logging.exception(f"Failed to download cheqd-noded binary. Reason: {e}") return False def prepare_cheqd_user(self) -> bool: # Set cheqd user/group along with required permissions try: # Create cheqd user and group if they don't exist if not self.does_user_exist(DEFAULT_CHEQD_USER): logging.info(f"Creating {DEFAULT_CHEQD_USER} group") self.exec(f"addgroup {DEFAULT_CHEQD_USER} --quiet --system") logging.info(f"Creating {DEFAULT_CHEQD_USER} user and adding to {DEFAULT_CHEQD_USER} group") self.exec( f"adduser --system {DEFAULT_CHEQD_USER} --home {self.cheqd_home_dir} --shell /bin/bash --ingroup {DEFAULT_CHEQD_USER} --quiet") else: logging.debug(f"User {DEFAULT_CHEQD_USER} already exists. Skipping creation...") # Create ~/.cheqdnode root directory if not os.path.exists(self.cheqd_root_dir): logging.info(f"Creating root directory: {self.cheqd_root_dir}") os.makedirs(self.cheqd_root_dir, exist_ok=True) else: logging.info(f"Root directory {self.cheqd_root_dir} already exists. Skipping creation...") # Set permissions for cheqd home directory to cheqd:cheqd logging.info(f"Setting permissions for {self.cheqd_home_dir} to {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER}") self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cheqd_home_dir}") # Return True if all steps were successful return True except Exception as e: logging.exception(f"Failed to create {DEFAULT_CHEQD_USER} user. Reason: {e}") return False def does_user_exist(self, username) -> bool: # Helper function to see if a given user exists on the system try: pwd.getpwnam(username) logging.debug(f"User {username} already exists") return True except KeyError: logging.debug(f"User {username} does not exist") return False def pre_install(self) -> bool: # Pre-installation steps # 1. Stop systemd services if running # 2. Backup existing data and configurations # 3. Remove existing cheqd-noded data and configurations # 4. Create logging directories try: # Stop existing systemd services first if running # Check if the service is running before stopping it self.stop_systemd_service(DEFAULT_STANDALONE_SERVICE_NAME) self.stop_systemd_service(DEFAULT_COSMOVISOR_SERVICE_NAME) # Create backup directory if it doesn't exist os.makedirs(self.cheqd_backup_dir, exist_ok=True) # Make a copy of validator key and state before removing user data # Use shutil.copytree() when copying directories # Use shutil.copy() when copying files instead of shutil.copyfile() since it preserves file metadata logging.info("Backing up user's config folder and selected validator secrets from data folder") if os.path.exists(self.cheqd_config_dir): # Backup ~/.cheqdnode/config/ folder shutil.copytree(self.cheqd_config_dir, os.path.join(self.cheqd_backup_dir, "config"), dirs_exist_ok=True) logging.info(f"Successfully backed up {self.cheqd_config_dir} directory") else: logging.debug("No config folder found to backup. Skipping...") # Backup ~/.cheqdnode/data/priv_validator_key.json # Without this file, a validator node will get jailed! if os.path.exists(os.path.join(self.cheqd_data_dir, "priv_validator_state.json")): shutil.copy(os.path.join(self.cheqd_data_dir, "priv_validator_state.json"), os.path.join(self.cheqd_backup_dir, "priv_validator_state.json")) logging.info(f"Successfully backed up {self.cheqd_data_dir}/priv_validator_state.json") else: logging.debug("No validator state file found to backup. Skipping...") # Backup ~/.cheqdnode/data/upgrade-info.json # This file is required for Cosmovisor to track and understand where upgrade is needed if os.path.exists(os.path.join(self.cheqd_data_dir, "upgrade-info.json")): shutil.copy(os.path.join(self.cheqd_data_dir, "upgrade-info.json"), os.path.join(self.cheqd_backup_dir, "upgrade-info.json")) logging.info(f"Successfully backed up {self.cheqd_data_dir}/upgrade-info.json") else: logging.debug("No upgrade-info.json file found to backup. Skipping...") # Change ownership of backup directory to cheqd user self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cheqd_backup_dir}") logging.debug(f"Successfully changed ownership of {self.cheqd_backup_dir}") if self.interviewer.is_from_scratch: # Remove cheqd-node data and binaries logging.warning("Removing user's data and configs") self.remove_safe(os.path.join(DEFAULT_INSTALL_PATH, DEFAULT_COSMOVISOR_BINARY_NAME)) self.remove_safe(os.path.join(DEFAULT_INSTALL_PATH, DEFAULT_BINARY_NAME)) self.remove_safe(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH) self.remove_safe(self.cheqd_root_dir) else: logging.debug("No user data or configs to remove. Skipping...") # Create a bash file in /etc/profile.d/ to set environment variables # This file will be sourced by all users for their LOGIN shells if not os.path.exists(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH): logging.info(f"Creating {DEFAULT_LOGIN_SHELL_ENV_FILE_PATH} file") # Create the file, overwrite if it already exists with open(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH, "w") as file: # Add a shebang line file.write("#!/bin/bash\n\n") # Change ownership to root:root shutil.chown(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH, "root", "root") else: logging.debug(f"{DEFAULT_LOGIN_SHELL_ENV_FILE_PATH} already exists. Skipping creation...") # Create a ~/.bash_profile file to execute ~/.bashrc # This file will be sourced by during the cheqd user's LOGIN shells if not os.path.exists(self.cheqd_user_bash_profile_path): logging.info(f"Creating {self.cheqd_user_bash_profile_path} file") # Download the template file from GitHub and write it to ~/.bash_profile if is_valid_url(BASH_PROFILE_TEMPLATE): with request.urlopen(BASH_PROFILE_TEMPLATE) as response, open(self.cheqd_user_bash_profile_path, "w") as file: # Add a shebang line file.write(f"#!/bin/bash{os.linesep}{os.linesep}") # Insert the contents of the template file file.write(response.read().decode("utf-8").strip()) # Add a newline at the end of the file file.write(os.linesep) # Change ownership to cheqd:cheqd shutil.chown(self.cheqd_user_bash_profile_path, DEFAULT_CHEQD_USER, DEFAULT_CHEQD_USER) else: logging.debug(f"{self.cheqd_user_bash_profile_path} already exists. Skipping creation...") # Create a ~/.bashrc file to set environment variables # This file will be sourced by the cheqd user's NON-LOGIN shells if not os.path.exists(self.cheqd_user_bashrc_path): logging.info(f"Creating {self.cheqd_user_bashrc_path} file") # Create the file, overwrite if it already exists with open(self.cheqd_user_bashrc_path, "w") as file: # Add a shebang line file.write("#!/bin/bash\n\n") # Change ownership to root:root shutil.chown(self.cheqd_user_bashrc_path, DEFAULT_CHEQD_USER, DEFAULT_CHEQD_USER) else: logging.debug(f"{self.cheqd_user_bashrc_path} already exists. Skipping creation...") # Change ownership of directories # Always execute these since they might be lost if directories are removed and recreated logging.info(f"Setting ownership of {self.cheqd_home_dir} to {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER}") self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cheqd_home_dir}") # Return True if all steps are successful return True except Exception as e: logging.exception(f"Could not complete pre-installation steps. Reason: {e}") return False def install_cosmovisor(self) -> bool: # Install binaries for cheqd-noded and Cosmovisor # Cosmovisor is only installed if requested by the user # cheqd-noded binary is installed in Cosmovisor bin path under this scenario try: logging.info("Setting up Cosmovisor...") # Download Cosmovisor binary and set environment variables if self.get_cosmovisor(): logging.info("Successfully downloaded Cosmovisor") # Set environment variables for Cosmovisor self.set_cosmovisor_env_vars() logging.info("Successfully set Cosmovisor environment variables") else: logging.error("Failed to download Cosmovisor") return False # Move Cosmovisor binary to installation directory if it doesn't exist or bump needed # This is executed is there is no Cosmovisor binary in the installation directory # or if the user has requested a bump for Cosmovisor # shutil.move() will overwrite the file if it already exists logging.info(f"Moving Cosmovisor {self.temporary_cosmovisor_binary_path} to {self.cosmovisor_binary_path}") shutil.move(self.temporary_cosmovisor_binary_path, self.cosmovisor_binary_path) # Set ownership of Cosmovisor binary to root:root shutil.chown(self.cosmovisor_binary_path, "root", "root") # Move cheqd-noded binary to /usr/bin logging.info(f"Copying cheqd-noded binary from {self.temporary_node_binary_path} to {self.standalone_node_binary_path}") shutil.copy(self.temporary_node_binary_path, self.standalone_node_binary_path) # Set ownership of cheqd-noded binary to root:root shutil.chown(self.standalone_node_binary_path, "root", "root") # Initialize Cosmovisor if it's not already initialized # This is done by checking whether the Cosmovisor root directory exists if not os.path.exists(self.cosmovisor_root_dir): self.exec(f"sudo -u {DEFAULT_CHEQD_USER} bash -c 'DAEMON_NAME={DEFAULT_BINARY_NAME} DAEMON_HOME={self.cheqd_root_dir} cosmovisor init {self.standalone_node_binary_path}'") else: logging.info("Cosmovisor directory already exists. Skipping initialization...") # Remove cheqd-noded binary from /usr/bin if it's not a symlink if not os.path.islink(self.standalone_node_binary_path): logging.warning(f"Removing {DEFAULT_BINARY_NAME} from {DEFAULT_INSTALL_PATH} because it is not a symlink") os.remove(self.standalone_node_binary_path) # Move cheqd-noded binary to Cosmovisor bin path # shutil.move() will overwrite the file if it already exists logging.info(f"Moving cheqd-noded binary from {self.temporary_node_binary_path} to {self.cosmovisor_current_bin_path}") shutil.move(self.temporary_node_binary_path, self.cosmovisor_current_bin_path) # Set ownership of cheqd-noded binary to cheqd:cheqd # This is ONLY done when the binary is moved to Cosmovisor bin path shutil.chown(self.cosmovisor_current_bin_path, DEFAULT_CHEQD_USER, DEFAULT_CHEQD_USER) # Create symlink to cheqd-noded binary in Cosmovisor bin path # Target comes first, then the location of the symlink logging.info(f"Creating symlink to {self.cosmovisor_current_bin_path}") os.symlink(self.cosmovisor_current_bin_path, self.standalone_node_binary_path) else: logging.info(f"{self.cosmovisor_current_bin_path} is already symlink. Skipping removal...") # Steps to execute only if this is an upgrade # The upgrade-info.json file is required for Cosmovisor to track upgrades if self.interviewer.is_upgrade \ and os.path.exists(os.path.join(self.cheqd_data_dir, "upgrade-info.json")) \ and not os.path.exists(os.path.join(self.cosmovisor_root_dir, "current/upgrade-info.json")): logging.info("Copying ~/.cheqdnode/data/upgrade-info.json file to ~/.cheqdnode/cosmovisor/current/") # shutil.copy() preserves the file metadata shutil.copy(os.path.join(self.cheqd_data_dir, "upgrade-info.json"), os.path.join(self.cosmovisor_root_dir, "current/upgrade-info.json"), follow_symlinks=True) else: logging.debug("Skipped copying upgrade-info.json file because it doesn't exist") # Change owner of Cosmovisor directory to cheqd:cheqd logging.info(f"Changing ownership of {self.cosmovisor_root_dir} to {DEFAULT_CHEQD_USER} user") self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cosmovisor_root_dir}") # Return True if all steps were successful return True except Exception as e: logging.exception(f"Failed to setup Cosmovisor. Reason: {e}") return False def get_cosmovisor(self) -> bool: # Download Cosmovisor binary and extract it # Also remove the downloaded archive file, if applicable try: logging.info("Downloading Cosmovisor binary...") binary_url = self.cosmovisor_download_url fname = os.path.basename(binary_url) # Download Cosmovisor binary from GitHub with request.urlopen(binary_url) as response, open(fname, "wb") as file: file.write(response.read()) # Check tar archive exists before extracting if fname.find(".tar.gz") != -1: # Extract Cosmovisor binary from the archive file # Using tarfile to extract is a safer option than just executing a command tar = tarfile.open(fname) tar.extractall() # Remove Cosmovisor artifacts... self.remove_safe("CHANGELOG.md") self.remove_safe("README.md") self.remove_safe("LICENSE") self.remove_safe(fname) # Make the binary executable # 0755 is equivalent to chmod +x os.chmod(DEFAULT_COSMOVISOR_BINARY_NAME, 0o755) # Return True if all steps were successful return True else: logging.error(f"Unable to extract Cosmovisor binary from archive file: {fname}") return False except Exception as e: logging.exception(f"Failed to download Cosmovisor binary. Reason: {e}") def install_standalone(self) -> bool: # Install cheqd-noded as a standalone binary # cheqd-noded binary is installed in /usr/bin under this scenario try: logging.info("Setting up standalone cheqd-noded binary...") # Remove symlink for cheqd-noded if it exists if os.path.islink(self.standalone_node_binary_path): logging.warning(f"Removing symlink {self.standalone_node_binary_path}") os.remove(self.standalone_node_binary_path) else: logging.info(f"{self.standalone_node_binary_path} is not a symlink. Skipping removal...") # Move cheqd-noded binary to /usr/bin # shutil.move() will overwrite the file if it already exists logging.info(f"Moving cheqd-noded binary from {self.temporary_node_binary_path} to {self.standalone_node_binary_path}") shutil.move(self.temporary_node_binary_path, self.standalone_node_binary_path) # Set ownership of cheqd-noded binary to root:root logging.info(f"Changing ownership of {self.standalone_node_binary_path} to root:root") shutil.chown(self.standalone_node_binary_path, "root", "root") # Remove Cosmovisor directory if it exists if os.path.exists(self.cosmovisor_root_dir): logging.warning(f"Removing Cosmovisor directory from {self.cosmovisor_root_dir} because it is not required for a standalone installation") self.remove_safe(self.cosmovisor_root_dir) else: logging.debug(f"{self.cosmovisor_root_dir} doesn't exist. Skipping removal...") # Return True if all steps were successful return True except Exception as e: logging.exception(f"Failed to setup Cosmovisor. Reason: {e}") return False def set_cosmovisor_env_vars(self): # Set environment variables for Cosmovisor try: self.set_environment_variable("DAEMON_NAME", DEFAULT_BINARY_NAME) self.set_environment_variable("DAEMON_HOME", self.cheqd_root_dir) self.set_environment_variable("DAEMON_ALLOW_DOWNLOAD_BINARIES", self.interviewer.daemon_allow_download_binaries) self.set_environment_variable("DAEMON_RESTART_AFTER_UPGRADE", self.interviewer.daemon_restart_after_upgrade) self.set_environment_variable("DAEMON_POLL_INTERVAL", DEFAULT_DAEMON_POLL_INTERVAL) self.set_environment_variable("UNSAFE_SKIP_BACKUP", DEFAULT_UNSAFE_SKIP_BACKUP) self.set_environment_variable("DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM", DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM) self.set_environment_variable("DAEMON_SHUTDOWN_GRACE", DAEMON_SHUTDOWN_GRACE) except Exception as e: logging.exception(f"Failed to set environment variables for Cosmovisor. Reason: {e}") raise def set_cheqd_env_vars(self): # Set environment variables for cheqd-noded binary # Applicable for both standalone and Cosmovisor installations # Only environment variables that are required required for transactions are set here try: logging.info("Starting to set cheqd-noded environment variables...") # If RPC port is set, set to user-specified value if self.interviewer.rpc_port: self.set_environment_variable("CHEQD_NODED_NODE", f"tcp://localhost:{self.interviewer.rpc_port}") else: # Otherwise, set to default value self.set_environment_variable("CHEQD_NODED_NODE", f"tcp://localhost:{DEFAULT_RPC_PORT}") # If chain ID is set, set to user-specified value if self.interviewer.chain == "testnet": self.set_environment_variable("CHEQD_NODED_CHAIN_ID", TESTNET_CHAIN_ID) elif self.interviewer.chain == "mainnet": self.set_environment_variable("CHEQD_NODED_CHAIN_ID", MAINNET_CHAIN_ID) # Proceed with Cosmovisor-specific environment variables if self.interviewer.is_cosmovisor_needed: self.set_environment_variable("DAEMON_HOME", self.cheqd_root_dir) self.set_environment_variable("DAEMON_NAME", DEFAULT_BINARY_NAME) self.set_environment_variable("DAEMON_ALLOW_DOWNLOAD_BINARIES", self.interviewer.daemon_allow_download_binaries) self.set_environment_variable("DAEMON_RESTART_AFTER_UPGRADE", self.interviewer.daemon_restart_after_upgrade) self.set_environment_variable("DAEMON_POLL_INTERVAL", DEFAULT_DAEMON_POLL_INTERVAL) self.set_environment_variable("UNSAFE_SKIP_BACKUP", DEFAULT_UNSAFE_SKIP_BACKUP) self.set_environment_variable("DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM", DAEMON_DOWNLOAD_MUST_HAVE_CHECKSUM) self.set_environment_variable("DAEMON_SHUTDOWN_GRACE", DAEMON_SHUTDOWN_GRACE) except Exception as e: logging.exception(f"Failed to set environment variables for cheqd-noded. Reason: {e}") raise def set_environment_variable(self, env_var_name, env_var_value): # Set an environment variable by exporting values to the following locations: # 1. /etc/profile.d/cheqd-noded.sh (for all users, LOGIN shell) # 2. ~/.bashrc (for current user, LOGIN shell) # 3. ~/.bash_profile (for current user, LOGIN shell) try: # Set environment variable in /etc/profile.d/cheqd-noded.sh if os.path.exists(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH): with open(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH, "r") as file: lines = file.readlines() # Track whether the environment variable has been updated updated = False # Read /etc/profile.d/cheqd-noded.sh line by line with open(DEFAULT_LOGIN_SHELL_ENV_FILE_PATH, "w") as file: for line in lines: # Update existing value (if it exists) if line.startswith(f"export {env_var_name}="): file.write(f"export {env_var_name}={env_var_value}\n") updated = True else: file.write(line) # Add new value (if it doesn't exist in the file already) if not updated: logging.debug(f"Adding {env_var_name}={env_var_value} to {DEFAULT_LOGIN_SHELL_ENV_FILE_PATH}") file.write(f'export {env_var_name}={env_var_value}\n') else: logging.debug(f"{DEFAULT_LOGIN_SHELL_ENV_FILE_PATH} doesn't exist. Skipped adding {env_var_name} to the file...") # Set environment variable in ~/.bashrc if os.path.exists(self.cheqd_user_bashrc_path): with open(self.cheqd_user_bashrc_path, "r") as file: lines = file.readlines() # Track whether the environment variable has been updated updated = False # Read ~/.bashrc line by line with open(self.cheqd_user_bashrc_path, "w") as file: for line in lines: # Update existing value (if it exists) if line.startswith(f"export {env_var_name}="): file.write(f"export {env_var_name}={env_var_value}\n") updated = True else: file.write(line) # Add new value (if it doesn't exist in the file already) if not updated: logging.debug(f"Adding {env_var_name}={env_var_value} to {self.cheqd_user_bashrc_path}") file.write(f'export {env_var_name}={env_var_value}\n') else: logging.debug(f"{self.cheqd_user_bashrc_path} doesn't exist. Skipped adding {env_var_name} to the file...") # Check if the cheqd user's default shell is fish try: cheqd_user_info = pwd.getpwnam(DEFAULT_CHEQD_USER) cheqd_user_shell = cheqd_user_info.pw_shell fish_detected = 'fish' in cheqd_user_shell if fish_detected: # Set environment variable for fish shell fish_config_dir = os.path.join(self.cheqd_home_dir, ".config", "fish") fish_config_file = os.path.join(fish_config_dir, "config.fish") # Create fish config directory if it doesn't exist os.makedirs(fish_config_dir, exist_ok=True) # Read existing config or create new one lines = [] if os.path.exists(fish_config_file): with open(fish_config_file, "r") as f: lines = f.readlines() # Check if interactive block exists interactive_block_start = -1 interactive_block_end = -1 for i, line in enumerate(lines): if line.strip() == "if status is-interactive": interactive_block_start = i elif line.strip() == "end" and interactive_block_start != -1: interactive_block_end = i break # Prepare the fish environment variable line fish_line = f" set -gx {env_var_name} {env_var_value}\n" # Update or add the environment variable updated = False new_lines = [] for i, line in enumerate(lines): if line.strip().startswith(f"set -gx {env_var_name} "): new_lines.append(fish_line) updated = True else: new_lines.append(line) # If not updated, add to interactive block or create one if not updated: if interactive_block_start != -1 and interactive_block_end != -1: # Insert into existing interactive block new_lines.insert(interactive_block_end, fish_line) else: # Create new interactive block new_lines.extend([ "if status is-interactive\n", " # Commands to run in interactive sessions can go here\n", fish_line, "end\n" ]) # Write the updated config with open(fish_config_file, "w") as f: f.writelines(new_lines) logging.debug(f"Set {env_var_name} for fish shell in {fish_config_file}") except (KeyError, ImportError) as e: logging.warning(f"Could not determine cheqd user shell or set fish config: {e}") except Exception as e: logging.exception(f"Failed to set environment variable {env_var_name}. Reason: {e}") raise def configure_node_settings(self) -> bool: # Configure cheqd-noded settings in app.toml and config.toml # Some of these need to be set based on user input for setup needed from scratch only # Others are needed regardless of whether the node is being setup from scratch or an upgrade path try: # Set file paths for common configuration files app_toml_path = os.path.join(self.cheqd_config_dir, "app.toml") config_toml_path = os.path.join(self.cheqd_config_dir, "config.toml") genesis_file_path = os.path.join(self.cheqd_config_dir, 'genesis.json') # Set URLs for files to be downloaded genesis_url = GENESIS_FILE.format(self.interviewer.chain) seeds_url = SEEDS_FILE.format(self.interviewer.chain) # These changes are required only when NEW node setup is needed if self.interviewer.is_from_scratch: # Don't execute an init in case a validator key already exists if not os.path.exists(os.path.join(self.cheqd_config_dir, 'priv_validator_key.json')): # Initialize the node logging.info(f"Initializing {self.cheqd_root_dir} directory") self.exec(f"sudo -u {DEFAULT_CHEQD_USER} bash -c 'cheqd-noded init {self.interviewer.moniker} --chain-id {TESTNET_CHAIN_ID if self.interviewer.chain == 'testnet' else MAINNET_CHAIN_ID}'") else: logging.debug(f"Validator key already exists in {self.cheqd_config_dir}. Skipping cheqd-noded init...") # Check if genesis file exists # If not, download it from the GitHub repo if is_valid_url(genesis_url) and not os.path.exists(genesis_file_path): logging.debug(f"Downloading genesis file for {self.interviewer.chain}") with request.urlopen(genesis_url) as response, open(genesis_file_path, "w") as file: file.write(response.read().decode("utf-8").strip()) else: logging.debug(f"Genesis file already exists in {genesis_file_path}") # Set seeds from the seeds file on GitHub if is_valid_url(seeds_url): logging.debug(f"Setting seeds from {seeds_url}") with request.urlopen(seeds_url) as response: seeds = response.read().decode("utf-8").strip() seeds_search_text = 'seeds = ""' seeds_replace_text = 'seeds = "{}"'.format(seeds) search_and_replace(seeds_search_text, seeds_replace_text, config_toml_path) else: logging.exception(f"Invalid URL for seeds file: {seeds_url}") return False # Set RPC port to listen to for all origins by default rpc_default_value = 'laddr = "tcp://127.0.0.1:{}"'.format(DEFAULT_RPC_PORT) new_rpc_default_value = 'laddr = "tcp://0.0.0.0:{}"'.format(DEFAULT_RPC_PORT) search_and_replace(rpc_default_value, new_rpc_default_value, config_toml_path) else: logging.debug("Skipping cheqd-noded init as setup is not needed") # Download genesis.json from GitHub repository if chain is set # For fresh installs, chain is always set # For upgrades, chain is only set if user chose to check genesis.json if self.interviewer.chain and is_valid_url(genesis_url): logging.info(f"Downloading genesis file for {self.interviewer.chain} from GitHub repository") with request.urlopen(genesis_url) as response, open(genesis_file_path, "w") as file: file.write(response.read().decode("utf-8").strip()) logging.info(f"Successfully downloaded and overwrote genesis.json for {self.interviewer.chain}") elif self.interviewer.chain and not is_valid_url(genesis_url): logging.error(f"Invalid URL for genesis file: {genesis_url}") return False else: logging.debug("Skipping genesis.json download (chain not set or user chose not to check)") ### This next section changes values in configuration files only if the user has provided input ### # Set external address if self.interviewer.external_address: external_address_search_text = 'external_address' external_address_replace_text = 'external_address = "{}:{}"'.format( self.interviewer.external_address, self.interviewer.p2p_port) logging.debug(f"Setting external address to {external_address_replace_text}") search_and_replace(external_address_search_text, external_address_replace_text, config_toml_path) else: logging.debug("External address not set by user. Skipping...") # Set P2P port if self.interviewer.p2p_port: p2p_laddr_search_text = 'laddr = "tcp://0.0.0.0:{}"'.format(DEFAULT_P2P_PORT) p2p_laddr_replace_text = 'laddr = "tcp://0.0.0.0:{}"'.format(self.interviewer.p2p_port) search_and_replace(p2p_laddr_search_text, p2p_laddr_replace_text, config_toml_path) else: logging.debug("P2P port not set by user. Skipping...") # Setting up the RPC port if self.interviewer.rpc_port: rpc_laddr_search_text = 'laddr = "tcp://0.0.0.0:{}"'.format(DEFAULT_RPC_PORT) rpc_laddr_replace_text = 'laddr = "tcp://0.0.0.0:{}"'.format(self.interviewer.rpc_port) search_and_replace(rpc_laddr_search_text, rpc_laddr_replace_text, config_toml_path) else: logging.debug("RPC port not set by user. Skipping...") # Setting up min gas-price if self.interviewer.gas_price: min_gas_price_search_text = 'minimum-gas-prices' min_gas_price_replace_text = 'minimum-gas-prices = "{}"'.format(self.interviewer.gas_price) search_and_replace(min_gas_price_search_text, min_gas_price_replace_text, app_toml_path) else: logging.debug("Minimum gas price not set by user. Skipping...") # Setting up persistent peers if self.interviewer.persistent_peers: persistent_peers_search_text = 'persistent_peers' persistent_peers_replace_text = 'persistent_peers = "{}"'.format(self.interviewer.persistent_peers) search_and_replace(persistent_peers_search_text, persistent_peers_replace_text, config_toml_path) else: logging.debug("Persistent peers not set by user. Skipping...") # Setting up log level if self.interviewer.log_level: log_level_search_text = 'log_level' log_level_replace_text = 'log_level = "{}"'.format(self.interviewer.log_level) search_and_replace(log_level_search_text, log_level_replace_text, config_toml_path) else: logging.debug("Log level not set by user. Skipping...") # Setting up log format if self.interviewer.log_format: log_format_search_text = 'log_format' log_format_replace_text = 'log_format = "{}"'.format(self.interviewer.log_format) search_and_replace(log_format_search_text, log_format_replace_text, config_toml_path) else: logging.debug("Log format not set by user. Skipping...") # Set ownership of configuration directory to cheqd:cheqd logging.info(f"Setting ownership of {self.cheqd_config_dir} to {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER}") self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cheqd_config_dir}") # Return True if all the above steps were successful return True except Exception as e: logging.exception(f"Failed to configure cheqd-noded settings. Reason: {e}") return False def _select_working_rpc_endpoint(self, chain: str) -> str: try: endpoints = [] if chain == "testnet": endpoints = [TESTNET_RPC_ENDPOINT_EU, TESTNET_RPC_ENDPOINT_AP] else: endpoints = [MAINNET_RPC_ENDPOINT_EU, MAINNET_RPC_ENDPOINT_AP] for endpoint in endpoints: try: req = request.Request(f"{endpoint}/status") with request.urlopen(req, timeout=10) as resp: if resp.getcode() == 200: return endpoint except Exception: continue except Exception as e: logging.exception(f"Could not select a working RPC endpoint. Reason: {e}") return "" def _get_latest_block_height(self, rpc_endpoint: str) -> int: try: req = request.Request(f"{rpc_endpoint}/status") with request.urlopen(req, timeout=10) as resp: status = json.loads(resp.read().decode("utf-8").strip()) # Tendermint /status -> result.sync_info.latest_block_height latest_height = int(status["result"]["sync_info"]["latest_block_height"]) return latest_height except Exception as e: logging.exception(f"Failed to fetch latest block height from {rpc_endpoint}. Reason: {e}") raise def _get_block_hash_at_height(self, rpc_endpoint: str, height: int) -> str: try: req = request.Request(f"{rpc_endpoint}/block?height={height}") with request.urlopen(req, timeout=10) as resp: block = json.loads(resp.read().decode("utf-8").strip()) # Tendermint /block -> result.block_id.hash return block["result"]["block_id"]["hash"] except Exception as e: logging.exception(f"Failed to fetch block hash at height {height} from {rpc_endpoint}. Reason: {e}") raise def _is_endpoint_healthy(self, endpoint: str) -> bool: try: req = request.Request(f"{endpoint}/status") with request.urlopen(req, timeout=10) as resp: return resp.getcode() == 200 except Exception: return False def configure_statesync(self) -> bool: # Configure statesync settings in config.toml using selected network RPCs try: config_toml_path = os.path.join(self.cheqd_config_dir, "config.toml") # Determine RPC servers for the chosen network if self.interviewer.chain == "testnet": candidates = [TESTNET_RPC_ENDPOINT_EU, TESTNET_RPC_ENDPOINT_AP] else: candidates = [MAINNET_RPC_ENDPOINT_EU, MAINNET_RPC_ENDPOINT_AP] healthy = [ep for ep in candidates if self._is_endpoint_healthy(ep)] if len(healthy) == 0: logging.error("No working RPC endpoint found for statesync configuration") return False if len(healthy) == 1: rpc_servers = f"{healthy[0]},{healthy[0]}" working_rpc = healthy[0] else: rpc_servers = f"{healthy[0]},{healthy[1]}" working_rpc = healthy[0] latest_height = self._get_latest_block_height(working_rpc) trust_height = max(latest_height - 2000, 1) trust_hash = self._get_block_hash_at_height(working_rpc, trust_height) # Safely edit only the [statesync] section for 'enable' with open(config_toml_path, "r") as f: lines = f.readlines() start = -1 end = len(lines) for i, line in enumerate(lines): if line.strip() == "[statesync]": start = i break if start == -1: logging.error("[statesync] section not found in config.toml") return False # Find end of [statesync] block (next top-level table) for j in range(start + 1, len(lines)): stripped = lines[j].lstrip() if stripped.startswith('['): end = j break block = lines[start:end] def upsert(key: str, value: str, quote: bool = False): nonlocal block key_prefix = f"{key} =" new_line = f"{key} = \"{value}\"\n" if quote else f"{key} = {value}\n" for idx, l in enumerate(block): if l.strip().startswith(key_prefix): block[idx] = new_line return # insert after header block.insert(1, new_line) upsert("enable", "true", quote=False) # Write back new_lines = lines[:start] + block + lines[end:] with open(config_toml_path, "w") as f: f.writelines(new_lines) # Use existing search_and_replace helper for other statesync fields (unique keys) search_and_replace('rpc_servers = ""', f'rpc_servers = "{rpc_servers}"', config_toml_path) search_and_replace('trust_height = 0', f'trust_height = {trust_height}', config_toml_path) search_and_replace('trust_hash = ""', f'trust_hash = "{trust_hash}"', config_toml_path) logging.info("Configured state sync settings in config.toml") return True except Exception as e: logging.exception(f"Failed to configure state sync. Reason: {e}") return False def setup_node_systemd(self) -> bool: # Setup cheqd-noded related systemd services # If user selected Cosmovisor install, then cheqd-cosmovisor.service will be setup # If user selected Standalone install, then cheqd-noded.service will be setup # WARNING: Services should already have been stopped in pre_install() but if it's removed from there, # then it should be added here try: # Remove cheqd-noded.service and cheqd-cosmovisor.service if they exist # Also run if setup is from scratch/first-time install if self.interviewer.rewrite_node_systemd: logging.warning("Removing existing node-related systemd configuration as requested") self.remove_systemd_service(DEFAULT_COSMOVISOR_SERVICE_NAME, DEFAULT_COSMOVISOR_SERVICE_FILE_PATH) self.remove_systemd_service(DEFAULT_STANDALONE_SERVICE_NAME, DEFAULT_STANDALONE_SERVICE_FILE_PATH) else: logging.debug("Node-related systemd configurations don't need to be removed. Skipping...") # Setup cheqd-cosmovisor.service if requested if self.interviewer.is_cosmovisor_needed: # Write cheqd-cosmovisor.service file # Replace placeholder values with actuals with open(DEFAULT_COSMOVISOR_SERVICE_FILE_PATH, "w") as fname: fname.write(self.cosmovisor_service_cfg) # Enable cheqd-cosmovisor.service self.enable_systemd_service(DEFAULT_COSMOVISOR_SERVICE_NAME) # Write cosmovisor-config.toml file if self.interviewer.is_cosmovisor_config_needed: with open(f"{self.cheqd_root_dir}/cosmovisor/config.toml", "w") as fname: fname.write(self.cosmovisor_config_cfg) return True # Otherwise, setup cheqd-noded.service for standalone install else: # Fetch the template file from GitHub if is_valid_url(STANDALONE_SERVICE_TEMPLATE): with request.urlopen(STANDALONE_SERVICE_TEMPLATE) as response, open(DEFAULT_STANDALONE_SERVICE_FILE_PATH, "w") as file: file.write(response.read().decode("utf-8").strip()) # Enable cheqd-noded.service self.enable_systemd_service(DEFAULT_STANDALONE_SERVICE_NAME) return True else: logging.error(f"Invalid URL provided for standalone service template: {STANDALONE_SERVICE_TEMPLATE}") return False except Exception as e: logging.exception(f"Failed to setup systemd service for cheqd-node. Reason: {e}") return False # Setup logging related systemd services def setup_journal_logging(self) -> bool: # Install cheqd-node configuration for journal if user wants to rewrite journal service file # Also run if setup is from scratch/first-time install try: if self.interviewer.rewrite_journal: # Remove existing journal configuration file if it exists if os.path.exists(DEFAULT_JOURNAL_CONFIG_FILE): logging.warning("Removing existing journal configuration as requested") self.remove_safe(DEFAULT_JOURNAL_CONFIG_FILE) else: logging.debug("Default configuration doesn't need to be removed. Skipping...") logging.info("Configuring journald configuration for logging") # Modify journald template file with values specific to the installation with open(DEFAULT_JOURNAL_CONFIG_FILE, "w"): if is_valid_url(JOURNAL_TEMPLATE): with request.urlopen(JOURNAL_TEMPLATE) as response, open(DEFAULT_JOURNAL_CONFIG_FILE, "w") as file: file.write(response.read().decode("utf-8").strip()) # Restarting journald service if self.restart_systemd_service("systemd-journald.service"): logging.info("Successfully configured journald service") else: logging.exception("Failed to configure journald service") return False if os.path.exists(DEFAULT_RSYSLOG_FILE): logging.warning("Removing existing rsyslog configuration as requested") self.remove_safe(DEFAULT_RSYSLOG_FILE) else: logging.debug("Default configuration doesn't need to be removed. Skipping...") if os.path.exists(DEFAULT_LOGROTATE_FILE): logging.warning("Removing existing logrotate configuration as requested") self.remove_safe(DEFAULT_LOGROTATE_FILE) else: logging.debug("Default configuration doesn't need to be removed. Skipping...") # Return True if both rsyslog and logrotate services are configured return True except Exception as e: logging.exception(f"Failed to setup logging systemd services. Reason: {e}") return False def download_snapshot(self) -> bool: # Download snapshot archive if requested by the user # This is a blocking operation that will take a while try: # Only proceed if a valid snapshot URL has been set if self.set_snapshot_url(): logging.info(f"Valid snapshot URL found: {self.snapshot_url}") fname = os.path.basename(self.snapshot_url) file_path = os.path.join(self.cheqd_root_dir, fname) else: logging.error(f"No valid snapshot URL found in last {MAX_SNAPSHOT_DAYS} days!") return False # Install dependencies needed to show progress bar if self.install_dependencies(): logging.info("Dependencies required for snapshot restore installed successfully") else: logging.error("Failed to install dependencies required for snapshot restore") return False # Fetch size of snapshot archive WITHOUT downloading it req = request.Request(self.snapshot_url, method='HEAD') response = request.urlopen(req) content_length = response.getheader("Content-Length") if content_length is not None: archive_size = content_length logging.debug(f"Snapshot archive size: {content_length} bytes") else: logging.error("Could not determine snapshot archive size") return False # Fetch the x-amz-meta-s3cmd-attrs header from the snapshot URL # This is automatically generated by S3Cmd and contains the MD5 checksum of the file # Sample response header format: "x-amz-meta-s3cmd-attrs: atime:/ctime:/gid:/gname:/md5:/mode:/mtime:/uid:/uname:" req = request.Request(self.snapshot_url, method='HEAD') response = request.urlopen(req) response_header = response.getheader("x-amz-meta-s3cmd-attrs") # Define a regular expression to match the MD5 checksum in above header # This is the part after "md5:" pattern = r'md5:([a-fA-F0-9]+)' # Search for the MD5 value in the header value using the regular expression match = re.search(pattern, response_header) # Extract the MD5 checksum from the header if a response header exists # AND a match was found in the header against the regular expression if response_header is not None and match: published_checksum = match.group(1) logging.debug(f"Published checksum: {published_checksum}") else: logging.error("Could not find MD5 checksum for snapshot archive. Aborting snapshot download...") return False # Free up some disk space by deleting contents of the data folder # Otherwise, there may not be enough space to download AND extract the snapshot # WARNING: Backup the priv_validator_state.json and upgrade-info.json before doing this! # Also check if a match was found for the MD5 checksum to prevent deleting when file integrity cannot be verified if os.path.exists(self.cheqd_backup_dir) and match: # Check that backup of validator keys, state, and upgrade info exists before proceeding logging.info(f"Backup directory exists: {self.cheqd_backup_dir}.") # Remove contents of data directory logging.warning(f"Contents of {self.cheqd_data_dir} will be deleted to make room for snapshot") self.remove_safe(self.cheqd_data_dir) # Recreate data directory os.makedirs(self.cheqd_data_dir, exist_ok=True) self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cheqd_data_dir}") else: logging.warning("Backup directory does not exist. Will not delete data directory.\n") logging.warning("Free disk space will be calculated without freeing up space.\n") # Check how much free disk space is available wherever the cheqd home directory is located # First, determine where the home directory is mounted fs_stats = os.statvfs(self.cheqd_home_dir) # Calculate the free space in bytes free_space = fs_stats.f_frsize * fs_stats.f_bavail # ONLY download the snapshot if there is enough free disk space # Also check if a match was found for the MD5 checksum to prevent downloading a corrupted file if (int(archive_size) < int(free_space)) and match: logging.info("Downloading snapshot and extracting archive. This can take a *really* long time...") # Use wget to download since it can show a progress bar while downloading natively # This is a blocking operation that will take a while # "wget -c" will resume a download if it gets interrupted self.exec(f"wget -c {self.snapshot_url} -P {self.cheqd_root_dir}") # Compare published checksum with downloaded file # If checksums match, return success if os.path.exists(file_path): # Calculate checksum of downloaded snapshot # This is a blocking operation that will take a while # Python's hashlib.md5() is not used because making it work with large files is a pain local_checksum = subprocess.check_output(["md5sum", file_path]).split()[0].decode() # Print local checksum for debugging logging.debug(f"Local checksum: {local_checksum}") else: logging.error(f"Snapshot download failed. File not found: {file_path}") return False # Compare checksums if published_checksum == local_checksum: logging.debug("Checksums match. Download is OK.") return True else: logging.error("Snapshot download was successful BUT checksums do not match.") logging.warning(f"Removing corrupted snapshot archive: {file_path}") # self.remove_safe(file_path) return False else: logging.error("Snapshot is larger than free disk space. Please free up disk space and try again.") return False except Exception as e: logging.exception(f"Failed to download snapshot. Reason: {e}") return False def set_snapshot_url(self) -> bool: # Get latest available snapshot URL from snapshots.cheqd.net for the given chain # This checks whether there are any snapshots in past MAX_SNAPSHOT_DAYS (default: 7 days) try: template = TESTNET_SNAPSHOT if self.interviewer.chain in TESTNET_CHAIN_ID else MAINNET_SNAPSHOT snapshot_date = datetime.date.today() counter = 0 valid_url_found = False # Iterate over past MAX_SNAPSHOT_DAYS days to find the latest snapshot while not valid_url_found and counter <= MAX_SNAPSHOT_DAYS: _url = template.format(snapshot_date.strftime( "%Y-%m-%d"), snapshot_date.strftime("%Y-%m-%d")) valid_url_found = is_valid_url(_url) counter += 1 snapshot_date -= datetime.timedelta(days=1) # Set snapshot URL if found if valid_url_found: self.snapshot_url = _url logging.debug(f"Snapshot URL: {self.snapshot_url}") return True else: logging.debug("Could not find a valid snapshot in last %d days", MAX_SNAPSHOT_DAYS) return False except Exception as e: logging.exception(f"Failed to get snapshot URL. Reason: {e}") return False def install_dependencies(self) -> bool: # Install dependencies required for snapshot extraction try: # Update apt lists before installing dependencies logging.info("Updating apt lists") self.exec("sudo apt-get update") # Use apt-get to install dependencies logging.info("Install pv to show progress of extraction") self.exec("sudo apt-get install -y pv lz4") return True except Exception as e: logging.exception(f"Failed to install dependencies. Reason: {e}") return False def extract_snapshot(self): # Extract snapshot archive to cheqd node data directory # This is a blocking operation that will take a while # Once extracted, restore files from backup folder try: # Set file path of snapshot archive file_path = os.path.join(self.cheqd_root_dir, os.path.basename(self.snapshot_url)) # Extract to cheqd node data directory EXCEPT for validator state # Snapshot archives are created using lz4 compression since it's more efficient than gzip if os.path.exists(file_path): logging.info("Extracting snapshot archive. This may take a while...") # Bash command is used since the Python libraries for lz4 are not installed out-of-the-box # Showing a progress bar or an estimate of time remaining is also not easy-to-achieve # "pv" is used to show a progress bar while extracting self.exec(f"sudo -u {DEFAULT_CHEQD_USER} bash -c 'pv {file_path} \ | tar --use-compress-program=lz4 -xf - -C {self.cheqd_root_dir} \ --exclude priv_validator_state.json'") # Delete snapshot archive file logging.info("Snapshot extraction was successful. Deleting snapshot archive.") self.remove_safe(file_path) else: logging.error("Snapshot archive file not found. Could not extract snapshot.") return False # Restore files from backup folder # Use shutil.copy() instead of shutil.copyfile() to preserve file metadata if os.path.exists(self.cheqd_backup_dir): logging.info("Backup directory found. Restoring files from backup...") # Restore priv_validator_state.json if os.path.exists(os.path.join(self.cheqd_backup_dir, "priv_validator_state.json")): shutil.copy(os.path.join(self.cheqd_backup_dir, "priv_validator_state.json"), os.path.join(self.cheqd_data_dir, "priv_validator_state.json")) logging.info(f"Restored priv_validator_state.json to {self.cheqd_data_dir}") else: logging.warning(f"priv_validator_state.json not found in {self.cheqd_backup_dir}! Please restore it manually to {self.cheqd_data_dir}.") # Restore upgrade-info.json if os.path.exists(os.path.join(self.cheqd_backup_dir, "upgrade-info.json")): shutil.copy(os.path.join(self.cheqd_backup_dir, "upgrade-info.json"), os.path.join(self.cheqd_data_dir, "upgrade-info.json")) logging.info(f"Restored upgrade-info.json to {self.cheqd_data_dir}") else: logging.warning(f"upgrade-info.json not found in {self.cheqd_backup_dir}! Please restore it manually to {self.cheqd_data_dir}.") # If Cosmovisor is needed, copy upgrade-info.json to ~/.cheqdnode/cosmovisor/current/ directory # Otherwise, Cosmovisor will throw an error if self.interviewer.is_cosmovisor_needed and os.path.exists(os.path.join(self.cheqd_data_dir, "upgrade-info.json")): shutil.copy(os.path.join(self.cheqd_data_dir, "upgrade-info.json"), os.path.join(self.cosmovisor_root_dir, "current/upgrade-info.json")) logging.info(f"Restored upgrade-info.json to {self.cosmovisor_root_dir}/current/") # Change ownership of Cosmovisor directory to cheqd user self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cosmovisor_root_dir}") else: logging.warning(f"upgrade-info.json not found in {self.cheqd_data_dir}! Please restore it manually to {self.cosmovisor_root_dir}/current/") else: logging.warning(f"Backup folder not found. Please manually restore required files to {self.cheqd_data_dir} and {self.cheqd_config_dir}") # Change ownership of cheqd node data directory to cheqd user self.exec(f"chown -R {DEFAULT_CHEQD_USER}:{DEFAULT_CHEQD_USER} {self.cheqd_data_dir}") # Return True if snapshot extraction was successful return True except Exception as e: logging.exception(f"Failed to extract snapshot. Reason: {e}") return False def check_systemd_service_active(self, service_name) -> bool: # Check if a given systemd service is active try: logging.debug(f"Checking whether {service_name} service is active") # Active services will return 0 active = os.system(f"systemctl is-active --quiet {service_name}") if active == 0: logging.debug(f"Service {service_name} is active") return True else: logging.debug(f"Service {service_name} is not active") return False except Exception as e: logging.exception(f"Failed to check whether {service_name} service is active. Reason: {e}") return False def check_systemd_service_enabled(self, service_name) -> bool: # Check if a given systemd service is enabled try: logging.debug(f"Checking whether {service_name} service is enabled") # Enabled services will return 0 enabled = os.system(f"systemctl is-enabled --quiet {service_name}") if enabled == 0: logging.debug(f"Service {service_name} is enabled") return True else: logging.debug(f"Service {service_name} is not enabled") return False except Exception as e: logging.exception(f"Failed to check whether {service_name} service is enabled. Reason: {e}") return False def reload_systemd(self) -> bool: # Reload systemd config try: logging.debug("Reload systemd config and reset failed services") # Reload systemd config reload = os.system('systemctl daemon-reload --quiet') # Reset failed services reset = os.system('systemctl reset-failed --quiet') if reload == 0 and reset == 0: logging.info("Reloaded systemd config and reset failed services") return True else: logging.error("Failed to reload systemd config and reset failed services") return False except Exception as e: logging.exception(f"Error daemon reloading: Reason: {e}") return False def disable_systemd_service(self, service_name) -> bool: # Disable a given systemd service try: if self.check_systemd_service_enabled(service_name): disabled = os.system(f"systemctl disable --quiet {service_name}") if disabled == 0: logging.info(f"{service_name} has been disabled") return True else: logging.error(f"{service_name} could not be disabled") return False else: logging.debug(f"{service_name} is already disabled") return True except Exception as e: logging.exception(f"Error disabling {service_name}: Reason: {e}") return False def enable_systemd_service(self, service_name) -> bool: # Enable a given systemd service try: if self.reload_systemd(): if not self.check_systemd_service_enabled(service_name): enabled = os.system(f"systemctl enable --quiet {service_name}") if enabled == 0: logging.info(f"{service_name} has been enabled") return True else: logging.error(f"{service_name} could not be enabled") return False else: logging.debug(f"{service_name} is already enabled") return True else: logging.error("Failed to reload systemd config and reset failed services") return False except Exception as e: logging.exception(f"Error disabling {service_name}: Reason: {e}") return False def stop_systemd_service(self, service_name) -> bool: # Stop and disable a given systemd service try: if self.check_systemd_service_active(service_name): stopped = os.system(f"systemctl stop --quiet {service_name}") if stopped == 0: logging.info(f"{service_name} has been stopped") return True else: logging.error(f"{service_name} could not be stopped") return False else: logging.debug(f"{service_name} is not active") return True except Exception as e: logging.exception(f"Error stopping {service_name}: Reason: {e}") return False def restart_systemd_service(self, service_name) -> bool: # Restart a given systemd service try: # If the service is not enabled, enable it before restarting if not self.check_systemd_service_enabled(service_name): self.enable_systemd_service(service_name) # Reload systemd services before restarting if self.reload_systemd(): restarted = os.system(f"systemctl restart --quiet {service_name}") if restarted == 0: logging.info(f"{service_name} has been restarted") return True else: logging.error(f"{service_name} could not be restarted") return False else: logging.error("Failed to reload systemd config and reset failed services") return False except Exception as e: logging.exception(f"Error restarting {service_name}: Reason: {e}") return False def remove_systemd_service(self, service_name, service_file) -> bool: # Remove a given systemd service try: if os.path.exists(service_file): logging.debug(f"Service file {service_file} exists") # Stop the service if it is active if self.stop_systemd_service(service_name): # Disable the service if self.disable_systemd_service(service_name): # Remove the service file self.remove_safe(service_file) logging.warning(f"{service_name} has been removed") return True else: logging.error(f"{service_name} could not be removed") return False else: logging.debug(f"Service file {service_file} does not exist. Skipping removal...") return True except Exception as e: logging.exception(f"Error removing {service_name}: Reason: {e}") return False ############################################################### ### Interviewer class: Ask user for settings ### ############################################################### class Interviewer: def __init__(self, home_dir=DEFAULT_CHEQD_HOME_DIR): self._home_dir = home_dir self._is_upgrade = False self._is_cosmovisor_needed = True self._is_cosmovisor_config_needed = True self._is_cosmovisor_bump_needed = True self._is_cosmovisor_installed = False self._systemd_service_file = "" self._init_from_snapshot = False self._use_statesync = True self._release = None self._chain = "" self._is_configuration_needed = False self._moniker = CHEQD_NODED_MONIKER self._external_address = "" self._rpc_port = "" self._p2p_port = "" self._gas_price = "" self._persistent_peers = "" self._log_level = "" self._log_format = "" self._daemon_allow_download_binaries = DEFAULT_DAEMON_ALLOW_DOWNLOAD_BINARIES self._daemon_restart_after_upgrade = DEFAULT_DAEMON_RESTART_AFTER_UPGRADE self._is_from_scratch = True self._rewrite_node_systemd = True self._rewrite_journal = True ### This section sets @property variables ### @property def cheqd_root_dir(self): return os.path.join(self.home_dir, ".cheqdnode") @property def cheqd_config_dir(self): return os.path.join(self.cheqd_root_dir, "config") @property def cheqd_data_dir(self): return os.path.join(self.cheqd_root_dir, "data") @property def release(self) -> Release: return self._release @property def home_dir(self) -> str: return self._home_dir @property def is_upgrade(self) -> bool: return self._is_upgrade @property def is_from_scratch(self) -> bool: return self._is_from_scratch @property def systemd_service_file(self) -> str: return self._systemd_service_file @property def rewrite_node_systemd(self) -> bool: return self._rewrite_node_systemd @property def rewrite_journal(self) -> bool: return self._rewrite_journal @property def is_cosmovisor_needed(self) -> bool: return self._is_cosmovisor_needed @property def is_cosmovisor_config_needed(self) -> bool: return self._is_cosmovisor_config_needed @property def is_cosmovisor_bump_needed(self) -> bool: return self._is_cosmovisor_bump_needed @property def is_cosmovisor_installed(self) -> bool: return self._is_cosmovisor_installed @property def init_from_snapshot(self) -> bool: return self._init_from_snapshot @property def use_statesync(self) -> bool: return self._use_statesync @property def chain(self) -> str: return self._chain @property def is_configuration_needed(self) -> bool: return self._is_configuration_needed @property def moniker(self) -> str: return self._moniker @property def external_address(self) -> str: return self._external_address @property def rpc_port(self) -> str: return self._rpc_port @property def p2p_port(self) -> str: return self._p2p_port @property def gas_price(self) -> str: return self._gas_price @property def persistent_peers(self) -> str: return self._persistent_peers @property def log_level(self) -> str: return self._log_level @property def log_format(self) -> str: return self._log_format @property def daemon_allow_download_binaries(self) -> str: return self._daemon_allow_download_binaries @property def daemon_restart_after_upgrade(self) -> str: return self._daemon_restart_after_upgrade ### This section sets @property variables ### @release.setter def release(self, release): self._release = release @home_dir.setter def home_dir(self, hd): self._home_dir = hd @is_upgrade.setter def is_upgrade(self, iu): self._is_upgrade = iu @is_from_scratch.setter def is_from_scratch(self, ifs): self._is_from_scratch = ifs @systemd_service_file.setter def systemd_service_file(self, ssf): self._systemd_service_file = ssf @rewrite_node_systemd.setter def rewrite_node_systemd(self, rns): self._rewrite_node_systemd = rns @rewrite_journal.setter def rewrite_journal(self, rr): self._rewrite_journal = rr @is_cosmovisor_needed.setter def is_cosmovisor_needed(self, icn): self._is_cosmovisor_needed = icn @is_cosmovisor_config_needed.setter def is_cosmovisor_config_needed(self, icn): self._is_cosmovisor_config_needed = icn @is_cosmovisor_bump_needed.setter def is_cosmovisor_bump_needed(self, icbn): self._is_cosmovisor_bump_needed = icbn @is_cosmovisor_installed.setter def is_cosmovisor_installed(self, ici): self._is_cosmovisor_installed = ici @init_from_snapshot.setter def init_from_snapshot(self, ifs): self._init_from_snapshot = ifs @chain.setter def chain(self, chain): self._chain = chain @use_statesync.setter def use_statesync(self, value: bool): self._use_statesync = value @is_configuration_needed.setter def is_configuration_needed(self, is_configuration_needed): self._is_configuration_needed = is_configuration_needed @moniker.setter def moniker(self, moniker): self._moniker = moniker @external_address.setter def external_address(self, external_address): self._external_address = external_address @rpc_port.setter def rpc_port(self, rpc_port): self._rpc_port = rpc_port @p2p_port.setter def p2p_port(self, p2p_port): self._p2p_port = p2p_port @gas_price.setter def gas_price(self, gas_price): self._gas_price = gas_price @persistent_peers.setter def persistent_peers(self, persistent_peers): self._persistent_peers = persistent_peers @log_level.setter def log_level(self, log_level): self._log_level = log_level @log_format.setter def log_format(self, log_format): self._log_format = log_format @daemon_allow_download_binaries.setter def daemon_allow_download_binaries(self, daemon_allow_download_binaries): self._daemon_allow_download_binaries = daemon_allow_download_binaries @daemon_restart_after_upgrade.setter def daemon_restart_after_upgrade(self, daemon_restart_after_upgrade): self._daemon_restart_after_upgrade = daemon_restart_after_upgrade ### This section contains helper functions for the interviewer ### # Set value to default answer for a question @default_answer def ask(self, question, **_): return str(input(question)).strip() @post_process def exec(self, cmd, use_stdout=True, suppress_err=False, check=True): logging.info(f"Executing command: {cmd}") kwargs = { "shell": True, "check": check, } if use_stdout: kwargs["stdout"] = subprocess.PIPE else: kwargs["capture_output"] = True if suppress_err: kwargs["stderr"] = subprocess.DEVNULL return subprocess.run(cmd, **kwargs) # Check if cheqd-noded is installed def is_node_installed(self) -> bool: try: return shutil.which("cheqd-noded") is not None except Exception as e: logging.exception(f"Could not check if cheqd-noded is already installed. Reason: {e}") # Check if Cosmovisor is installed def check_cosmovisor_installed(self) -> bool: try: if shutil.which("cosmovisor") is not None: self.is_cosmovisor_installed = True return True else: self.is_cosmovisor_installed = False return False except Exception as e: logging.exception(f"Could not check if Cosmovisor is already installed. Reason: {e}") # Check whether external address provided is valid IP address def check_ip_address(self, ip_address) -> bool: try: socket.inet_aton(ip_address) logging.debug(f"IP address {ip_address} is valid") return True except socket.error: logging.debug(f"IP address {ip_address} is invalid") return False # Check whether external address provided is valid DNS name def check_dns_name(self, dns_name) -> bool: try: socket.gethostbyname(dns_name) logging.debug(f"DNS name {dns_name} is valid") return True except socket.error: logging.debug(f"DNS name {dns_name} is invalid") return False # Check if a systemd config is installed for a given service file def is_systemd_config_installed(self, systemd_service_file) -> bool: try: return os.path.exists(systemd_service_file) except Exception as e: logging.exception(f"Could not check if {systemd_service_file} already exists. Reason: {e}") # Get list of last N releases for cheqd-node from GitHub def get_releases(self): try: req = request.Request( "https://api.github.com/repos/cheqd/cheqd-node/releases") req.add_header("Accept", "application/vnd.github.v3+json") with request.urlopen(req) as response: r_list = json.loads(response.read().decode("utf-8").strip()) return [Release(r) for r in r_list] except Exception as e: logging.exception(f"Could not get releases from GitHub. Reason: {e}") # The "latest" stable release may not be in last N releases, so we need to get it separately def get_latest_release(self): try: req = request.Request( "https://api.github.com/repos/cheqd/cheqd-node/releases/latest") req.add_header("Accept", "application/vnd.github.v3+json") with request.urlopen(req) as response: return Release(json.loads(response.read().decode("utf-8"))) except Exception as e: logging.exception(f"Could not get latest release from GitHub. Reason: {e}") # Compile a list of releases to be displayed to the user # The "latest" stable release is always displayed first def remove_release_from_list(self, r_list, elem): try: copy_r_list = copy.deepcopy(r_list) for i, release in enumerate(r_list): if release.version == elem.version: copy_r_list.pop(i) return copy_r_list except Exception as e: logging.exception(f"Could not assemble list of releases to show to the user. Reason: {e}") # Ask user to select a version of cheqd-node to install def ask_for_version(self): try: default = self.get_latest_release() all_releases = self.get_releases() all_releases = self.remove_release_from_list(all_releases, default) all_releases.insert(0, default) print(f"Latest stable cheqd-noded release version is {default}") print("List of cheqd-noded releases: ") # Print list of releases for i, release in enumerate(all_releases[0: LAST_N_RELEASES]): print(f"{i + 1}. {release.version}") release_num = int(self.ask( "Choose list option number above to select version of cheqd-node to install", default=1)) # Check that user selected a valid release number if 1 <= release_num <= LAST_N_RELEASES and isinstance(release_num, int): self.release = all_releases[release_num - 1] logging.debug(f"Release version selection: {self.release.version}") else: logging.error(f"Invalid release number picked from list of releases: {release_num}") logging.error(f"Please choose a number between 1 and {LAST_N_RELEASES}\n") self.ask_for_version() except Exception as e: logging.exception(f"Failed to selected version of cheqd-noded. Reason: {e}") # Set cheqd user's home directory def ask_for_home_directory(self) -> str: try: self.home_dir = self.ask( "Set path for cheqd user's home directory", default=DEFAULT_CHEQD_HOME_DIR) logging.debug(f"Setting home directory to {self.home_dir}") except Exception as e: logging.exception(f"Failed to set cheqd user's home directory. Reason: {e}") # Ask whether user wants configure node configuration parameters from scratch def ask_for_config(self): try: answer = self.ask( "Do you want to define node configuration parameters? (yes/no)", default="yes") if answer.lower().startswith("y"): self.is_configuration_needed = True elif answer.lower().startswith("n"): self.is_configuration_needed = False else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_config() except Exception as e: logging.exception(f"Failed to define node configuration parameters. Reason: {e}") # Ask user which network to join def ask_for_chain(self): try: answer = int(self.ask( "Select cheqd network to join:\n" f"1. Mainnet ({MAINNET_CHAIN_ID})\n" f"2. Testnet ({TESTNET_CHAIN_ID})", default=1)) if answer == 1: self.chain = "mainnet" elif answer == 2: self.chain = "testnet" else: logging.error("Invalid network selected during installation. Please choose either 1 or 2.\n") self.ask_for_chain() # Set debug message logging.debug(f"Setting network to join as {self.chain}") except Exception as e: logging.exception(f"Failed to set network/chain to join. Reason: {e}") # Ask user whether to install with Cosmovisor def ask_for_cosmovisor(self): try: logging.info("Installing cheqd-node with Cosmovisor allows for automatic unattended upgrades for valid software upgrade proposals. See https://docs.cosmos.network/main/tooling/cosmovisor for more information.\n") answer = self.ask( "Install cheqd-noded using Cosmovisor? (yes/no)", default=DEFAULT_USE_COSMOVISOR) if answer.lower().startswith("y"): self.is_cosmovisor_needed = True elif answer.lower().startswith("n"): self.is_cosmovisor_needed = False self.is_cosmovisor_bump_needed = False else: logging.error("Invalid input provided during installation. Please choose either 'yes' or 'no'.\n") self.ask_for_cosmovisor() except Exception as e: logging.exception(f"Failed to set whether installation should be done with Cosmovisor. Reason: {e}") # Ask whether to initialize via state sync (default yes). If declined, snapshot remains available. def ask_for_statesync(self): try: logging.info("State sync rapidly bootstraps a node without downloading state DB snapshot and uses less storage. You can still choose snapshot (slower, much larger storage, but contains more historic data and blocks) if you decline state sync.\n") answer = self.ask( "Initialize chain via State Sync? (yes/no)", default="yes") if answer.lower().startswith("y"): self.use_statesync = True self.init_from_snapshot = False elif answer.lower().startswith("n"): self.use_statesync = False else: logging.error("Invalid input provided. Please choose either 'yes' or 'no'.\n") self.ask_for_statesync() except Exception as e: logging.exception(f"Failed to set state sync preference. Reason: {e}") # Ask user whether to bump Cosmovisor to latest version def ask_for_cosmovisor_bump(self): try: answer = self.ask( f"Do you want to install Cosmovisor version {DEFAULT_LATEST_COSMOVISOR_VERSION}? (yes/no)", default=DEFAULT_BUMP_COSMOVISOR) if answer.lower().startswith("y"): self.is_cosmovisor_bump_needed = True elif answer.lower().startswith("n"): self.is_cosmovisor_bump_needed = False else: logging.error("Invalid input provided during installation. Please choose either 'yes' or 'no'.\n") self.ask_for_cosmovisor_bump() except Exception as e: logging.exception(f"Failed to set whether Cosmovisor should be bumped to latest version. Reason: {e}") # Ask user whether to allow Cosmovisor to automatically download binaries for scheduled upgrades def ask_for_daemon_allow_download_binaries(self): try: answer = self.ask( "Do you want Cosmovisor to automatically download binaries for scheduled upgrades? (yes/no)", default="yes") if answer.lower().startswith("y"): self.daemon_allow_download_binaries = "true" elif answer.lower().startswith("n"): self.daemon_allow_download_binaries = "false" else: logging.error("Invalid input provided during installation. Please choose either 'yes' or 'no'.\n") self.ask_for_daemon_allow_download_binaries() except Exception as e: logging.exception( f"Failed to set whether Cosmovisor should automatically download binaries. Reason: {e}") # Ask whether Cosmovisor should restart daemon after upgrade def ask_for_daemon_restart_after_upgrade(self): try: answer = self.ask( "Do you want Cosmovisor to automatically restart after an upgrade? (yes/no)", default="yes") if answer.lower().startswith("y"): self.daemon_restart_after_upgrade = "true" elif answer.lower().startswith("n"): self.daemon_restart_after_upgrade = "false" else: logging.error("Invalid input provided during installation. Please choose either 'yes' or 'no'.\n") self.ask_for_daemon_restart_after_upgrade() except Exception as e: logging.exception(f"Failed to set whether Cosmovisor should automatically restart after an upgrade. Reason: {e}") ## Add parameters for cosmovisor here def ask_for_cosmovisor_config_rewrite(self): try: answer = self.ask("Do you want to overwrite existing configuration (or create a new one) for cosmovisor, with the values you provided? (yes/no)", default="yes") if answer.lower().startswith("y"): self.is_cosmovisor_config_needed = True elif answer.lower().startswith("n"): self.is_cosmovisor_config_needed = False else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_cosmovisor_config_rewrite() except Exception as e: logging.exception(f"Failed to set whether overwrite existing cosmovisor configuration. Reason: {e}") # Ask user for node moniker def ask_for_moniker(self): try: logging.info("Moniker is a human-readable name for your cheqd-node.\nThis is NOT the same as your validator name, and is only used to uniquely identify your node for Tendermint P2P address book.\nIt can be edited later in your ~/.cheqdnode/config/config.toml file.\n") self.moniker = self.ask( "Provide a moniker for your cheqd-node", default=CHEQD_NODED_MONIKER) if self.moniker is not None and isinstance(self.moniker, str): logging.debug(f"Moniker set to {self.moniker}") else: logging.error("Invalid moniker provided during cheqd-noded setup.\n") self.ask_for_moniker() except Exception as e: logging.exception(f"Failed to set moniker. Reason: {e}") # Ask for node's external IP address or DNS name def ask_for_external_address(self): try: logging.info("External address is the publicly accessible IP address or DNS name of your cheqd-node.\nThis is used to advertise your node's P2P address to other nodes in the network.\n- If you are running your node behind a NAT, you should set this to your public IP address or DNS name\n- If you are running your node on a public IP address, you can leave this blank to automatically fetch your IP address via DNS resolver lookup.\n- Automatic fetching sends a `dig` request to whoami.cloudflare.com\n") answer = self.ask( f"What is the externally-reachable IP address or DNS name for your cheqd-node? [default: Fetch automatically via DNS resolver lookup]: {os.linesep}") # If user provided an answer, check if it's a valid IP address or DNS name if answer: if self.check_ip_address(answer) or self.check_dns_name(answer): self.external_address = answer else: logging.error("Invalid IP address or DNS name provided. Please enter a valid IP address or DNS name.\n") self.ask_for_external_address() # If user didn't provide an answer, fetch IP address via DNS resolver lookup else: self.external_address = str(self.exec( "dig +short txt ch whoami.cloudflare @1.1.1.1").stdout).strip("""b'"\\n""") logging.debug(f"External address set to {self.external_address}") except Exception as e: logging.exception(f"Failed to set external address. Reason: {e}") # Ask for node's P2P port def ask_for_p2p_port(self): try: self.p2p_port = int(self.ask("Specify your node's P2P port", default=DEFAULT_P2P_PORT)) if isinstance(self.p2p_port, int): logging.debug(f"P2P port set to {self.p2p_port}") else: logging.error("Invalid P2P port provided. Please enter a valid port number.\n") self.ask_for_p2p_port() except Exception as e: logging.exception(f"Failed to set P2P port. Reason: {e}") # Ask for node's RPC port def ask_for_rpc_port(self): try: self.rpc_port = int(self.ask("Specify your node's RPC port", default=DEFAULT_RPC_PORT)) if isinstance(self.rpc_port, int): logging.debug(f"RPC port set to {self.rpc_port}") else: logging.error("Invalid RPC port provided. Please enter a valid port number.\n") self.ask_for_rpc_port() except Exception as e: logging.exception(f"Failed to set RPC port. Reason: {e}") # (Optional) Ask for node's persistent peers def ask_for_persistent_peers(self): try: logging.info("Persistent peers are nodes that you want to always keep connected to. Values for persistent peers should be specified in format: @:,@:...\n") answer = self.ask( f"Specify persistent peers [default: none]: {os.linesep}") if answer is not None: self.persistent_peers = answer logging.debug(f"Persistent peers set to {self.persistent_peers}") else: self.persistent_peers = "" logging.debug("No persistent peers set.") except Exception as e: logging.exception(f"Failed to set persistent peers. Reason: {e}") # (Optional) Ask for minimum gas prices def ask_for_gas_price(self): try: logging.info( "Minimum gas prices is the price you are willing to accept as a validator to process a transaction.\nValues should be entered in format ncheq (e.g., 5000ncheq)\n") self.gas_price = self.ask("Specify minimum gas price", default=CHEQD_NODED_MINIMUM_GAS_PRICES) if self.gas_price.endswith("ncheq"): logging.debug(f"Minimum gas price set to {self.gas_price}") else: logging.error("Invalid minimum gas price provided. Valid format is ncheq.\n") self.ask_for_gas_price() except Exception as e: logging.exception(f"Failed to set minimum gas prices. Reason: {e}") # (Optional) Ask for node's log level def ask_for_log_level(self): try: self.log_level = self.ask( "Specify log level (trace|debug|info|warn|error|fatal|panic)", default=CHEQD_NODED_LOG_LEVEL) if self.log_level in ["trace", "debug", "info", "warn", "error", "fatal", "panic"]: logging.debug(f"Log level set to {self.log_level}") else: logging.error("Invalid log level provided. Please enter a valid log level.\n") self.ask_for_log_level() except Exception as e: logging.exception(f"Failed to set log level. Reason: {e}") # (Optional) Ask for node's log format def ask_for_log_format(self): try: self.log_format = self.ask("Specify log format (json|plain)", default=CHEQD_NODED_LOG_FORMAT) if self.log_format in ["json", "plain"]: logging.debug(f"Log format set to {self.log_format}") else: logging.error("Invalid log format provided. Please enter a valid log format.\n") self.ask_for_log_format() except Exception as e: logging.exception(f"Failed to set log format. Reason: {e}") # If an existing installation is detected, ask user if they want to upgrade def ask_for_upgrade(self): try: logging.warning("Existing cheqd-node binary detected.\n") logging.info("Choosing UPGRADE will preserve your existing configuration and data.\nChoosing FRESH INSTALL will remove ALL existing configuration and data.\n") logging.info("Please ensure you have a backup of your existing configuration and data before proceeding!\n") answer = int(self.ask( "Do you want to UPGRADE an existing installation or do a FRESH INSTALL?\n" "1. UPGRADE existing installation\n" "2. Overwrite existing configuration with a FRESH INSTALL", default=1)) if answer == 1: self.is_upgrade = True self.is_from_scratch = False elif answer == 2: self.is_upgrade = False self.is_from_scratch = False else: logging.error("Invalid option selected. Please choose either 1 or 2.\n") self.ask_for_upgrade() except Exception as e: logging.exception(f"Failed to set whether installation should be upgraded. Reason: {e}") # If an install from scratch is requested, warn the user and check if they want to proceed def ask_for_confirm_upgrade(self): try: answer = self.ask( "Are you SURE you want to overwrite existing with a FRESH INSTALL? (yes/no)", default="no") if answer.lower().startswith("y"): self.is_from_scratch = True elif answer.lower().startswith("n"): self.is_from_scratch = False else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_confirm_upgrade() except Exception as e: logging.exception(f"Failed to set whether to install from scratch. Reason: {e}") # If an existing installation is detected, ask user if they want to overwrite existing systemd configuration def ask_for_rewrite_node_systemd(self): try: answer = self.ask( "Overwrite existing systemd configuration for node-related services? (yes/no)", default="yes") if answer.lower().startswith("y"): self.rewrite_node_systemd = True elif answer.lower().startswith("n"): self.rewrite_node_systemd = False else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_rewrite_node_systemd() except Exception as e: logging.exception(f"Failed to set whether overwrite existing systemd configuration. Reason: {e}") # If an existing installation is detected, ask user if they want to overwrite existing rsyslog configuration def ask_for_rewrite_journal(self): try: answer = self.ask("Overwrite existing configuration for cheqd-node logging, including the journald configuration file? (yes/no)", default="yes") if answer.lower().startswith("y"): self.rewrite_journal = True elif answer.lower().startswith("n"): self.rewrite_journal = False else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_rewrite_journal() except Exception as e: logging.exception(f"Failed to set whether overwrite existing jorunal configuration. Reason: {e}") # Ask user if they want to download a snapshot of the existing chain to speed up node synchronization. # This is only applicable if installing from scratch. # This question is asked last because it is the most time consuming. def ask_for_init_from_snapshot(self): try: logging.info("You have the option of downloading a snapshot to get a copy of the blockchain data to speed up node bootstrapping.\n") logging.warning("Snapshots can be 100 GBs so downloading can take a really long time!\n- Existing chain data folder will be removed to make space for the downloaded file! Please ensure you take any backups.\n- The download will happen in the background once all configuration settings have set in this installer.\n- Usually safe to use this option when doing a fresh installation.\n") answer = self.ask( "Do you want to download a snapshot of the existing chain to speed up node synchronization? (yes/no)", default="yes") if answer.lower().startswith("y"): self.init_from_snapshot = True elif answer.lower().startswith("n"): self.init_from_snapshot = False else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_init_from_snapshot() except Exception as e: logging.exception(f"Failed to set whether init snapshot. Reason: {e}") # Ask user if they want to check their genesis.json during upgrade def ask_for_genesis_check(self): try: logging.info("During upgrades, you can optionally verify that you have the correct genesis.json file for your network.\n") answer = self.ask( "Do you want to check and update your genesis.json file? (yes/no)", default="yes") if answer.lower().startswith("y"): # Ask which network they're running on (similar to fresh install) network_answer = int(self.ask( "Which network are you running on?\n" f"1. Mainnet ({MAINNET_CHAIN_ID})\n" f"2. Testnet ({TESTNET_CHAIN_ID})", default=1)) if network_answer == 1: self.chain = "mainnet" elif network_answer == 2: self.chain = "testnet" else: logging.error("Invalid network selected. Please choose either 1 or 2.\n") self.ask_for_genesis_check() logging.debug(f"Network set to {self.chain} for genesis.json check") elif answer.lower().startswith("n"): # Keep chain empty to skip genesis.json download self.chain = "" logging.debug("Skipping genesis.json check") else: logging.error("Please choose either 'yes' or 'no'\n") self.ask_for_genesis_check() except Exception as e: logging.exception(f"Failed to set genesis.json check preference. Reason: {e}") if __name__ == '__main__': # Order of questions to ask the user if installing: # 1. Version of cheqd-noded to install # 2. Home directory for cheqd user # 3. Install new version of cheqd-noded # 4. Chain ID to join # 5. Install Cosmovisor if not installed, or bump Cosmovisor version # 6. (if applicable) Cosmovisor settings # 7. Node configuration settings # 8. Download snapshot to bootsrap node def install_steps(): try: interviewer.ask_for_version() interviewer.ask_for_home_directory() interviewer.ask_for_chain() if interviewer.is_cosmovisor_installed is False: interviewer.ask_for_cosmovisor() else: interviewer.ask_for_cosmovisor_bump() if interviewer.is_cosmovisor_needed is True: interviewer.ask_for_daemon_allow_download_binaries() interviewer.ask_for_daemon_restart_after_upgrade() interviewer.ask_for_cosmovisor_config_rewrite() interviewer.ask_for_config() if interviewer.is_configuration_needed is True: interviewer.ask_for_moniker() interviewer.ask_for_external_address() interviewer.ask_for_p2p_port() interviewer.ask_for_rpc_port() interviewer.ask_for_persistent_peers() interviewer.ask_for_gas_price() interviewer.ask_for_log_level() interviewer.ask_for_log_format() # Prefer state sync by default; if declined, offer snapshot option interviewer.ask_for_statesync() if interviewer.use_statesync is False: logging.info("You chose not to use state sync. Snapshot restore is slower and requires substantially more disk space.") interviewer.ask_for_init_from_snapshot() except Exception as e: logging.exception(f"Unable to complete user interview process for installation. Reason for exiting: {e}") # Order of questions to ask the user if installing: # 1. Version of cheqd-noded to install # 2. Home directory for cheqd user # 3. Install Cosmovisor if not installed, or bump Cosmovisor version # 4. (if applicable) Cosmovisor settings # 6. Rewrite node systemd config # 7. Rewrite journal config def upgrade_steps(): try: interviewer.ask_for_version() interviewer.ask_for_home_directory() if interviewer.is_cosmovisor_installed is False: interviewer.ask_for_cosmovisor() else: interviewer.ask_for_cosmovisor_bump() if interviewer.is_cosmovisor_needed is True: interviewer.ask_for_daemon_allow_download_binaries() interviewer.ask_for_daemon_restart_after_upgrade() interviewer.ask_for_cosmovisor_config_rewrite() if interviewer.is_systemd_config_installed(DEFAULT_COSMOVISOR_SERVICE_FILE_PATH) is True or interviewer.is_systemd_config_installed(DEFAULT_STANDALONE_SERVICE_FILE_PATH) is True: interviewer.ask_for_rewrite_node_systemd() if interviewer.is_systemd_config_installed(DEFAULT_JOURNAL_CONFIG_FILE) is True: interviewer.ask_for_rewrite_journal() interviewer.ask_for_genesis_check() except Exception as e: logging.exception(f"Unable to complete user interview process for upgrade. Reason for exiting: {e}") ### This section is where the Interviewer class is invoked ### try: interviewer = Interviewer() # Check if cheqd-noded is already installed installed = interviewer.is_node_installed() # Check if Cosmovisor is already installed cosmovisor_installed = interviewer.check_cosmovisor_installed() # If no cheqd-noded binary is found, install from scratch if installed is False: install_steps() else: # If cheqd-noded binary is found, ask user if they want to upgrade or install from scratch interviewer.ask_for_upgrade() # If user wants to upgrade, execute upgrade steps if interviewer.is_upgrade is True: upgrade_steps() else: # If user declines upgrade, ask if they want to install from scratch interviewer.ask_for_confirm_upgrade() if interviewer.is_from_scratch is True: install_steps() else: logging.error("Aborting installation to prevent overwriting existing node installation. Exiting...") sys.exit(1) except Exception as e: logging.exception(f"Unable to complete user interview process. Reason for exiting: {e}") raise ### This section where the Installer class is invoked ### try: installer = Installer(interviewer) if installer.install(): logging.info(f"Installation of cheqd-noded {installer.version} completed successfully!\n") logging.info("Please review the configuration files manually and use systemctl to start the node.\n") logging.info("Documentation: https://docs.cheqd.io/node\n") sys.exit(0) else: logging.error(f"Installation of cheqd-noded {installer.version} failed. Exiting...") logging.info("Documentation: https://docs.cheqd.io/node\n") sys.exit(1) except Exception as e: logging.exception(f"Unable to execute installation process. Reason for exiting: {e}") sys.exit(1)