#!/usr/bin/env python ''' Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' """ Upgrade catalog file format description: Format version 1.0 Global section description: STACKNAME - name of stack, for example HDP OLDVERSION - version of stack from which upgrade should be done, used by fromStack script argument NEWVERSION - version of stack to which upgrade should be done, used by toStack script argument Sub-section options: config-types - contains global per-config settings merged-copy - would merge latest server properties with properties defined in "properties" section, without this option server properties would be rewritten by properties defined in "properties" section required-services - properties from json catalog would be processed only if desired services are present on the cluster property level definition will always override catalog level definition. Sub-section properties - Contains property definition Sub-section property-mapping(optional) - contains mapping of property names in case, if some property changed their name in NEWVERSION Example: { "version": "1.0", "stacks": [ { "name": "STACKNAME", "old-version": "OLDVERSION", "target-version": "NEWVERSION", "options": { "config-types": { "CONFIGTYPE1": { "merged-copy": "yes", "required-services": ["HDFS"] } } }, "properties": { "CONFIGTYPE1": { "some_property": "some property value", "some_second_property: { "remove": "yes" }, "template_property": { "value": "{TEMPLATE_TAG}", "template": "yes", "required-services": ["HDFS", "YARN"] }, "test_property": { "value": "new value", "override: "no", (optional, override already existed property yes/no. Default: yes) "value-required": "old value", (optional, property would be set if the required value is present) "can-create": "no", (optional, process property only if that property present on the server. i.e. ability to create new property. Default: yes) "required-services": ["HDFS", "YARN"], (optional, process property only if selected services existed) "resolve-dependency": "no" (optional, use Stack Advisor to get depended properties changes. Default: no) } } }, "property-mapping": { "old-property-name": "new-property-name", (short form, equal to "old-property-name": { "map-to": "new-property-name" }) "old-property1-name": { (usually key is an name of the property which need to be mapped, but in case of same property should be set to unique name and "map-from" option used instead) "map-from": "old property name", (optional, define property name which should be mapped) "map-to": "new_property1_name", (optional, new property name. If not set, would be used old property name) "from-catalog": "test", (optional, require "to-catalog. Source of old-property1-name) "to-catalog": "test", (optional, require "from-catalog. Target of new_property1_name) "default": "default value", (optional, if set and old property not exists, new one would be created with default value) "template": "yes", (optional, template parsing for default option) "coerce-to": "pre-defined type", (optional, convert value from one type to another. Types supported: yaml-array - converts string item1,item2 to ['item1', 'item2'] ) "replace-from": "something", (optional, should be present both from and to. Replace 'from' value to 'to') "replace-to": "something, "required-services": ["YARN"], (optional, process entry if services in the list existed on the cluster } } } ] } More examples available in ambari-server/src/main/resources/upgrade/catalog/ """ import getpass import optparse from pprint import pprint import re import sys import os.path import logging import time import base64 from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError try: # try to import new simplejson version, which should be faster than outdated python 2.6 version import ambari_simplejson as json except ImportError: import json # ============================== # Error classes definition # ============================== class FatalException(Exception): def __init__(self, code, reason): self.code = code self.reason = reason def __str__(self): return repr("Fatal exception: %s, exit code %s" % (self.reason, self.code)) def _get_message(self): return str(self) class ReadOnlyPropertyException(Exception): def __str__(self): return "Property is read-only" def _get_message(self): return self.__str__() class NotSupportedCatalogVersion(Exception): def __init__(self, catalog_version): self._version = catalog_version def __str__(self): return "Version %s of loaded catalog not supported" % self._version def _get_message(self): return self.__str__() message = property(__str__) class CatalogNotFoundException(Exception): pass class TemplateProcessingException(Exception): pass class CatalogExistException(Exception): pass class PropertyNotFoundException(Exception): pass class StackNotFoundException(Exception): pass class MalformedPropertyDefinitionException(Exception): pass # ============================== # Constant class definition # ============================== class Const(object): def __new__(cls, *args, **kwargs): raise Exception("Class couldn't be created") class Options(Const): # action commands API_PROTOCOL = "http" API_PORT = "8080" GET_MR_MAPPING_ACTION = "save-mr-mapping" VERIFY_ACTION = "verify" DELETE_MR_ACTION = "delete-mr" ADD_YARN_MR2_ACTION = "add-yarn-mr2" MODIFY_CONFIG_ACTION = "update-configs" BACKUP_CONFIG_ACTION = "backup-configs" INSTALL_YARN_MR2_ACTION = "install-yarn-mr2" MR_MAPPING_FILE = "mr_mapping" CAPACITY_SCHEDULER_TAG = "capacity-scheduler" REPLACE_JH_HOST_NAME_TAG = "REPLACE_JH_HOST" REPLACE_RM_HOST_NAME_TAG = "REPLACE_RM_HOST" REPLACE_WITH_TAG = "REPLACE_WITH_" PHOENIX_QUERY_SERVER = "PHOENIX_QUERY_SERVER" ZK_OPTIONS = "zoo.cfg" KAFKA_BROKER_CONF = "kafka-broker" RANGER_ADMIN = "admin-properties" KAFKA_PORT = "port" RANGER_EXTERNAL_URL = "policymgr_external_url" ZK_CLIENTPORT = "clientPort" DELETE_OLD_TAG = "DELETE_OLD" ZOOKEEPER_SERVER = "ZOOKEEPER_SERVER" KAFKA_BROKER = "KAFKA_BROKER" NAMENODE = "NAMENODE" MR_MAPPING = None logger = None server_config_factory = None """:type : ServerConfigFactory""" stack_advisor = None """:type : StackAdvisor""" ambari_server = None """:type : AmbariServer""" # Api constants ROOT_URL = None CLUSTER_URL = None COMPONENTS_FORMAT = None TEZ_VIEW_URL = None # Curl options CURL_PRINT_ONLY = None CURL_WRITE_ONLY = None ARGS = None OPTIONS = None HOST = None CLUSTER_NAME = None # for verify action REPORT_FILE = None SERVICES = [] API_TOKENS = { "user": None, "pass": None } HEADERS = { 'X-Requested-By': 'upgradeHelper' } @classmethod def initialize(cls): cls.ROOT_URL = '%s://%s:%s/api/v1' % (cls.API_PROTOCOL, cls.HOST, cls.API_PORT) cls.CLUSTER_URL = cls.ROOT_URL + "/clusters/%s" % cls.CLUSTER_NAME cls.COMPONENTS_URL = cls.CLUSTER_URL + "/components?fields=ServiceComponentInfo/total_count" cls.COMPONENTS_FORMAT = cls.CLUSTER_URL + "/components/{0}" cls.TEZ_VIEW_URL = cls.ROOT_URL + "/views/TEZ" cls.STACKS_URL = cls.ROOT_URL + "/stacks" cls.STACKS_VERSIONS_URL = cls.STACKS_URL + "/{0}/versions" cls.STACK_ADVISOR_URL = cls.STACKS_VERSIONS_URL + "/{1}/recommendations" cls.AMBARI_SERVER_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_SERVER" cls.AMBARI_AGENTS_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_AGENT" if cls.CLUSTER_NAME is not None and cls.HOST is not None: cls.SERVICES = set(map(lambda x: x.upper(), get_cluster_services())) cls.ambari_server = AmbariServer() if not cls.isPropertyAttributesSupported(): cls.logger.warning("Property attributes not supported by current Ambari version") @classmethod def isPropertyAttributesSupported(cls): if cls.ambari_server.server_version[0] * 10 + cls.ambari_server.server_version[1] >= 17: return True return False @classmethod def initialize_logger(cls, filename=None): cls.logger = logging.getLogger('UpgradeHelper') cls.logger.setLevel(logging.DEBUG) if filename is not None: handler = logging.FileHandler(filename) handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s')) cls.logger.addHandler(handler) cls.logger.info("") cls.logger.info("Start new logging section") handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) cls.logger.addHandler(handler) class CatConst(Const): VERSION_TAG = "version" STACK_VERSION_OLD = "old-version" STACK_VERSION_TARGET = "target-version" STACK_STAGS_TAG = "stacks" STACK_NAME = "name" CONFIG_OPTIONS = "options" CONFIG_TYPES = "config-types" STACK_PROPERTIES = "properties" STACK_PROPERTIES_ATTRIBUTES = "properties_attributes" PROPERTY_VALUE_TAG = "value" VERSIONS_TAG = "versions" PROPERTY_REMOVE_TAG = "remove" PROPERTY_MAP_TO = "map-to" PROPERTY_MAP_FROM = "map-from" PROPERTY_FROM_CATALOG = "from-catalog" PROPERTY_TO_CATALOG = "to-catalog" PROPERTY_DEFAULT = "default" MERGED_COPY_TAG = "merged-copy" REQUIRED_SERVICES = "required-services" COERCE_TO_PROPERTY_TAG = "coerce-to" RESOLVE_DEPENDENCY_TAG = "resolve-dependency" COERCE_YAML_OPTION_TAG = "yaml-array" REPLACE_FROM_TAG = "replace-from" REPLACE_TO_TAG = "replace-to" OVERRIDE_TAG = "override" ITEMS_TAG = "items" TYPE_TAG = "type" TRUE_TAG = "yes" VALUE_REQUIRED_TAG = "value-required" PROPERTY_CAN_CREATE_TAG = "can-create" STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping" VALUE_TEMPLATE_TAG = "template" SEARCH_PATTERN = "(\{[^\{\}]+\})" # {XXXXX} ACTION_COMMIT = "commit" ACTION_RELOAD = "reload" ACTION_RENAME_PROPERTY = "rename-property" TEMPLATE_HANDLER = "template_handler" # ============================== # Catalog classes definition # ============================== class AmbariServer(object): def __init__(self): Options.logger.info("Resolving Ambari server configuration ...") self._get_server_info() self._get_agents_info() self._get_components() def _get_components(self): info = curl(Options.COMPONENTS_URL, parse=True) self._components = [] if CatConst.ITEMS_TAG in info: for item in info[CatConst.ITEMS_TAG]: if "ServiceComponentInfo" in item and "total_count" in item["ServiceComponentInfo"] and \ int(item["ServiceComponentInfo"]["total_count"]) > 0 and "component_name" in item["ServiceComponentInfo"]: self._components.append(item["ServiceComponentInfo"]["component_name"]) def _get_server_info(self): info = curl(Options.AMBARI_SERVER_URL, parse=True) self._server_version = [0, 0, 0] if "RootServiceComponents" in info: server_props = info["RootServiceComponents"] ver = server_props["component_version"] if "component_version" in server_props else None try: self._server_version = list(map(lambda x: int(x), ver.split("."))) except ValueError: pass def _get_agents_info(self): info = curl(Options.AMBARI_AGENTS_URL, parse=True) self._agents = [] if "hostComponents" in info: agent_props = info["hostComponents"] self._agents = list(map(lambda x: x["RootServiceHostComponents"]["host_name"], agent_props)) @property def components(self): return self._components @property def server_version(self): return self._server_version @property def agent_hosts(self): return self._agents class StackAdvisorFactory(object): def __init__(self): self._stack_info = self._load_stack_info() def _load_stack_versions(self, stack): versions = curl(Options.STACKS_VERSIONS_URL.format(stack), parse=True) if CatConst.ITEMS_TAG in versions: versions = list(map(lambda x: x["Versions"]["stack_version"], versions[CatConst.ITEMS_TAG])) return versions def _load_stack_info(self): stacks = curl(Options.STACKS_URL, parse=True) if CatConst.ITEMS_TAG in stacks: stacks = list(map(lambda x: x["Stacks"]["stack_name"], stacks["items"])) else: stacks = {} stacks_dict = {} for stack in stacks: stacks_dict[stack] = self._load_stack_versions(stack) return stacks_dict def get_instance(self, stack, version): sversion = Options.ambari_server.server_version if sversion[0] * 10 + sversion[1] < 21: Options.logger.warning("Ambari server version \"%s.%s.%s\" doesn't support property dependencies suggestion" % (sversion[0], sversion[1], sversion[2])) return BaseStackAdvisor(stack, version) if stack in self._stack_info and version in self._stack_info[stack]: return StackAdvisor(stack, version) else: raise StackNotFoundException("Stack %s-%s not exist on the server" % (stack, version)) class StackAdvisorRequestProperty(object): def __init__(self, catalog, property_name): self._catalog = catalog self._property_name = property_name @property def catalog(self): return self._catalog @property def name(self): return self._property_name def get_json(self): return { "type": self.catalog, "name": self.name } class BaseStackAdvisor(object): def __init__(self, stack, version): self._req_url = Options.STACK_ADVISOR_URL.format(stack, version) def get_suggestion(self, cfg_factory, changed_properties): return {} class StackAdvisor(BaseStackAdvisor): def __init__(self, stack, version): super(StackAdvisor, self).__init__(stack, version) def _transform_properties(self, cfg_factory): """ Transform properties list to blueprint output format :type cfg_factory: ServerConfigFactory :rtype dict """ props = cfg_factory.get_json() for cfg in props: props[cfg] = { "properties": props[cfg] } return props def _from_blueprint_properties_transform(self, props): """ Transform SA response to dict """ for p in props: rprop = {} if "properties" in props[p] and props[p]["properties"] is not None: rprop = props[p]["properties"] if "property_attributes" in props[p]: for property_attribute in props[p]["property_attributes"]: if "delete" in props[p]["property_attributes"][property_attribute] and \ props[p]["property_attributes"][property_attribute]["delete"] == "true": rprop[property_attribute] = None props[p] = rprop return props def _generate_req_properties(self, properties): rlist = [] for item in properties: if isinstance(item, StackAdvisorRequestProperty): rlist.append(item.get_json()) return rlist def get_suggestion(self, cfg_factory, changed_properties): """ :type cfg_factory: ServerConfigFactory :type catalog_name str :type changed_properties: list :rtype dict """ request = { "recommend": "configuration-dependencies", "hosts": Options.ambari_server.agent_hosts, "services": list(Options.SERVICES), "changed_configurations": self._generate_req_properties(changed_properties), "recommendations": { "blueprint": { "host_groups": [], "configurations": self._transform_properties(cfg_factory), "blueprint_cluster_binding": {} } } } response = curl(self._req_url, request_type="POST", data=request, parse=True) if "resources" in response and isinstance(response["resources"], list) and len(response["resources"]) > 0: response = response["resources"][0] if "recommendations" in response and "blueprint" in response["recommendations"] and \ "configurations" in response["recommendations"]["blueprint"]: return self._from_blueprint_properties_transform(response["recommendations"]["blueprint"]["configurations"]) return {} class UpgradeCatalogFactory(object): # versions of catalog which is currently supported _supported_catalog_versions = ["1.0"] # private variables _json_catalog = None def __init__(self, path): self._load(path) def _load(self, path): f = None try: f = open(path, 'r') json_string = f.read() self._json_catalog = json.loads(json_string) self._parse_upgrade_catalog() except IOError as e: raise FatalException(e.errno, "Couldn't open upgrade catalog file %s: %s" % (path, e.strerror)) except NotSupportedCatalogVersion as e: raise FatalException(1, e.message) except ValueError as e: raise FatalException(1, "Malformed upgrade catalog: %s" % e.message) finally: try: if f is not None: f.close() except IOError as e: pass def _parse_upgrade_catalog(self): catalog_version = None if CatConst.VERSION_TAG in self._json_catalog: catalog_version = self._json_catalog[CatConst.VERSION_TAG] if catalog_version is None or catalog_version not in self._supported_catalog_versions: raise NotSupportedCatalogVersion(str(catalog_version)) def get_catalog(self, from_version=None, to_version=None): search_version = { CatConst.STACK_VERSION_OLD: from_version, CatConst.STACK_VERSION_TARGET: to_version } for stack in self._json_catalog[CatConst.STACK_STAGS_TAG]: version = { CatConst.STACK_VERSION_OLD: stack[CatConst.STACK_VERSION_OLD], CatConst.STACK_VERSION_TARGET: stack[CatConst.STACK_VERSION_TARGET] } if version == search_version: return UpgradeCatalog(catalog=stack, version=version) return None class UpgradeCatalog(object): # private variables _json_catalog = None _properties_catalog = None _properties_map_catalog = None _version = None _search_pattern = None _catalog_options = None def __init__(self, catalog=None, version=None): self._handlers = {} self._json_catalog = catalog self._version = version self._search_pattern = re.compile(CatConst.SEARCH_PATTERN) if CatConst.STACK_PROPERTIES in catalog: self._properties_catalog = self._format_catalog_properties(catalog[CatConst.STACK_PROPERTIES]) if CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG in catalog: self._properties_map_catalog = PropertyMapping(catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG]) else: self._properties_map_catalog = PropertyMapping() if catalog is not None and CatConst.CONFIG_OPTIONS in catalog \ and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]: self._catalog_options = catalog[CatConst.CONFIG_OPTIONS] def add_handler(self, name, handler): if name not in self._handlers: self._handlers[name] = handler def _format_catalog_properties(self, properties): """ Transform properties from short form to normal one: "property": "text" => "property": { "value": "text" } :param properties: dict :return: dict """ for config_item in properties: cfg_item = properties[config_item] """ case when "properties": { "yarn-site": { ..... } } is set like "properties": { "yarn-site": "" } """ if not isinstance(cfg_item, dict): raise MalformedPropertyDefinitionException("The property catalog '%s' definition error" % config_item) properties[config_item] = dict(zip( cfg_item.keys(), map(lambda x: x if isinstance(x, dict) or isinstance(x, list) else {CatConst.PROPERTY_VALUE_TAG: x}, cfg_item.values()) )) return properties @property def version(self): return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET]) @property def target_version(self): return self._version[CatConst.STACK_VERSION_TARGET] @property def source_version(self): return self._version[CatConst.STACK_VERSION_OLD] def get_parsed_version(self): """ Get numeric representation of the version for comparation purposes Example: 1.3-2.1 will be represented as { from: 13, to: 21 } :return: Numeric version """ v_from = self._version[CatConst.STACK_VERSION_OLD].split(".") v_to = self._version[CatConst.STACK_VERSION_TARGET].split(".") try: v_from = int(v_from[0]) * 10 + int(v_from[1]) v_to = int(v_to[0]) * 10 + int(v_to[1]) except ValueError: v_from = 0 v_to = 0 version = { "from": v_from, "to": v_to } return version @property def name(self): if CatConst.STACK_NAME in self._json_catalog: return self._json_catalog[CatConst.STACK_NAME] return "" @property def mapping(self): return self._properties_map_catalog @property def items(self): return self._properties_catalog @property def options(self): if CatConst.CONFIG_TYPES in self._catalog_options: return self._catalog_options[CatConst.CONFIG_TYPES] return {} @property def action_handlers(self): return self._handlers @property def tag_search_pattern(self): return self._search_pattern def __handle_remove_tag(self, name, catalog_item_name, catalog_property_item, properties): """ :type name str :type catalog_item_name str :type catalog_property_item dict :type properties dict """ if CatConst.PROPERTY_REMOVE_TAG in catalog_property_item and \ catalog_property_item[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG and \ catalog_item_name in properties: del properties[catalog_item_name] def __handle_template_tag_sub(self, catalog_item_name, catalog_property_item): """ :type catalog_item_name str :type catalog_property_item dict """ if CatConst.TEMPLATE_HANDLER in self._handlers and self._handlers is not None and \ CatConst.VALUE_TEMPLATE_TAG in catalog_property_item and catalog_property_item[ CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: try: parsed_value = self._handlers[CatConst.TEMPLATE_HANDLER]( self, self._search_pattern.findall(catalog_property_item[CatConst.PROPERTY_VALUE_TAG]), catalog_property_item[CatConst.PROPERTY_VALUE_TAG] ) catalog_property_item[CatConst.PROPERTY_VALUE_TAG] = parsed_value except TemplateProcessingException: pass def __handle_add_new(self, name, catalog_item_name, catalog_property_item, properties): """ :type name str :type catalog_item_name str :type catalog_property_item dict :type properties dict """ catalog_property_item = dict(catalog_property_item) can_add_new = not (CatConst.PROPERTY_CAN_CREATE_TAG in catalog_property_item and catalog_property_item[CatConst.PROPERTY_CAN_CREATE_TAG].upper() == "NO") if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name not in properties and can_add_new: self.__handle_template_tag_sub(catalog_item_name, catalog_property_item) properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG] def __handle_change_existing(self, name, catalog_item_name, catalog_property_item, properties): """ :type name str :type catalog_item_name str :type catalog_property_item dict :type properties dict """ catalog_property_item = dict(catalog_property_item) can_override = True if CatConst.OVERRIDE_TAG in catalog_property_item and catalog_property_item[CatConst.OVERRIDE_TAG] != CatConst.TRUE_TAG: can_override = False if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name in properties and can_override: self.__handle_template_tag_sub(catalog_item_name, catalog_property_item) properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG] return properties def __handle_dependency_tag(self, name, catalog_item_name, catalog_property_item, properties): """ :type name str :type catalog_item_name str :type catalog_property_item dict :type properties dict """ if CatConst.RESOLVE_DEPENDENCY_TAG in catalog_property_item and \ catalog_property_item[CatConst.RESOLVE_DEPENDENCY_TAG] == CatConst.TRUE_TAG: sa_suggestions = Options.stack_advisor.get_suggestion(Options.server_config_factory, [StackAdvisorRequestProperty(name, catalog_item_name)]) for sa_catalog in sa_suggestions: # create new config group if not existed if sa_catalog not in Options.server_config_factory.items(): Options.server_config_factory.create_config(sa_catalog) catalog_properties = Options.server_config_factory.get_config(sa_catalog).properties for sa_property in sa_suggestions[sa_catalog]: if sa_suggestions[sa_catalog][sa_property] is None and sa_property in catalog_properties: print "rem %s:%s" % (sa_catalog, sa_property) del catalog_properties[sa_property] elif sa_suggestions[sa_catalog][sa_property] is not None: catalog_properties[sa_property] = sa_suggestions[sa_catalog][sa_property] def __can_handler_execute(self, catalog_options, catalog_property_item, property_item, properties): """ :type catalog_options dict :type catalog_property_item str :type property_item dict :type properties dict """ can_process = True # process required services tag required_list = None if CatConst.REQUIRED_SERVICES in catalog_options and catalog_options[CatConst.REQUIRED_SERVICES] is not None and \ isinstance(catalog_options[CatConst.REQUIRED_SERVICES], list): required_list = catalog_options[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in property_item and property_item[CatConst.REQUIRED_SERVICES] is not None and\ isinstance(property_item[CatConst.REQUIRED_SERVICES], list): required_list = property_item[CatConst.REQUIRED_SERVICES] if required_list is not None: can_process = can_process and is_services_exists(required_list) if CatConst.VALUE_REQUIRED_TAG in property_item and property_item[CatConst.VALUE_REQUIRED_TAG] is not None and\ CatConst.PROPERTY_VALUE_TAG in property_item and catalog_property_item in properties: can_process = properties[catalog_property_item] == property_item[CatConst.VALUE_REQUIRED_TAG] return can_process def process_simple_transformations(self, name, properties): """ :type properties dict :type name str """ tag_handlers = [ self.__handle_add_new, self.__handle_change_existing, self.__handle_dependency_tag, self.__handle_remove_tag ] # catalog has no update entries for this config group if name not in self._properties_catalog: return 0 catalog_item = self._properties_catalog[name] for catalog_property_item in catalog_item.keys(): catalog_options = self.options[name] if name in self.options else {} if self.__can_handler_execute(catalog_options, catalog_property_item, catalog_item[catalog_property_item], properties): for handler in tag_handlers: handler(name, catalog_property_item, catalog_item[catalog_property_item], properties) class PropertyMapping(object): _mapping_list = {} def __init__(self, map_list=None): if map_list is not None: self._mapping_list = self._convert_list(map_list) def _convert_list(self, map_list): return dict(zip( map_list.keys(), map(lambda x: x if isinstance(x, dict) else {CatConst.PROPERTY_MAP_TO: x}, map_list.values()) )) def get(self, old_property_name): """ Get property mapping dict :old_property_name str :return dict """ if old_property_name in self._mapping_list: return self._mapping_list[old_property_name] raise PropertyNotFoundException("Property %s from property mapping section not found" % old_property_name) def list(self): return self._mapping_list.keys() def get_mapped_name(self, old_property_name): if CatConst.PROPERTY_MAP_TO not in self.get(old_property_name): raise MalformedPropertyDefinitionException("%s option is not set for %s property" % (CatConst.PROPERTY_MAP_TO, old_property_name)) return self.get(old_property_name)[CatConst.PROPERTY_MAP_TO] def exists(self, old_property_name): return old_property_name in self._mapping_list class ServerConfigFactory(object): def __init__(self): self.__observers = [] self._server_catalogs = {} self._load_configs() def subscribe(self, name, config_item): self.__observers.append((name, config_item)) def _load_configs(self): Options.logger.info('Getting latest cluster configuration from the server...') new_configs = get_config_resp_all() for config_item in new_configs: if config_item in self._server_catalogs: self.notify_observer(config_item, CatConst.ACTION_RELOAD, new_configs[config_item]) else: self._server_catalogs[config_item] = ServerConfig(self, config_item, new_configs[config_item]) def notify_observers(self, action, arg=None): for name, config_item in self.__observers: if config_item is not None and name in self._server_catalogs: config_item.notify(action, arg) def notify_observer(self, _name, action, arg=None): for name, config_item in self.__observers: if config_item is not None and name == _name and name in self._server_catalogs: config_item.notify(action, arg) def __str__(self): catalogs = {} for cfg in self._server_catalogs: catalogs[cfg] = str(self._server_catalogs[cfg]) return json.dumps(catalogs) def get_json(self): catalogs = {} for cfg in self._server_catalogs: catalogs[cfg] = self._server_catalogs[cfg].properties return catalogs def get_config(self, name): """ Get configuration item object :type name str :rtype: ServerConfig """ if name in self._server_catalogs: return self._server_catalogs[name] raise CatalogNotFoundException("Server catalog item \"%s\" not found" % name) def create_config(self, name): if name not in self._server_catalogs: self._server_catalogs[name] = ServerConfig(self, name, {CatConst.STACK_PROPERTIES: {}}) else: raise CatalogExistException("Config group \"%s\" already existed" % name) def items(self): return self._server_catalogs.keys() def reload(self): self._load_configs() def process_mapping_transformations(self, catalog): """ :type catalog UpgradeCatalog """ for map_item in catalog.mapping.list(): self._process_single_map_transformation(catalog, map_item, catalog.mapping.get(map_item)) def _process_default_template_map_replacement(self, catalog, item): """ :type catalog: UpgradeCatalog :type item: dict """ if CatConst.VALUE_TEMPLATE_TAG in item and CatConst.TEMPLATE_HANDLER in catalog.action_handlers and\ CatConst.PROPERTY_DEFAULT in item and item[CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: try: parsed_value = catalog.action_handlers[CatConst.TEMPLATE_HANDLER]( catalog, catalog.tag_search_pattern.findall(item[CatConst.PROPERTY_DEFAULT]), item[CatConst.PROPERTY_DEFAULT] ) item[CatConst.PROPERTY_DEFAULT] = parsed_value except TemplateProcessingException: pass def _process_property_value_transformation(self, catalog, property_map_definition, old_value): """ :type catalog: UpgradeCatalog :type property_map_definition: dict :type old_value: str :return: str """ tmp = old_value if CatConst.REPLACE_FROM_TAG in property_map_definition and CatConst.REPLACE_TO_TAG in property_map_definition and\ property_map_definition[CatConst.REPLACE_TO_TAG] is not None and property_map_definition[CatConst.REPLACE_FROM_TAG] is not None: tmp = tmp.replace(property_map_definition[CatConst.REPLACE_FROM_TAG], property_map_definition[CatConst.REPLACE_TO_TAG]) if CatConst.COERCE_TO_PROPERTY_TAG in property_map_definition: if property_map_definition[CatConst.COERCE_TO_PROPERTY_TAG] == CatConst.COERCE_YAML_OPTION_TAG: # for example c6401,c6402 into ['c6401','c6402'] data = list(map(lambda x: "'%s'" % x.strip(), tmp.split(','))) tmp = "[%s]" % ','.join(data) return tmp def _process_single_map_transformation(self, catalog, map_item_name, map_property_item): """ :type catalog UpgradeCatalog :type map_item_name str :type map_property_item dict """ old_property_name = map_item_name # map-from item name could be re-defined via PROPERTY_MAP_FROM property to avoid duplicate entries if CatConst.PROPERTY_MAP_FROM in map_property_item and map_property_item[CatConst.PROPERTY_MAP_FROM] is not None: old_property_name = map_property_item[CatConst.PROPERTY_MAP_FROM] new_property_name = old_property_name if CatConst.PROPERTY_MAP_TO in map_property_item: new_property_name = map_property_item[CatConst.PROPERTY_MAP_TO] # process first required section required_services = map_property_item[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in map_property_item else None # process required-services tag if required_services is not None and not is_services_exists(required_services): return 0 # process template tag self._process_default_template_map_replacement(catalog, map_property_item) source_cfg_group = map_property_item[CatConst.PROPERTY_FROM_CATALOG] if CatConst.PROPERTY_FROM_CATALOG in map_property_item and\ map_property_item[CatConst.PROPERTY_FROM_CATALOG] != "" else None target_cfg_group = map_property_item[CatConst.PROPERTY_TO_CATALOG] if CatConst.PROPERTY_TO_CATALOG in map_property_item and \ map_property_item[CatConst.PROPERTY_TO_CATALOG] != ""else None default_value = map_property_item[CatConst.PROPERTY_DEFAULT] if CatConst.PROPERTY_DEFAULT in map_property_item and \ map_property_item[CatConst.PROPERTY_DEFAULT] != "" else None if source_cfg_group is None and target_cfg_group is None: # global scope mapping renaming self.notify_observers(CatConst.ACTION_RENAME_PROPERTY, [old_property_name, new_property_name, self._process_property_value_transformation, catalog, map_property_item ]) elif source_cfg_group is not None and target_cfg_group is not None: # group-to-group moving if source_cfg_group in self._server_catalogs and target_cfg_group in self._server_catalogs: old_cfg_group = self.get_config(source_cfg_group).properties new_cfg_group = self.get_config(target_cfg_group).properties if old_property_name in old_cfg_group: new_cfg_group[new_property_name] = self._process_property_value_transformation(catalog, map_property_item, old_cfg_group[old_property_name]) if new_property_name != old_property_name: del old_cfg_group[old_property_name] elif old_property_name not in old_cfg_group and default_value is not None: new_cfg_group[new_property_name] = default_value def commit(self): self.notify_observers(CatConst.ACTION_COMMIT) class ServerConfig(object): def __init__(self, factory, name, initial_configs): """ Initialize configuration item :factory ServerConfigFactory """ factory.subscribe(name, self) self._configs = initial_configs self._hash = self._calculate_hash() self._name = name def _calculate_hash(self): return hash(str(self._configs)) def notify(self, action, arg=None): if action == CatConst.ACTION_RELOAD: self._configs = arg self._hash = self._calculate_hash() elif action == CatConst.ACTION_COMMIT: self._commit() elif action == CatConst.ACTION_RENAME_PROPERTY and isinstance(arg, list) and len(arg) == 5: self._rename_property(*arg) def _rename_property(self, old_name, new_name, transform_func, catalog, map_item): """ :type old_name: str :type new_name: str :type transform_func: function :type catalog: UpgradeCatalog :type map_item: dict :return: """ if old_name in self.properties: old_property_value = self.properties[old_name] if transform_func is not None: self.properties[new_name] = transform_func(catalog, map_item, old_property_value) else: self.properties[new_name] = old_property_value if old_name != new_name: del self.properties[old_name] def is_attributes_exists(self): return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs def __str__(self): return json.dumps(self.properties) @property def properties(self): return self._configs[CatConst.STACK_PROPERTIES] @properties.setter def properties(self, value): self._configs[CatConst.STACK_PROPERTIES] = value @property def attributes(self): return self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] @attributes.setter def attributes(self, value): self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] = value def _commit(self): if self._hash != self._calculate_hash(): Options.logger.info("Committing changes for \"%s\" configuration group ..." % self._name) if self.is_attributes_exists(): update_config(self.properties, self._name, self.attributes) else: update_config(self.properties, self._name) def clear(self): self.properties = {} self.attributes = {} def merge(self, catalog_item): """ :type catalog_item UpgradeCatalog """ # handle "merged-copy" tag config_options = catalog_item.options[self._name] if self._name in catalog_item.options else {} clear_properties = not (CatConst.MERGED_COPY_TAG in config_options and config_options[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG) if clear_properties: self.clear() Options.logger.info("Processing configuration group: %s", self._name) catalog_item.process_simple_transformations(self._name, self.properties) def write_mapping(hostmapping): if os.path.isfile(Options.MR_MAPPING_FILE): os.remove(Options.MR_MAPPING_FILE) json.dump(hostmapping, open(Options.MR_MAPPING_FILE, 'w')) def read_mapping(): if os.path.isfile(Options.MR_MAPPING_FILE): if Options.MR_MAPPING is not None: return Options.MR_MAPPING else: Options.MR_MAPPING = json.load(open(Options.MR_MAPPING_FILE)) return Options.MR_MAPPING else: raise FatalException(-1, "MAPREDUCE host mapping file, mr_mapping, is not available or badly formatted. Execute " "action save-mr-mapping. Ensure the file is present in the directory where you are " "executing this command.") def get_mr1_mapping(): components = ["MAPREDUCE_CLIENT", "JOBTRACKER", "TASKTRACKER", "HISTORYSERVER"] GET_URL_FORMAT = Options.CLUSTER_URL + '/services/MAPREDUCE/components/%s' hostmapping = {} for component in components: hostlist = [] structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True) if 'host_components' in structured_resp: for hostcomponent in structured_resp['host_components']: if 'HostRoles' in hostcomponent: if 'host_name' in hostcomponent['HostRoles']: hostlist.append(hostcomponent['HostRoles']['host_name']) hostmapping[component] = hostlist write_mapping(hostmapping) pprint("File mr_mapping contains the host mapping for mapreduce components. This file is critical for later " "steps.") def get_YN_input(prompt, default): yes = set(['yes', 'ye', 'y']) no = set(['no', 'n']) return get_choice_string_input(prompt, default, yes, no) def get_choice_string_input(prompt, default, firstChoice, secondChoice): choice = raw_input(prompt).lower() if choice in firstChoice: return True elif choice in secondChoice: return False elif choice is "": # Just enter pressed return default else: print "input not recognized, please try again: " return get_choice_string_input(prompt, default, firstChoice, secondChoice) def delete_mr(): saved_mr_mapping = get_YN_input("Have you saved MR host mapping using action save-mr-mapping [y/n] (n)? ", False) if not saved_mr_mapping: raise FatalException(1, "Ensure MAPREDUCE host component mapping is saved before deleting it. Use action " "save-mr-mapping.") SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/MAPREDUCE' COMPONENT_URL_FORMAT = Options.CLUSTER_URL + '/hosts/%s/host_components/%s' NON_CLIENTS = ["JOBTRACKER", "TASKTRACKER", "HISTORYSERVER"] PUT_IN_DISABLED = { "HostRoles": { "state": "DISABLED" } } hostmapping = read_mapping() for key, value in hostmapping.items(): if (key in NON_CLIENTS) and (len(value) > 0): for host in value: curl(COMPONENT_URL_FORMAT % (host, key), request_type="PUT", data=PUT_IN_DISABLED, validate=True) curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True) def get_cluster_stackname(): VERSION_URL_FORMAT = Options.CLUSTER_URL + '?fields=Clusters/version' structured_resp = curl(VERSION_URL_FORMAT, validate=True, parse=True) if 'Clusters' in structured_resp: if 'version' in structured_resp['Clusters']: return structured_resp['Clusters']['version'] raise FatalException(-1, "Unable to get the cluster version") def has_component_in_stack_def(stack_name, service_name, component_name): STACK_COMPONENT_URL_FORMAT = Options.ROOT_URL + '/stacks2/{0}/versions/{1}/stackServices/{2}/serviceComponents/{3}' stack, stack_version = stack_name.split('-') try: curl(STACK_COMPONENT_URL_FORMAT.format(stack, stack_version, service_name, component_name), validate=True) return True except FatalException: return False def add_services(): SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/{0}' COMPONENT_URL_FORMAT = SERVICE_URL_FORMAT + '/components/{1}' HOST_COMPONENT_URL_FORMAT = Options.CLUSTER_URL + '/hosts/{0}/host_components/{1}' service_comp = { "YARN": ["NODEMANAGER", "RESOURCEMANAGER", "YARN_CLIENT"], "MAPREDUCE2": ["HISTORYSERVER", "MAPREDUCE2_CLIENT"]} new_old_host_map = { "NODEMANAGER": "TASKTRACKER", "HISTORYSERVER": "HISTORYSERVER", "RESOURCEMANAGER": "JOBTRACKER", "YARN_CLIENT": "MAPREDUCE_CLIENT", "MAPREDUCE2_CLIENT": "MAPREDUCE_CLIENT"} stack_name = get_cluster_stackname() stack_has_ats = has_component_in_stack_def(stack_name, "YARN", "APP_TIMELINE_SERVER") # if upgrading to stack > 2.1 (which has ats) if stack_has_ats: service_comp["YARN"].append("APP_TIMELINE_SERVER") new_old_host_map["APP_TIMELINE_SERVER"] = "JOBTRACKER" hostmapping = read_mapping() for service in service_comp.keys(): curl(SERVICE_URL_FORMAT.format(service), validate=True, request_type="POST") for component in service_comp[service]: curl(COMPONENT_URL_FORMAT.format(service, component), validate=True, request_type="POST") for host in hostmapping[new_old_host_map[component]]: curl(HOST_COMPONENT_URL_FORMAT.format(host, component), validate=True, request_type="POST") def update_config(properties, config_type, attributes=None): tag = "version" + str(int(time.time() * 1000)) properties_payload = {"Clusters": {"desired_config": {"type": config_type, "tag": tag, "properties": properties}}} if attributes is not None: properties_payload["Clusters"]["desired_config"]["properties_attributes"] = attributes expect_body = config_type != "cluster-env" # ToDo: make exceptions more flexible curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True, soft_validation=True) def build_all_options(desired_configs): """ Get all configs in the old-fashion way ( versions below 1.7.0 doesn't support "properties" filter ) """ config_url_tpl = Options.CLUSTER_URL + "/configurations?type={0}&tag={1}" all_options = {CatConst.ITEMS_TAG: []} for config in desired_configs: cfg_item = curl(config_url_tpl.format(config, desired_configs[config]["tag"]), parse=True, validate=True) if CatConst.ITEMS_TAG in cfg_item and len(cfg_item[CatConst.ITEMS_TAG]) == 1: all_options[CatConst.ITEMS_TAG].append(cfg_item[CatConst.ITEMS_TAG][0]) return all_options def get_config_resp_all(): desired_configs = {} config_all_properties_url = Options.CLUSTER_URL + "/configurations?fields=properties" desired_configs_resp = curl(Options.CLUSTER_URL + "?fields=Clusters/desired_configs", validate=True, parse=True) if 'Clusters' in desired_configs_resp: if 'desired_configs' in desired_configs_resp['Clusters']: desired_configs_resp = desired_configs_resp['Clusters']['desired_configs'] else: return None else: return None if Options.isPropertyAttributesSupported(): config_all_properties_url += ",properties_attributes" all_options = curl(config_all_properties_url, validate=True, parse=True) else: all_options = build_all_options(desired_configs_resp) if CatConst.ITEMS_TAG in all_options: all_options = all_options[CatConst.ITEMS_TAG] else: return None all_options = filter( lambda x: x[CatConst.TYPE_TAG] in desired_configs_resp and x["tag"] == desired_configs_resp[x[CatConst.TYPE_TAG]][ "tag"], all_options) for item in all_options: dc_item = {} if CatConst.STACK_PROPERTIES in item: # config item could not contain any property dc_item[CatConst.STACK_PROPERTIES] = item[CatConst.STACK_PROPERTIES] else: dc_item[CatConst.STACK_PROPERTIES] = {} if CatConst.STACK_PROPERTIES_ATTRIBUTES in item: dc_item[CatConst.STACK_PROPERTIES_ATTRIBUTES] = item[CatConst.STACK_PROPERTIES_ATTRIBUTES] if "tag" in item: dc_item["tag"] = item["tag"] if dc_item != {}: desired_configs[item[CatConst.TYPE_TAG]] = dc_item return desired_configs def is_services_exists(required_services): """ return true, if required_services is a part of Options.SERVICES :param required_services: list :return: bool """ # sets are equal if Options.SERVICES == set(required_services): return True return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES def get_cluster_services(): services_url = Options.CLUSTER_URL + '/services' raw_services = curl(services_url, parse=True) # expected structure: # items: [ {"href":"...", "ServiceInfo":{"cluster_name":"..", "service_name":".."}}, ..., ... ] if raw_services is not None and "items" in raw_services and isinstance(raw_services["items"], list): return list(map(lambda item: item["ServiceInfo"]["service_name"], raw_services["items"])) Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work") return [] def get_zookeeper_quorum(): zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, parse=True) zoo_quorum = [] zoo_def_port = "2181" if Options.server_config_factory is not None and Options.ZK_OPTIONS in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(Options.ZK_OPTIONS) if Options.ZK_CLIENTPORT in props.properties: zoo_def_port = props.properties[Options.ZK_CLIENTPORT] if "host_components" in zoo_cfg: for item in zoo_cfg["host_components"]: zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port)) return ",".join(zoo_quorum) def get_tez_history_url_base(): try: tez_view = curl(Options.TEZ_VIEW_URL, validate=False, parse=True) except HTTPError as e: raise TemplateProcessingException(str(e)) version = "" if "versions" in tez_view and \ len(tez_view['versions']) > 0 and \ "ViewVersionInfo" in tez_view['versions'][0] and \ 'version' in tez_view['versions'][0]['ViewVersionInfo']: version = tez_view['versions'][0]['ViewVersionInfo']['version'] url = '{0}://{1}:{2}/#/main/views/TEZ/{3}/TEZ_CLUSTER_INSTANCE'.format(Options.API_PROTOCOL, Options.HOST, Options.API_PORT, version) return url def get_kafka_listeners(): kafka_host = "localhost" kafka_port = "6667" if Options.server_config_factory is not None and Options.KAFKA_BROKER_CONF in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(Options.KAFKA_BROKER_CONF) if Options.KAFKA_PORT in props.properties: kafka_port = props.properties[Options.KAFKA_PORT] # Default kafka listeners string kafka_listeners = "PLAINTEXT://{0}:{1}".format(kafka_host, kafka_port) return kafka_listeners def get_ranger_xaaudit_hdfs_destination_directory(): namenode_hostname="localhost" namenode_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.NAMENODE), validate=False, parse=True) if "host_components" in namenode_cfg: namenode_hostname = namenode_cfg["host_components"][0]["HostRoles"]["host_name"] return "hdfs://{0}:8020/ranger/audit".format(namenode_hostname) def get_ranger_policymgr_external_url(): url = "{{policymgr_mgr_url}}" if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) if Options.RANGER_EXTERNAL_URL in props.properties: url = props.properties[Options.RANGER_EXTERNAL_URL] return url def get_jdbc_driver(): driver = "{{jdbc_driver}}" if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) if "DB_FLAVOR" in props.properties: db = props.properties["DB_FLAVOR"] if db.lower() == "mysql": driver = "com.mysql.jdbc.Driver" elif db.lower() == "oracle": driver = "oracle.jdbc.OracleDriver" return driver def get_audit_jdbc_url(): audit_jdbc_url = "{{audit_jdbc_url}}" if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) if "DB_FLAVOR" in props.properties: xa_audit_db_flavor = props.properties["DB_FLAVOR"] if "db_host" in props.properties: xa_db_host = props.properties["db_host"] if "audit_db_name" in props.properties: xa_audit_db_name = props.properties["audit_db_name"] if xa_audit_db_flavor.lower() == 'mysql': audit_jdbc_url = "jdbc:mysql://{0}/{1}".format(xa_db_host, xa_audit_db_name) elif xa_audit_db_flavor.lower() == 'oracle': audit_jdbc_url = "jdbc:oracle:thin:\@//{0}".format(xa_db_host) return audit_jdbc_url def get_audit_db_passwd(): audit_db_passwd = "crypted" if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) if "audit_db_password" in props.properties: audit_db_passwd = props.properties['audit_db_password'] return audit_db_passwd def get_audit_to_db_enabled(config_name): audit_to_db = "true" if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(config_name) if "XAAUDIT.DB.IS_ENABLED" in props.properties: audit_to_db = props.properties["XAAUDIT.DB.IS_ENABLED"] return audit_to_db def get_audit_to_hdfs_enabled(config_name): audit_to_hdfs = "false" if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(config_name) if "XAAUDIT.HDFS.IS_ENABLED" in props.properties: audit_to_hdfs = props.properties["XAAUDIT.HDFS.IS_ENABLED"] return audit_to_hdfs def get_hdfs_batch_filespool_dir(config_name, component): if component == 'hdfs': path = '/var/log/hadoop/hdfs/audit/hdfs/spool' else: path = '/var/log/{0}/audit/hdfs/spool'.format(component) if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): props = Options.server_config_factory.get_config(config_name) if "XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY" in props.properties: path = props.properties["XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY"] return path def get_jt_host(catalog): """ :type catalog: UpgradeCatalog :rtype str """ if catalog.get_parsed_version()["from"] == 13: return read_mapping()["JOBTRACKER"][0] return "" def get_jh_host(catalog): """ :type catalog: UpgradeCatalog :rtype str """ if catalog.get_parsed_version()["from"] == 13: return read_mapping()["HISTORYSERVER"][0] return "" def get_ranger_host(): ranger_config = curl(Options.COMPONENTS_FORMAT.format('RANGER_ADMIN'), validate=False, parse=True) ranger_host_list = [] if "host_components" in ranger_config: for item in ranger_config["host_components"]: ranger_host_list.append(item["HostRoles"]["host_name"]) return ranger_host_list[0] def get_ranger_service_details(): server_cfg_factory = Options.server_config_factory server_cfg_catalog = server_cfg_factory.get_config('admin-properties') properties_latest = server_cfg_catalog.properties data = {} if properties_latest['DB_FLAVOR'].lower() == 'mysql': data['RANGER_JDBC_DRIVER'] = 'com.mysql.jdbc.Driver' data['RANGER_JDBC_DIALECT'] = 'org.eclipse.persistence.platform.database.MySQLPlatform' data['RANGER_JDBC_URL'] = 'jdbc:mysql://{0}/{1}'.format(properties_latest['db_host'], properties_latest['db_name']) data['RANGER_AUDIT_JDBC_URL'] = 'jdbc:mysql://{0}/{1}'.format(properties_latest['db_host'], properties_latest['audit_db_name']) elif properties_latest['DB_FLAVOR'].lower() == 'oracle': data['RANGER_JDBC_DRIVER'] = 'oracle.jdbc.OracleDriver' data['RANGER_JDBC_DIALECT'] = 'org.eclipse.persistence.platform.database.OraclePlatform' data['RANGER_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) data['RANGER_AUDIT_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) return data def get_hive_security_authorization_setting(): # this pattern should be used only once, changes here mimic UpgradeCatalog210.java -> updateRangerHiveConfigs scf = Options.server_config_factory response = "None" if "hive-env" in scf.items() and "hive_security_authorization" in scf.get_config("hive-env").properties: response = scf.get_config("hive-env").properties["hive_security_authorization"] old_ranger_catalog = "ranger-hive-plugin-properties" old_ranger_setting = "ranger-hive-plugin-enabled" hive_server_catalog = "hiveserver2-site" hive_sec_property = "hive.security.authorization.enabled" if scf is not None and old_ranger_catalog in scf.items(): cfg = scf.get_config(old_ranger_catalog) prop = cfg.properties if old_ranger_setting in prop and cfg.properties[old_ranger_setting].upper() == "YES": response = "Ranger" if hive_server_catalog in scf.items(): hive_props = scf.get_config(hive_server_catalog).properties hive_props[hive_sec_property] = "true" if old_ranger_setting in prop: del prop[old_ranger_setting] # workaround for buggy stack advisor if "HIVE" in Options.SERVICES and response == "None": if hive_server_catalog not in scf.items(): scf.create_config(hive_server_catalog) scf.get_config(hive_server_catalog).properties[hive_sec_property] = "false" return response def get_hbase_coprocessmaster_classes(): scf = Options.server_config_factory prop = "hbase.coprocessor.master.classes" hbase_ranger_enabled = False old_value = "" if "hbase-site" in scf.items(): if prop in scf.get_config("hbase-site").properties: old_value = scf.get_config("hbase-site").properties[prop] if "hbase.security.authorization" in scf.get_config("hbase-site").properties and \ scf.get_config("hbase-site").properties["hbase.security.authorization"].upper() == "TRUE": hbase_ranger_enabled = True if hbase_ranger_enabled and "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" not in old_value: if "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" in old_value: old_value = old_value.replace("com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor", "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") else: val = [] if old_value.strip() == "" else old_value.split(',') val.append("org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") old_value = ','.join(val) return old_value def get_rpc_scheduler_factory_class(): if Options.PHOENIX_QUERY_SERVER in Options.ambari_server.components: return "org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory" else: return "" def get_hbase_rpc_controllerfactory_class(): if Options.PHOENIX_QUERY_SERVER in Options.ambari_server.components: return "org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory" else: return "" def get_hbase_regionserver_wal_codec(): prop = "phoenix_sql_enabled" scf = Options.server_config_factory if "hbase-env" in scf.items(): if prop in scf.get_config("hbase-env").properties and scf.get_config("hbase-env").properties[prop].upper() == "TRUE": return "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec" return "org.apache.hadoop.hbase.regionserver.wal.WALCellCodec" def get_hbase_coprocessor_region_classes(): scf = Options.server_config_factory prop = "hbase.coprocessor.region.classes" hbase_ranger_enabled = False old_value = "" if "hbase-site" in scf.items(): if prop in scf.get_config("hbase-site").properties: old_value = scf.get_config("hbase-site").properties[prop] if "hbase.security.authorization" in scf.get_config("hbase-site").properties and \ scf.get_config("hbase-site").properties["hbase.security.authorization"].upper() == "TRUE": hbase_ranger_enabled = True if hbase_ranger_enabled and "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" not in old_value: if "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" in old_value: old_value = old_value.replace("com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor", "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") else: val = [] if old_value.strip() == "" else old_value.split(',') val.append("org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") old_value = ','.join(val) return old_value def _substitute_handler(upgrade_catalog, tokens, value): """ Substitute handler :param upgrade_catalog: UpgradeCatalog :param tokens: list :param value: str :rtype str """ for token in tokens: if token == "{JOBHISTORY_HOST}": value = value.replace(token, get_jh_host(upgrade_catalog)) elif token == "{RESOURCEMANAGER_HOST}": value = value.replace(token, get_jt_host(upgrade_catalog)) elif token == "{HBASE_REGIONSERVER_WAL_CODEC}": value = value.replace(token, get_hbase_regionserver_wal_codec()) elif token == "{HBASE_REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS}": value = value.replace(token, get_rpc_scheduler_factory_class()) elif token == "{HBASE_RPC_CONTROLLERFACTORY_CLASS}": value = value.replace(token, get_hbase_rpc_controllerfactory_class()) elif token == "{ZOOKEEPER_QUORUM}": value = value.replace(token, get_zookeeper_quorum()) elif token == "{HBASE_COPROCESS_MASTER_CLASSES}": value = value.replace(token, get_hbase_coprocessmaster_classes()) elif token == "{HBASE_COPROCESSOR_REGION_CLASSES}": value = value.replace(token, get_hbase_coprocessor_region_classes()) elif token == "{HIVE_SECURITY_AUTHORIZATION}": value = value.replace(token, get_hive_security_authorization_setting()) elif token == "{TEZ_HISTORY_URL_BASE}": value = value.replace(token, get_tez_history_url_base()) elif token == "{RANGER_JDBC_DRIVER}": value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_DRIVER']) elif token == "{RANGER_JDBC_URL}": value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_URL']) elif token == "{RANGER_AUDIT_JDBC_URL}": value = value.replace(token, get_ranger_service_details()['RANGER_AUDIT_JDBC_URL']) elif token == "{RANGER_HOST}": value = value.replace(token, get_ranger_host()) elif token == "{RANGER_JDBC_DIALECT}": value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_DIALECT']) elif token == "{KAFKA_LISTENERS}": value = value.replace(token, get_kafka_listeners()) elif token == "{RANGER_PLUGIN_HBASE_POLICY_CACHE_DIR}": value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hbase")) elif token == "{RANGER_PLUGIN_HDFS_POLICY_CACHE_DIR}": value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hadoop")) elif token == "{RANGER_PLUGIN_HIVE_POLICY_CACHE_DIR}": value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hive")) elif token == "{RANGER_PLUGIN_KNOX_POLICY_CACHE_DIR}": value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_knox")) elif token == "{RANGER_PLUGIN_STORM_POLICY_CACHE_DIR}": value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_storm")) elif token == "{RANGER_HBASE_KEYSTORE_CREDENTIAL_FILE}": value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hbase")) elif token == "{RANGER_HDFS_KEYSTORE_CREDENTIAL_FILE}": value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hadoop")) elif token == "{RANGER_HIVE_KEYSTORE_CREDENTIAL_FILE}": value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hive")) elif token == "{RANGER_KNOX_KEYSTORE_CREDENTIAL_FILE}": value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_knox")) elif token == "{RANGER_STORM_KEYSTORE_CREDENTIAL_FILE}": value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_storm")) elif token == "{XAAUDIT_HDFS_DESTINATION_DIRECTORY}": value = value.replace(token, get_ranger_xaaudit_hdfs_destination_directory()) elif token == "{HBASE_RANGER_REPO_NAME}": value = value.replace(token, Options.CLUSTER_NAME+"_hbase") elif token == "{HDFS_RANGER_REPO_NAME}": value = value.replace(token, Options.CLUSTER_NAME+"_hadoop") elif token == "{HIVE_RANGER_REPO_NAME}": value = value.replace(token, Options.CLUSTER_NAME+"_hive") elif token == "{KNOX_RANGER_REPO_NAME}": value = value.replace(token, Options.CLUSTER_NAME+"_knox") elif token == "{STORM_RANGER_REPO_NAME}": value = value.replace(token, Options.CLUSTER_NAME+"_storm") elif token == "{POLICYMGR_MGR_URL}": value = value.replace(token, get_ranger_policymgr_external_url()) elif token == "{HDFS_JDBC_DRIVER}": value = value.replace(token, get_jdbc_driver()) elif token == "{HBASE_JDBC_DRIVER}": value = value.replace(token, get_jdbc_driver()) elif token == "{HIVE_JDBC_DRIVER}": value = value.replace(token, get_jdbc_driver()) elif token == "{KNOX_JDBC_DRIVER}": value = value.replace(token, get_jdbc_driver()) elif token == "{STORM_JDBC_DRIVER}": value = value.replace(token, get_jdbc_driver()) elif token == "{HDFS_AUDIT_JDBC_URL}": value = value.replace(token, get_audit_jdbc_url()) elif token == "{HBASE_AUDIT_JDBC_URL}": value = value.replace(token, get_audit_jdbc_url()) elif token == "{HIVE_AUDIT_JDBC_URL}": value = value.replace(token, get_audit_jdbc_url()) elif token == "{KNOX_AUDIT_JDBC_URL}": value = value.replace(token, get_audit_jdbc_url()) elif token == "{STORM_AUDIT_JDBC_URL}": value = value.replace(token, get_audit_jdbc_url()) elif token == "{AUDIT_DB_PASSWD}": value = value.replace(token, get_audit_db_passwd()) elif token == "{AUDIT_TO_DB_HDFS}": value = value.replace(token, get_audit_to_db_enabled("ranger-hdfs-plugin-properties")) elif token == "{AUDIT_TO_DB_HBASE}": value = value.replace(token, get_audit_to_db_enabled("ranger-hbase-plugin-properties")) elif token == "{AUDIT_TO_DB_HIVE}": value = value.replace(token, get_audit_to_db_enabled("ranger-hive-plugin-properties")) elif token == "{AUDIT_TO_DB_KNOX}": value = value.replace(token, get_audit_to_db_enabled("ranger-knox-plugin-properties")) elif token == "{AUDIT_TO_DB_STORM}": value = value.replace(token, get_audit_to_db_enabled("ranger-storm-plugin-properties")) elif token == "{AUDIT_TO_HDFS_HDFS}": value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hdfs-plugin-properties")) elif token == "{AUDIT_TO_HDFS_HIVE}": value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hive-plugin-properties")) elif token == "{AUDIT_TO_HDFS_HBASE}": value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hbase-plugin-properties")) elif token == "{AUDIT_TO_HDFS_KNOX}": value = value.replace(token, get_audit_to_hdfs_enabled("ranger-knox-plugin-properties")) elif token == "{AUDIT_TO_HDFS_STORM}": value = value.replace(token, get_audit_to_hdfs_enabled("ranger-storm-plugin-properties")) elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HDFS}": value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hdfs-plugin-properties", "hdfs")) elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HIVE}": value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hive-plugin-properties", "hive")) elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HBASE}": value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hbase-plugin-properties", "hbase")) elif token == "{AUDIT_HDFS_FILESPOOL_DIR_KNOX}": value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-knox-plugin-properties", "knox")) elif token == "{AUDIT_HDFS_FILESPOOL_DIR_STORM}": value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-storm-plugin-properties", "storm")) return value def modify_config_item(config_type, catalog, server_config_factory): """ Modify configuration item :type config_type str :type catalog UpgradeCatalog :type server_config_factory ServerConfigFactory """ # if config group is absent on the server, we will create it if config_type not in server_config_factory.items(): server_config_factory.create_config(config_type) server_config_catalog = server_config_factory.get_config(config_type) server_config_catalog.merge(catalog) def modify_configs(): if len(Options.ARGS) > 1: config_type = Options.ARGS[1] else: config_type = None catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack) # get desired version of catalog Options.stack_advisor = StackAdvisorFactory().get_instance(catalog.name, catalog.target_version) # load all desired configs from the server # ToDo: implement singleton for that class Options.server_config_factory = ServerConfigFactory() if catalog is None: raise FatalException(1, "Upgrade catalog for version %s-%s not found, no configs was modified" % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)) # add user-defined template processing function catalog.add_handler(CatConst.TEMPLATE_HANDLER, _substitute_handler) if config_type is not None and config_type not in catalog.items: raise FatalException("Config type %s not exists, no configs was modified" % config_type) if config_type is not None: modify_config_item(config_type, catalog, Options.server_config_factory) else: for collection_name in catalog.items: modify_config_item(collection_name, catalog, Options.server_config_factory) Options.server_config_factory.process_mapping_transformations(catalog) # commit changes to server, if any will be found Options.server_config_factory.commit() def backup_configs(conf_type=None): dir = "backups_%d" % time.time() file_pattern = "%s%s%s_%s.json" configs = get_config_resp_all() if configs is None: Options.logger.error("Unexpected response from the server") return -1 if conf_type is not None and conf_type in configs: configs = {conf_type: configs[conf_type]} if not os.path.exists(dir): os.mkdir(dir) for item in configs: filename = file_pattern % (dir, os.path.sep, item, configs[item]["tag"]) if os.path.exists(filename): os.remove(filename) try: with open(filename, "w") as f: f.write(json.dumps(configs[item][CatConst.STACK_PROPERTIES], indent=4)) Options.logger.info("Catalog \"%s\" stored to %s", item, filename) except IOError as e: Options.logger.error("Unable to store \"%s\": %s", item, e) def install_services(): SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/{0}' SERVICES = ["MAPREDUCE2", "YARN"] PUT_IN_INSTALLED = [ { "RequestInfo": { "context": "Install MapReduce2" }, "Body": { "ServiceInfo": { "state": "INSTALLED" } } }, { "RequestInfo": { "context": "Install YARN" }, "Body": { "ServiceInfo": { "state": "INSTALLED" } } } ] err_retcode = 0 err_message = "" for index in [0, 1]: try: curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True, request_type="PUT", data=PUT_IN_INSTALLED[index]) except FatalException as e: if not e.code == 0: err_retcode = e.code err_message = err_message + " Error while installing " + SERVICES[index] + ". Details: " + e.message + "." if err_retcode != 0: raise FatalException(err_retcode, err_message + "(Services may already be installed or agents are not yet started.)") Options.OPTIONS.exit_message = "Requests has been submitted to install YARN and MAPREDUCE2. Use Ambari Web to monitor " \ "the status of the install requests." def generate_auth_header(user, password): token = "%s:%s" % (user, password) token = base64.encodestring(token) return {"Authorization": "Basic %s" % token.replace('\n', '')} def curl(url, tokens=None, headers=None, request_type="GET", data=None, parse=False, validate=False, soft_validation=False): """ :rtype type """ _headers = {} handler_chain = [] post_req = ["POST", "PUT"] get_req = ["GET", "DELETE"] print_url = Options.CURL_PRINT_ONLY is not None write_only_print = Options.CURL_WRITE_ONLY is not None if request_type not in post_req + get_req: raise IOError("Wrong request type \"%s\" passed" % request_type) if data is not None and isinstance(data, dict): data = json.dumps(data) if tokens is not None: _headers.update(generate_auth_header(tokens["user"], tokens["pass"])) elif Options.API_TOKENS is not None: _headers.update(generate_auth_header(Options.API_TOKENS["user"], Options.API_TOKENS["pass"])) if request_type in post_req and data is not None: _headers["Content-Length"] = len(data) if headers is not None: _headers.update(headers) if Options.HEADERS is not None: _headers.update(Options.HEADERS) director = build_opener(*handler_chain) if request_type in post_req: _data = bytes(data) req = Request(url, headers=_headers, data=_data) else: req = Request(url, headers=_headers) req.get_method = lambda: request_type if print_url: if write_only_print: if request_type in post_req: Options.logger.info(url) if data is not None: Options.logger.info("POST Data: \n" + str(data)) else: Options.logger.info(url) if request_type in post_req and data is not None: Options.logger.info("POST Data: \n" + str(data)) code = 200 if not (print_url and request_type in post_req): try: resp = director.open(req) out = resp.read() if isinstance(out, bytes): out = out.decode("utf-8") code = resp.code except URLError as e: Options.logger.error(str(e)) if isinstance(e, HTTPError): raise e else: raise FatalException(-1, str(e)) else: if not print_url: Options.logger.info(url) out = "{}" if validate and not print_url and (code > 299 or code < 200): if soft_validation: Options.logger.warning("Response validation failed, please check previous action result manually.") else: raise FatalException(code, "Response validation failed, please check previous action result manually.") if parse: return json.loads(out) else: return out def configuration_item_diff(collection_name, catalog, actual_properties_list): """ Merge catalog item with actual config item on the server Diff item response: { "property" : name, "catalog_item": value, "catalog_value": value, "actual_value": value } :param collection_name: :param catalog: :param actual_properties_list :return: """ verified_catalog = [] catalog_properties = dict(catalog) actual_properties = dict(actual_properties_list) if actual_properties is None: verified_catalog = map(lambda x: { "property": x, "catalog_item": catalog_properties[x], "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG], "actual_value": None }, catalog_properties.keys()) else: # build list of properties according to actual properties verified_catalog = map(lambda x: { "property": x, "catalog_item": catalog_properties[x] if x in catalog_properties else None, "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if x in catalog_properties else None, "actual_value": actual_properties[x] }, actual_properties.keys()) # build list of properties according to catalog properties verified_catalog_catalog = map(lambda x: { "property": x, "catalog_item": catalog_properties[x], "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in catalog_properties[x] else None, "actual_value": actual_properties[x] if x in actual_properties else None, }, catalog_properties.keys()) # append properties, which are listened in catalog but doesn't present in the actual configuration verified_catalog += filter(lambda x: x["property"] not in actual_properties, verified_catalog_catalog) return verified_catalog def configuration_diff_analyze(diff_list): report = {} for item_key in diff_list.keys(): property_diff_list = diff_list[item_key] item_stat = { "skipped": {"count": 0, "items": []}, "ok": {"count": 0, "items": []}, "fail": {"count": 0, "items": []}, "total": {"count": len(property_diff_list), "items": []} } def push_status(status, _property_item): item_stat[status]["count"] += 1 item_stat[status]["items"].append(_property_item) for property_item in property_diff_list: # process properties which can be absent # item was removed, from actual configs according to catalog instructions if property_item["actual_value"] is None \ and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \ and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG: push_status("ok", property_item) # currently skip values with template tag, as there no filter implemented # ToDo: implement possibility to filter values without filter handler, # ToDo: currently filtering is possible only on update-configs stage elif property_item["actual_value"] is not None and property_item["catalog_value"] is not None \ and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \ and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: push_status("skipped", property_item) # item not present in actual config, but present in catalog and no remove tag is present elif property_item["actual_value"] is None and property_item["catalog_value"] is not None: push_status("fail", property_item) # property exists in actual configuration, but not described in catalog configuration elif property_item["actual_value"] is not None and property_item["catalog_value"] is None: push_status("skipped", property_item) # actual and catalog properties are equal elif property_item["catalog_value"] == property_item["actual_value"]: push_status("ok", property_item) elif property_item["catalog_value"] != property_item["actual_value"]: push_status("fail", property_item) report[item_key] = item_stat return report def verify_configuration(): diff_list = {} if len(Options.ARGS) > 1: config_type = Options.ARGS[1] else: config_type = None catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack) # get desired version of catalog server_configs = ServerConfigFactory() if catalog is None: raise FatalException(1, "Upgrade catalog for version %s-%s not found" % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)) if config_type is not None and config_type not in catalog.items.keys() and config_type not in server_configs.items(): raise FatalException("Config type %s not exists" % config_type) # fetch from server all option at one time and filter only desired versions if config_type is not None: diff_list[config_type] = configuration_item_diff(config_type, catalog.items[config_type], server_configs.get_config(config_type).properties) else: for collection_name in catalog.items.keys(): diff_list[collection_name] = configuration_item_diff(collection_name, catalog.items[collection_name], server_configs.get_config(collection_name).properties) analyzed_list = configuration_diff_analyze(diff_list) report_file = None if Options.REPORT_FILE is not None: try: report_file = open(Options.REPORT_FILE, "w") except IOError as e: Options.logger.error("Report file open error: %s" % e.message) for config_item in analyzed_list: if analyzed_list[config_item]["fail"]["count"] != 0: Options.logger.info( "%s: %s missing configuration(s) - please look in the output file for the missing params" % ( config_item, analyzed_list[config_item]["fail"]["count"] ) ) if report_file is not None: report_formatter(report_file, config_item, analyzed_list[config_item]) else: Options.logger.info("%s: verified" % config_item) if report_file is not None: try: report_file.close() except IOError as e: Options.logger.error("Report file close error: %s" % e.message) def report_formatter(report_file, config_item, analyzed_list_item): prefix = "Configuration item %s" % config_item if analyzed_list_item["fail"]["count"] > 0: for item in analyzed_list_item["fail"]["items"]: report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"\n" % ( prefix, item["property"], item["actual_value"], item["catalog_value"] )) def main(): action_list = { # list of supported actions Options.GET_MR_MAPPING_ACTION: get_mr1_mapping, Options.DELETE_MR_ACTION: delete_mr, Options.ADD_YARN_MR2_ACTION: add_services, Options.MODIFY_CONFIG_ACTION: modify_configs, Options.INSTALL_YARN_MR2_ACTION: install_services, Options.BACKUP_CONFIG_ACTION: backup_configs, Options.VERIFY_ACTION: verify_configuration } parser = optparse.OptionParser(usage="usage: %prog [options] action\n Valid actions: " + ", ".join(action_list.keys()) + "\n update-configs accepts type, e.g. hdfs-site to update specific configs") parser.add_option("-n", "--printonly", action="store_true", dest="printonly", default=False, help="Prints all the curl commands to be executed (no post/update request will be performed)") parser.add_option("-w", "--writeonly", action="store_true", dest="writeonly", default=False, help="in the combination with --printonly param will print only post/update requests") parser.add_option("-o", "--log", dest="logfile", default=None, help="Log file") parser.add_option("--report", dest="report", default=None, help="Report file output location") parser.add_option('--upgradeCatalog', default=None, help="Upgrade Catalog file full path", dest="upgrade_json") parser.add_option('--fromStack', default=None, help="stack version to upgrade from", dest="from_stack") parser.add_option('--toStack', default=None, help="stack version to upgrade to", dest="to_stack") parser.add_option('--hostname', default=None, help="Hostname for Ambari server", dest="hostname") parser.add_option('--port', default='8080', help="Port number for Ambari server", dest="port") parser.add_option('--https', default=False, action="store_true", dest="https", help="Use https protocol for connection to the server") parser.add_option('--user', default=None, help="Ambari admin user", dest="user") parser.add_option('--password', default=None, help="Ambari admin password", dest="password") parser.add_option('--clustername', default=None, help="Cluster name", dest="clustername") (options, args) = parser.parse_args() Options.initialize_logger(options.logfile) options.warnings = [] if len(args) == 0: parser.error("No action entered") if options.user is None: options.warnings.append("User name must be provided (e.g. admin)") if options.hostname is None: options.warnings.append("Ambari server host name must be provided") if options.clustername is None: options.warnings.append("Cluster name must be provided") if options.password is None: options.password = getpass.getpass("Please enter Ambari admin password: ") if options.password == "": options.warnings.append("Ambari admin user's password name must be provided (e.g. admin)") if options.https: Options.API_PROTOCOL = "https" if options.port: Options.API_PORT = str(options.port) action = args[0] # check params according to executed action if action == Options.MODIFY_CONFIG_ACTION or action == Options.VERIFY_ACTION: if options.upgrade_json is None: options.warnings.append("Upgrade catalog option need to be set") if options.from_stack is None: options.warnings.append("Should be provided fromStack option") if options.to_stack is None: options.warnings.append("Should be provided toStack option") if action == Options.VERIFY_ACTION: if options.report is None: options.warnings.append("Should be provided report option") if len(options.warnings) != 0: print parser.print_help() for warning in options.warnings: Options.logger.warn(warning) raise FatalException(1, "Not all required options was set") options.exit_message = "Upgrade action '%s' completed successfully." % action if options.printonly: Options.CURL_PRINT_ONLY = "yes" options.exit_message = "Simulated execution of action '%s'. Verify the list edit calls." % action if options.writeonly: Options.CURL_WRITE_ONLY = "yes" Options.ARGS = args Options.OPTIONS = options Options.HOST = options.hostname Options.CLUSTER_NAME = options.clustername Options.API_TOKENS = { "user": options.user, "pass": options.password } Options.REPORT_FILE = options.report if action in action_list: Options.initialize() action_list[action]() else: parser.error("Invalid action") if options.exit_message is not None: Options.logger.info(options.exit_message) if __name__ == "__main__": try: main() except (KeyboardInterrupt, EOFError): print("\nAborting ... Keyboard Interrupt.") sys.exit(1) except HTTPError as e: print("\nResponse error, " + str(e)) sys.exit(1) except FatalException as e: if e.reason is not None: error = "Exiting with exit code {0}. Reason: {1}".format(e.code, e.reason) if Options.logger is not None: Options.logger.error(error) sys.exit(e.code)