""" Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import urllib2, base64, json, ssl, time, random, sys from optparse import OptionParser from contextlib import closing class SslContext: def build(self, url): if not url.startswith('https') or not hasattr(ssl, 'SSLContext'): return None return ssl.SSLContext(self._protocol()) if self._protocol() else ssl.create_default_context() def _protocol(self): if hasattr(ssl, 'PROTOCOL_TLS'): return ssl.PROTOCOL_TLS elif hasattr(ssl, 'PROTOCOL_TLSv1_2'): return ssl.PROTOCOL_TLSv1_2 elif hasattr(ssl, 'PROTOCOL_TLSv1_1'): return ssl.PROTOCOL_TLSv1_1 elif hasattr(ssl, 'PROTOCOL_TLSv1'): return ssl.PROTOCOL_TLSv1 else: return None class PermissiveSslContext: def build(self, url): context = SslContext().build(url) if hasattr(context, '_https_verify_certificates'): context._https_verify_certificates(False) return context class Url: @classmethod def base(clazz, protocol, host, port): return clazz('%s://%s:%d' % (protocol, host, port)) def __init__(self, url_str): self.base = url_str.rstrip('/') def __div__(self, suffix_url): suffix_str = str(suffix_url) if self._is_absolute(suffix_str): return Url(suffix_str) else: return Url(self.base + (suffix_str if suffix_str.startswith('/') else '/' + suffix_str)) def _is_absolute(self, suffix_str): return suffix_str.startswith(self.base) def query_params(self, a_dict): return Url(self.base + '?' + '&'.join('%s=%s' % (name, value) for name, value in a_dict.items())) def __str__(self): return self.base class Header: @classmethod def csrf(clazz): return clazz('X-Requested-By', 'ambari') def __init__(self, key, value): self.key, self.value = key, value def add_to(self, request): request.add_header(self.key, self.value) class BasicAuth: def __init__(self, user, password): self.header = Header( 'Authorization', 'Basic %s' % base64.encodestring('%s:%s' % (user, password)).replace('\n', '')) def authenticate(self, request): self.header.add_to(request) class ResponseTransformer: @staticmethod def identity(): return lambda url, code, data: (code, data) def __call__(self, url, code, data): raise RuntimeError('Subclass responsibility') class UnexpectedHttpCode(Exception): pass class JsonTransformer(ResponseTransformer): def __call__(self, url, code, data): if 200 <= code <= 299: return code, self._parse(data) else: return UnexpectedHttpCode('Unexpected http code: %d url: %s response: %s' % (code, url, data)) def _parse(self, a_str): if not a_str: return {} try: return json.loads(a_str) except ValueError as e: raise ValueError('Error %s while parsing: %s' % (e, a_str)) class RestClient: def __init__(self, an_url, authenticator, headers=[], ssl_context=SslContext(), request_transformer=lambda r:r, response_transformer=ResponseTransformer.identity()): self.base_url = an_url self.authenticator = authenticator self.headers = headers self.ssl_context = ssl_context self.request_transformer = request_transformer self.response_transformer = response_transformer def get(self, suffix_str): return self._response(*self._request(suffix_str, 'GET')) def post(self, suffix_str, data): return self._response(*self._request(suffix_str, 'POST', data=data)) def put(self, suffix_str, data): return self._response(*self._request(suffix_str, 'PUT', data=data)) def delete(self, suffix_str): return self._response(*self._request(suffix_str, 'DELETE')) def _request(self, suffix_str, http_method, data=""): url = str(self.base_url / suffix_str) request = urllib2.Request(url, data=self.request_transformer(data)) request.get_method = lambda: http_method self.authenticator.authenticate(request) map(lambda each: each.add_to(request), self.headers) return request, self.ssl_context.build(url) def _response(self, request, ssl_context): with closing(urllib2.urlopen(request, context=ssl_context)) as response: return self.response_transformer(request.get_full_url(), response.getcode(), response.read()) def rebased(self, new_base_url): return RestClient( new_base_url, self.authenticator, self.headers, self.ssl_context, self.request_transformer, self.response_transformer) class ServiceComponent: def __init__(self, client, a_dict): self.client = client self.name = a_dict['ServiceComponentInfo']['component_name'] self.component = a_dict def host_names(self): return [each['HostRoles']['host_name'] for each in self.component['host_components']] def __str__(self): return self.name class Service: def __init__(self, client, a_dict): self.client = client self.service = a_dict self.href = self.service['href'] self.name = self.service['ServiceInfo']['service_name'] def delete(self): try: self.client.delete(self.href) except urllib2.HTTPError as e: if e.code != 404: raise e def start(self): _, data = self.client.put(self.href, {'ServiceInfo': {'state' : 'STARTED'}}) return AsyncResult.of(self.client, data) def components(self): return [ServiceComponent(self.client, self.client.get(each['href'])[1]) for each in self.service['components']] def component(self, component_name): matches = [each for each in self.components() if each.name == 'HDFS_CLIENT'] return matches[0] if matches else None def __str__(self): return self.name class Cluster: def __init__(self, cluster_name, host, port=8080, protocol='http', user='admin', password='admin', api_version='v1'): self.cluster_name = cluster_name self.base_url = Url.base(protocol, host, port) / 'api' / api_version self.client = RestClient( self.base_url / 'clusters' / cluster_name, BasicAuth(user, password), headers=[Header.csrf()], ssl_context=PermissiveSslContext(), request_transformer=json.dumps, response_transformer=JsonTransformer()) def version(self): _, data = self.client.get('') return data['Clusters']['version'] def installed_stack(self): stack_name, stack_ver = cluster.version().split('-') return Stack(stack_name, stack_ver, self.client.rebased(self.base_url / 'stacks')) def add_service(self, service_name): self.client.post(Url('services') / service_name, {'ServiceInfo' : {'service_name' : service_name}}) def add_service_component(self, service_name, component_name): self.client.post(Url('services') / service_name / 'components' / component_name, {}) def add_host_component(self, service_name, component_name, host_name): self.client.post( Url('hosts').query_params({'Hosts/host_name': host_name}), {'host_components': [{'HostRoles': {'component_name': component_name}}]}) _, data = self.client.put(Url('services') / service_name, {'ServiceInfo': {'state' : 'INSTALLED'}}) return AsyncResult.of(self.client, data) def service(self, service_name): _, data = self.client.get(Url('services') / service_name) return Service(self.client, data) def services(self): _, data = self.client.get(Url('services')) return [Service(self.client, self.client.get(each['href'])[1]) for each in data['items']] def has_service(self, service_name): return service_name in [each.name for each in self.services()] def add_config(self, config_type, tag, properties): self.client.post(Url('configurations'), { 'type': config_type, 'tag': tag, 'properties' : properties }) self.client.put('', { 'Clusters' : { 'desired_configs': {'type': config_type, 'tag' : tag } } }) def config(self, config_type): code, data = self.client.get(Url('configurations').query_params({'type': config_type})) return Configs(self.client, [Config(self.client, each) for each in data['items']]) def start_all(self): _, data = self.client.put('services', { 'RequestInfo' : { 'context' : '_PARSE_.START.ALL_SERVICES', 'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' : self.cluster_name } }, 'Body' : { 'ServiceInfo' : {'state' : 'STARTED'} } }) return AsyncResult.of(self.client, data) def stop_all(self): _, data = self.client.put('services', { 'RequestInfo' : { 'context' : '_PARSE_.STOP.ALL_SERVICES', 'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' : self.cluster_name } }, 'Body' : { 'ServiceInfo' : {'state' : 'INSTALLED'} } }) return AsyncResult.of(self.client, data) def __str__(self): return 'Cluster: %s (%s)' % (self.cluster_name, self.client.base_url) class OperationFailed(Exception): pass class AsyncResult: @staticmethod def of(client, data): return AsyncResult(client, data) if data else NoResult() def __init__(self, client, a_dict): self.client = client self.status = a_dict['Requests']['status'] self.id = a_dict['Requests']['id'] self.href = a_dict['href'] def request_status(self): _, data = self.client.get(self.href) return data['Requests']['request_status'] def is_finished(self): return self.request_status() in ['FAILED', 'TIMEDOUT', 'ABORTED', 'COMPLETED', 'SKIPPED_FAILED'] def await(self): while not self.is_finished(): time.sleep(1) status = self.request_status() if status != 'COMPLETED': raise OperationFailed("%s failed with status: %s" % (self.id, status)) return status def __str__(self): return "Request status: %s id: %d" % (self.status, self.id) class NoResult: def request_status(): return 'UNKNOWN' def is_finished(self): return True def await(self): pass class Config: def __init__(self, client, a_dict): self.client = client self.config = a_dict def version(self): return int(self.config['version']) def href(self): return self.config['href'] def properties(self): code, data = self.client.get(self.href()) return data['items'][0]['properties'] def __str__(self): return json.dumps(self.config) class Configs: def __init__(self, client, config_list): self.client = client self.configs = sorted(config_list, key=lambda config: config.version()) def latest(self): return self.configs[-1] class Stack: def __init__(self, stack_name, stack_version, client): self.name = stack_name self.version = stack_version self.client = client def has_service(self, service_name): try: _, data = self.client.get(Url(self.name) / 'versions' / self.version / 'services' / service_name) return True except urllib2.HTTPError as e: if e.code == 404: return False else: raise e class CannotLoad(Exception): pass class FsStorage: def save(self, key, value): with open("saved-" + key, 'wt') as f: f.write(repr(value)) def load(self, key): try: with open("saved-" + key, 'rt') as f: return eval(f.read()) except IOError as e: raise CannotLoad(key + ' not found') class Conversion: def __init__(self, cluster, storage): self.cluster = cluster self.storage = storage def check_prerequisites(self): print 'Checking %s' % self.cluster ver = self.cluster.version() print 'Found stack %s' % ver if not ver.startswith('HDP-3.'): print 'Only HDP-3.x stacks are supported.' return False if not self.cluster.installed_stack().has_service('ONEFS'): print 'ONEFS management pack is not installed.' return False sys.stdout.write('Please, confirm you have made backup of the Ambari db [y/n] (n)? ') if raw_input() != 'y': return False return True def perform(self): hdfs_client_hosts = self.find_hdfs_client_hosts() self.stop_all_services() self.read_configs() self.delete_hdfs() self.add_onefs() self.configure_onefs() self.install_onefs_clients(hdfs_client_hosts) self.start_all_services() def find_hdfs_client_hosts(self): if self.cluster.has_service('HDFS'): print 'Collecting hosts with HDFS_CLIENT' hdfs_client_hosts = self.cluster.service('HDFS').component('HDFS_CLIENT').host_names() self.storage.save('hdfs_client_hosts', hdfs_client_hosts) else: print 'Using previously saved HDFS client hosts' hdfs_client_hosts = self.storage.load('hdfs_client_hosts') print 'Found hosts %s' % hdfs_client_hosts return hdfs_client_hosts def stop_all_services(self): print 'Stopping all services..' self.cluster.stop_all().await() def read_configs(self): if self.cluster.has_service('HDFS'): print 'Downloading core-site..' self.core_site = self.cluster.config('core-site').latest().properties() print 'Downloading hdfs-site..' self.hdfs_site = self.cluster.config('hdfs-site').latest().properties() print 'Downloading hadoop-env..' self.hadoop_env = self.cluster.config('hadoop-env').latest().properties() self.storage.save('core-site', self.core_site) self.storage.save('hdfs-site', self.hdfs_site) self.storage.save('hadoop-env', self.hadoop_env) else: print 'Using previously saved HDFS configs' self.core_site = self.storage.load('core-site') self.hdfs_site = self.storage.load('hdfs-site') self.hadoop_env = self.storage.load('hadoop-env') def delete_hdfs(self): print 'Deleting HDFS..' if self.cluster.has_service('HDFS'): self.cluster.service('HDFS').delete() else: print 'Already deleted.' def add_onefs(self): print 'Adding ONEFS..' if self.cluster.has_service('ONEFS'): print 'Already added.' else: self.cluster.add_service('ONEFS') try: self.cluster.add_service_component('ONEFS', 'ONEFS_CLIENT') except urllib2.HTTPError as e: if e.code != 409: raise e def configure_onefs(self): print 'Adding ONEFS config..' self.cluster.add_config('onefs', random_tag('onefs'), { "onefs_host" : self.smart_connect_zone(self.core_site) }) print 'Adding core-site' self.cluster.add_config('core-site', random_tag('new-core-site'), self.core_site) print 'Adding hdfs-site' self.cluster.add_config('hdfs-site', random_tag('new-hdfs-site'), self.hdfs_site) print 'Adding hadoop-env-site' self.cluster.add_config('hadoop-env', random_tag('new-hadoop-env'), self.hadoop_env) def smart_connect_zone(self, core_site): def_fs = core_site['fs.defaultFS'] if '://' in def_fs: def_fs = def_fs.split('://')[1] if ':' in def_fs: def_fs = def_fs.split(':')[0] return def_fs def install_onefs_clients(self, hdfs_client_hosts): print 'Adding ONEFS_CLIENT to hosts: %s' % (hdfs_client_hosts) results = [self.add_onefs_client(each) for each in hdfs_client_hosts] for each in results: each.await() def add_onefs_client(self, hostname): try: return self.cluster.add_host_component('ONEFS', 'ONEFS_CLIENT', hostname) except urllib2.HTTPError as e: if e.code == 409: print 'Already added to host %s' % hostname return NoResult() else: raise e def start_all_services(self): print 'Starting all services..' self.cluster.start_all().await() def random_tag(tag_name): return "%s-%s" % (tag_name, time.time()) class CommandLine: def __init__(self): self.parser = OptionParser() self.parser.add_option("-o", '--host', dest='host', help='Ambari server host', default='localhost') self.parser.add_option("-p", '--port', dest='port', help='Ambari server port', default='8080') self.parser.add_option("-c", '--cluster', dest='cluster_name', help='Cluster name') self.parser.add_option("-u", '--user', dest='admin_user', help='Admin user name', default='admin') self.parser.add_option("-k", '--password', dest='admin_pass', help='Admin user name', default='admin') self.parser.add_option("-t", '--protocol', dest='protocol', help='HTTP protocol', default='http') def parse_options(self): options, args = self.parser.parse_args() if not options.cluster_name: self.parser.error('Missing cluster name.') if not options.protocol or options.protocol.lower() not in ['http', 'https']: self.parser.error('Invalid protocol. Use http or https.') if not options.port or not options.port.isdigit(): self.parser.error('Port should be an integer') return options if __name__ == '__main__': options = CommandLine().parse_options() cluster = Cluster( options.cluster_name, options.host, port=int(options.port), protocol=options.protocol.lower(), user=options.admin_user, password=options.admin_pass) print 'This script will replace the HDFS service to ONEFS' print 'The following prerequisites are required:' print ' * ONEFS management package must be installed' print ' * Ambari must be upgraded to >=v2.7.1' print ' * Stack must be upgraded to >=HDP-3.0' print ' * Is highly recommended to backup ambari database before you proceed.' conversion = Conversion(cluster, FsStorage()) if not conversion.check_prerequisites(): sys.exit() else: conversion.perform()