#!/usr/bin/env python2.7 # -*- coding: utf-8 -*- # Copyright 2019 Univention GmbH # # http://www.univention.de/ # # All rights reserved. # # The source code of this program is made available # under the terms of the GNU Affero General Public License version 3 # (GNU AGPL V3) as published by the Free Software Foundation. # # Binary versions of this program provided by Univention to you as # well as other copyrighted, protected or trademarked materials like # Logos, graphics, fonts, specific documentations and configurations, # cryptographic keys etc. are subject to a license agreement between # you and Univention. # # This program is provided in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public # License with the Debian GNU/Linux or Univention distribution in file # /usr/share/common-licenses/AGPL-3; if not, see # . """ Command line tool to import (create/modify/remove) UDM objects defined in a CSV file in UCS. https://github.com/univention/udm_import """ import csv import sys import codecs from six import iteritems, string_types try: from univention.udm import UDM, CreateError, ModifyError, NoObject, NoSuperordinate, UnknownModuleType, UnknownProperty except ImportError: print('This script has to run on a UCS version 4.3 erratum 313 or higher.') print('Run "lsb_release -a" get Linux distribution and version information.') sys.exit(1) try: import magic except ImportError: print('The Python library "magic" is required.') print('Run "univention-install python-magic" to install it.') sys.exit(1) try: import click except ImportError: print('The Python library "click" is required.') print('Run "univention-install python-click" to install it.') sys.exit(1) try: from typing import Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, Optional, Union from univention.udm import BaseModuleTV, BaseObjectTV except ImportError: pass @click.command() @click.argument('udm_module') @click.argument('action', type=click.Choice(['create', 'modify', 'remove'])) @click.argument('filename', type=click.Path(exists=True)) @click.pass_context def main(ctx, udm_module, action, filename): # type: (click.Context, str, str, str) -> None """ UDM_MODULE is the name of a UDM module like "users/user", "groups/group" etc. To see all possible values run "udm modules" on the command line. ACTION is "create", "modify" or "remove". FILENAME is the CSV file to read. """ res = UdmImport(udm_module, action, filename).do_import() ctx.exit(res) class UdmImport(object): obj_non_props = ['dn', 'options', 'policies', 'position', 'superordinate'] def __init__(self, udm_module_name, action, filename): # type: (str, str, str) -> None self.udm_module_name = udm_module_name self.action = action self.filename = filename udm = UDM.admin().version(2) try: self.mod = udm.get(self.udm_module_name) # type: BaseModuleTV except UnknownModuleType as exc: Log.fatal('Could not load UDM module {!r}. Error: {}'.format(self.udm_module_name, exc)) Log.good('Loaded UDM module {!r}.'.format(self.udm_module_name)) def do_import(self): # type: () -> int reader = CsvReader(self.filename) rows = list(reader.read()) # Log.debug('rows=\n{}'.format('\n'.join([repr(row) for row in rows]))) if not rows: Log.fatal('File contains no data.') Log.info('Found {!r} rows in {!r}.'.format(len(rows), self.filename)) self.check_preconditions(rows) labels = { 'create': ('Creating', 'Created'), 'modify': ('Modifying', 'Modified'), 'remove': ('Removing', 'Removed'), }[self.action] errors = 0 with click.progressbar(rows, label='{} {} objects'.format(labels[0], self.udm_module_name), show_pos=True) as bar: for row in bar: try: dn = self.exec_admin(row) Log.good(' -> {!r}'.format(dn), lb=True) except (CreateError, ModifyError, NoObject) as exc: errors += 1 Log.error('{}'.format(exc), lb=True) continue except UnknownProperty as exc: Log.fatal('{}.\nUse "udm {}" to see known attributes.\nRow: {!r}'.format(exc, self.udm_module_name, row)) # TODO: wait_for_replication: # import /usr/lib/nagios/plugins/check_univention_replication # run its main() while returns != 0 and timeout not hit log = Log.error if errors else Log.good log('{} {} {} objects. {} errors.'.format(labels[1], len(rows) - errors, self.udm_module_name, errors)) return 1 if errors else 0 def check_preconditions(self, rows): # type: (List[Dict[unicode, unicode]]) -> None if self.action == 'create' and 'dn' in rows[0].keys(): Log.fatal('Column "dn" not allowed with operation {!r}.'.format(self.action)) if self.action in ('modify', 'remove'): id_prop = self.mod.meta.identifying_property if id_prop not in rows[0].keys() and 'dn' not in rows[0].keys(): Log.fatal('Column {!r} or {!r} required with operation {!r}.'.format('dn', id_prop, self.action)) if self.action in ('create', 'modify'): try: known_props = self.obj_non_props + self.mod.new().props.__dict__.keys() except NoSuperordinate: # handle UnknownProperty during import pass else: unknown_columns = [key for key in rows[0].keys() if key not in known_props] if unknown_columns: Log.fatal( 'Unknown properties: {!r}.\nUse "udm {}" to see known attributes.'.format( unknown_columns, self.udm_module_name)) @classmethod def set_attrs(cls, obj, row): # type: (BaseModuleTV, Dict[unicode, unicode]) -> None for k, v in iteritems(row): if k in cls.obj_non_props: setattr(obj, k, v) else: setattr(obj.props, k, v) def get_obj(self, row): # type: (Dict[unicode, unicode]) -> BaseObjectTV if 'dn' in row: return self.mod.get(row['dn']) else: id_prop = self.mod.meta.identifying_property return self.mod.get_by_id(row[id_prop]) def create(self, row): # type: (Dict[unicode, unicode]) -> str obj = self.mod.new() self.set_attrs(obj, row) obj.save() return obj.dn def modify(self, row): # type: (Dict[unicode, unicode]) -> str obj = self.get_obj(row) self.set_attrs(obj, row) obj.save() return obj.dn def remove(self, row): # type: (Dict[unicode, unicode]) -> str obj = self.get_obj(row) dn = obj.dn obj.delete() return dn def exec_admin(self, row): # type: (Dict[unicode, unicode]) -> str meth = { 'create': self.create, 'modify': self.modify, 'remove': self.remove }[self.action] return meth(row) class Log(object): @staticmethod def _log(msg, color, lb=False): click.secho('{}{}'.format('\n' if lb else '', msg), fg=color) @classmethod def debug(cls, msg, lb=False): cls._log(msg, 'blue', lb) @classmethod def info(cls, msg, lb=False): cls._log(msg, 'reset', lb) @classmethod def error(cls, msg, lb=False): cls._log(msg, 'red', lb) @classmethod def fatal(cls, msg, exit_code=1): cls._log(msg, 'red', True) sys.exit(exit_code) @classmethod def good(cls, msg, lb=False): cls._log(msg, 'green', lb) class CsvReader(object): """ Blatantly copied (and adapted) from ucs-school-import/modules/ucsschool/importer/reader/csv_reader.py """ encoding = "utf-8" def __init__(self, filename): # type: (str) -> None """ :param str filename: Path to file with user data. """ self.filename = filename self.fieldnames = [] # type: Iterable[str] @staticmethod def get_encoding(filename_or_file): # type: (Union[str, BinaryIO]) -> str """ Get encoding of file ``filename_or_file``. Handles both magic libraries. :param filename_or_file: filename or open file :type filename_or_file: str or file :return: encoding of filename_or_file :rtype: str """ if isinstance(filename_or_file, string_types): with open(filename_or_file, 'rb') as fp: txt = fp.read() elif isinstance(filename_or_file, file): old_pos = filename_or_file.tell() txt = filename_or_file.read() filename_or_file.seek(old_pos) else: raise ValueError('Argument "filename_or_file" has unknown type {!r}.'.format(type(filename_or_file))) if hasattr(magic, 'from_file'): encoding = magic.Magic(mime_encoding=True).from_buffer(txt) elif hasattr(magic, 'detect_from_filename'): encoding = magic.detect_from_content(txt).encoding else: raise RuntimeError('Unknown version or type of "magic" library.') # auto detect utf-8 with BOM if encoding == 'utf-8' and txt.startswith(codecs.BOM_UTF8): encoding = 'utf-8-sig' return encoding @staticmethod def get_dialect(fp): # type: (BinaryIO) -> csv.Dialect """ Overwrite me to force a certain CSV dialect. :param file fp: open file to read from :return: CSV dialect :rtype: csv.Dialect """ return csv.Sniffer().sniff(fp.readline()) def read(self): # type: () -> Iterator[Dict[unicode, unicode]] """ Generate dicts from a CSV file. :return: iterator over list of dicts :rtype: Iterator """ with click.open_file(self.filename, "r") as fp: try: dialect = self.get_dialect(fp) except csv.Error as exc: Log.fatal('Could not determine CSV dialect. Error: {}'.format(exc)) fp.seek(0) encoding = self.get_encoding(fp) Log.info('Reading {} with encoding {!r}.'.format(click.format_filename(self.filename), encoding)) fpu = UTF8Recoder(fp, encoding) reader = csv.DictReader(fpu, dialect=dialect) self.fieldnames = reader.fieldnames for row in reader: yield { unicode(key, 'utf-8').strip(): unicode(value or "", 'utf-8').strip() for key, value in iteritems(row) } class UTF8Recoder(object): """ Iterator that reads an encoded stream and reencodes the input to UTF-8. Blatantly copied from docs.python.org/2/library/csv.html """ def __init__(self, f, encoding): # type: (BinaryIO, str) -> None self.reader = codecs.getreader(encoding)(f) def __iter__(self): # type: () -> UTF8Recoder return self def next(self): # type: () -> str return self.reader.next().encode("utf-8") if __name__ == '__main__': main()