#!/usr/bin/env python # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # # Container TRF. It shadows runGen.py (T. Maeno) when the user # runs prun with --containerImage the pilot will run this code instead # of runGen.py. There are several runGen options that are not used here # but are automatically added by the system. The script will just log # them without failures. # # Authors: # - Alessandra Forti, alessandra.forti@cern.ch, 2018 ####################################################################### import os import sys import re import ssl import time import glob import argparse import logging try: import urllib.request as urllib except ImportError: import urllib try: from urllib.request import urlopen from urllib.error import HTTPError except ImportError: from urllib2 import urlopen, HTTPError import ast import subprocess import shlex import shutil import tarfile import xml.dom.minidom import json VERSION = '1.1.15' def main(): """ Main function of run_container """ logging.info("runcontainer version: " + VERSION) logging.info("Start time: " + time.ctime()) get_user_sandbox() run_container() rename_ouput() logging.info("End time: " + time.ctime()) def agis_container_options(): if 'PILOT_HOME' not in os.environ: os.environ['PILOT_HOME'] = os.environ['PWD'] queuedata_json = os.path.join(os.environ['PILOT_HOME'],'queuedata.json') data = {'container_type':'singularity', 'container_options':None} if os.access(queuedata_json, os.R_OK): with open(queuedata_json, "r") as from_file: tmp_data = json.load(from_file) data = {'container_type':tmp_data['container_type'].split(':')[0], 'container_options':tmp_data['container_options']} else: logging.info('Cannot read the panda queue information from {}. '+ 'Using default runtime singularity with no extra options'.format(queuedata_json)) return data def check_usernamespaces(): user_ns = 0 for file in glob.iglob('/etc/sysctl.d/*.conf'): if not os.access(file, os.R_OK): logging.warn('File {} not readable'.format(file)) continue for line in open(file, 'r'): if re.search('user.max_user_namespaces', line): ns_keyval = line.split('=') if 1 < len(ns_keyval): sys_user_ns = ns_keyval[1].strip() logging.info('File: {0}, Line: {1}, Conf: {2}'.format(file, line.strip(), sys_user_ns)) if os.path.exists('/proc/sys/user/max_user_namespaces'): f = open('/proc/sys/user/max_user_namespaces','r') user_ns = f.readline().strip() logging.info('/proc/sys/user/max_user_namespaces exists. User NS value: {}.'.format(user_ns)) # Docker containers have the value in /proc >0 by default but don't have necessarily # the right permission to actually mount /proc inside a singularity container # if user_ns: # conf_user_ns = -1 # for file in glob.iglob('/etc/sysctl.d/*.conf'): # for line in open(file, 'r'): # if re.search('user.max_user_namespaces', line): # conf_user_ns = line.split('=')[1].strip() # if user_ns != conf_user_ns: # user_ns = 0 # logging.info('User NS /proc and /etc/sysctl.d differ') # logging.info('User NS might be misconfigured or we might be in a docker image with insufficient privileges') return int(user_ns) def runtime_version(): # Check runtime exists and is executable # python 3.3 # def cmd_exists(x): # return shutil.which(x) is not None # This part of checking the user namespaces and the runtime # might need to be isolated in its own function def cmd_exists(x): return any( os.access(os.path.join(path, x), os.X_OK) for path in os.environ["PATH"].split(os.pathsep) ) def cmd_path(x): for path in os.environ["PATH"].split(os.pathsep): bin_path=os.path.join(path, x) if os.access(bin_path, os.X_OK): break return bin_path # I hate the hard coded path but we are going down that route for other things runtime_data = agis_container_options() runtime = runtime_data['container_type'] bin_path = '/cvmfs/atlas.cern.ch/repo/containers/sw/{0}/x86_64-el7/{1}/bin'.format(runtime, args.runtime_version) user_ns = check_usernamespaces() if user_ns > 0: logging.info('User namespaces is enabled. Checking if the runtime {} is in CVMFS'.format(runtime)) if os.path.exists('{0}/{1}'.format(bin_path, runtime)): os.environ['PATH'] = '{0}:{1}'.format(bin_path, os.environ['PATH']) logging.info('{} is available in CVMFS. Adding to the path.'.format(runtime)) else: logging.info('CVMFS not available'.format(runtime)) else: logging.info('User namespaces is NOT enabled.') if cmd_exists(runtime): logging.info('{} is in the PATH.'.format(runtime)) else: logging.error('{} is not in the PATH and user NS is not enabled'.format(runtime)) sys.exit(cmd_exists(runtime)) logging.info('Using: {}'.format(subprocess.check_output(['which', runtime]).decode('utf-8').rstrip())) runtime_data['version'] = subprocess.check_output([runtime, '--version']).decode('utf-8').rstrip() logging.info('Singularity version: {0}'.format(runtime_data['version'])) return runtime_data def singularity_sandbox(data={},source_image=''): # We need to sandbox because singularity has changed behaviour in 3.1.1 # and produces a SIFG image when it exec from a docker registry os.environ['SINGULARITY_TMPDIR'] = '{0}/{1}'.format(os.environ['PILOT_HOME'], re.sub('\W+', '_', source_image)) target_image = '{0}/{1}'.format(os.environ['SINGULARITY_TMPDIR'], 'image') if not os.path.exists(target_image): logging.info('Local image {} doesn\'t exist yet, building.'.format(target_image)) # Need to check the VERSION to add --fix-perms and disable the cache for # releases > 3.5.2 sv = data['version'].rstrip('\n') fix_perms = '' if int(''.join(re.findall(r'(\d+)[.]{0,1}', sv)[:3])) >= 352: fix_perms = '--fix-perms' os.environ['SINGULARITY_DISABLE_CACHE'] = 'true' logging.info('{}. Fixing permissions and disabling cache'.format(sv)) else: os.environ['SINGULARITY_CACHEDIR'] = '{0}/{1}'.format(os.environ['SINGULARITY_TMPDIR'], 'cache') logging.info('Setting cache dir to: {}'.format(os.environ['SINGULARITY_CACHEDIR'])) if not os.path.exists(os.environ['SINGULARITY_TMPDIR']): os.mkdir(os.environ['SINGULARITY_TMPDIR'], 0o755) debug = '--debug' if args.debug else '-s' sing_cmd = "singularity {0} build {1} --sandbox {2} {3}".format(debug, fix_perms, target_image, source_image) logging.info("Singularity command: %s", sing_cmd) execute(shlex.split(sing_cmd)) if 'SINGULARITY_CACHEDIR' in os.environ and os.path.exists(os.environ['SINGULARITY_CACHEDIR']): logging.info("Deleting {0}.".format(os.environ['SINGULARITY_CACHEDIR'])) shutil.rmtree(os.environ['SINGULARITY_CACHEDIR']) else: logging.info('Local image {} already exists. Trying it....'.format(target_image)) return target_image def singularity_user_proxy(): # The user proxy should be copied directly in the pilot directory which is # bound to the container. However we still are dealing with the pilot proxy # which is located elsewhere in that case copy locally to a fixed name and # point the container env var to that. It seems underlay cannot bind files. # user_proxy is the name of the copied proxy to avoid copying it several # times? This certainly works for the pilot proxy that doesn't change, for # the user proxy it's a question mark. # # This also needs to be reviewed if we start to use podman # if 'X509_USER_PROXY' not in os.environ: logging.error('X509_USER_PROXY required but not defined') sys.exit(1163) user_proxy_file = '{0}/user_proxy'.format(os.environ['PWD']) if os.environ['X509_USER_PROXY'] != user_proxy_file: shutil.copy2(os.environ['X509_USER_PROXY'], user_proxy_file) os.environ['SINGULARITYENV_X509_USER_PROXY'] = '{0}/user_proxy'.format(args.ctr_datadir) ca_cert_dir = '/etc/grid-security/certificates' if os.path.isdir(ca_cert_dir) and 'X509_CERT_DIR' not in os.environ: os.environ['X509_CERT_DIR'] = ca_cert_dir if 'X509_CERT_DIR' not in os.environ: logging.error('X509_CERT_DIR undefined') sys.exit(1163) logging.debug('X509_CERT_DIR={} X509_USER_PROXY={}'.format(os.environ['X509_CERT_DIR'],os.environ['X509_USER_PROXY'])) # Some sites mount /etc/grid-security/certificates in the configuration when I try to mount as well it fails # Changing location of X509_CERT_DIR inside the container. os.environ['SINGULARITYENV_X509_CERT_DIR'] = '/var{}'.format(ca_cert_dir) return '-B {}:{}'.format(os.environ['X509_CERT_DIR'], os.environ['SINGULARITYENV_X509_CERT_DIR']) def singularity_envvars(): if 'ATHENA_CORE_NUMBER' in os.environ: os.environ['SINGULARITYENV_ATHENA_CORE_NUMBER'] = os.environ['ATHENA_CORE_NUMBER'] if args.debug: logging.info("====== runcontainer host environment ======") for param in sorted(os.environ.keys()): logging.info('{}={}'.format(param,os.environ[param])) def singularity_container(data={}): logging.debug("Container type: {}, {}, Container options: {}" \ .format(data['container_type'], data['version'], data['container_options'])) # Do we have to sandbox? Only if the image is from a docker registry # not if it is from /cvmfs in that case we want to use the image exec_image = '' if re.search('docker://', args.ctr_image): exec_image = singularity_sandbox(data,args.ctr_image) else: exec_image = args.ctr_image # Set environment variables singularity_envvars() # Options for the command line string have default values or are mandatory # Base singularity command debug = '--debug' if args.debug else '' singularity_base = 'singularity {} exec -C'.format(debug) # Replace input place holders command = args.ctr_cmd files_map = input() for key in sorted(files_map.keys(), reverse=True, key=lambda x: len(x)): if key in command: command = command.replace('%' + key, files_map[key]) logging.debug("Command to run in the container %s", args.ctr_cmd) # Write the command into a script. # Makes it easier to handle whatever character # is passed to the script file_name = '_runcontainer.sh' open(file_name, 'w').write(command + '\n') os.chmod(file_name, 0o700) logging.info("User command: %s", command) io_dir = '-B {}:{}'.format( os.environ['PWD'],args.ctr_datadir) cmd = args.ctr_datadir + '/' + file_name # If Cvmfs add that to bind_paths cvmfs = '-B /cvmfs:/cvmfs' if args.ctr_cvmfs else '' container_options = data['container_options'] if data and data['container_options'] != None else '' # set the singularity user proxy x509_cert_bind = singularity_user_proxy() if not args.ctr_x509 else '' # Compose the command logging.debug('Using image: {}'.format(exec_image)) singularity_cmd = '{} --pwd {} {} {} {} {} {} {}' \ .format(singularity_base, args.ctr_workdir, io_dir, cvmfs, x509_cert_bind, container_options, exec_image, cmd) logging.info('Singularity command: {}'.format(singularity_cmd)) execute(shlex.split(singularity_cmd)) def podman_container(data={}): logging.debug("Container type: {}, {}, Container options: {}" \ .format(data['container_type'], data['version'], data['container_options'])) logging.info('Podman container to be developed') def execute(cmd=[]): ############################################################## # Run subprocess print stdout catch stderr in a generic way ch = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1) while ch.poll() == None: if ch.stdout: for line in iter(ch.stdout.readline,b''): logging.info(line.strip()) if ch.returncode == None and ch.stderr: for line in iter(ch.stderr.readline,b''): logging.warn(line.strip()) if ch.returncode != 0: logging.error('Container execution failed with errors. Error code: {}'.format(ch.returncode)) sys.exit(ch.returncode) # Correctly reports errors, warns and info but it buffers... # #def execute(cmd=[]): # ch = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1) # ch_stdout, ch_stderr = ch.communicate() # # for line in ch_stdout.decode(encoding='utf-8').split('\n'): # logging.info(line.strip()) # # if ch.returncode == 0 and len(ch_stderr) != 0: # for line in ch_stderr.decode(encoding='utf-8').split('\n'): # logging.warn(line.strip()) # elif ch.returncode != 0: # logging.error('Container execution failed with errors. Error code: {}'.format(ch.returncode)) # for line in ch_stderr.decode(encoding='utf-8').split('\n'): # logging.error(line.strip()) # sys.exit(ch.returncode) def run_container(): logging.info("Start container time: " + time.ctime()) pq_data = runtime_version() ctr_type = pq_data['container_type'] # to review when more than one container # or when I'll parse queue data if ctr_type == 'singularity': singularity_container(pq_data) elif ctr_type == 'podman': podman_container(pq_data) logging.info("End container time: " + time.ctime()) def read_poolfilecatalog(): directTmpTurl = {} try: logging.info('===== PFC from pilot =====') tmpPcFile = open('PoolFileCatalog.xml') logging.debug(tmpPcFile.read()) tmpPcFile.close() # parse XML root = xml.dom.minidom.parse('PoolFileCatalog.xml') files = root.getElementsByTagName('File') for file in files: # get PFN node physical = file.getElementsByTagName('physical')[0] pfnNode = physical.getElementsByTagName('pfn')[0] # convert UTF8 to Raw pfn = str(pfnNode.getAttribute('name')) lfn = pfn.split('/')[-1] lfn = re.sub('^([^:]+:)', '', lfn) lfn = re.sub('\?site.*$', '', lfn) lfn = re.sub('\?select.*$', '', lfn) lfn = re.sub('\?GoogleAccessId.*$', '', lfn) # append directTmpTurl[lfn] = pfn except: type, value, traceBack = sys.exc_info() logging.error('ERROR : Failed to collect GUIDs : %s - %s', type, value) return directTmpTurl def input(): # If the PFC contained only input files we could just extract the info # from it and check if the file exists either on disk or on the storage # Instead we have to match the input files arguments (input_map and # input_files to extract the correct files. # Dictionary to merge --inDS --inMap options and treat them in the same way in_map = {} if args.input_map: logging.info('Input primary and secondary files {}'.format(args.input_map)) in_map = args.input_map elif args.input_files: logging.info('Input files {}'.format(args.input_files)) in_map['IN'] = args.input_files else: logging.info('No input files requested') out_map = {} # If the input is requested read the PFC if in_map: # PoolFileCatalog directPFNs = {} directPFNs = read_poolfilecatalog() logging.info('TURLs in PFC {}'.format(directPFNs)) for key, in_files in in_map.items(): input_string = '' for filename in in_files: # check first it is in the PFC in theory we have flattened if filename in directPFNs: # if it is a root URL add to the input_string if re.search('^\w+://', directPFNs[filename]): input_string += "%s," % directPFNs[filename] out_map[key] = input_string[:-1] logging.debug('input_string with URLs {}'.format(input_string[:-1])) # otherwise check the file exists on disk elif os.path.isfile(filename): filename = os.path.join(args.ctr_datadir, filename) input_string += "%s," % filename out_map[key] = input_string[:-1] logging.debug('input_string with files {}'.format(input_string[:-1])) # else we cannot find the file else: logging.info('Input file {} is missing: '.format(filename)) else: logging.info('Input file {} not in the PFC: '.format(filename)) # Write input files string to a text file if args.input_text: # Write input files if needed for a in args.input_text.split(','): file_key, text_file = a.split(':') if file_key in out_map.keys(): f = open(text_file, 'w') f.write(out_map[file_key]) f.close() else: logging.error("Key %s doesn't match any of the input keys %s will not create corresponding file %s", file_key, out_map.keys(), text_file) logging.debug("Input files map: %s", out_map) if not out_map: logging.error('All requested input files are missing {}'.format(in_map)) exit(1331) return out_map def rename_user_metadata_json(): current_dir = os.environ['PWD'] user_meta = '{}/userJobMetadata.json'.format(current_dir) job_report = '{}/jobReport.json'.format(current_dir) if os.access(user_meta, os.R_OK): os.rename(user_meta,job_report) logging.info('Found {}, renaming to {}'.format(user_meta,job_report)) def rename_ouput(): current_dir = os.environ['PWD'] rename_user_metadata_json() # Rename the output files. No need to move them to currentDir # because we are already there. PFC and jobreport.json at # a later stage all jobs I checked have them empty anyway for old_name, new_name in args.output_files.items(): # archive * if old_name.find('*') != -1: for root, dirs, files in os.walk(current_dir): for folder in dirs: out_folder = os.path.join(root, folder) try: os.chdir(out_folder) if glob.glob(old_name): tar_cmd = ('tar -zcvf ' + current_dir + '/' + new_name + '.tgz ' + old_name) logging.debug("rename_output tar command: " + tar_cmd) subprocess.check_output(tar_cmd, shell=True) break except OSError as err: logging.error("Cannot chdir. Error: " + format(err)) pass else: output_path = '' for root, dirs, files in os.walk(current_dir): if old_name in files: output_path = os.path.join(root, old_name) mv_cmd = 'mv ' + output_path + ' ' + new_name logging.debug("rename_output mv command: " + mv_cmd) try: subprocess.check_output(mv_cmd, shell=True) except OSError as err: logging.error("Cannot mv: " + format(err)) def get_user_sandbox(): if args.user_sandbox is None or args.source_url is None: return url = args.source_url + '/cache/' + args.user_sandbox logging.info("Getting a user sandbox from {0}".format(url)) isOK = False errStr = None for i in range(3): try: res = urlopen(url, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) isOK = True with open(args.user_sandbox, 'wb') as f: f.write(res.read()) break except HTTPError as e: errStr = 'HTTP code: {0} - Reason: {1}'.format(e.code, e.reason) # doesn't exist if e.code == 404: break except Exception as e: errStr = str(e) time.sleep(30) if not isOK: logging.error("Cannot download the user sandbox with {0}".format(errStr)) sys.exit(1) logging.info("Extracting the user sandbox") with tarfile.open(args.user_sandbox) as f: f.extractall() if __name__ == "__main__": arg_parser = argparse.ArgumentParser() # Required arguments required = arg_parser.add_argument_group('required arguments') # Command to execute required.add_argument('-p', dest='ctr_cmd', type=urllib.unquote, required=True, help='Command to execute in the container') # Container Image to use required.add_argument('--containerImage', dest='ctr_image', required=True, help='Image path in CVMFS or on docker') # Optional arguments # Container output dataset arg_parser.add_argument('-o', dest='output_files', type=ast.literal_eval, default="{}", help='Output files') # Container input dataset arg_parser.add_argument('-i', dest='input_files', type=ast.literal_eval, default="[]", help='Input files') # Container output dataset arg_parser.add_argument('--inMap', dest='input_map', type=ast.literal_eval, default="{}", help='Input files mapping') # Some users prefer reading the input string from file # might be the best also for containers arg_parser.add_argument('--writeInputToTxt', dest='input_text', default="", help='Write input to a text file') # Container data directory arg_parser.add_argument('--datadir', dest='ctr_datadir', default="/ctrdata", help='Change directory where input, output \ and log files should be stored. \ Default: /ctrdata') # Container workdir arg_parser.add_argument('--workdir', dest='ctr_workdir', default="/ctrdata", help='Change workdir inside the container. \ Default: /ctrdata') # Container cvmfs arg_parser.add_argument('--cvmfs', dest='ctr_cvmfs', action='store_true', default=False, help='Mount /cvmfs. Default false') # Container proxy arg_parser.add_argument('--noX509', dest='ctr_x509', action='store_true', default=False, help='Unset the X509_USER_PROXY \ and X509_CA_CERTDIR. Default: true') # Container environment vars arg_parser.add_argument('--env', dest='ctr_env', default="", help='Container environment variables') # Debug arg_parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='Enable debug mode for logging messages') # Debug arg_parser.add_argument('--runtime-version', dest='runtime_version', default="testing", help='Enable select the runtime version in CVMFS. Default current') # User sandbox filename arg_parser.add_argument('-a', dest='user_sandbox', default=None, help='User sandbox filename') # URL of sandbox server arg_parser.add_argument('--sourceURL', dest='source_url', default=None, help='URL of user sandbox server') args, unknown = arg_parser.parse_known_args() # Setup the logging level format_str = '%(asctime)s | %(levelname)-8s | %(message)s' if args.debug: logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=format_str) else: logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=format_str) logging.basicConfig(stream=sys.stderr, level=logging.ERROR, format=format_str) if unknown: logging.info("Following arguments are unknown or unsupported %s" % unknown) main()