# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 # # Project: # glideinWMS # # File Version: # # Description: # This module implements the functions needed to advertize # and get resources from the Collector import calendar import copy import os import time from glideinwms.lib import symCrypto # pubCrypto was removed because unused from glideinwms.lib import ( classadSupport, condorExe, condorManager, condorMonitor, defaults, glideinWMSVersion, logSupport, token_util, x509Support, ) from glideinwms.lib.util import hash_nc ############################################################ # # Configuration # ############################################################ class FrontendConfig: def __init__(self): # set default values # user should modify if needed # The name of the attribute that identifies the glidein self.factory_id = "glidefactory" self.factory_global = "glidefactoryglobal" self.client_id = "glideclient" self.client_global = "glideclientglobal" self.factoryclient_id = "glidefactoryclient" # Default the glideinWMS version string self.glideinwms_version = "glideinWMS UNKNOWN" try: self.glideinwms_version = glideinWMSVersion.GlideinWMSDistro("checksum.frontend").version() except: logSupport.log.exception("Exception occurred while trying to retrieve the glideinwms version: ") # String to prefix for the attributes self.glidein_attr_prefix = "" # String to prefix for the parameters self.glidein_param_prefix = "GlideinParam" self.encrypted_param_prefix = "GlideinEncParam" # String to prefix for the monitors self.glidein_monitor_prefix = "GlideinMonitor" # String to prefix for the configured limits self.glidein_config_prefix = "GlideinConfig" # String to prefix for the performance metrics self.glidein_perfmetric_prefix = "GlideinPerfMetric" # String to prefix for the requests self.client_req_prefix = "Req" # The name of the signtype self.factory_signtype_id = "SupportedSignTypes" # Should we use TCP for condor_advertise? self.advertise_use_tcp = False # Should we use the new -multiple for condor_advertise? self.advertise_use_multi = False self.condor_reserved_names = ( "MyType", "TargetType", "GlideinMyType", "MyAddress", "UpdatesHistory", "UpdatesTotal", "UpdatesLost", "UpdatesSequenced", "UpdateSequenceNumber", "DaemonStartTime", ) # global configuration of the module frontendConfig = FrontendConfig() ##################################################### # Exception thrown when multiple executions are used # Helps handle partial failures class MultiExeError(condorExe.ExeError): def __init__(self, arr): """ arr is a list of ExeError exceptions """ self.arr = arr # First approximation of implementation, can be improved str_arr = [] for e in arr: str_arr.append("%s" % e) str = "\\n".join(str_arr) condorExe.ExeError.__init__(self, str) ############################################################ # # Global Variables # ############################################################ # Advertize counter for glideclient advertizeGCCounter = {} # Advertize counter for glideclientglobal advertizeGCGounter = {} # Advertize counter for glideresource advertizeGRCounter = {} # Advertize counter for glidefrontendmonitor advertizeGFMCounter = {} ############################################################ # # User functions # ############################################################ def findGlobals(pool_name, auth_identity, classad_type, additional_constraint=None): """ Query the given pool to find the globals classad. Can be used to query glidefactoryglobal and glidefrontendglobal classads. """ status_constraint = '(GlideinMyType=?="%s")' % classad_type # identity checking can be disabled, if really wanted if not ((auth_identity is None) or (auth_identity == "*")): # filter based on AuthenticatedIdentity status_constraint += ' && (AuthenticatedIdentity=?="%s")' % auth_identity if additional_constraint is not None: status_constraint = f"{status_constraint} && ({additional_constraint})" status = condorMonitor.CondorStatus("any", pool_name=pool_name) # important, especially for proxy passing status.require_integrity(True) status.load(status_constraint) data = status.fetchStored() return format_condor_dict(data) def findMasterFrontendClassads(pool_name, frontend_name): """ Query the given pool to find master frontend classads """ status_constraint = '(GlideinMyType=?="{}")||(GlideinMyType=?="{}")'.format("glideclientglobal", "glideclient") frontend_constraint = '(FrontendName=?="%s")&&(FrontendHAMode=!="slave")' % frontend_name status = condorMonitor.CondorStatus("any", pool_name=pool_name) # important, especially for proxy passing status.require_integrity(True) status.load(f"({status_constraint})&&({frontend_constraint})") data = status.fetchStored() return format_condor_dict(data) # can throw condorMonitor.QueryError def findGlideins(factory_pool, factory_identity, signtype, additional_constraint=None): global frontendConfig status_constraint = '(GlideinMyType=?="%s")' % frontendConfig.factory_id # identity checking can be disabled, if really wanted if not ((factory_identity is None) or (factory_identity == "*")): # filter based on AuthenticatedIdentity status_constraint += ' && (AuthenticatedIdentity=?="%s")' % factory_identity if signtype is not None: status_constraint += f' && stringListMember("{signtype}",{frontendConfig.factory_signtype_id})' # Note that Require and Allow x509_Proxy has been replaced by credential type and trust domain if additional_constraint is not None: status_constraint += " && (%s)" % additional_constraint status = condorMonitor.CondorStatus("any", pool_name=factory_pool) status.require_integrity(True) # important, especially for proxy passing status.load(status_constraint) data = status.fetchStored() return format_condor_dict(data) def findGlideinClientMonitoring(factory_pool, factory_identity, my_name, additional_constraint=None): global frontendConfig status_constraint = '(GlideinMyType=?="%s")' % frontendConfig.factoryclient_id # identity checking can be disabled, if really wanted if not ((factory_identity is None) or (factory_identity == "*")): # filter based on AuthenticatedIdentity status_constraint += ' && (AuthenticatedIdentity=?="%s")' % factory_identity if my_name is not None: status_constraint += ' && (ReqClientName=?="%s")' % my_name if additional_constraint is not None: status_constraint += " && (%s)" % additional_constraint status = condorMonitor.CondorStatus("any", pool_name=factory_pool) status.load(status_constraint) data = status.fetchStored() return format_condor_dict(data) def format_condor_dict(data): """ Formats the data from the condor call. """ reserved_names = frontendConfig.condor_reserved_names for k in reserved_names: if k in data: del data[k] out = {} for k in list(data.keys()): kel = data[k].copy() el = {"params": {}, "monitor": {}} # first remove reserved names for attr in reserved_names: if attr in kel: del kel[attr] # then move the parameters and monitoring for (prefix, eldata) in ( (frontendConfig.glidein_param_prefix, el["params"]), (frontendConfig.glidein_monitor_prefix, el["monitor"]), ): plen = len(prefix) for attr in list(kel.keys()): if attr[:plen] == prefix: eldata[attr[plen:]] = kel[attr] del kel[attr] # what is left are glidein attributes el["attrs"] = kel out[k] = el return out ############################################# # TODO: PM # At some point we should change this class to watch for credential file # updates and cache the contents/info between updates. This should further # reduce calls to openssl and maintain consistency of credential info # between cycles. If the file does not change the info in it remains same. # This also means that the credential objects should be created much before # and not for every iteration. class Credential: def __init__(self, proxy_id, proxy_fname, elementDescript): self.req_idle = 0 self.req_max_run = 0 self.advertize = False # TODO: refcredential - all these attributes names should not start w/ proxy and the dict names should # be CredentialSomething, not ProxySomething proxy_security_classes = elementDescript.merged_data["ProxySecurityClasses"] proxy_trust_domains = elementDescript.merged_data["ProxyTrustDomains"] credential_generators = elementDescript.merged_data["CredentialGenerators"] proxy_types = elementDescript.merged_data["ProxyTypes"] proxy_keyfiles = elementDescript.merged_data["ProxyKeyFiles"] proxy_pilotfiles = elementDescript.merged_data["ProxyPilotFiles"] proxy_vm_ids = elementDescript.merged_data["ProxyVMIds"] proxy_vm_types = elementDescript.merged_data["ProxyVMTypes"] proxy_creation_scripts = elementDescript.merged_data["ProxyCreationScripts"] proxy_update_frequency = elementDescript.merged_data["ProxyUpdateFrequency"] proxy_vmid_fname = elementDescript.merged_data["ProxyVMIdFname"] proxy_vmtype_fname = elementDescript.merged_data["ProxyVMTypeFname"] proxy_remote_username = elementDescript.merged_data["ProxyRemoteUsernames"] proxy_project_id = elementDescript.merged_data["ProxyProjectIds"] self.proxy_id = proxy_id # self.filename (absfname) always contains component of credential # used to submit glidein and based on the type contains following: # grid_proxy: x509 proxy (also used by pilot to talk to User collector # key_pair: public/access key # cert_pair: public cert # auth_file: auth file used self.filename = proxy_fname self.type = proxy_types.get(proxy_fname, "Unknown") self.security_class = proxy_security_classes.get(proxy_fname, proxy_id) self.trust_domain = proxy_trust_domains.get(proxy_fname, "None") self.update_frequency = int(proxy_update_frequency.get(proxy_fname, -1)) # Following items can be None self.generator = credential_generators.get(proxy_fname) self.vm_id_fname = proxy_vmid_fname.get(proxy_fname) self.vm_type_fname = proxy_vmtype_fname.get(proxy_fname) self.vm_id = proxy_vm_ids.get(proxy_fname) self.vm_type = proxy_vm_types.get(proxy_fname) self.creation_script = proxy_creation_scripts.get(proxy_fname) self.key_fname = proxy_keyfiles.get(proxy_fname) self.pilot_fname = proxy_pilotfiles.get(proxy_fname) self.remote_username = proxy_remote_username.get(proxy_fname) self.project_id = proxy_project_id.get(proxy_fname) # Will be initialized when getId() is called self._id = None def getId(self, recreate=False): """ Generate the Credential id if we do not have one already Since the Id is dependent on the credential content for proxies recreate them if asked to do so """ if (not self._id) or recreate: # Create the credential id self.create() self._id = self.file_id(self.getIdFilename()) return self._id def getIdFilename(self): """ Get credential file (name, aka string) used to generate the credential id """ # This checks seem hacky. Ideally checking against the credetnial type # to get the filename is right thing to do cred_file = None if self.filename: cred_file = self.filename elif self.key_fname: cred_file = self.key_fname elif self.pilot_fname: cred_file = self.pilot_fname elif self.generator: cred_file = self.generator return cred_file def create(self): """ Generate the credential """ if self.creation_script: logSupport.log.debug("Creating credential using %s" % (self.creation_script)) try: condorExe.iexe_cmd(self.creation_script) except: logSupport.log.exception("Creating credential using %s failed" % (self.creation_script)) self.advertize = False # Recreating the credential can result in ID change self._id = self.file_id(self.getIdFilename()) def createIfNotExist(self): """ Generate the credential if it does not exists. """ if self.filename and (not os.path.exists(self.filename)): logSupport.log.debug("Credential %s does not exist." % (self.filename)) self.create() def getString(self, cred_file=None): """ Based on the type of credentials read appropriate files and return the credentials to advertise as a string. The output should be encrypted by the caller as required. """ cred_data = "" if not cred_file: # If not file specified, assume the file used to generate Id cred_file = self.getIdFilename() try: with open(cred_file) as data_fd: cred_data = data_fd.read() except: # This credential should not be advertised self.advertize = False logSupport.log.exception("Failed to read credential %s: " % cred_file) return cred_data # PM: Why are the usage details part of Credential Class? # This is overloading the purpose of Credential Class def add_usage_details(self, req_idle=0, req_max_run=0): self.req_idle = req_idle self.req_max_run = req_max_run def get_usage_details(self): return (self.req_idle, self.req_max_run) def file_id(self, filename, ignoredn=False): if ("grid_proxy" in self.type) and not ignoredn: dn = x509Support.extract_DN(filename) hash_str = filename + dn else: hash_str = filename logSupport.log.debug(f"Using hash_str={hash_str} ({hash_nc(hash_str, 8)})") return hash_nc(hash_str, 8) def time_left(self): """ Returns the time left if a grid proxy If missing, returns 0 If not a grid proxy or other unidentified error, return -1 """ if not os.path.exists(self.filename): return 0 if ("grid_proxy" in self.type) or ("cert_pair" in self.type): time_list = condorExe.iexe_cmd("openssl x509 -in %s -noout -enddate" % self.filename) if "notAfter=" in time_list[0]: time_str = time_list[0].split("=")[1].strip() timeleft = calendar.timegm(time.strptime(time_str, "%b %d %H:%M:%S %Y %Z")) - int(time.time()) return timeleft else: return -1 def renew(self): """ Renews credential if time_left() 0: glidein_params_to_encrypt["NumberOfCredentials"] = "%s" % nr_credentials request_name = "Global" if factory_pool in self.global_params: request_name, security_name = self.global_params[factory_pool] glidein_params_to_encrypt["SecurityName"] = security_name classad_name = f"{request_name}@{self.descript_obj.my_name}" fd.write('MyType = "%s"\n' % frontendConfig.client_global) fd.write('GlideinMyType = "%s"\n' % frontendConfig.client_global) fd.write('GlideinWMSVersion = "%s"\n' % frontendConfig.glideinwms_version) fd.write('Name = "%s"\n' % classad_name) fd.write('FrontendName = "%s"\n' % self.descript_obj.frontend_name) fd.write('FrontendHAMode = "%s"\n' % self.ha_mode) fd.write('GroupName = "%s"\n' % self.descript_obj.group_name) fd.write('ClientName = "%s"\n' % self.descript_obj.my_name) for i in range(nr_credentials): cred_el = self.x509_proxies_data[i] if cred_el.advertize == False: continue # we already determined it cannot be used for ld_el in cred_el.loaded_data: ld_fname, ld_data = ld_el glidein_params_to_encrypt[cred_el.file_id(ld_fname)] = ld_data if hasattr(cred_el, "security_class"): # Convert the sec class to a string so the Factory can interpret the value correctly glidein_params_to_encrypt["SecurityClass" + cred_el.file_id(ld_fname)] = str( cred_el.security_class ) key_obj = None if factory_pool in self.global_key: key_obj = self.global_key[factory_pool] if key_obj is not None: fd.write("\n".join(key_obj.get_key_attrs()) + "\n") for attr in list(glidein_params_to_encrypt.keys()): el = key_obj.encrypt_hex(glidein_params_to_encrypt[attr]).decode(defaults.BINARY_ENCODING_CRYPTO) escaped_el = el.replace('"', '\\"').replace("\n", "\\n") fd.write(f'{frontendConfig.encrypted_param_prefix}{attr} = "{escaped_el}"\n') # Update Sequence number information if classad_name in advertizeGCGounter: advertizeGCGounter[classad_name] += 1 else: advertizeGCGounter[classad_name] = 0 fd.write("UpdateSequenceNumber = %s\n" % advertizeGCGounter[classad_name]) # add a final empty line... useful when appending fd.write("\n") return [tmpname] def do_advertize(self, file_id_cache=None, adname=None, create_files_only=False, reset_unique_id=True): """ Do the advertizing of the requests Returns a dictionary of files that still need to be advertised. The key is the factory pool, while the element is a list of file names Expects that the credentials have already been loaded. """ if file_id_cache is None: file_id_cache = CredentialCache() unpublished_files = {} if reset_unique_id: self.unique_id = 1 for factory_pool in list(self.factory_queue.keys()): self.unique_id += 1 # make sure ads for different factories don't end in the same file unpublished_files[factory_pool] = self.do_advertize_one( factory_pool, file_id_cache, adname, create_files_only, False ) return unpublished_files def do_advertize_one( self, factory_pool, file_id_cache=None, adname=None, create_files_only=False, reset_unique_id=True ): """ Do the advertizing of requests for one factory Returns the list of files that still need to be advertised. Expects that the credentials have already been loaded. """ # the different indentation is due to code refactoring # this way the diff was minimized if not (factory_pool in list(self.factory_queue.keys())): # nothing to be done, prevent failure return [] if file_id_cache is None: file_id_cache = CredentialCache() if reset_unique_id: self.unique_id = 1 if adname is None: self.adname = classadSupport.generate_classad_filename(prefix="gfi_ad_gc") else: self.adname = adname # this should be done in parallel, but keep it serial for now filename_arr = [] if frontendConfig.advertise_use_multi == True: filename_arr.append(self.adname) for el in self.factory_queue[factory_pool]: params_obj, key_obj = el try: filename_arr_el = self.createAdvertizeWorkFile( factory_pool, params_obj, key_obj, file_id_cache=file_id_cache ) for f in filename_arr_el: if f not in filename_arr: filename_arr.append(f) except NoCredentialException: filename_arr = [] # don't try to advertise logSupport.log.warning( "No security credentials match for factory pool %s, not advertising request;" " if this is not intentional, check for typos frontend's credential " "trust_domain and type, vs factory's pool trust_domain and auth_method" % factory_pool ) except condorExe.ExeError: filename_arr = [] # don't try to advertise logSupport.log.exception( "Error creating request files for factory pool %s, unable to advertise: " % factory_pool ) logSupport.log.error( "Error creating request files for factory pool %s, unable to advertise" % factory_pool ) del self.factory_queue[factory_pool] # clean queue for this factory if create_files_only: return filename_arr # Else, advertize all the files (if multi, should only be one) for filename in filename_arr: try: advertizeWorkFromFile( factory_pool, filename, remove_file=True, is_multi=frontendConfig.advertise_use_multi ) except condorExe.ExeError: logSupport.log.exception("Advertising request failed for factory pool %s: " % factory_pool) return [] # No files left to be advertized def vm_attribute_from_file(self, filename, prefix): """ Expected syntax: VM_ID= or VM_TYPE= Note: This method does not check if the string that follows VM_ID is meaningful AMI or the string that follows VM_TYPE is one of AWS instance types. """ values = [] try: vmfile = open(filename) for line in vmfile.readlines(): sep_idx = line.find("=") if sep_idx > 0: key = (line[:sep_idx]).strip() if key.upper() == prefix.upper(): value = (line[sep_idx + 1 :]).strip() if value != "": values.append(value) except: logSupport.log.exception("Failed to read the file %s" % (filename)) raise NoCredentialException if len(values) > 1: logSupport.log.error(f"Found multiple lines that contain {prefix} in {filename}") raise NoCredentialException elif len(values) == 0: logSupport.log.error(f"File {filename} does not contain {prefix}") raise NoCredentialException logSupport.log.debug(f"Found {prefix} = {values[0]} from file {filename}") return values[0] def createAdvertizeWorkFile(self, factory_pool, params_obj, key_obj=None, file_id_cache=None): """ Create the advertize file Expects the object variables adname, unique_id and x509_proxies_data to be set. """ global frontendConfig global advertizeGCCounter descript_obj = self.descript_obj logSupport.log.debug("In create Advertize work") factory_trust, factory_auth = self.factory_constraint[params_obj.request_name] total_nr_credentials = len(self.x509_proxies_data) cred_filename_arr = [] if total_nr_credentials == 0: raise NoCredentialException # get_credentials will augment the needed credentials with the requests # A little weird, but that's how it works right now # The credential objects are also persistent, so this will be a subset of self.x509_proxies_data credentials_with_requests = descript_obj.x509_proxies_plugin.get_credentials( params_obj=params_obj, credential_type=factory_auth, trust_domain=factory_trust ) nr_credentials = len(credentials_with_requests) if nr_credentials == 0: raise NoCredentialException if file_id_cache is None: # create a local cache, if no global provided file_id_cache = CredentialCache() for i in range(nr_credentials): fd = None glidein_monitors_this_cred = {} try: encrypted_params = {} # none by default glidein_params_to_encrypt = params_obj.glidein_params_to_encrypt if glidein_params_to_encrypt is None: glidein_params_to_encrypt = {} else: glidein_params_to_encrypt = copy.deepcopy(glidein_params_to_encrypt) classad_name = f"{params_obj.request_name}@{descript_obj.my_name}" req_idle = 0 req_max_run = 0 # credential_el (Credebtial()) credential_el = credentials_with_requests[i] logSupport.log.debug(f"Checking Credential file {credential_el.filename} ...") if credential_el.advertize == False: # We already determined it cannot be used # if hasattr(credential_el,'filename'): # filestr=credential_el.filename # logSupport.log.warning("Credential file %s had some earlier problem in loading so not advertizing, skipping..."%(filestr)) continue if credential_el.supports_auth_method("scitoken"): try: # try first for credential generator token_expired = token_util.token_str_expired(credential_el.generated_data) except AttributeError: # then try file stored credential token_expired = token_util.token_file_expired(credential_el.filename) if token_expired: logSupport.log.warning( f"Credential file {credential_el.filename} has expired scitoken, skipping" ) continue glidein_params_to_encrypt["ScitokenId"] = file_id_cache.file_id( credential_el, credential_el.filename ) if params_obj.request_name in self.factory_constraint: if (factory_auth != "Any") and (not credential_el.supports_auth_method(factory_auth)): logSupport.log.warning( "Credential %s does not match auth method %s (for %s), skipping..." % (credential_el.type, factory_auth, params_obj.request_name) ) continue if (credential_el.trust_domain != factory_trust) and (factory_trust != "Any"): logSupport.log.warning( "Credential %s does not match %s (for %s) domain, skipping..." % (credential_el.trust_domain, factory_trust, params_obj.request_name) ) continue # Convert the sec class to a string so the Factory can interpret the value correctly glidein_params_to_encrypt["SecurityClass"] = str(credential_el.security_class) classad_name = credential_el.file_id(credential_el.filename, ignoredn=True) + "_" + classad_name if "username_password" in credential_el.type: glidein_params_to_encrypt["Username"] = file_id_cache.file_id(credential_el, credential_el.filename) glidein_params_to_encrypt["Password"] = file_id_cache.file_id( credential_el, credential_el.key_fname ) if "grid_proxy" in credential_el.type: glidein_params_to_encrypt["SubmitProxy"] = file_id_cache.file_id( credential_el, credential_el.filename ) if "cert_pair" in credential_el.type: glidein_params_to_encrypt["PublicCert"] = file_id_cache.file_id( credential_el, credential_el.filename ) glidein_params_to_encrypt["PrivateCert"] = file_id_cache.file_id( credential_el, credential_el.key_fname ) if "key_pair" in credential_el.type: glidein_params_to_encrypt["PublicKey"] = file_id_cache.file_id( credential_el, credential_el.filename ) glidein_params_to_encrypt["PrivateKey"] = file_id_cache.file_id( credential_el, credential_el.key_fname ) if "auth_file" in credential_el.type: glidein_params_to_encrypt["AuthFile"] = file_id_cache.file_id(credential_el, credential_el.filename) if "vm_id" in credential_el.type: if credential_el.vm_id_fname: glidein_params_to_encrypt["VMId"] = self.vm_attribute_from_file( credential_el.vm_id_fname, "VM_ID" ) else: glidein_params_to_encrypt["VMId"] = str(credential_el.vm_id) if "vm_type" in credential_el.type: if credential_el.vm_type_fname: glidein_params_to_encrypt["VMType"] = self.vm_attribute_from_file( credential_el.vm_type_fname, "VM_TYPE" ) else: glidein_params_to_encrypt["VMType"] = str(credential_el.vm_type) # removing this, was here by mistake? glidein_params_to_encrypt['VMType']=str(credential_el.vm_type) # Process additional information of the credential if credential_el.pilot_fname: glidein_params_to_encrypt["GlideinProxy"] = file_id_cache.file_id( credential_el, credential_el.pilot_fname ) if credential_el.remote_username: # MM: or "username" in credential_el.type glidein_params_to_encrypt["RemoteUsername"] = str(credential_el.remote_username) if credential_el.project_id: glidein_params_to_encrypt["ProjectId"] = str(credential_el.project_id) (req_idle, req_max_run) = credential_el.get_usage_details() logSupport.log.debug( "Advertizing credential %s with (%d idle, %d max run) for request %s" % (credential_el.filename, req_idle, req_max_run, params_obj.request_name) ) glidein_monitors_this_cred = params_obj.glidein_monitors_per_cred.get(credential_el.getId(), {}) if frontendConfig.advertise_use_multi is True: fname = self.adname cred_filename_arr.append(fname) else: fname = self.adname + "_" + str(self.unique_id) self.unique_id += 1 cred_filename_arr.append(fname) logSupport.log.debug(f"Writing {fname}") fd = open(fname, "a") fd.write('MyType = "%s"\n' % frontendConfig.client_id) fd.write('GlideinMyType = "%s"\n' % frontendConfig.client_id) fd.write('GlideinWMSVersion = "%s"\n' % frontendConfig.glideinwms_version) fd.write('Name = "%s"\n' % classad_name) fd.write("\n".join(descript_obj.get_id_attrs()) + "\n") fd.write('ReqName = "%s"\n' % params_obj.request_name) fd.write('ReqGlidein = "%s"\n' % params_obj.glidein_name) fd.write("\n".join(descript_obj.get_web_attrs()) + "\n") if params_obj.security_name is not None: glidein_params_to_encrypt["SecurityName"] = params_obj.security_name if key_obj is not None: fd.write("\n".join(key_obj.get_key_attrs()) + "\n") for attr in glidein_params_to_encrypt: encrypted_params[attr] = key_obj.encrypt_hex(glidein_params_to_encrypt[attr]).decode( defaults.BINARY_ENCODING_CRYPTO ) fd.write("ReqIdleGlideins = %i\n" % req_idle) fd.write("ReqMaxGlideins = %i\n" % req_max_run) fd.write('ReqRemoveExcess = "%s"\n' % params_obj.remove_excess_str) fd.write("ReqRemoveExcessMargin = %i\n" % params_obj.remove_excess_margin) fd.write('ReqIdleLifetime = "%s"\n' % params_obj.idle_lifetime) fd.write('WebMonitoringURL = "%s"\n' % descript_obj.monitoring_web_url) # write out both the params classad_info_tuples = ( (frontendConfig.glidein_param_prefix, params_obj.glidein_params), (frontendConfig.encrypted_param_prefix, encrypted_params), (frontendConfig.glidein_config_prefix, self.glidein_config_limits), ) for (prefix, data) in classad_info_tuples: for attr in list(data.keys()): writeTypedClassadAttrToFile(fd, f"{prefix}{attr}", data[attr]) for attr_name in params_obj.glidein_monitors: prefix = frontendConfig.glidein_monitor_prefix # attr_value = params_obj.glidein_monitors[attr_name] if (attr_name == "RunningHere") and glidein_monitors_this_cred: # This double check is for backward compatibility attr_value = glidein_monitors_this_cred.get("GlideinsRunning", 0) elif (attr_name == "Running") and glidein_monitors_this_cred: # This double check is for backward compatibility attr_value = glidein_monitors_this_cred.get("ScaledRunning", 0) else: attr_value = glidein_monitors_this_cred.get(attr_name, params_obj.glidein_monitors[attr_name]) writeTypedClassadAttrToFile(fd, f"{prefix}{attr_name}", attr_value) # Update Sequence number information if classad_name in advertizeGCCounter: advertizeGCCounter[classad_name] += 1 else: advertizeGCCounter[classad_name] = 0 fd.write("UpdateSequenceNumber = %s\n" % advertizeGCCounter[classad_name]) # add a final empty line... useful when appending fd.write("\n") fd.close() except: logSupport.log.exception("Exception writing advertisement file: ") # remove file in case of problems if fd is not None: fd.close() os.remove(fname) raise return cred_filename_arr def set_glidein_config_limits(self, limits_data): """ Set various limits and curbs configured in the frontend config into the glideresource classad """ self.glidein_config_limits = limits_data def writeTypedClassadAttrToFile(fd, attr_name, attr_value): """ Given the FD, type check the value and write the info the classad file """ if isinstance(attr_value, (int, int, float)): # don't quote numeric values fd.write(f"{attr_name} = {attr_value}\n") else: escaped_value = str(attr_value).replace('"', '\\"').replace("\n", "\\n") fd.write(f'{attr_name} = "{escaped_value}"\n') # Remove ClassAd from Collector def deadvertizeAllWork(factory_pool, my_name, ha_mode="master"): """ Removes all work requests for the client in the factory. """ global frontendConfig tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_gc") fd = open(tmpnam, "w") try: try: fd.write('MyType = "Query"\n') fd.write('TargetType = "%s"\n' % frontendConfig.client_id) fd.write( 'Requirements = (ClientName == "%s") && (GlideinMyType == "%s") && (FrontendHAMode == "%s")\n' % (my_name, frontendConfig.client_id, ha_mode) ) finally: fd.close() exe_condor_advertise(tmpnam, "INVALIDATE_MASTER_ADS", factory_pool) finally: os.remove(tmpnam) def deadvertizeAllGlobals(factory_pool, my_name, ha_mode="master"): """ Removes all globals classads for the client in the factory. """ global frontendConfig tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_gcg") fd = open(tmpnam, "w") try: try: fd.write('MyType = "Query"\n') fd.write('TargetType = "%s"\n' % frontendConfig.client_global) fd.write( 'Requirements = (ClientName == "%s") && (GlideinMyType == "%s") && (FrontendHAMode == "%s")\n' % (my_name, frontendConfig.client_global, ha_mode) ) finally: fd.close() exe_condor_advertise(tmpnam, "INVALIDATE_MASTER_ADS", factory_pool) finally: os.remove(tmpnam) ############################################################################### # Code to advertise glideresource classads to the User Pool ############################################################################### class ResourceClassad(classadSupport.Classad): """ This class describes the resource classad. Frontend advertises the resource classad to the user pool as an UPDATE_AD_GENERIC type classad """ def __init__(self, factory_ref, frontend_ref): """ Class Constructor @type factory_ref: string @param factory_ref: Name of the resource in the glidefactory classad @type frontend_ref: string @param type: Name of the resource in the glideclient classad """ global advertizeGRCounter classadSupport.Classad.__init__(self, "glideresource", "UPDATE_AD_GENERIC", "INVALIDATE_ADS_GENERIC") self.adParams["GlideinWMSVersion"] = frontendConfig.glideinwms_version self.adParams["GlideFactoryName"] = "%s" % factory_ref self.adParams["GlideClientName"] = "%s" % frontend_ref self.adParams["Name"] = f"{factory_ref}@{frontend_ref}" self.adParams["GLIDEIN_In_Downtime"] = "False" if self.adParams["Name"] in advertizeGRCounter: advertizeGRCounter[self.adParams["Name"]] += 1 else: advertizeGRCounter[self.adParams["Name"]] = 0 self.adParams["UpdateSequenceNumber"] = advertizeGRCounter[self.adParams["Name"]] def setFrontendDetails(self, frontend_name, group_name, ha_mode): """ Add the detailed description of the frontend. @type frontend_name: string @param frontend_name: A representation of the frontend MatchExpr @type group_name: string @param group_name: Representation of the job query_expr """ self.adParams["GlideFrontendName"] = "%s" % frontend_name self.adParams["GlideGroupName"] = "%s" % group_name self.adParams["GlideFrontendHAMode"] = "%s" % ha_mode def setMatchExprs(self, match_expr, job_query_expr, factory_query_expr, start_expr): """ Sets the matching expressions for the resource classad Thus, it would be possible to find out why a job is not matching. @type match_expr: string @param match_expr: A representation of the frontend MatchExpr @type job_query_expr: string @param job_query_expr: Representation of the job query_expr @type factory_query_expr: string @param factory_query_expr: Representation of the factory query_expr @type start_expr: string @param start_expr: Representation of the match start expr (on the glidein) """ self.adParams["GlideClientMatchingGlideinCondorExpr"] = "%s" % match_expr self.adParams["GlideClientConstraintJobCondorExpr"] = "%s" % job_query_expr self.adParams["GlideClientMatchingInternalPythonExpr"] = "%s" % factory_query_expr self.adParams["GlideClientConstraintFactoryCondorExpr"] = "%s" % start_expr def setInDownTime(self, downtime): """ Set the downtime flag for the resource in the classad @type downtime: bool @param downtime: True if the entry is in down time. """ self.adParams["GLIDEIN_In_Downtime"] = str(downtime) def setGlideClientMonitorInfo(self, monitorInfo): """ Set the GlideClientMonitor* for the resource in the classad @type monitorInfo: list @param monitorInfo: GlideClientMonitor information. """ if len(monitorInfo) == 17: self.adParams["GlideClientMonitorJobsIdle"] = monitorInfo[0] self.adParams["GlideClientMonitorJobsIdleMatching"] = monitorInfo[1] self.adParams["GlideClientMonitorJobsIdleEffective"] = monitorInfo[2] self.adParams["GlideClientMonitorJobsIdleOld"] = monitorInfo[3] self.adParams["GlideClientMonitorJobsIdleUnique"] = monitorInfo[4] self.adParams["GlideClientMonitorJobsRunning"] = monitorInfo[5] self.adParams["GlideClientMonitorJobsRunningHere"] = monitorInfo[6] self.adParams["GlideClientMonitorJobsRunningMax"] = monitorInfo[7] self.adParams["GlideClientMonitorGlideinsTotal"] = monitorInfo[8] self.adParams["GlideClientMonitorGlideinsIdle"] = monitorInfo[9] self.adParams["GlideClientMonitorGlideinsRunning"] = monitorInfo[10] self.adParams["GlideClientMonitorGlideinsFailed"] = monitorInfo[11] self.adParams["GlideClientMonitorGlideinsTotalCores"] = monitorInfo[12] self.adParams["GlideClientMonitorGlideinsIdleCores"] = monitorInfo[13] self.adParams["GlideClientMonitorGlideinsRunningCores"] = monitorInfo[14] self.adParams["GlideClientMonitorGlideinsRequestIdle"] = monitorInfo[15] self.adParams["GlideClientMonitorGlideinsRequestMaxRun"] = monitorInfo[16] else: raise RuntimeError( "Glide client monitoring structure changed. Resource ad may have incorrect GlideClientMonitor values" ) def setEntryInfo(self, info): """ Set the useful entry specific info for the resource in the classad @type info: dict @param info: Useful info from the glidefactory classad """ eliminate_attrs = { "CurrentTime", "PubKeyValue", "PubKeyType", "AuthenticatedIdentity", "GlideinName", "FactoryName", "EntryName", "GlideinWMSVersion", "PubKeyObj", "LastHeardFrom", "PubKeyID", "SupportedSignTypes", "GLIDEIN_In_Downtime", } available_attrs = set(info.keys()) publish_attrs = available_attrs - eliminate_attrs for attr in publish_attrs: ad_key = attr if attr.startswith(frontendConfig.glidein_config_prefix): # Condvert GlideinConfig -> GlideFactoryConfig ad_key = attr.replace(frontendConfig.glidein_config_prefix, "GlideFactoryConfig", 1) self.adParams[ad_key] = info[attr] def setEntryMonitorInfo(self, info): """ Set the useful entry specific monitoring info for the resource in the classad Monitoring info from the glidefactory classad (e.g. CompletedJobs ) @type info: dict @param info: Useful monitoring info from the glidefactory classad """ # Monitoring Prefixes are considering format_condor_dict that strips "GlideinMonitor" for k in info: if k.startswith("CompletedJobs"): self.adParams["GlideFactoryMonitor" + k] = info[k] def setGlideFactoryMonitorInfo(self, info): """ Set the GlideinFactoryMonitor* for the resource in the classad @type info: dict @param info: Useful information from the glidefactoryclient classad """ # Required keys do not start with TotalClientMonitor but only # start with Total or Status or Requested. Append GlideFactoryMonitor # to these keys and put them in the classad for key in info: ad_key = key if not key.startswith("TotalClientMonitor"): if key.startswith("Total") or key.startswith("Status") or key.startswith("Requested"): ad_key = "GlideFactoryMonitor" + key self.adParams[ad_key] = info[key] def setGlideClientConfigLimits(self, info): """ Set the GlideClientConfig* for the resource in the classad @type info: dict @param info: Useful config information """ for key in info: self.adParams["GlideClientConfig%s" % key] = info[key] def setCurbsAndLimits(self, limits_triggered): """ Set descriptive messages about which limits and curbs have been triggered in deciding number of glideins to request @type limits_triggered: dictionary @param limits_triggered: limits and curbs that have been triggered """ for k, v in limits_triggered.items(): if k.startswith("Curb"): classadmessage = "GlideClientCurb" + k else: classadmessage = "GlideClientLimit" + k self.adParams[classadmessage] = v class ResourceClassadAdvertiser(classadSupport.ClassadAdvertiser): """ Class to handle the advertisement of resource classads to the user pool """ def __init__(self, pool=None, multi_support=False): """ Constructor @type pool: string @param pool: Collector address @type multi_support: bool @param multi_support: True if the installation support advertising multiple classads with one condor_advertise command. Defaults to False. """ classadSupport.ClassadAdvertiser.__init__( self, pool=pool, multi_support=multi_support, tcp_support=frontendConfig.advertise_use_tcp ) self.adType = "glideresource" self.adAdvertiseCmd = "UPDATE_AD_GENERIC" self.adInvalidateCmd = "INVALIDATE_ADS_GENERIC" self.advertiseFilePrefix = "gfi_ar" class FrontendMonitorClassad(classadSupport.Classad): """ This class describes the frontend monitor classad. Frontend advertises the monitor classad to the user pool as an UPDATE_AD_GENERIC type classad """ def __init__(self, frontend_ref): """ Class Constructor @type frontend_ref: string @param type: Name of the resource in the glideclient classad """ global advertizeGFMCounter classadSupport.Classad.__init__(self, "glidefrontendmonitor", "UPDATE_AD_GENERIC", "INVALIDATE_ADS_GENERIC") self.adParams["GlideinWMSVersion"] = frontendConfig.glideinwms_version self.adParams["Name"] = "%s" % (frontend_ref) # self.adParams['GlideFrontend_In_Downtime'] = 'False' if self.adParams["Name"] in advertizeGFMCounter: advertizeGFMCounter[self.adParams["Name"]] += 1 else: advertizeGFMCounter[self.adParams["Name"]] = 0 self.adParams["UpdateSequenceNumber"] = advertizeGFMCounter[self.adParams["Name"]] def setFrontendDetails(self, frontend_name, groups, ha_mode): """ Add the detailed description of the frontend. @type frontend_name: string @param frontend_name: A representation of the frontend MatchExpr @type group_name: string @param group_name: Representation of the job query_expr """ self.adParams["GlideFrontendName"] = "%s" % frontend_name self.adParams["GlideFrontendGroups"] = "%s" % groups self.adParams["GlideFrontendHAMode"] = "%s" % ha_mode def setIdleJobCount(self, idle_jobs): """ Set the idle jobs info in the classad @type idle_jobs: dict @param idle_jobs: Dictionary of idle jobs keyed on idle duration. For example - Total for all idle jobs, 3600 for jobs idle more than 1 Hour """ for key in idle_jobs: k = "%s" % key self.adParams["GlideFrontend_IdleJobs_%s" % k.title()] = idle_jobs[key] def setPerfMetrics(self, perf_metrics): """ Set the performance metrics info for frontend or group in the classad @type perf_metrics: servicePerformance.PerfMetric @param perf_metrics: PerfMetric object for frontend or group """ for event in perf_metrics.metric: attr_name = f"{frontendConfig.glidein_perfmetric_prefix}_{perf_metrics.name}_{event}" self.adParams[attr_name] = perf_metrics.event_lifetime(event) class FrontendMonitorClassadAdvertiser(classadSupport.ClassadAdvertiser): """ Class to handle the advertisement of frontend monitor classads to the user pool """ def __init__(self, pool=None, multi_support=False): """ Constructor @type pool: string @param pool: Collector address @type multi_support: bool @param multi_support: True if the installation support advertising multiple classads with one condor_advertise command. Defaults to False. """ classadSupport.ClassadAdvertiser.__init__( self, pool=pool, multi_support=multi_support, tcp_support=frontendConfig.advertise_use_tcp ) self.adType = "glidefrontendmonitor" self.adAdvertiseCmd = "UPDATE_AD_GENERIC" self.adInvalidateCmd = "INVALIDATE_ADS_GENERIC" self.advertiseFilePrefix = "gfi_afm" ############################################################ # # I N T E R N A L - Do not use # ############################################################ def exe_condor_advertise(fname, command, pool, is_multi=False): logSupport.log.debug(f"CONDOR ADVERTISE {fname} {command} {pool} {is_multi}") return condorManager.condorAdvertise(fname, command, frontendConfig.advertise_use_tcp, is_multi, pool) class NoCredentialException(Exception): pass