#!/usr/bin/env python2 # -*- mode: python; -*- # xatk is a window switcher which assigns keybindings to windows dynamically # Copyright (C) 2010-2012,2015 Vyacheslav Levit # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import ConfigParser import codecs import errno import fcntl import functools import locale import logging.handlers import optparse import os.path import re import signal import StringIO import subprocess import sys from ConfigParser import RawConfigParser from collections import OrderedDict try: from Xlib import X from Xlib import XK from Xlib import Xatom from Xlib import display from Xlib import protocol import Xlib.error from Xlib.error import BadWindow except ImportError: XLIB_PRESENT = False else: XLIB_PRESENT = True PROG_NAME = os.path.basename(sys.argv[0]) VERSION = (0, 2, 2) CONFIG_PATH = ('~/.xatkrc', '~/.xatk/xatkrc') CONFIG_DIR = os.path.expanduser("~/.xatk") LOCK_PATH = os.path.join(CONFIG_DIR, "lock") VERSION_NAME = '%s %s' % (PROG_NAME, '.'.join(map(str, VERSION))) FIELDS = ('config', 'windows', 'keys', 'signals', 'X', 'lock') ENCODING = locale.getpreferredencoding() def escape(string): """ Escape non-ascii characters in the string (unicode, str or object) and return ascii representation of the string. """ if isinstance(string, unicode): return repr(string)[2:-1] elif isinstance(string, str): return repr(string)[1:-1] else: return repr(unicode(string))[2:-1] def unicode_error_ignore_and_log(exception): """ UnicodeError handler which ignores malformed data and does logging. """ Log.exception('encoding', exception) return codecs.ignore_errors(exception) class Log(object): """ Provide static methods for logging similar to those in the logging module. """ SYSINFO = 5 STDERR = 45 STDOUT = 25 CATLEN = 7 FORMAT_DICT = OrderedDict(( ('time', '%(asctime)-8s,%(msecs)03d'), ('level', '%(levelname)-8s'), ('category', '%(catstr)-' + str(CATLEN) + 's'), ('message', '%(message)s'))) MSG_FORMAT_FULL = ' - '.join(FORMAT_DICT.values()) MSG_FORMAT = '%s - %s' % (FORMAT_DICT['level'], FORMAT_DICT['message']) DATE_FORMAT = '%H:%M:%S' logging.addLevelName(SYSINFO, 'SYSINFO') logging.addLevelName(STDERR, 'STDERR') logging.addLevelName(STDOUT, 'STDOUT') logger = logging.getLogger('root') logger.setLevel(SYSINFO) handler = logging.StreamHandler() logger.addHandler(handler) formatter = logging.Formatter(MSG_FORMAT, DATE_FORMAT) # don't print exception traceback to stderr formatter.formatException = lambda exc_info: '' handler.setFormatter(formatter) categories = set() categoryFilter = logging.Filter('root') logfileCreated = False rotatingFileHandler = None setLevel = handler.setLevel class SessionRotatingFileHandler(logging.handlers.RotatingFileHandler): """ Handler for logging to a set of files, which switches from one file to the next every new session. """ def __init__(self, filename, backupCount=0): self.fileExists = os.path.exists(filename) logging.handlers.BaseRotatingHandler.__init__(self, filename, 'a', 'utf-8', 0) self.backupCount = backupCount def shouldRollover(self, record): if not Log.logfileCreated: Log.logfileCreated = True if self.fileExists: return True return False class StdLog(object): """File-like object intended to redirect stdout and stderr.""" def __init__(self, std): """ Create a stderr-like or stdout-like object. std value should be either Log.STDERR or Log.STDOUT. """ if std == Log.STDOUT: stdbackup = sys.stdout elif std == Log.STDERR: stdbackup = sys.stderr else: raise ValueError('invalid value of std: %d' % std) self._std = std self.stdbackup = stdbackup self.closed = stdbackup.closed self.encoding = stdbackup.encoding self.errors = stdbackup.errors self.mode = stdbackup.mode self.name = '<log>' self.newlines = stdbackup.newlines self.softspace = stdbackup.softspace self.__iter__ = stdbackup.__iter__ self.next = stdbackup.next self.close = stdbackup.close self.seek = stdbackup.seek self.tell = stdbackup.tell self.read = stdbackup.read self.readline = stdbackup.readline self.truncate = stdbackup.truncate def isatty(self): return False def write(self, s): for l in s.splitlines(): if l != '': Log.log(self._std, '', l) def writelines(self, iterable): for l in iterable: self.write(l) def flush(self): Log.handler.flush() if hasattr(Log, 'rotatingFileHandler'): Log.rotatingFileHandler.flush() @staticmethod def _update_extra(kwargs, category): """ Update `extra` dictionary in `kwargs` dictionary with `catset` and `catstr` items obtained from `category`. `category` is expected to be a string or a tuple of strings. """ if isinstance(category, basestring): catset = set([category]) else: catset = set(category) # form `catstr` string with length not larger than CATLEN catlen = (Log.CATLEN - len(catset) + 1) / len(catset) rem = (Log.CATLEN - len(catset) + 1) % len(catset) cuts = [] for i, cat in enumerate(catset): if len(cat) < catlen: rem += catlen - len(cat) cuts.append(cat[:catlen]) continue add = rem / (len(catset) - i) cuts.append(cat[:catlen + add]) rem -= add catstr = ','.join(cuts) catdict = {'catset': catset, 'catstr': catstr} if 'extra' not in kwargs: kwargs['extra'] = catdict else: kwargs['extra'].update(catdict) @staticmethod def capture_stdout(): sys.stdout = Log.StdLog(Log.STDOUT) @staticmethod def capture_stderr(): sys.stderr = Log.StdLog(Log.STDERR) @staticmethod def release_stdout(): sys.stdout = sys.stdout.stdbackup @staticmethod def release_stderr(): sys.stderr = sys.stderr.stdbackup @staticmethod def configHandler(stream): level = Log.handler.level Log.handler.close() Log.logger.removeHandler(Log.handler) Log.handler = logging.StreamHandler(stream) Log.logger.addHandler(Log.handler) Log.formatter = logging.Formatter(Log.MSG_FORMAT, Log.DATE_FORMAT) # don't print exception traceback to stderr Log.formatter.formatException = lambda exc_info: '' Log.handler.setFormatter(Log.formatter) Log.handler.setLevel(level) @staticmethod def removeHandler(): Log.handler.close() Log.logger.removeHandler(Log.handler) @staticmethod def configFilter(categories): """ Pass only log messages whose `category` attribute belong to the `categories` iterable. """ Log.categories = set(categories) def filter_(record): if record.levelno >= logging.WARNING: return True if Log.categories.intersection(record.catset): return True return False Log.categoryFilter.filter = filter_ Log.handler.addFilter(Log.categoryFilter) @staticmethod def resetFilter(): """Remove filter added by `Log.configFilter`.""" Log.categories = set() Log.handler.removeFilter(Log.categoryFilter) @staticmethod def configFormatter(format): """ Change the format string to include the fields in `format` iterable in specified order. """ try: fields = map(Log.FORMAT_DICT.__getitem__, format) except KeyError, e: raise ValueError("invalid format string: %s" % e.args[0]) Log.formatter = logging.Formatter(' - '.join(fields), Log.DATE_FORMAT) # don't print exception traceback to stderr Log.formatter.formatException = lambda exc_info: '' Log.handler.setFormatter(Log.formatter) @staticmethod def resetFormatter(): """Reset to the default formatter.""" Log.formatter = logging.Formatter(Log.MSG_FORMAT, Log.DATE_FORMAT) # don't print exception traceback to stderr Log.formatter.formatException = lambda exc_info: '' Log.handler.setFormatter(Log.formatter) @staticmethod def configRotatingFileHandler(filename, backupCount=0): Log.rotatingFileHandler = Log.SessionRotatingFileHandler( filename, backupCount) Log.rotatingFileHandler.setLevel(Log.SYSINFO) formatter = logging.Formatter(Log.MSG_FORMAT_FULL, Log.DATE_FORMAT) Log.rotatingFileHandler.setFormatter(formatter) Log.logger.addHandler(Log.rotatingFileHandler) @staticmethod def resetRotatingFileHandler(): if Log.rotatingFileHandler is not None: Log.logger.removeHandler(Log.rotatingFileHandler) @staticmethod def sysinfo(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.log(Log.SYSINFO, msg, *args, **kwargs) @staticmethod def debug(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.debug(msg, *args, **kwargs) @staticmethod def info(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.info(msg, *args, **kwargs) @staticmethod def warning(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.warning(msg, *args, **kwargs) @staticmethod def error(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.error(msg, *args, **kwargs) @staticmethod def critical(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.critical(msg, *args, **kwargs) @staticmethod def exception(category, msg, *args, **kwargs): Log._update_extra(kwargs, category) kwargs['exc_info'] = True Log.logger.error(msg, *args, **kwargs) @staticmethod def log(level, category, msg, *args, **kwargs): Log._update_extra(kwargs, category) Log.logger.log(level, msg, *args, **kwargs) @staticmethod def log_system_information(): Log.sysinfo('uname', '%s %s' % (os.uname()[0], os.uname()[2])) Log.sysinfo('python', sys.version[0:5]) Log.sysinfo('xlib', Xlib.__version_string__) Log.sysinfo(PROG_NAME, VERSION_NAME) Log.sysinfo('wm', Xtool.get_wm_name() or 'n/a') Log.sysinfo('encoding', ENCODING) class ConfigError(Exception): """Base class for Config exceptions.""" class ParseError(ConfigError): """Wrapper for all exceptions of ConfigParser module.""" class MissingSectionError(ConfigError): """Configuration file misses some section.""" def __init__(self, section): self.section = section def __str__(self): return "missing section %s" % self.section class OptionValueError(ConfigError): """Raised when option has invalid value.""" def __init__(self, section, option, value, values=None, message=None): """Either possible `values` or `message` must be specified.""" self.section = section self.option = option self.value = value self.values = values self.message = message def __str__(self): msg = ("section: %s, option: %s, value: %s" % (self.section, self.option, self.value)) if self.values is not None: return ("%s. The value should be one of the following: %s" % (msg, ', '.join(self.values))) elif self.message is not None: if self.message != '': msg = '%s: %s' % (msg, self.message) return msg else: raise TypeError("Either values or message must be specified") class RuleOptionValueError(OptionValueError): """Raised when rule in RULES section has invalid format.""" def __init__(self, prop, regex, awn, command=None, message=None, msg_only=False): self.prop = prop self.regex = regex self.awn = awn self.command = command self.message = message self.msg_only = msg_only def __str__(self): if self.msg_only: msg = self.message else: msg = ("section: RULES, prop: %s, regex: %s, awn: %s" % (self.prop, self.regex, self.awn)) if self.command: msg = '%s, command: %s' % (msg, self.command) if self.message: msg = '%s: %s' % (msg, self.message) else: msg = '%s: %s' % (msg, "invalid rule") return msg class History(OrderedDict): """ Extend OrderedDict. Contain (awn, key) pairs. Number of these pairs is controlled by `Config.history_length` value. """ def parse(self, history): Log.debug('config', 'parsing history...') history.reverse() for item in history: if len(item[1]) == 1 and Config.keyboard_layout.isalpha(item[1]): self[item[0]] = item[1] else: Log.warning('config', 'shortcut should be a latin ' "alphabetical character: '%s', ignored", item[1]) self.truncate() Log.info('config', 'parsed history: %s', str(self)) def update_item(self, awn, base_key): """Update history with a new window or its new base key.""" if awn in self: del self[awn] self[awn] = base_key self.truncate() def write(self): """Rewrite a configuration file with the current history.""" items = [] for awn in self: if awn.strip() != '': items.insert(0, '%s = %s' % (awn, self[awn])) body = '\n'.join(items) try: Config.write_section(u'HISTORY', body) except (IOError, OSError, MissingSectionError), e: Log.exception('config', 'when writing the history: %s' % e) else: Log.info('config', 'history written: %s' % str(self)) def truncate(self): """Leave `Config.history_length` last entries in the history.""" for i in range(len(self) - Config.history_length): self.popitem(last=False) class Rule(object): """Represent a mapping of window properties to awn.""" props = ['instance', 'class', 'title'] template = None # list of strings and indexes of Rule.props template_str = None # remember for debugging regpat = None # compiled regex pattern awn = None command = None def __init__(self, prop, regex, awn, command=None): """ Initialize Rule values by parsing prop, and initialising regex, awn and command (optional). Raise OptionValueError. """ self.parse(prop, regex, awn, command) def parse(self, prop, regex, awn, command=None): self.awn = awn self.command = command self.template_str = prop template = [self.template_str] prop_found = False for i, prop in enumerate(Rule.props): j = 0 while j != len(template): token = template[j] if isinstance(token, basestring): pos = token.find(prop) if pos != -1: prop_found = True if pos != 0: template[j:j] = [token[:pos]] j += 1 template[j] = i if len(token) != pos + len(prop): j += 1 template[j:j] = [token[pos+len(prop):]] j += 1 if not prop_found: raise RuleOptionValueError(escape(prop), escape(regex), escape(awn), escape(command), message="template must contain at least one of the following " "properties: %s" % ', '.join(Rule.props)) self.template = template try: self.regpat = re.compile(regex, re.I | re.UNICODE) except re.error: raise RuleOptionValueError(escape(prop), escape(regex), escape(awn), escape(command), message="invalid regex") def get_property(self, window): """Get property string from the rule's template and the specified window.""" prop = [] for token in self.template: if isinstance(token, int): if Rule.props[token] == 'class': prop.append(window.klass) elif Rule.props[token] == 'title': prop.append(window.name) else: # instance prop.append(window.instance) else: # basestring prop.append(token) return ''.join(prop) def match(self, window): """Does rule matches the specified window?""" prop = self.get_property(window) m = self.regpat.match(prop) match = False if m and (self.regpat.pattern or not prop): match = True Log.debug('windows', "matching template: %s - %s", self.template_str, prop) return match def substitute(self, window): """ Substitute window properties to form awn. Return awn or None if window is not matching or error occurred when substituting. """ awn = None prop = self.get_property(window) m = self.regpat.match(prop) if m and (self.regpat.pattern or not prop): try: awn = self.regpat.sub(self.awn, prop[:m.end()], 1) except re.error, e: Log.exception('config', '%s: awn = %s ' % (e, self.awn)) return awn def __str__(self): return ', '.join([self.template_str, self.regpat.pattern, self.awn, "%s" % self.command]) class Rules(list): """Extend list. Contain list of Rule objects.""" permkey_pos = 0 # position of the last permananet key in Rules permanent_keys = set() # permanent keys must be unique re_cmd_rule = re.compile(ur""" (?P<prop>.+?)[.] (?P<regex>.*?)\s*[=:]\s* (?P<awn>[!].+?)\s*[=:]\s* (?P<command>.+) """, re.VERBOSE) re_rule = re.compile(ur""" (?P<prop>.+?)[.] (?P<regex>.*(?<!\s))\s*[=:]\s* (?P<awn>.*) """, re.VERBOSE) def parse(self, section): """Parse RULES section.""" self.permanent_keys = set() bodylines = section.splitlines()[1:] for line in bodylines: stripline = line.lstrip() if stripline == '' or stripline.startswith('#'): continue m = self.re_cmd_rule.match(line) if m: prop = m.group('prop') regex = m.group('regex') awn = m.group('awn') command = m.group('command') else: m = self.re_rule.match(line) if m: prop = m.group('prop') regex = m.group('regex') awn = m.group('awn') command = None else: raise RuleOptionValueError(None, None, None, message="invalid rule: %s" % escape(line)) rule = Rule(prop, regex, awn, command) if awn.startswith('!'): if len(awn) == 1 or not Config.keyboard_layout.isalpha(awn[1]): raise RuleOptionValueError(escape(prop), escape(regex), escape(awn), escape(command), message="invalid key: %s" % escape(awn[1])) if awn[1] not in self.permanent_keys: self.permanent_keys.add(awn[1]) else: raise RuleOptionValueError(escape(prop), escape(regex), escape(awn), escape(command), message="permanent key duplicate: %s" % escape(awn[1])) self.insert(self.permkey_pos, rule) self.permkey_pos += 1 else: self.append(rule) Log.info('config', 'parsed rules: %s', self) def __str__(self): return ', '.join(['(%s)' % r for r in self]) def lookup_rule(self, window): """ Find the first rule which matches the window. Return matching Rule instance, or None if not found. """ for rule in self: if rule.match(window): return rule def get_permanent_keys(self): return self.permanent_keys class Config(object): """Object that reads, parses, and writes a configuration file.""" _path = None rules = Rules() history = History() @staticmethod def set_path(path): """Set a user-defined configuration file path.""" Config._path = path @staticmethod def get_path(): """ Obtain the configuration file path either standard or user-defined. Return a config file name set by `set_path`. If it is not set, search for first existent file name in CONFIG_PATH tuple. If no file exists, return None. The function has a side effect: only the first call will cause searching for a file name, it will remember the name, and will use it for the next time. """ if Config._path is None: for p in CONFIG_PATH: path = os.path.expanduser(p) if os.path.exists(path): Config._path = path return Config._path else: return Config._path @staticmethod def get_dirpath(): """Get directory path of the configuration file.""" return os.path.dirname(Config.get_path()) @staticmethod def use_defaults(): """Set `Config` attributes to default values.""" Config._parse_options(Config._get_defaults(), Config._get_valid_opts()) @staticmethod def get_default_config(): return Config._config_str % Config._get_defaults() @staticmethod def parse(): """ Parse the configuration file, assign option values to the corresponding `Config` attributes. Parse rules and history. """ try: f = open(Config.get_path()) except IOError, e: raise ConfigError(e) try: conf = f.read() except IOError, e: raise ConfigError(e) finally: f.close() try: uconf = conf.decode(ENCODING) except UnicodeDecodeError: raise ConfigError('cannot decode configuration file contents ' 'with encoding %s' % ENCODING) uconfIO = StringIO.StringIO(uconf) config = RawConfigParser(OrderedDict(), OrderedDict) try: config.readfp(uconfIO) except ConfigParser.Error, e: raise ParseError(e) else: for sec in config.sections(): if sec not in ['SETTINGS', 'HISTORY', 'RULES']: Log.warning('config', "unrecognized section %s", sec) if config.has_section('SETTINGS'): items = dict(config.items('SETTINGS')) Log.info('config', 'option values: %s', str(items)) options = set(items.keys()) keys = set(Config._defaults.keys()) missing = keys.difference(options) unrecognized = options.difference(keys) for opt in missing: Log.warning('config', 'missing option %s', opt) for opt in unrecognized: Log.warning('config', 'unrecognized option %s ', opt) del items[opt] Config._parse_options(items, Config._get_valid_opts()) else: Log.warning('config', 'missing section SETTINGS') if config.has_section('RULES'): start, end = Config.find_section('RULES', uconf) Config.rules.parse(uconf[start:end]) else: Log.warning('config', 'missing section RULES') if Config.history_length != 0: if config.has_section('HISTORY'): Config.history.parse(config.items('HISTORY')) else: Log.warning('config', 'missing section HISTORY') @staticmethod def write(config=None): """ Write a default configuration file if `config` is None. Otherwise write a `config` string to the configuration file. """ if config is None: config = Config.get_default_config() path = Config.get_path() rewrite = os.path.exists(path) try: if not rewrite: f = open(path, 'w') else: dirpath = Config.get_dirpath() temppath = os.path.join(dirpath, '%s.%s~' % (path, PROG_NAME)) f = open(temppath, 'w') f.write(config.encode(ENCODING)) f.flush() os.fsync(f.fileno()) if rewrite: os.rename(temppath, path) except (IOError, OSError): raise else: Log.info('config', 'config written') finally: f.close() if rewrite and os.path.exists(temppath): os.remove(temppath) @staticmethod def _get_defaults(): return dict([(k, Config._defaults[k][0]) for k in Config._defaults]) @staticmethod def _get_valid_opts(): return dict([(k, Config._defaults[k][1]) for k in Config._defaults]) @staticmethod def read(): f = open(Config.get_path(), 'r') try: config = f.read() except IOError: raise else: try: uconfig = config.decode(ENCODING) except UnicodeDecodeError: raise ParseError('cannot decode configuration file contents ' 'with encoding %s' % ENCODING) return uconfig finally: f.close() SECRE = ur""" (?P<section>^\[%s\].*?) # a section header and a body (?=\s* # don't include whitespaces (?: (?:^\[[^]]+\]) # a new section | \Z) ) # or the end of the string """ @staticmethod def find_section(section, config): """ Return a tuple containing the start and end positions of `section` in `config` string. If not found `MissingSectionError` is raised. """ secre = re.compile(Config.SECRE % section, re.DOTALL | re.MULTILINE | re.VERBOSE | re.UNICODE) m = secre.search(config) if m is None: raise MissingSectionError(escape(section)) else: return m.span() @staticmethod def write_section(secname, secbody): config = Config.read() start, end = Config.find_section(secname, config) header = '[%s]\n' % secname newconfig = config[:start] + header + secbody + config[end:] Config.write(newconfig) @staticmethod def _parse_options(options, valid_opts): for opt in options: value = options[opt] possible = valid_opts[opt] if isinstance(possible, tuple): if value in possible: setattr(Config, opt, value) continue else: raise OptionValueError('SETTINGS', escape(opt), escape(value), possible) elif callable(possible): setattr(Config, opt, possible(value)) elif possible is None: if value.startswith('"') and value.endswith('"'): value = value[1:-1] setattr(Config, opt, value) continue else: raise TypeError(type(possible)) def _parse_keyboard_layout(keyboard_layout): possible = ('QWERTY', 'QWERTZ', 'AZERTY', 'Dvorak') if keyboard_layout not in possible: raise OptionValueError('SETTINGS', 'keyboard_layout', escape(keyboard_layout), possible) else: return KeyboardLayout(keyboard_layout) def _parse_prefix(prefix): mods = prefix.split('+') try: fake_kb = Keybinding(mods + ['a'], lambda: None) except KeybindingError, e: raise OptionValueError('SETTINGS', 'modifiers', escape(prefix), message=escape(e)) return fake_kb.modifiers + fake_kb.keys[:-1] def _parse_title_format(title_format): """Check title_format contains not more than one %t and %s.""" if title_format.count('%t') > 1 or title_format.count('%s') > 1: raise OptionValueError("SETTINGS", "title_format", escape(title_format), message="only one occurrence of %t or %s is possible") return title_format def _parse_history_length(history_length): try: hist_len = int(history_length) except ValueError: raise OptionValueError("SETTINGS", "history_length", escape(history_length), message="invalid number") if(hist_len < 0): raise OptionValueError("SETTINGS", "history_length", escape(history_length), message="the value must be positive") return hist_len known_window_types = ['NORMAL', 'DIALOG', 'UTILITY', 'MENU', 'TOOLBAR', 'DESKTOP', 'DOCK', 'SPLASH'] def _parse_window_types(types): types = types.split() for t in types: if t == 'All': return [] if t not in Config.known_window_types: raise OptionValueError("SETTINGS", "window_types", escape(t), message="unknown window type, window types are %s" % ', '.join(Config.known_window_types)) return types _defaults = { 'keyboard_layout': ('QWERTY', _parse_keyboard_layout), 'prefix': ('Super', _parse_prefix), 'group_windows_by': ('AWN', ('AWN', 'Group', 'None')), 'title_format': ('%t /%s/', _parse_title_format), 'history_length': ('20', _parse_history_length), 'desktop_action': ('SwitchDesktop', ('SwitchDesktop', 'MoveWindow', 'None')), 'window_types': ('NORMAL DIALOG UTILITY MENU', _parse_window_types)} """ Dictionary with keys containing options, and values containing tuples of the default value and a list of possible valid values. Format: {option: (default_value, (variants) or parse_function or None), ...}, where None means that arbitrary string is allowed. Enclosing double quotes around arbitrary strings will be stripped. """ _config_str = re.compile('^ +', re.M).sub('', u"""# -*- mode: conf; -*- # All option values are case sensitive unless other noted. # List of modifiers: # - Control (aliases: C, Ctrl) # - Shift (aliases: S) # - Mod1 (aliases: A, Alt) # - Mod4 (aliases: U, Super) # List of keys can be obtained with --print-keys option. [SETTINGS] # Keyboard Layout. This is used to produce suffix keys close to base keys. # Possible values: Dvorak, QWERTY, QWERTZ, AZERTY. keyboard_layout = %(keyboard_layout)s # Keybinding prefix consists of a series of modifiers and/or keys # separated by + # Examples: Ctrl+Alt, U+a, A+x+w, F12, XF86_RotateWindows prefix = %(prefix)s # All windows of the same application are grouped. Windows of the same # group are bound to the keys with the same base. The following option # specifies which windows should belong to the same group. # Possible values: # - AWN -- two windows belong to the same group if they have equal awns. # - Group -- group windows as window manager normally does. # - None -- do not group at all. group_windows_by = %(group_windows_by)s # Put shortcuts in window titles. # %%t and %%s are replaced by window title and shortcut accordingly. # Only one occurrence of %%t and %%s in title_format is allowed. # Set to None not to modify window titles. title_format = %(title_format)s # Action when the window is not on the current desktop. # Possible values: # - SwitchDesktop -- switch to the desktop which the window is on. # - MoveWindow -- move the window to the current desktop. # - None -- just activate the window (actual behaviour may differ # in different window managers). desktop_action = %(desktop_action)s # List of allowed window types. # Possible values: # All NORMAL DIALOG UTILITY MENU TOOLBAR DESKTOP DOCK SPLASH window_types = NORMAL DIALOG UTILITY MENU # History of shortcuts is used to prevent them floating between # different windows across the sessions. # Set history_length to 0 to disable history. history_length = %(history_length)s [HISTORY] [RULES] # This section specifies rules according to which window classes or names # (titles) are transformed to abstract window names (AWNs). When a new # window appears, the program tries out the rules (from top to bottom) # until it finds out one that matches the window property. If no # suitable rule is found a window class will be assigned to AWN. # AWNs are used to determine window shortcuts. For example, if AWN is # 'xterm' than keybinding will more likely 'mod+x'. If it is already # assigned to another window or is used by an another program the next # keybinding to try out will be 'mod+t'. It sorts out the alphabetical # characters of AWN until it finds one whose corresponding key is not # grabbed. If it turns out there is no such a key, the window will be # bound to any different unused key. # Format: # property.[regex] = [[!]awn [= command]] # where property is one of the following: title, class, instance. # regex matching is case insensitive. awn may contain backreferences, # e.g. \\1 is replaced with the first group of regex. If awn is omitted # the window will not be bound to any of keys. If regex is omitted it # implies windows without the property or with empty property. Exclamation # mark before awn denotes that the shortcut should be permanent. Only the # first symbol after ! is used to compose a shortcut. Permanent shortcuts # have a higher priority than history shortcuts and other rules, and they # are never assigned to different windows. Rules with permanent shortcuts # may have commands. Command is executed when no window matches the rule. # Commands are executed with /bin/sh. # Examples: # set awn to firefox for all the windows whose titles end with firefox # title..*firefox$ = firefox # remove prefix gnome- from window classes # class.gnome-(.*) = \\1 # transform classes icecat, iceweasel, and icedove to awns cat, weasel, # dove respectively # class.ice(cat|weasel|dove) = \\1 # don't bind windows that don't have class property # class. = # always bind emacs window to 'e' # class.emacs = !e # run emacs if it is not running yet # class.emacs = !e = emacs # In place of property a template can be specified. template # combines different properties within one rule. With regular expressions # it is possible to simulate logical operators. # Ignore window with class myclass only if its title isn't mytitle: # class and not title.myclass and not (?!mytitle$) = """) class KeyboardLayout(object): """ Object holding information about the order of keys of different keboard layouts. The idea of such definition is totally wrong. Positions of non-alphabetic symbols may differ among different languages of the same layout. We need a more robust way to describe/determine layouts. """ dvorak = '\',.pyfgcrlaoeuidhtns;qjkxbmwvz' qwerty = 'qwertyuiopasdfghjkl;zxcvbnm,./' qwertz = 'qwertzuiopasdfghjkl;yxcvbnm,.-' azerty = 'azertyuiopqsdfghjklmwxcvbn,;:!' def __init__(self, layout="QWERTY"): if layout == "Dvorak": # dvorak layout with 3 rows by 10 characters self.keys = self.dvorak elif layout == "QWERTY": # qwerty layout with 3 rows by 10 characters self.keys = self.qwerty elif layout == "QWERTZ": # qwertz layout with 3 rows by 10 characters self.keys = self.qwertz elif layout == "AZERTY": # azerty layout with 3 rows by 10 characters self.keys = self.azerty else: raise ValueError("Unknown keyboard layout name: %s" % escape(layout)) self.indexes = dict(zip(self.keys, range(len(self.keys)))) def __contains__(self, char): return char in self.indexes def isalpha(self, char): return char in self and char.isalpha() class ShortcutGenerator(object): """ Class which generates shortcuts for specified windows taking into account windows' and window list's information. """ def _get_direction(self, base): """Determine where next suffix key would be from the base key Return 1 if to the right, and -1 if to the left""" return 1 if Config.keyboard_layout.indexes[base] % 10 < 5 else -1 def _next_suffix(self, shortcuts): """ Return a new free suffix. shortcuts must have the same base key and must be sorted by shortcut_sort_key. """ layout = Config.keyboard_layout base = shortcuts[0][0] dir_ = self._get_direction(base) suffixes = [s[1] for s in shortcuts if len(s) == 2] if not suffixes: # first shortcut with suffix return layout.keys[layout.indexes[base] + dir_] suffix_indexes = [layout.indexes[s] for s in suffixes] # get last suffix index first_index = layout.indexes[suffixes[0]] left_indexes = [i for i in suffix_indexes if i < first_index] right_indexes = [i for i in suffix_indexes if i > first_index] if dir_ == 1: # move right if left_indexes: # crossed over the rightmost symbol last_index = max(left_indexes) elif right_indexes: last_index = max(right_indexes) else: # only one suffix last_index = first_index else: # move left if right_indexes: # crossed over the leftmost symbol last_index = min(right_indexes) elif left_indexes: last_index = min(left_indexes) else: # only one suffix last_index = first_index next_index = (last_index + dir_) % len(layout.keys) next_suffix = layout.keys[next_index] if next_suffix == base: # all suffixes are over return None else: return next_suffix def _new_base(self, name, bases): layout = Config.keyboard_layout for base in name: if base not in bases and layout.isalpha(base): return base free_bases = set(layout.keys).symmetric_difference(bases) for base in free_bases: if layout.isalpha(base): return base return None # all the bases are overed _forbidden_bases = set() def forbid_base(self, base): """ Tell `ShortcutGenerator` not to use the `base` key for new shortcuts. """ self._forbidden_bases.add(base) def new_shortcut(self, window, window_list): """Return a new shortcut generated for `window`. Return None if no new shortcut is possible. `wid` and `gid` attributes of `window` must be initialised before the method call. """ shortcuts = window_list.get_group_shortcuts(window.gid) if not shortcuts: # first shortcut for the group allbases = window_list.get_all_bases().union(self._forbidden_bases) if window.awn in Config.history: base = Config.history[window.awn] if base not in allbases: return base # prefer shortcuts not present in the history bases = allbases.union(set(Config.history.values())) base = self._new_base(window.awn, bases) if base is not None: return base else: return self._new_base(window.awn, allbases) else: # the group already has its base key prefix = shortcuts[0][0] suffix = self._next_suffix(shortcuts) if suffix is None: return None else: return prefix + suffix def shortcut_sort_key(self, shortcut): b = shortcut[0] keys = [] bi = Config.keyboard_layout.indexes[b] keys.append(bi) if len(shortcut) > 1: s = shortcut[1] si = Config.keyboard_layout.indexes[s] dir_ = self._get_direction(b) res = dir_ * (si - bi) if dir_ * si < dir_ * bi: res += len(Config.keyboard_layout.indexes) keys.append(res) return keys class WindowList(list): """Extend list. `WindowList` elements must be of type `Window`.""" def get_window(self, wid): """Return a `Window` object with the window id `wid`.""" for win in self: if win.wid == wid: return win def get_windows(self, wids): """Return a list of `Window` objects with the window ids in `wids`.""" windows = list() for win in self: if win.wid in wids: windows.append(win) return windows def get_group_id(self, name): for win in self: if win.awn == name: return win.gid return 0 def get_group_windows(self, gid): """ Return a list of `Window` objects with the window group id `gid` sorted by shortcuts. """ return sorted([w for w in self if w.gid == gid], key=lambda w: w.shortcut_sort_key) def get_group_shortcuts(self, gid): """Return a sorted list of shortcuts with the window group id `gid`.""" return [w.shortcut for w in sorted([w for w in self if w.gid == gid and w.shortcut], key=lambda w: w.shortcut_sort_key)] def get_all_bases(self): """Return a set of all used base keys.""" return set([win.shortcut[0] for win in self if win.shortcut]) def get_all_awns(self): """Return a set of awns of the window list.""" return set([win.awn for win in self]) last_unique_group_id = 0 def get_unique_group_id(self): self.last_unique_group_id += 1 return self.last_unique_group_id class Window(object): """An object holding attributes related to a window. Attributes: - `wid`: window id - `gid`: window group id - `awn`: abstract window name, from which shortcut is produced - `name`: real window name (title) - `klass`: window class - `kind`: window type (one atom value) - `shortcut`: is represented by a string of length one or two (e.g. 'a' or 'bn', where 'a' is the base key, 'b' is the prefix, and 'n' is the suffix) - `shortcut_sort_key`: variable by which windows can be sorted - `keybinding`: keybinding object """ wid = None gid = None _awn = None name = None instance = None klass = None kind = None _shortcut = None shortcut_sort_key = [sys.maxint] keybinding = None @property def awn(self): return self._awn @awn.setter def awn(self, awn): if isinstance(awn, basestring): self._awn = awn.lower() elif awn is None: self._awn = None else: raise TypeError('awn must be a string object or None') @property def shortcut(self): return self._shortcut @shortcut.setter def shortcut(self, shortcut): if isinstance(shortcut, basestring): self._shortcut = shortcut.lower() elif shortcut is None: self._shortcut = None else: raise TypeError('Shortcut must be a string object or None') def __str__(self): d = self.__dict__ str_or_hex = lambda k, v: hex(v) if k in ('wid', 'gid') else unicode(v) return ', '.join(['%s: %s' % (k, str_or_hex(k, d[k])) for k in d]) class ConnectionClosedError(Exception): """Wrapper for Xlib's ConnectionClosedError exception.""" class Xtool(object): """Wrapper for Xlib related methods.""" @staticmethod def connect(displaystr=None): Xtool._display = display.Display(displaystr) Xtool._root = Xtool._display.screen().root Xtool._root.change_attributes(event_mask=X.KeyPressMask | X.KeyReleaseMask | X.PropertyChangeMask) Xtool._load_keys() Xtool._init_mod_keycodes() # Keyboard related methods # @staticmethod def grab_key(keycode, mask, onerror=None): Xtool._root.grab_key(keycode, mask, 1, X.GrabModeAsync, X.GrabModeAsync, onerror=onerror) Xtool._root.grab_key(keycode, mask | X.Mod2Mask, 1, X.GrabModeAsync, X.GrabModeAsync, onerror=onerror) Xtool._root.grab_key(keycode, mask | X.LockMask, 1, X.GrabModeAsync, X.GrabModeAsync, onerror=onerror) Xtool._root.grab_key(keycode, mask | X.Mod2Mask | X.LockMask, 1, X.GrabModeAsync, X.GrabModeAsync, onerror=onerror) @staticmethod def ungrab_key(keycode, mask, onerror=None): Xtool._root.ungrab_key(keycode, mask, onerror=onerror) Xtool._root.ungrab_key(keycode, mask | X.Mod2Mask, onerror=onerror) Xtool._root.ungrab_key(keycode, mask | X.LockMask, onerror=onerror) Xtool._root.ungrab_key(keycode, mask | X.Mod2Mask | X.LockMask, onerror=onerror) @staticmethod def grab_keyboard(): Xtool._root.grab_keyboard(1, X.GrabModeAsync, X.GrabModeAsync, X.CurrentTime) @staticmethod def ungrab_keyboard(): Xtool._display.ungrab_keyboard(X.CurrentTime) # After the keyboard is ungrabbed no release event # will come, so forget all pressed keys. Xtool._pressed_keys.clear() @staticmethod def sync(): Xtool._display.sync() @staticmethod def get_keycode(key): keysym = XK.string_to_keysym(key) if keysym == XK.NoSymbol and len(key) == 1 and ord(key) < 128: keysym = ord(key) return Xtool._display.keysym_to_keycode(keysym) @staticmethod def get_key(keycode): return XK.keysym_to_string( Xtool._display.keycode_to_keysym(keycode, 0)) @staticmethod def get_all_keys(): keysyms = [] keys = [] keycodes = range(Xtool._display.display.info.min_keycode, Xtool._display.display.info.max_keycode - Xtool._display.display.info.min_keycode + 1) for keycode in keycodes: if keycode != XK.NoSymbol and not Xtool.is_modifier(keycode): keysyms.append(Xtool._display.keycode_to_keysym(keycode, 0)) for s in dir(XK): if s.startswith('XK_'): keysym = getattr(XK, s) if keysym in keysyms: keys.append(s[3:]) return keys @staticmethod def modmask_equal(modmask1, modmask2): """ Return True if modmask1 and modmask2 are equal. Consider only Shift, Control, Mod1 and Mod4 masks, ignore others. """ mask = X.ShiftMask | X.ControlMask | X.Mod1Mask | X.Mod4Mask return modmask1 & mask == modmask2 & mask @staticmethod def _load_keys(): for group in Xlib.keysymdef.__all__: XK.load_keysym_group(group) @staticmethod def _init_mod_keycodes(): Xtool._mod_keycodes = set() modmap = Xtool._display.get_modifier_mapping() for i in (X.ControlMapIndex, X.ShiftMapIndex, X.LockMapIndex, X.Mod1MapIndex, X.Mod2MapIndex, X.Mod3MapIndex, X.Mod4MapIndex, X.Mod5MapIndex): Xtool._mod_keycodes.update(modmap[i]) if XK.NoSymbol in Xtool._mod_keycodes: Xtool._mod_keycodes.remove(XK.NoSymbol) @staticmethod def is_modifier(keycode): return keycode in Xtool._mod_keycodes @staticmethod def _is_key_pressed(keycode): bitmap = Xtool._display.query_keymap() return bitmap[keycode / 8] & (1 << (keycode % 8)) # Window related methods # @staticmethod def _atom(name): return Xtool._display.get_atom(name) @staticmethod def _get_full_property(win, prop, kind, sizehint=32): """ Get full property `prop` of type `kind` for window object `win`. python-xlib's get_full_property is unsafe: if property is changed between get_proprty calls, the function returns inconsistent value and may cause BadValue Error because of incorrect long_offset value. Workaround here is that property is (re)read entirely if its size is larger than it has been expected. Raise BadWindow. """ long_length = sizehint bytes_after = 1 while bytes_after: res = win.get_property(prop, kind, 0, long_length) if res: bytes_after = res.bytes_after long_length += bytes_after / 4 + 1 else: return None return res @staticmethod def get_window_list(): try: prop = Xtool._get_full_property(Xtool._root, Xtool._atom("_NET_CLIENT_LIST"), Xatom.WINDOW) except BadWindow as e: Log.exception('bad root window', e) prop = None if prop: return prop.value else: return [] @staticmethod def _get_window(wid): """Get Xlib.xobject.drawable.Window instance with a resource id.""" return Xtool._display.create_resource_object("window", wid) @staticmethod def get_window_name(wid): """Retrieve a window name by the resource id. Raise BadWindow.""" win = Xtool._get_window(wid) prop = None prop = Xtool._get_full_property(win, Xtool._atom('_NET_WM_NAME'), Xtool._atom('UTF8_STRING')) if prop: # with Python 3 port property values can now also be unicode if isinstance(prop.value, unicode): return prop.value else: return prop.value.decode('utf-8', 'ignore_log') else: prop = Xtool._get_full_property(win, Xatom.WM_NAME, Xatom.STRING) if prop is not None: return prop.value.decode('latin_1', 'ignore_log') else: return u'' @staticmethod def get_window_class(wid): """Retrieve a window class by the resource id. Raise BadWindow.""" win = Xtool._get_window(wid) instance, klass = u'', u'' prop = None prop = Xtool._get_full_property(win, Xatom.WM_CLASS, Xatom.STRING) if prop: parts = prop.value.split('\0') instance = parts[0].decode('latin_1', 'ignore_log') if len(parts) > 1: klass = parts[1].decode('latin_1', 'ignore_log') return (instance, klass) @staticmethod def get_window_group_id(wid): """Retrieve a window group id by the resource id. Raise BadWindow.""" hints = Xtool._get_window(wid).get_wm_hints() group_id = 0 if hints: group_id = hints.window_group.id return group_id @staticmethod def _set_property(wid, prop, value): """Set window property to value. Raise BadWindow.""" if not isinstance(value, unicode): raise TypeError('value must be a unicode instance') win = Xtool._get_window(wid) win.change_property( Xtool._atom(prop), Xtool._atom('UTF8_STRING'), 8, value.encode('utf-8'), mode=X.PropModeReplace) @staticmethod def _send_client_message(wid, msg, data): window = Xtool._get_window(wid) if wid is not None else Xtool._root event = protocol.event.ClientMessage( client_type=Xtool._atom(msg), window=window, data=(32, data)) Xtool._display.send_event( Xtool._root, event, event_mask=X.SubstructureRedirectMask | X.SubstructureNotifyMask) @staticmethod def get_wm_name(): """Get window manager name.""" wm_name = u'' reply = Xtool._get_full_property(Xtool._root, Xtool._atom('_NET_SUPPORTING_WM_CHECK'), Xatom.WINDOW) if reply: wid = reply.value[0] try: wm_name = Xtool.get_window_name(wid) except BadWindow, e: Log.exception('windows', e) return wm_name @staticmethod def set_window_name(wid, name): """Set window name. Raise BadWindow.""" Xtool._set_property(wid, '_NET_WM_NAME', name) @staticmethod def set_window_icon_name(wid, name): """Set window icon name. Raise BadWindow.""" Xtool._set_property(wid, '_NET_WM_ICON_NAME', name) @staticmethod def get_transient_for(wid): """Get TRANSIENT_FOR window id. Raise BadWindow.""" window = Xtool._get_window(wid) reply = Xtool._get_full_property(window, Xtool._atom('WM_TRANSIENT_FOR'), Xatom.WINDOW) if reply is not None: return reply.value[0] @staticmethod def get_window_type_atom(kind): return Xtool._atom('_NET_WM_WINDOW_TYPE_' + kind) @staticmethod def get_window_types(wid): """Get window types. Raise BadWindow.""" window = Xtool._get_window(wid) reply = Xtool._get_full_property(window, Xtool._atom('_NET_WM_WINDOW_TYPE'), Xatom.ATOM) if reply is not None: return reply.value return [] @staticmethod def _get_desktop_number(wid=None): if wid is None: window = Xtool._root message = '_NET_CURRENT_DESKTOP' else: window = Xtool._get_window(wid) message = '_NET_WM_DESKTOP' reply = Xtool._get_full_property(window, Xtool._atom(message), Xatom.CARDINAL) if reply is not None: return reply.value[0] DESKTOP_IGNORE = 0 DESKTOP_SWITCH_DESKTOP = 1 DESKTOP_MOVE_WINDOW = 2 @staticmethod def raise_window(wid, desktop_action=DESKTOP_IGNORE): """Activate window. Optionally switch a desktop or move a window to the active desktop. Raise BadWindow. """ if desktop_action == Xtool.DESKTOP_SWITCH_DESKTOP: deskno = Xtool._get_desktop_number(wid) if deskno is not None: Xtool._send_client_message(None, '_NET_CURRENT_DESKTOP', [deskno, Xtool._last_key_event_time, 0, 0, 0]) elif desktop_action == Xtool.DESKTOP_MOVE_WINDOW: deskno = Xtool._get_desktop_number() if deskno is not None: Xtool._send_client_message(wid, '_NET_WM_DESKTOP', [deskno, 2, 0, 0, 0]) elif desktop_action != Xtool.DESKTOP_IGNORE: raise ValueError('invalid desktop_action: %d' % desktop_action) Xtool._send_client_message(wid, '_NET_ACTIVE_WINDOW', [2, Xtool._last_key_event_time, 0, 0, 0]) @staticmethod def listen_window_name(wid): """ Tell Xtool to watch the window name changes. Otherwise `window_name_listener.on_window_name_changed()` will not work. Raise BadWindow. """ Xtool._get_window(wid).change_attributes( event_mask=X.PropertyChangeMask) @staticmethod def register_key_listener(key_listener): """ Register `key_listener` which must have `on_key_press` and `on_key_release` methods. """ Xtool._key_listener = key_listener @staticmethod def register_window_list_listener(window_list_listener): """ Register `window_list_listener` which must have `on_window_list_changed` method. """ Xtool._window_list_listener = window_list_listener @staticmethod def register_window_name_listener(window_name_listener): """ Register `window_name_listener` which must have `on_window_name_changed` method. """ Xtool._window_name_listener = window_name_listener @staticmethod def _window_list_changed(event): return (event.type == X.PropertyNotify and event.atom == Xtool._atom("_NET_CLIENT_LIST")) @staticmethod def _window_name_changed(event): return (event.type == X.PropertyNotify and (event.atom == Xtool._atom("_NET_WM_NAME") or event.atom == Xtool._atom("WM_NAME"))) @staticmethod def _check_listeners(): """Check if all listeners are registered before entering event_loop.""" if not hasattr(Xtool, '_key_listener'): raise AttributeError('no key_listener') elif not (hasattr(Xtool._key_listener, 'on_key_press') and hasattr(Xtool._key_listener, 'on_key_release')): raise AttributeError('bad key_listener') if not hasattr(Xtool, '_window_list_listener'): raise AttributeError('no window_list_listener') elif not hasattr(Xtool._window_list_listener, 'on_window_list_changed'): raise AttributeError('bad window_list_listener') _pressed_keys = set() @staticmethod def _is_key_press_fake(keycode): """Return True if KeyPress event was caused by auto-repeat mode.""" if keycode in Xtool._pressed_keys: return True else: Xtool._pressed_keys.add(keycode) return False @staticmethod def _is_key_release_fake(keycode): """Return True if KeyRelease event was caused by auto-repeat mode.""" if Xtool.is_modifier(keycode): return False # modifiers are never auto-repeated if not Xtool._is_key_pressed(keycode): try: Xtool._pressed_keys.remove(keycode) except KeyError: # Some key had been pressed before the keyboard was grabbed # and now it is released while the keyboard is still # grabbed. Actually this is not a fake event, though ignore it. return True return False return True @staticmethod def event_loop(): """ Event loop. Before entering the loop all the listeners must be registered wih `Xtool.register_xxx_listener()`. """ Xtool._check_listeners() while True: try: event = Xtool._display.next_event() if Xtool._window_list_changed(event): Xtool._window_list_listener.on_window_list_changed() elif Xtool._window_name_changed(event): Xtool._window_name_listener.on_window_name_changed( event.window.id) elif event.type == X.KeyPress: Xtool._last_key_event_time = event.time if not Xtool._is_key_press_fake(event.detail): Xtool._key_listener.on_key_press(event) elif event.type == X.KeyRelease: Xtool._last_key_event_time = event.time if not Xtool._is_key_release_fake(event.detail): Xtool._key_listener.on_key_release(event) except Xlib.error.ConnectionClosedError, e: raise ConnectionClosedError(e) class KeybindingError(Exception): """Base class for keybinding errors.""" class CyclicKeybindingError(KeybindingError): """Raised when keybinding is cyclic and two last keys are the same.""" def __init__(self, symbols): self.keybinding = escape('+'.join(symbols)) def __str__(self): return "cyclic keybinding '%s' is invalid" % self.keybinding class InvalidKeyError(KeybindingError): """ Bad key name, or it is not present on the keyboard, or bad modifier name. """ def __init__(self, symbols, key): self.keybinding = escape('+'.join(symbols)) self.key = escape(key) def __str__(self): if self.key: return ("keybinding '%s': invalid key or modifier name: '%s'" % (self.keybinding, self.key)) else: return "keybinding '%s': no key" % escape(self.keybinding) class KeybindingCollisionError(KeybindingError): """New keybinding collides with the existing one.""" def __init__(self, kb1, kb2): self.kb1 = escape(kb1) self.kb2 = escape(kb2) def __str__(self): return "keybinding '%s' collides with '%s'" % (self.kb1, self.kb2) class Keybinding(object): def __init__(self, symbols, callback, cycleback=None): (self.modifiers, self.modmask, self.keys, self.keycodes) = self._parse(symbols) self.call = callback if cycleback: self.cyclic = True self.cycle = cycleback else: self.cyclic = False if self.cyclic and len(self.keycodes) >= 2: if self.keycodes[-1] == self.keycodes[-2]: raise CyclicKeybindingError(symbols) def __str__(self): return '+'.join(self.modifiers + self.keys) def _parse(self, symbols): """ Split symbols into modifiers and keys, obtain modmask and keycodes. Return (modifiers, modmask, keys, keycodes) tuple. """ modmask = 0 keycodes = [] for i, mod in enumerate(symbols): mask = self._get_modmask(mod) if mask is not None: modmask |= mask else: break self.modmask = modmask modifiers = symbols[:i] keys = symbols[i:] if not keys: InvalidKeyError(symbols, None) for key in keys: keycode = Xtool.get_keycode(key) if keycode == 0: raise InvalidKeyError(symbols, key) else: keycodes.append(keycode) return modifiers, modmask, keys, keycodes def _get_modmask(self, modifier): if modifier in ('Shift', 'S'): return X.ShiftMask elif modifier in ('Control', 'Ctrl', 'C'): return X.ControlMask elif modifier in ('Mod1', 'Alt', 'A'): return X.Mod1Mask elif modifier in ('Mod4', 'Super', 'U'): return X.Mod4Mask else: return None def match_partial(self, keycodes, modmask): if Xtool.modmask_equal(self.modmask, modmask): return self.keycodes[:len(keycodes)] == keycodes return False def match_full(self, keycodes, modmask): return (Xtool.modmask_equal(self.modmask, modmask) and self.keycodes == keycodes) def match_cyclic(self, keycodes, modmask): return (self.cyclic and Xtool.modmask_equal(self.modmask, modmask) and (self.keycodes == keycodes or self.keycodes[:-1] == keycodes)) def collideswith(self, keybinding): """ Return True if keybinding collides with passed `keybinding`. Collision examples: - mod+a and mod+a - mod+ab and mod+abb - mod+a (non-cyclic) and mod+ab (cyclic) Next keybindings do not collide: - mod+a and mod2+a - mod+a and mod+ab - mod+ab and mod+abc """ # mod+a and mod2+a if not Xtool.modmask_equal(self.modmask, keybinding.modmask): return False # mod+a and mod+a if self.keycodes == keybinding.keycodes: return True minlen = min(len(self.keycodes), len(keybinding.keycodes)) if len(self.keycodes) > len(keybinding.keycodes): longkb, shortkb = (self, keybinding) else: longkb, shortkb = (keybinding, self) # mod+a (non-cyclic) and mod+ab (cyclic) if not shortkb.cyclic and longkb.cyclic: if longkb.keycodes[:-1] == shortkb.keycodes: return True # mod+ab and mod+abb if len(longkb.keycodes) != len(shortkb.keycodes): if longkb.keycodes[:minlen] == shortkb.keycodes: if longkb.keycodes[minlen - 1] == longkb.keycodes[minlen]: return True return False class KeybindingList(list): def __init__(self): self._marker = None # last cyclic keybinding def remove(self, keybinding): for i, kb in enumerate(self): if kb.match_full(keybinding.keycodes, keybinding.modmask): del self[i] return raise ValueError(keybinding) def set_marker(self, keybinding): self._marker = keybinding def reset_marker(self): self._marker = None def find_partial(self, keycodes, modmask): """ Return a keybinding object if only one matching keybinding exists. Return 1 if more than one, and 0 if no such keybinding. """ found = 0 res = None for kb in self: if kb.match_partial(keycodes, modmask): if found == 1: return 1 res = kb found += 1 if res is not None: return res return 0 def find_full(self, keycodes, modmask): """ Return the keybinding with exact matching of `keycodes` and `modmask` """ for kb in self: if kb.match_full(keycodes, modmask): return kb def find_cyclic(self, keycodes, modmask): """ Return the first matching cyclic keybinding after the marker if it is set and matches. Otherwise, return the first matching cyclic keybinding. """ marker_found = False first_kb = None for i, kb in enumerate(self): if kb.match_cyclic(keycodes, modmask): if first_kb is None: first_kb = kb if marker_found: return kb elif kb == self._marker: marker_found = True return first_kb def append(self, keybinding): for kb in self: if kb.collideswith(keybinding): raise KeybindingCollisionError(keybinding, kb) keybinding._marker = self._marker list.append(self, keybinding) class KeyBinderError(Exception): """Base class for KeyBinder exceptions.""" class BadShortcut(KeyBinderError): """Raised when one of the shortcut's symbol has invalid keycode.""" def __init__(self, shortcut): self.shortcut = shortcut def __str__(self): return ("can't bind shortcut '%s'. Symbol '%s' has bad keycode." % (self.shortcut, self.shortcut[0])) class GrabError(KeyBinderError): """Raised when the key is already grabbed.""" def __init__(self, keybinding): self.keybinding = keybinding def __str__(self): return ("can't grab key %s. It is already " "grabbed by another program.") % (self.keybinding) class KeyBinder(object): def __init__(self): self._kblist = KeybindingList() self._key_listener = KeyListener(self._kblist) Xtool.register_key_listener(self._key_listener) for kb in self._kblist: self.bind(kb) def bind(self, keybinding): kb = keybinding found = self._kblist.find_partial([kb.keycodes[0]], kb.modmask) self._kblist.append(kb) if found: return ec = Xlib.error.CatchError(Xlib.error.BadAccess) Xtool.grab_key(kb.keycodes[0], kb.modmask, onerror=ec) Xtool.sync() if ec.get_error(): self._kblist.pop() Xtool.ungrab_key(kb.keycodes[0], kb.modmask) raise GrabError(escape(keybinding)) def rebind(self, oldkb, newkb): """ Rebind old keybinding oldkb to new keybinding newkb. It is expected that the keybindings have the same prefix and base key. """ kb = self._kblist.find_full(oldkb.keycodes, oldkb.modmask) if not kb: raise ValueError("keybinding %s is not in the list" % oldkb) if newkb.cyclic: kb.cycle = newkb.cycle elif kb.cyclic: del kb.cycle kb.cyclic = newkb.cyclic kb.call = newkb.call return kb def unbind(self, keybinding): kb = keybinding self._kblist.remove(kb) if not self._kblist.find_partial([kb.keycodes[0]], kb.modmask): Xtool.ungrab_key(kb.keycodes[0], kb.modmask) def unbind_all(self): """Delete all the keybindings and ungrab related keys.""" km = [(kb.keycodes[0], kb.modmask) for kb in self._kblist] for key, modmask in set(km): Xtool.ungrab_key(key, modmask) self._kblist.clear() class KeyListener(object): """ `KeyListener` recieves the key events, determines the pressed keybindings, and calls the appropriate functions. """ def __init__(self, kblist): self._kblist = kblist self._initial_state() def _initial_state(self): self.keycodes = [] self.pressed = set() self._modmask = None self._kbd_grabbed = False self._cycling = False self._kblist.reset_marker() def _grab_keyboard(self): if not self._kbd_grabbed: Xtool.grab_keyboard() Log.debug('keys', 'keyboard grabbed') self._kbd_grabbed = True def _ungrab_keyboard(self): if self._kbd_grabbed: Xtool.ungrab_keyboard() Log.debug('keys', 'keyboard ungrabbed') self._kbd_grabbed = False def on_key_press(self, ev): Log.debug('keys', 'key \'%s\' pressed with modifier state %d and ' 'keycode %d', Xtool.get_key(ev.detail), ev.state, ev.detail) if self._modmask is None: self._modmask = ev.state self.pressed.add(ev.detail) if self._cycling and ev.detail == self.keycodes[-1]: return self._grab_keyboard() self.keycodes.append(ev.detail) kb = self._kblist.find_partial(self.keycodes, self._modmask) if kb == 0: self._ungrab_keyboard() self._initial_state() elif kb == 1: pass else: Log.info('keys', 'keybinding cauguht: %s' % kb) kb.call() self._ungrab_keyboard() self._initial_state() def on_key_release(self, ev): Log.debug('keys', 'key \'%s\' released with modifier state %d and ' 'keycode %d', Xtool.get_key(ev.detail), ev.state, ev.detail) if ev.detail in self.pressed: self.pressed.remove(ev.detail) if Xtool.is_modifier(ev.detail): if self._cycling and not self.pressed: self._ungrab_keyboard() self._initial_state() return if ev.detail != self.keycodes[-1]: if (self._cycling and not self.pressed and Xtool.modmask_equal(self._modmask, 0)): self._ungrab_keyboard() self._initial_state() return kb = self._kblist.find_cyclic(self.keycodes, self._modmask) if kb: Log.info('keys', 'cyclic keybinding cauguht: %s' % kb) kb.cycle() if (not self.pressed and (not Xtool.modmask_equal(self._modmask, ev.state) or Xtool.modmask_equal(self._modmask, 0))): self._ungrab_keyboard() self._initial_state() else: self._cycling = True self._kblist.set_marker(kb) else: kb = self._kblist.find_full(self.keycodes, self._modmask) if kb: Log.info('keys', 'keybinding cauguht: %s' % kb) kb.call() self._ungrab_keyboard() self._initial_state() class KeybindingManager(object): bound_commands = {} unbound_commands = {} def __init__(self): self._shortgen = ShortcutGenerator() map(self._shortgen.forbid_base, Config.rules.get_permanent_keys()) self._keybinder = KeyBinder() def _make_run_command_func(self, command): return functools.partial(subprocess.Popen, command, shell=True, cwd=os.path.expanduser('~'), stdin=file(os.devnull, 'r'), stdout=file(os.devnull, 'a+'), stderr=file(os.devnull, 'a+', 0), close_fds=True) def _make_raise_window_func(self, wid): if Config.desktop_action == 'SwitchDesktop': action = Xtool.DESKTOP_SWITCH_DESKTOP elif Config.desktop_action == 'MoveWindow': action = Xtool.DESKTOP_MOVE_WINDOW else: action = Xtool.DESKTOP_IGNORE def raise_window(wid=wid, action=action): try: Xtool.raise_window(wid, action) except BadWindow, e: Log.exception('windows', e) return raise_window def unbind_window(self, shortcut): if shortcut in self.unbound_commands: command = self.unbound_commands[shortcut] self.rebind_to_command(shortcut, command) self.bound_commands[shortcut] = command del self.unbound_commands[shortcut] else: kb = Keybinding(Config.prefix + list(shortcut), lambda: None) self._keybinder.unbind(kb) def bind_window(self, shortcut, window): if shortcut in self.bound_commands: kb = self.rebind_to_window(shortcut, window) if kb: self.unbound_commands[shortcut] = self.bound_commands[shortcut] del self.bound_commands[shortcut] else: raise_window = self._make_raise_window_func(window.wid) kb = Keybinding(Config.prefix + list(shortcut), raise_window, raise_window) self._keybinder.bind(kb) return kb def bind_command(self, shortcut, command): run_command = self._make_run_command_func(command) kb = Keybinding(Config.prefix + list(shortcut), run_command) self._keybinder.bind(kb) self.bound_commands[shortcut] = command return kb def bind_all_commands(self): for rule in Config.rules: if rule.command is not None: try: self.bind_command(rule.awn[1], rule.command) except GrabError, e: Log.info('keys', escape(e)) def rebind_to_window(self, shortcut, window): raise_window = self._make_raise_window_func(window.wid) oldkb = Keybinding(Config.prefix + list(shortcut), lambda: None) newkb = Keybinding(Config.prefix + list(shortcut), raise_window, raise_window) try: newkb = Keybinding(Config.prefix + list(shortcut), raise_window, raise_window) except CyclicKeybindingError, e: Log.info('keys', e) return window.keybinding = self._keybinder.rebind(oldkb, newkb) window.shortcut = shortcut window.shortcut_sort_key = self._shortgen.shortcut_sort_key( window.shortcut) return window.keybinding def rebind_to_command(self, shortcut, command): run_command = self._make_run_command_func(command) oldkb = Keybinding(Config.prefix + list(shortcut), lambda: None) newkb = Keybinding(Config.prefix + list(shortcut), run_command) return self._keybinder.rebind(oldkb, newkb) def pickup_shortcut(self, window, windows): """ Pick up shortcut with ungrabbed key and bind window to it. On success set shortcut, shortcut_sort_key and keybinding. On failure (all shortcuts are already grabbed), don't modify window attributes. """ while True: shortcut = self._shortgen.new_shortcut(window, windows) if not shortcut: Log.info(('windows', 'keys'), 'so many windows, so few keys') return try: kb = self.bind_window(shortcut, window) except (GrabError, CyclicKeybindingError), e: Log.info('keys', e) self._shortgen.forbid_base(shortcut[0]) else: break window.shortcut = shortcut window.shortcut_sort_key = self._shortgen.shortcut_sort_key( window.shortcut) window.keybinding = kb Log.info(('windows', 'keys'), "window '%s' (id=0x%x) was bound to " "'%s'", window.awn, window.wid, window.shortcut) def try_shortcut(self, shortcut, window): """ Similar to pickup_shortcut, but try to use shortcut instead. On success set shortcut, shortcut_sort_key and keybinding. On failure (e.g. all shortcuts are already grabbed), don't modify window attributes. """ try: kb = self.bind_window(shortcut, window) except (GrabError, CyclicKeybindingError), e: Log.info('keys', e) return window.shortcut = shortcut window.shortcut_sort_key = self._shortgen.shortcut_sort_key( window.shortcut) window.keybinding = kb Log.info(('windows', 'keys'), "window '%s' (id=0x%x) was bound to " "'%s'", window.awn, window.wid, window.shortcut) class WindowManager(object): """ `WindowManager` tracks changes of the window list and window names. """ def __init__(self): self._keyman = KeybindingManager() self._windows = WindowList() self._init_window_types() self._keyman.bind_all_commands() for wid in Xtool.get_window_list(): self._on_window_create(wid) Xtool.register_window_list_listener(self) Xtool.register_window_name_listener(self) def _init_window_types(self): self.known_window_types = [] self.window_types = [] if Config.window_types: self.window_types = [0] * len(Config.window_types) for t in Config.known_window_types: atom = Xtool.get_window_type_atom(t) self.known_window_types.append(atom) if t in Config.window_types: self.window_types[Config.window_types.index(t)] = atom Log.debug('X', 'known window types: %s', ', '.join(map(str, self.known_window_types))) Log.debug('X', 'window types: %s', ', '.join(map(str, self.window_types))) def _get_window_type(self, wid): """Get window type. Raise BadWindow.""" types = Xtool.get_window_types(wid) for t in types: if t in self.known_window_types: return t if Xtool.get_transient_for(wid) is None: return Xtool.get_window_type_atom('NORMAL') return Xtool.get_window_type_atom('DIALOG') def _get_awn(self, window): rule = Config.rules.lookup_rule(window) if rule: # rule found? awn = rule.substitute(window) if awn is None: # bad awn substitution? awn = window.klass # awn is class by default elif awn == '': # special case: awn = None # window to be ignored else: # if no rule, then assign class awn = window.klass return awn def _get_group(self, window): gid = 0 if window.awn is None: gid = 0 elif Config.group_windows_by == 'Group': try: gid = Xtool.get_window_group_id(window.wid) except BadWindow as e: Log.exception('windows', e) elif Config.group_windows_by == 'AWN': gid = self._windows.get_group_id(window.awn) if not gid: gid = self._windows.get_unique_group_id() return gid def _add_shortcut(self, window): """Add shortcut, shortcut_sort_key and keybinding attributes to window.""" if window.awn.startswith('!'): # permanent shortcut if not self._windows.get_group_id(window.awn): # the first window self._keyman.try_shortcut(window.awn[1], window) else: # no shortcut for the leader => key is grabbed by someone else wins = self._windows.get_group_windows(window.gid) if wins and wins[0].shortcut is not None: self._keyman.pickup_shortcut(window, self._windows) else: # normal awn self._keyman.pickup_shortcut(window, self._windows) def on_window_list_changed(self): old_wids = set([win.wid for win in self._windows]) current_wids = set(Xtool.get_window_list()) new_wids = current_wids.difference(old_wids) closed_wids = old_wids.difference(current_wids) for new_wid in sorted(new_wids): Log.debug('windows', 'new window (id=0x%x)', new_wid) self._on_window_create(new_wid) if closed_wids: Log.debug('windows', 'windows closed (ids: %s)' % ', '.join(map(hex, closed_wids))) self._on_windows_close(sorted(closed_wids)) def on_window_name_changed(self, wid): win = self._windows.get_window(wid) # In some rare cases 'window name changed' event is recieved after # 'window closed' event somehow. Check if it has not already been # removed from the window list if win is not None: self._update_window_name(win, win.shortcut) else: Log.warning('windows', 'name of the window (id=0%x) changed ' + 'while it is not in the window list', wid) def _on_windows_close(self, wids): """ Remove windows from the list, unbind them, rebind others if necessary. """ closed_windows = self._windows.get_windows(wids) closed_groups = set([w.gid for w in closed_windows]) for group in closed_groups: wins = self._windows.get_group_windows(group) sh_queue = [] for w in wins: if w.wid in wids and w.shortcut: sh_queue.append(w.shortcut) elif sh_queue: if w.shortcut: sh_queue.append(w.shortcut) prev_shortcut = w.shortcut self._keyman.rebind_to_window(sh_queue.pop(0), w) Log.info('keys', 'Rebinding: %s -> %s', prev_shortcut, w.shortcut) self._update_window_name(w, prev_shortcut) for shortcut in sh_queue: if shortcut: self._keyman.unbind_window(shortcut) for w in closed_windows: self._windows.remove(w) def _on_window_create(self, wid): """ Create window, initialise its attributes, add to the window list, possibly change its name, and register the window for watching its name. """ window = Window() window.wid = wid try: window.name = Xtool.get_window_name(wid) window.instance, window.klass = Xtool.get_window_class(wid) window.kind = self._get_window_type(wid) except BadWindow, e: Log.exception('windows', e) return # consider only appropriate window types if not self.window_types or window.kind in self.window_types: window.awn = self._get_awn(window) window.gid = self._get_group(window) if window.awn is not None: self._add_shortcut(window) Log.info('windows', 'new window attributes: %s' % window) if window.shortcut is not None: if (not window.awn.startswith('!') and window.awn not in self._windows.get_all_awns()): Config.history.update_item(window.awn, window.shortcut[0]) self._update_window_name(window, window.shortcut) if window.awn is not None: try: Xtool.listen_window_name(window.wid) except BadWindow, e: Log.exception('windows', e) self._windows.append(window) def _update_window_name(self, window, prev_shortcut): """Change the window name, so it includes the shortcut.""" if Config.title_format == 'None': return if window.shortcut is None: return try: new_name = Xtool.get_window_name(window.wid) except BadWindow, e: Log.exception('windows', e) return if prev_shortcut is not None: edges = Config.title_format.split('%t') start = edges[0].replace('%s', prev_shortcut) end = edges[1].replace('%s', prev_shortcut) if new_name.startswith(start) and new_name.endswith(end): new_name = new_name[len(start):len(new_name)-len(end)] if (new_name == window.name and prev_shortcut == window.shortcut): return # window name wasn't changed if new_name != window.name: Log.info('windows', "window name '%s' (id=0x%x) changed to " "'%s'", window.name, window.wid, new_name) window.name = new_name new_name = Config.title_format.replace('%t', new_name) new_name = new_name.replace('%s', window.shortcut) try: Xtool.set_window_name(window.wid, new_name) except BadWindow, e: Log.exception('windows', e) class SignalHandler: """ Object holding static methods that implement the program behaviour when recieving signals. """ @staticmethod def graceful_exit_handler(sig, frame): """Handle graceful exit of the program.""" Log.info('signals', 'signal recieved: %i', sig) graceful_exit(history=True) @staticmethod def save_histoy_handler(sig, frame): """Save the current history to the configuration file.""" try: Config.history.write() except (IOError, OSError, MissingSectionError), e: Log.exception('config', e) @staticmethod def handle_all(): """Handle all the signals defined in `SignalHandler` class""" signal.signal(signal.SIGTERM, SignalHandler.graceful_exit_handler) signal.signal(signal.SIGHUP, signal.SIG_IGN) signal.signal(signal.SIGUSR1, SignalHandler.save_histoy_handler) class KeepLock(Exception): """ Raised to exit the program keeping the lock file. Useful for creating a daemon. """ class FileLock: """ Class which provides file locking functionality. It also implements context manager protocol. """ fd = None def __enter__(self): """ Return (lock, err) tuple lock is a FileLock instance if the lock was acquired successfully, err is None or an unexpected exception (IOError or OSError). """ err = None lock = None try: lock = self if self.lock() else None except EnvironmentError, e: err = e return lock, err def __exit__(self, exc_type, exc_value, traceback): if exc_type == KeepLock: return True else: self.remove() def lock(self): """ Acquire a lock, return True if the lock was successful. Return False if the lock had already been set (by another xatk instance). Raise IOError or OSError if the error occurred while opening, locking or writing to a file. """ status = True fd = os.open(LOCK_PATH, os.O_RDWR | os.O_CREAT, 0600) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError, e: os.close(fd) if e.errno == errno.EWOULDBLOCK: status = False else: raise if status: os.ftruncate(fd, 0) os.write(fd, str(os.getpid()) + '\n') self.fd = fd return status def remove(self): """Remove the lock.""" try: if self.fd: os.close(self.fd) self.fd = None os.remove(LOCK_PATH) except OSError, e: Log.exception('lock', e) def update_pid(self): if self.fd: try: os.ftruncate(self.fd, 0) os.lseek(self.fd, 0, os.SEEK_SET) os.write(self.fd, str(os.getpid()) + '\n') except OSError, e: Log.exception('lock', e) def graceful_exit(exit_code=0, history=False): """Optionally write history and remove lock, shutdown logging, and exit.""" if history: try: Config.history.write() except (IOError, OSError, MissingSectionError), e: Log.exception('config', e) logging.shutdown() sys.exit(exit_code) def kill_running_instance(): """Kill running xatk instance. Return status code.""" status = 1 try: with open(LOCK_PATH) as lockfile: pid = int(lockfile.readline().rstrip()) except IOError as e: if e.errno == errno.ENOENT: sys.stderr.write('Lock file not found. xatk is probably not ' 'running.\n') else: sys.stderr.write("Error reading the lock file '%s': %s\n" % (LOCK_PATH, e)) except ValueError: sys.stderr.write("The lock file content is invalid.") else: os.kill(pid, signal.SIGTERM) status = 0 return status def parse_options(): """ Parse command line options and return an object holding option values. """ def splitstr(option, opt_str, value, parser, choice): splits = value.split(',') for s in splits: if not s in choice: raise optparse.OptionValueError('option %s: invalid field: ' '%s (valid fields: %s)' % (opt_str, s, ', '.join(choice))) setattr(parser.values, option.dest, splits) usage = 'usage: %s [options]' % PROG_NAME optparser = optparse.OptionParser(usage=usage, version=VERSION_NAME, add_help_option=False, conflict_handler='resolve') optparser.add_option('-h', '--help', action='help', help="Show this help message and exit.") optparser.add_option('-V', '--version', action='version', help="Show program's version number and exit.") optparser.add_option('-f', '--file', dest='filepath', metavar='FILE', help='Specify a configuration file. By default, the ' 'configuration file is searched for in the following ' 'places: %s' % ', '.join(CONFIG_PATH)) optparser.add_option('-d', '--display', dest='display', metavar='DISPLAY', type='string', help='Specify X display name to connect to. If not ' 'given the environment variable $DISPLAY is used.') optparser.add_option('-n', '--no-daemon', dest='daemon', action='store_false', default=True, help='Don\'t start as daemon.') optparser.add_option('-p', '--print-defaults', dest='print_defaults', action='store_true', default=False, help='Print a default configuration file on the ' 'standard output.') optparser.add_option('-k', '--print-keys', dest='keys', action='store_true', default=False, help='Print all available keys on the standard ' 'output.') optparser.add_option('--kill', dest='kill', action='store_true', default=False, help='Gracefully stop running xatk instance.') debgroup = optparse.OptionGroup(optparser, 'Debugging Options') debgroup.add_option('-v', '--verbose', dest='verbosity', action='count', default=0, help='Provide verbose output. When the option is ' 'given twice the verbosity increases.') debgroup.add_option('-t', '--format', dest='fields', type='string', action='callback', callback=splitstr, callback_args=(Log.FORMAT_DICT.keys(), ), metavar='field1[,field2[,...]]', help='Specify which fields to print and their order. ' 'Possible fields: %s.' % ', '.join(Log.FORMAT_DICT.keys())) debgroup.add_option('-r', '--filter', dest='categories', type='string', action='callback', callback=splitstr, callback_args=(FIELDS, ), metavar='category1[,category2[,...]]', help='Print only those messages that belong to given ' 'categories (this doesn\'t apply to errors and ' 'warnings which are always printed). Possible ' 'categories: %s.' % ', '.join(FIELDS)) debgroup.add_option('-l', '--log-file', dest='logfile', metavar='FILE', help='Specify a file where to write a log. Options ' '-v/--verbose, -t/--format and -r/--filter don\'t ' 'affect logging to the file.') debgroup.add_option('-b', '--backup-count', dest='backup_count', type='int', default=0, metavar='NUMBER', help='How many log files to store not counting the ' 'current one (specified by -l/--log-file option). ' 'Default value is %%default. If NUMBER is not 0 ' 'log files will be rotated on every %s\'s start. ' 'The name of the oldest file will have the largest ' 'number at the end (e.g. %s.log.5).' % ((PROG_NAME, )*2)) optparser.add_option_group(debgroup) (options, args) = optparser.parse_args() if args: optparser.error('no argument was expected: %s' % ', '.join(args)) if options.verbosity == 0: if options.fields is not None: optparser.error('option -t/--format: -v/--verbose should' ' be specified') if options.categories is not None: optparser.error('option -r/--filter: -v/--verbose should ' 'be specified') if options.logfile is None: if options.backup_count !=0: optparser.error('option -b/--backup-count: -l/--log-file should ' 'be specified') if options.backup_count < 0: optparser.error('option -b/--backup-count: value should be 0 or ' 'larger: %d' % options.backup_count) return options def daemonize(): """ Do the UNIX double-fork magic, see Stevens' "Advanced Programming in the UNIX Environment" for details (ISBN 0201563177) http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 Based on Sander Marechal's Daemon class http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/ """ try: pid = os.fork() if pid > 0: # exit first parent raise KeepLock except OSError, e: Log.critical(PROG_NAME, 'fork #1 failed: %d (%s)', e.errno, e.strerror) graceful_exit(1) # decouple from parent environment os.chdir('/') os.setsid() # do second fork try: pid = os.fork() if pid > 0: # exit from second parent raise KeepLock except OSError, e: Log.critical(PROG_NAME, 'fork #2 failed: %d (%s)', e.errno, e.strerror) graceful_exit(1) # redirect standard file descriptors sys.stdout.flush() sys.stderr.flush() si = file(os.devnull, 'r') so = file(os.devnull, 'a+') se = file(os.devnull, 'a+', 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) def main(): options = parse_options() if options.print_defaults: print Config.get_default_config().encode(ENCODING) graceful_exit() if options.kill: status = kill_running_instance() graceful_exit(status) if options.verbosity == 0: Log.setLevel(logging.WARNING) elif options.verbosity == 1: Log.setLevel(logging.INFO) else: Log.setLevel(logging.DEBUG) if options.fields is not None: Log.configFormatter(options.fields) if options.categories is not None: Log.configFilter(options.categories) if options.logfile is not None: Log.configRotatingFileHandler(options.logfile, options.backup_count) if not XLIB_PRESENT: Log.error('X', 'can\'t import Xlib, probably python-xlib ' 'is no installed') graceful_exit(1) codecs.register_error('ignore_log', unicode_error_ignore_and_log) Log.capture_stderr() Log.capture_stdout() try: Xtool.connect(options.display) except Xlib.error.DisplayError, e: Log.exception('X', e) graceful_exit(1) Log.log_system_information() if options.keys: for key in Xtool.get_all_keys(): sys.stdout.stdbackup.write('%s\n' % key) graceful_exit() with FileLock() as (filelock, err): if err: Log.exception('lock', err) graceful_exit(1) if not filelock: Log.error('lock', "xatk is already running. See " "%s for pid", LOCK_PATH) graceful_exit(1) if options.filepath is not None: Config.set_path(options.filepath) Config.use_defaults() filepath = Config.get_path() if filepath and os.path.exists(filepath): try: Config.parse() except ConfigError, e: Log.exception('config', e) graceful_exit(1) else: pathstr = '%s ' % filepath if options.filepath else '' Log.error('config', ("configuration file %sdoesn't exist. Create " "it first: mkdir -p ~/.xatk && %s --print-defaults > " "%s") % (pathstr, PROG_NAME, CONFIG_PATH[1])) graceful_exit(1) if options.daemon: Log.release_stderr() Log.release_stdout() daemonize() if options.logfile is not None: Log.capture_stderr() Log.capture_stdout() Log.removeHandler() filelock.update_pid() SignalHandler.handle_all() try: WindowManager() # everything starts here Xtool.event_loop() # and continues here except ConnectionClosedError, e: Log.exception('X', e) graceful_exit(1, history=True) if __name__ == "__main__": main()