#!/usr/bin/env python3 # vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab ######################################################################### # Copyright 2011-2013 Niko Will # Copyright 2017 Bernd Meiners Bernd.Meiners@mail.de ######################################################################### # This file is part of SmartHomeNG. https://github.com/smarthomeNG// # # SmartHomeNG is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # SmartHomeNG is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with SmartHomeNG. If not, see . ########################################################################## # Item Data Format # # Each UZSU item is of type list. Each list entry has to be a dict with specific key and value pairs. # Here are the possible keys and their purpose: # # dtstart: a datetime object. Exact datetime as start value for the rrule algorithm. Important e.g. for FREQ=MINUTELY rrules (optional). # # value: the value which will be set to the item. # # active: True if the entry is activated, False if not. # A deactivated entry is stored to the database but doesn't trigger the setting of the value. # It can be enabled later with the update method. # # time: time as string to use sunrise/sunset arithmetics like in the crontab # examples: # 17:008:00 # 17:00 '1.5.1': self._items[item] = item() else: self._items[item] = copy.deepcopy(item()) _sunrise = self._uzsu_sun.rise() _sunset = self._uzsu_sun.set() if _sunrise.tzinfo == tzutc(): _sunrise = _sunrise.astimezone(self._timezone) if _sunset.tzinfo == tzutc(): _sunset = _sunset.astimezone(self._timezone) self._items[item]['sunrise'] = '{:02}:{:02}'.format(_sunrise.hour, _sunrise.minute) self._items[item]['sunset'] = '{:02}:{:02}'.format(_sunset.hour, _sunset.minute) self.logger.debug('Updated sun entries for item {}, triggered by {}. sunrise: {}, sunset: {}'.format( item, caller, self._items[item]['sunrise'], self._items[item]['sunset'])) success = True except Exception: success = False return success def _update_suncalc(self, item, entry, entryindex, entryvalue): update = False if entry.get('calculated'): update = True if entry == self._items[item]['list'][entryindex] else update with mock.patch.dict(entry, calculated=entryvalue): update = True if entry == self._items[item]['list'][entryindex] else update else: update = True if update is True and not entry.get('calculated') == entryvalue: self.logger.debug("Updated calculated time for item {} entry {} with value {}.".format( item, self._items[item]['list'][entryindex], entryvalue)) self._items[item]['list'][entryindex]['calculated'] = entryvalue item(self._items[item], 'UZSU Plugin', 'update_sun') else: self.logger.debug("Sun calculation {} entry not updated for item {} with value {}".format( entryvalue, item, entry.get('calculated'))) def _add_type(self, item): """ Adding the type of the item that is changed by the uzsu to the item dict :param item: uzsu item :type item: item :return: The item type of the item that is changed """ _itemforuzsu = self.get_iattr_value(item.conf, ITEM_TAG[0]) try: _uzsuitem = self.itemsApi.return_item(_itemforuzsu) except Exception as err: _uzsuitem = None self.logger.warning("Item to be set by uzsu '{}' does not exist. Error: {}".format(_itemforuzsu, err)) try: _itemtype = _uzsuitem.property.type except Exception as err: try: _itemtype = _uzsuitem.type() except Exception: _itemtype = 'foo' if _uzsuitem is not None else None if _itemtype is None: self.logger.warning("Item to be set by uzsu '{}' does not exist. Error: {}".format(_itemforuzsu, err)) else: self.logger.warning("Item to be set by uzsu '{}' does not have a type attribute. Error: {}".format(_itemforuzsu, err)) return _itemtype def _logics_lastvalue(self, item=None): lastvalue = self._items[item].get('lastvalue') self.logger.debug("Last value of item {} is: {}.".format(item, lastvalue)) return lastvalue def _logics_activate(self, activevalue=None, item=None): if isinstance(activevalue, str): if activevalue.lower() in ['1', 'yes', 'true', 'on']: activevalue = True elif activevalue.lower() in ['0', 'no', 'false', 'off']: activevalue = False else: self.logger.warning("Value to activate item '{}' has to be True or False".format(item)) if isinstance(activevalue, bool): if '.'.join(VERSION.split('.', 3)[:3]) > '1.5.1': self._items[item] = item() else: self._items[item] = copy.deepcopy(item()) self._items[item]['active'] = activevalue self.logger.info("Item {} is set via logic to: {}".format(item, activevalue)) item(self._items[item], 'UZSU Plugin', 'logic') return activevalue if activevalue is None: return self._items[item].get('active') def _logics_interpolation(self, type=None, interval=None, backintime=None, item=None): interval = self._interpolation_interval if interval is None else interval backintime = self._backintime if backintime is None else backintime if type is None: return self._items[item].get('interpolation') else: if '.'.join(VERSION.split('.', 3)[:3]) > '1.5.1': self._items[item] = item() else: self._items[item] = copy.deepcopy(item()) self._items[item]['interpolation']['type'] = str(type).lower() self._items[item]['interpolation']['interval'] = abs(int(interval)) self._items[item]['interpolation']['initage'] = int(backintime) self.logger.info("Item {} interpolation is set via logic to: type={}, interval={}, backintime={}".format( item, type, abs(interval), backintime)) item(self._items[item], 'UZSU Plugin', 'logic') return self._items[item].get('interpolation') def _logics_clear(self, clear=False, item=None): if isinstance(clear, str): if clear.lower() in ['1', 'yes', 'true', 'on']: clear = True else: self.logger.warning("Value to clear uzsu item '{}' has to be True".format(item)) if isinstance(clear, bool) and clear is True: self._items[item].clear() self._items[item] = {'interpolation': {}, 'active': False} self.logger.info("UZSU settings for item '{}' are cleared".format(item)) item(self._items[item], 'UZSU Plugin', 'clear') return True else: return False def _logics_itpl(self, clear=False, item=None): if isinstance(clear, str): if clear.lower() in ['1', 'yes', 'true', 'on']: clear = True if isinstance(clear, bool) and clear is True: self._itpl[item].clear() self.logger.info("UZSU interpolation dict for item '{}' is cleared".format(item)) return self._itpl[item] else: self.logger.info("UZSU interpolation dict for item '{}' is: {}".format(item, self._itpl[item])) return self._itpl[item] def _logics_planned(self, item=None): if self._planned.get(item) not in [None, {}, 'notinit'] and self._items[item].get('active') is True: self.logger.info("Item '{}' is going to be set to {} at {}".format( item, self._planned[item]['value'], self._planned[item]['next'])) return self._planned[item] elif self._planned.get(item) == 'notinit' and self._items[item].get('active') is True: self.logger.info("Item '{}' is active but not fully initialized yet.".format(item)) return None elif not self._planned.get(item) and self._items[item].get('active') is True: self.logger.warning("Item '{}' is active but has no (active) entries.".format(item)) self._planned.update({item: None}) return None else: self.logger.info("Nothing planned for item '{}'.".format(item)) return None def parse_item(self, item): """ Default plugin parse_item method. Is called when the plugin is initialized. The plugin can, corresponding to its attribute keywords, decide what to do with the item in future, like adding it to an internal array for future reference :param item: The item to process. :type item: item :return: If the plugin needs to be informed of an items change you should return a call back function like the function update_item down below. An example when this is needed is the knx plugin where parse_item returns the update_item function when the attribute knx_send is found. This means that when the items value is about to be updated, the call back function is called with the item, caller, source and dest as arguments and in case of the knx plugin the value can be sent to the knx with a knx write function within the knx plugin. """ if self.has_iattr(item.conf, ITEM_TAG[0]): item.expand_relativepathes(ITEM_TAG[0], '', '') # add functions for use in logics and webif item.activate = functools.partial(self._logics_activate, item=item) item.lastvalue = functools.partial(self._logics_lastvalue, item=item) item.interpolation = functools.partial(self._logics_interpolation, item=item) item.clear = functools.partial(self._logics_clear, item=item) item.planned = functools.partial(self._logics_planned, item=item) item.itpl = functools.partial(self._logics_itpl, item=item) if '.'.join(VERSION.split('.', 3)[:3]) > '1.5.1': self._items[item] = item() else: self._items[item] = copy.deepcopy(item()) try: self._items[item]['interpolation']['initialized'] = False except Exception: self._items[item]['interpolation'] = {} self._items[item]['interpolation']['type'] = 'none' self._items[item]['interpolation']['initialized'] = False if self._items[item].get('list'): for entry, _ in enumerate(self._items[item]['list']): self._items[item]['list'][entry].pop('condition', None) self._items[item]['list'][entry].pop('holiday', None) self._items[item]['list'][entry].pop('delayedExec', None) else: self._items[item]['list'] = [] if not self._items[item].get('active'): self._items[item]['active'] = False item(self._items[item], 'UZSU Plugin', 'init') self._planned.update({item: 'notinit'}) self.logger.debug('Dict for item {} is: {}'.format(item, self._items[item])) return self.update_item def _remove_dupes(self, item): self._items[item]['list'] = [i for n, i in enumerate(self._items[item]['list']) if i not in self._items[item]['list'][:n]] self.logger.debug('Removed duplicate entries for item {}.'.format(item)) compare_entries = item.prev_value() if compare_entries.get('list'): newentries = [] [newentries.append(i) for i in self._items[item]['list'] if i not in compare_entries['list']] self.logger.debug('Got update for item {}: {}'.format(item, newentries)) for entry in self._items[item]['list']: for new in newentries: found = False if not entry.get('value') == new.get('value') and new.get('active') is True: try: with mock.patch.dict(entry, value=new.get('value'), calculated=new['calculated']): found = True if entry == new else found except Exception: with mock.patch.dict(entry, value=new.get('value')): found = True if entry == new else found if found is True: self._items[item]['list'][self._items[item]['list'].index(entry)].update({'active': False}) time = entry['time'] oldvalue, newvalue = entry['value'], new['value'] self.logger.warning("Set old entry for item '{}' at {} with value {} to inactive" " because newer active entry with value {} found.".format( item, time, oldvalue, newvalue)) item(self._items[item], 'UZSU Plugin', 'update') def _check_rruleandplanned(self, item): if self._items[item].get('list'): _inactive = 0 count = 0 for entry in self._items[item]['list']: if entry.get('active') is False: _inactive += 1 if entry.get('rrule') == '': try: _index = self._items[item]['list'].index(entry) self._items[item]['list'][_index]['rrule'] = 'FREQ=WEEKLY;BYDAY=MO,TU,WE,TH,FR,SA,SU' count += 1 except Exception as err: self.logger.warning("Error creating rrule: {}".format(err)) if count > 0: self.logger.debug("Updated {} rrule entries for item: {}".format(count, item)) item(self._items[item], 'UZSU Plugin', 'create_rrule') if _inactive >= len(self._items[item]['list']): self._planned.update({item: None}) def update_item(self, item, caller=None, source=None, dest=None): """ This is called by smarthome engine when the item changes, e.g. by Visu or by the command line interface The relevant item is put into the internal item list and registered to the scheduler :param item: item to be updated towards the plugin :param caller: if given it represents the callers name :param source: if given it represents the source :param dest: if given it represents the dest """ if '.'.join(VERSION.split('.', 3)[:3]) > '1.5.1': self._items[item] = item() else: self._items[item] = copy.deepcopy(item()) cond = (not caller == 'UZSU Plugin') or source == 'logic' self.logger.debug('Update Item {}, Caller {}, Source {}, Dest {}. Will update: {}'.format( item, caller, source, dest, cond)) if not source == 'create_rrule': self._check_rruleandplanned(item) # Removing Duplicates if self._remove_duplicates is True and self._items[item].get('list') and cond: self._remove_dupes(item) if cond and self._items[item].get('active') is False and not source == 'update_sun': self._items[item]['lastvalue'] = None self.logger.debug('lastvalue for item {} set to None because UZSU is deactivated'.format(item)) item(self._items[item], 'UZSU Plugin', 'item_deactivated') if cond: self._schedule(item, caller='update') def _schedule(self, item, caller=None): """ This function schedules an item: First the item is removed from the scheduler. If the item is active then the list is searched for the nearest next execution time. No matter if active or not the calculation for the execution time is triggered. :param item: item to be updated towards the plugin :param caller: if given it represents the callers name """ self.scheduler_remove('uzsu_{}'.format(item.property.path)) self.logger.debug('Schedule Item {}, Trigger: {}, Changed by: {}'.format( item, caller, item.changed_by())) _next = None _value = None self._update_sun(item, caller='schedule') if self._items[item].get('interpolation') is None: self.logger.error("Something is wrong with your UZSU item. You most likely use a wrong smartVISU widget version! Use the latest device.uzsu from SV 2.9") if not self._items[item]['interpolation'].get('itemtype'): self.logger.error("item '{}' to be set by uzsu does not exist.".format( self.get_iattr_value(item.conf, ITEM_TAG[0]))) elif not self._items[item].get('list') and self._items[item].get('active') is True: self.logger.warning("item '{}' is active but has no entries.".format(item)) self._planned.update({item: None}) elif self._items[item].get('active') is True: self._itpl[item] = OrderedDict() for i, entry in enumerate(self._items[item]['list']): next, value = self._get_time(entry, 'next', item, i) previous, previousvalue = self._get_time(entry, 'previous', item, i) item(self._items[item], 'UZSU Plugin', 'schedule') cond1 = next is None and previous is not None cond2 = previous is not None and next is not None and previous < next if cond1 or cond2: next = previous value = previousvalue if next is not None: self.logger.debug("uzsu active entry for item {} with datetime {}, value {}" " and tzinfo {}".format(item, next, value, next.tzinfo)) if _next is None: _next = next _value = value elif next and next < _next: self.logger.debug("uzsu active entry for item {} using now {}, value {}" " and tzinfo {}".format(item, next, value, next.tzinfo)) _next = next _value = value else: self.logger.debug("uzsu active entry for item {} keep {}, value {} and tzinfo {}".format( item, _next, _value, _next.tzinfo)) if _next and _value is not None and self._items[item].get('active') is True: _reset_interpolation = False _interval = self._items[item]['interpolation'].get('interval') _interval = self._interpolation_interval if not _interval else int(_interval) if _interval < 0: _interval = abs(int(_interval)) self._items[item]['interpolation']['interval'] = _interval item(self._items[item], 'UZSU Plugin', 'intervalchange') _interpolation = self._items[item]['interpolation'].get('type') _interpolation = self._interpolation_type if not _interpolation else _interpolation _initage = self._items[item]['interpolation'].get('initage') _initage = 0 if not _initage else int(_initage) _initialized = self._items[item]['interpolation'].get('initialized') _initialized = False if not _initialized else _initialized entry_now = datetime.now(self._timezone).timestamp() * 1000.0 self._itpl[item][entry_now] = 'NOW' itpl_list = sorted(list(self._itpl[item].items())) entry_index = itpl_list.index((entry_now, 'NOW')) _inittime = itpl_list[entry_index - min(1, entry_index)][0] _initvalue = itpl_list[entry_index - min(1, entry_index)][1] itpl_list = itpl_list[entry_index - min(2, entry_index):entry_index + min(3, len(itpl_list))] itpl_list.remove((entry_now, 'NOW')) self._items[item]['lastvalue'] = _initvalue item(self._items[item], 'UZSU Plugin', 'lastvalue') _timediff = datetime.now(self._timezone) - timedelta(minutes=_initage) try: _value = float(_value) except ValueError: pass cond1 = _inittime - _timediff.timestamp() * 1000.0 >= 0 cond2 = _interpolation.lower() in ['cubic', 'linear'] cond3 = _initialized is False cond4 = _initage > 0 cond5 = isinstance(_value, float) self._itpl[item] = OrderedDict(itpl_list) if not cond2 and cond3 and cond4: self.logger.info("Looking if there was a value set after {} for item {}".format( _timediff, item)) self._items[item]['interpolation']['initialized'] = True item(self._items[item], 'UZSU Plugin', 'init') if cond1 and not cond2 and cond3: self._set(item=item, value=_initvalue, caller='scheduler') self.logger.info("Updated item {} on startup with value {} from time {}".format( item, _initvalue, datetime.fromtimestamp(_inittime/1000.0))) _itemtype = self._items[item]['interpolation'].get('itemtype') if cond2 and not REQUIRED_PACKAGE_IMPORTED: self.logger.warning("Interpolation is set to {} but scipy not installed. Ignoring interpolation".format( _interpolation)) elif cond2 and _interval < 1: self.logger.warning("Interpolation is set to {} but interval is {}. Ignoring interpolation".format( _interpolation, _interval)) elif cond2 and _itemtype not in ['num']: self.logger.warning("Interpolation is set to {} but type of item is {}." " Ignoring interpolation and setting UZSU interpolation to none.".format( _interpolation, _itemtype)) _reset_interpolation = True elif _interpolation.lower() == 'cubic' and _interval > 0: try: tck = interpolate.PchipInterpolator(list(self._itpl[item].keys()), list(self._itpl[item].values())) _nextinterpolation = datetime.now(self._timezone) + timedelta(minutes=_interval) _next = _nextinterpolation if _next > _nextinterpolation else _next _value = round(float(tck(_next.timestamp() * 1000.0)), self._interpolation_precision) _value_now = round(float(tck(entry_now)), self._interpolation_precision) self._set(item=item, value=_value_now, caller='scheduler') self.logger.info("Updated: {}, cubic interpolation value: {}, based on dict: {}." " Next: {}, value: {}".format(item, _value_now, self._itpl[item], _next, _value)) except Exception as e: self.logger.error("Error cubic interpolation for item {} " "with interpolation list {}: {}".format(item, self._itpl[item], e)) elif _interpolation.lower() == 'linear' and _interval > 0: try: tck = interpolate.interp1d(list(self._itpl[item].keys()), list(self._itpl[item].values())) _nextinterpolation = datetime.now(self._timezone) + timedelta(minutes=_interval) _next = _nextinterpolation if _next > _nextinterpolation else _next _value = round(float(tck(_next.timestamp() * 1000.0)), self._interpolation_precision) _value_now = round(float(tck(entry_now)), self._interpolation_precision) self._set(item=item, value=_value_now, caller='scheduler') self.logger.info("Updated: {}, linear interpolation value: {}, based on dict: {}." " Next: {}, value: {}".format(item, _value_now, self._itpl[item], _next, _value)) except Exception as e: self.logger.error("Error linear interpolation: {}".format(e)) if cond5 and _value < 0: self.logger.warning("value {} for item '{}' is negative. This might be due" " to not enough values set in the UZSU.".format(_value, item)) if _reset_interpolation is True: self._items[item]['interpolation']['type'] = 'none' item(self._items[item], 'UZSU Plugin', 'reset_interpolation') self.logger.debug("will add scheduler named uzsu_{} with datetime {} and tzinfo {}" " and value {}".format(item.property.path, _next, _next.tzinfo, _value)) self._planned.update({item: {'value': _value, 'next': _next.strftime('%Y-%m-%d %H:%M')}}) self._update_count['done'] = self._update_count.get('done') + 1 self.scheduler_add('uzsu_{}'.format(item.property.path), self._set, value={'item': item, 'value': _value}, next=_next) if self._update_count.get('done') == self._update_count.get('todo'): self.scheduler_trigger('uzsu_sunupdate', by='UZSU Plugin') self._update_count = {'done': 0, 'todo': 0} elif self._items[item].get('active') is True and self._items[item].get('list'): self.logger.warning("item '{}' is active but has no active entries.".format(item)) self._planned.update({item: None}) def _set(self, item=None, value=None, caller=None): """ This function sets the specific item :param item: item to be updated towards the plugin :param value: value the item should be set to :param caller: if given it represents the callers name """ _uzsuitem = self.itemsApi.return_item(self.get_iattr_value(item.conf, ITEM_TAG[0])) _uzsuitem(value, 'UZSU Plugin', 'set') if not caller: self._schedule(item, caller='set') def _get_time(self, entry, timescan, item=None, entryindex=None): """ Returns the next and previous execution time and value :param entry: a dictionary that may contain the following keys: value active date rrule dtstart :param item: item to be updated towards the plugin :param timescan: defines whether to find values in the future or past """ try: if not isinstance(entry, dict): return None, None if 'value' not in entry: return None, None if 'active' not in entry: return None, None if 'time' not in entry: return None, None value = entry['value'] active = entry['active'] today = datetime.today() tomorrow = today + timedelta(days=1) yesterday = today - timedelta(days=1) weekbefore = today - timedelta(days=7) time = entry['time'] if not active: return None, None if 'rrule' in entry: if entry['rrule'] == '': entry['rrule'] = 'FREQ=WEEKLY;BYDAY=MO,TU,WE,TH,FR,SA,SU' if 'dtstart' in entry: rrule = rrulestr(entry['rrule'], dtstart=entry['dtstart']) else: try: rrule = rrulestr(entry['rrule'], dtstart=datetime.combine( weekbefore, parser.parse(time.strip()).time())) self.logger.debug("Created rrule: '{}' for time:'{}'".format( str(rrule).replace('\n', ';'), time)) except ValueError: self.logger.debug("Could not create a rrule from rrule: '{}' and time:'{}'".format( entry['rrule'], time)) if 'sun' in time: rrule = rrulestr(entry['rrule'], dtstart=datetime.combine( weekbefore, self._sun(datetime.combine(weekbefore.date(), datetime.min.time()).replace(tzinfo=self._timezone), time, timescan).time())) self.logger.debug("Looking for {} sun-related time. Found rrule: {}".format( timescan, str(rrule).replace('\n', ';'))) else: rrule = rrulestr(entry['rrule'], dtstart=datetime.combine(weekbefore, datetime.min.time())) self.logger.debug("Looking for {} time. Found rrule: {}".format( timescan, str(rrule).replace('\n', ';'))) dt = datetime.now() while self.alive: dt = rrule.before(dt) if timescan == 'previous' else rrule.after(dt) if dt is None: return None, None if 'sun' in time: sleep(0.01) next = self._sun(datetime.combine(dt.date(), datetime.min.time()).replace(tzinfo=self._timezone), time, timescan) self.logger.debug("Result parsing time (rrule) {}: {}".format(time, next)) if entryindex is not None: self._update_suncalc(item, entry, entryindex, next.strftime("%H:%M")) else: next = datetime.combine(dt.date(), parser.parse(time.strip()).time()).replace(tzinfo=self._timezone) if next and next.date() == dt.date(): self._itpl[item][next.timestamp() * 1000.0] = value if next - timedelta(seconds=1) > datetime.now().replace(tzinfo=self._timezone): self.logger.debug("Return from rrule {}: {}, value {}.".format(timescan, next, value)) return next, value else: self.logger.debug("Not returning {} rrule {} because it's in the past.".format(timescan, next)) if 'sun' in time: next = self._sun(datetime.combine(today, datetime.min.time()).replace( tzinfo=self._timezone), time, timescan) cond_future = next > datetime.now(self._timezone) if cond_future: self.logger.debug("Result parsing time today (sun) {}: {}".format(time, next)) if entryindex is not None: self._update_suncalc(item, entry, entryindex, next.strftime("%H:%M")) else: self._itpl[item][next.timestamp() * 1000.0] = value self.logger.debug("Include previous today (sun): {}, value {} for interpolation.".format(next, value)) if entryindex: self._update_suncalc(item, entry, entryindex, next.strftime("%H:%M")) next = self._sun(datetime.combine(tomorrow, datetime.min.time()).replace( tzinfo=self._timezone), time, timescan) self.logger.debug("Result parsing time tomorrow (sun) {}: {}".format(time, next)) else: next = datetime.combine(today, parser.parse(time.strip()).time()).replace(tzinfo=self._timezone) cond_future = next > datetime.now(self._timezone) if not cond_future: self._itpl[item][next.timestamp() * 1000.0] = value self.logger.debug("Include {} today: {}, value {} for interpolation.".format(timescan, next, value)) next = datetime.combine(tomorrow, parser.parse(time.strip()).time()).replace(tzinfo=self._timezone) cond_today = next.date() == today.date() cond_yesterday = next.date() - timedelta(days=1) == yesterday.date() cond_tomorrow = next.date() == tomorrow.date() cond_next = next > datetime.now(self._timezone) cond_previous_today = next - timedelta(seconds=1) < datetime.now(self._timezone) cond_previous_yesterday = next - timedelta(days=1) < datetime.now(self._timezone) if next and cond_today and cond_next: self._itpl[item][next.timestamp() * 1000.0] = value self.logger.debug("Return next today: {}, value {}".format(next, value)) return next, value if next and cond_tomorrow and cond_next: self._itpl[item][next.timestamp() * 1000.0] = value self.logger.debug("Return next tomorrow: {}, value {}".format(next, value)) return next, value if next and cond_today and cond_previous_today: self._itpl[item][(next - timedelta(seconds=1)).timestamp() * 1000.0] = value self.logger.debug("Not returning previous today {} because it's in the past.".format(next)) if next and cond_yesterday and cond_previous_yesterday: self._itpl[item][(next - timedelta(days=1)).timestamp() * 1000.0] = value self.logger.debug("Not returning previous yesterday {} because it's in the past.".format(next)) except Exception as e: self.logger.error("Error '{}' parsing time: {}".format(time, e)) return None, None def _create_sun(self): """ Creates a sun object for sun calculations """ # checking preconditions from configuration: uzsu_sun = None if not self._sh.sun: # no sun object created self.logger.error("No latitude/longitude specified. Not possible to create sun object.") # create an own sun object: if not self._uzsu_sun: try: longitude = self._sh.sun._obs.long latitude = self._sh.sun._obs.lat elevation = self._sh.sun._obs.elev uzsu_sun = lib.orb.Orb('sun', longitude, latitude, elevation) self.logger.debug("Created a new sun object with latitude={}, longitude={}, elevation={}".format( latitude, longitude, elevation)) except Exception as e: self.logger.error("Error '{}' creating a new sun object. You could not " "use sunrise/sunset as UZSU entry.".format(e)) else: uzsu_sun = self._uzsu_sun return uzsu_sun def _sun(self, dt, tstr, timescan): """ parses a given string with a time range to determine it's timely boundaries and returns a time :param dt: contains a datetime object, :param tstr: contains a string with '[H:M<](sunrise|sunset)[+|-][offset][ dmax: self.logger.error("Wrong times: the earliest time should be smaller than the " "latest time in {}".format(tstr)) return try: next_time = dmin if dmin > next_time else next_time except Exception: pass try: next_time = dmax if dmax < next_time else next_time except Exception: pass return next_time def _get_dependant(self, item): """ Getting the value of the dependent item for the webif :param item: uzsu item :type item: item :return: The item value of the item that is changed """ try: _uzsuitem = self.itemsApi.return_item(self.get_iattr_value(item.conf, ITEM_TAG[0])) except Exception as err: _uzsuitem = None self.logger.warning("Item to be queried '{}' does not exist. Error: {}".format( self.get_iattr_value(item.conf, ITEM_TAG[0]), err)) try: _itemvalue = _uzsuitem() except Exception as err: _itemvalue = None self.logger.warning("Item to be queried '{}' does not have a type attribute. Error: {}".format( self.get_iattr_value(item.conf, ITEM_TAG[0]), err)) return _itemvalue def get_items(self): """ Getting a sorted item list with uzsu config :return: sorted itemlist """ sortedlist = sorted([k.id() for k in self._items.keys()]) finallist = [] for i in sortedlist: finallist.append(self.itemsApi.return_item(i)) return finallist def init_webinterface(self): """" Initialize the web interface for this plugin This method is only needed if the plugin is implementing a web interface """ try: self.mod_http = Modules.get_instance().get_module('http') except Exception: self.mod_http = None if self.mod_http is None: self.logger.error("Plugin '{}': Not initializing the web interface".format(self.get_shortname())) return False import sys if "SmartPluginWebIf" not in list(sys.modules['lib.model.smartplugin'].__dict__): self.logger.warning("Plugin '{}': Web interface needs SmartHomeNG v1.5 and up. Not initializing the web interface".format(self.get_shortname())) return False # set application configuration for cherrypy webif_dir = self.path_join(self.get_plugin_dir(), 'webif') config = { '/': { 'tools.staticdir.root': webif_dir, }, '/static': { 'tools.staticdir.on': True, 'tools.staticdir.dir': 'static' } } # Register the web interface as a cherrypy app self.mod_http.register_webif(WebInterface(webif_dir, self), self.get_shortname(), config, self.get_classname(), self.get_instance_name(), description='') return True # ------------------------------------------ # Webinterface of the plugin # ------------------------------------------ import cherrypy from jinja2 import Environment, FileSystemLoader class WebInterface(SmartPluginWebIf): def __init__(self, webif_dir, plugin): """ Initialization of instance of class WebInterface :param webif_dir: directory where the webinterface of the plugin resides :param plugin: instance of the plugin :type webif_dir: str :type plugin: object """ self.logger = logging.getLogger(__name__) self.webif_dir = webif_dir self.plugin = plugin self.tplenv = self.init_template_environment() @cherrypy.expose def index(self, action=None, item_id=None, item_path=None, reload=None): """ Build index.html for cherrypy Render the template and return the html file to be delivered to the browser :return: contents of the template after beeing rendered """ item = self.plugin.get_sh().return_item(item_path) tmpl = self.tplenv.get_template('index.html') # add values to be passed to the Jinja2 template eg: tmpl.render(p=self.plugin, interface=interface, ...) return tmpl.render(p=self.plugin, language=self.plugin._sh.get_defaultlanguage(), now=self.plugin.shtime.now())