# -*- coding: utf-8 -*- """ Support for DLNA DMR (Device Media Renderer) """ import asyncio import functools import logging import re import urllib.parse import xml.etree.ElementTree as ET from datetime import datetime from datetime import timedelta import aiohttp import async_timeout import voluptuous as vol import homeassistant.helpers.config_validation as cv from homeassistant.components.http.view import ( request_handler_factory, HomeAssistantView) from homeassistant.components.media_player import ( SUPPORT_PLAY, SUPPORT_PAUSE, SUPPORT_STOP, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_PLAY_MEDIA, SUPPORT_PREVIOUS_TRACK, SUPPORT_NEXT_TRACK, MediaPlayerDevice, PLATFORM_SCHEMA) from homeassistant.const import ( EVENT_HOMEASSISTANT_STOP, CONF_URL, CONF_NAME, STATE_OFF, STATE_ON, STATE_IDLE, STATE_PLAYING, STATE_PAUSED) from homeassistant.helpers.aiohttp_client import async_get_clientsession REQUIREMENTS = ['async_upnp_client==0.10.0'] DEFAULT_NAME = 'DLNA_DMR' CONF_MAX_VOLUME = 'max_volume' CONF_PICKY_DEVICE = 'picky_device' PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ vol.Required(CONF_URL): cv.string, vol.Optional(CONF_NAME): cv.string, vol.Optional(CONF_MAX_VOLUME): cv.positive_int, vol.Optional(CONF_PICKY_DEVICE): cv.boolean, }) NS = { 'didl_lite': 'urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/', 'dc': 'http://purl.org/dc/elements/1.1/', } SERVICE_TYPES = { 'RC': 'urn:schemas-upnp-org:service:RenderingControl:1', 'AVT': 'urn:schemas-upnp-org:service:AVTransport:1', } HOME_ASSISTANT_UPNP_CLASS_MAPPING = { 'music': 'object.item.audioItem', 'tvshow': 'object.item.videoItem', 'video': 'object.item.videoItem', 'episode': 'object.item.videoItem', 'channel': 'object.item.videoItem', 'playlist': 'object.item.playlist', } UPNP_DEVICE_MEDIA_RENDERER = 'urn:schemas-upnp-org:device:MediaRenderer:1' _LOGGER = logging.getLogger(__name__) def requires_action(service_type, action_name, value_not_connected=None): """Raise NotImplemented() if connected but service/action not available.""" def call_wrapper(func): """Call wrapper for decorator""" @functools.wraps(func) def wrapper(self, *args, **kwargs): """ Require device is connected and has service/action. If device is not connected, value_not_connected is returned. """ # pylint: disable=protected-access if not self._is_connected: return value_not_connected service = self._service(service_type) if not service: _LOGGER.error('requires_state_variable(): ' '%s.%s: no service: %s', self, func.__name__, service_type) raise NotImplementedError() action = service.action(action_name) if not action: _LOGGER.error('requires_action(): %s.%s: no action: %s.%s', self, func.__name__, service_type, action_name) raise NotImplementedError() return func(self, action, *args, **kwargs) return wrapper return call_wrapper def requires_state_variable(service_type, state_variable_name, value_not_connected=None): """ Raise NotImplemented() if connected but service/state_variable not available. """ def call_wrapper(func): """Call wrapper for decorator.""" @functools.wraps(func) def wrapper(self, *args, **kwargs): """ Require device is connected and has service/state_variable. If device is not connected, value_not_connected is returned. """ # pylint: disable=protected-access if not self._is_connected: return value_not_connected service = self._service(service_type) if not service: _LOGGER.error('requires_state_variable(): ' '%s.%s: no service: %s', self, func.__name__, service_type) raise NotImplementedError() state_var = service.state_variable(state_variable_name) if not state_var: _LOGGER.error('requires_state_variable(): ' '%s.%s: no state_variable: %s.%s', self, func.__name__, service_type, state_variable_name) raise NotImplementedError() return func(self, state_var, *args, **kwargs) return wrapper return call_wrapper def start_notify_view(hass): """Register notify view.""" hass_data = hass.data[__name__] name = 'notify_view' if name in hass_data: return hass_data[name] view = UpnpNotifyView(hass) hass_data[name] = view hass.http.register_view(view) return view def start_proxy_view(hass): """Register proxy view.""" hass_data = hass.data[__name__] name = 'proxy_view' if name in hass_data: return hass_data[name] view = PickyDeviceProxyView(hass) hass_data[name] = view hass.http.register_view(view) return view def setup_platform(hass, config, add_devices, discovery_info=None): """Set up DLNA DMR platform.""" if discovery_info and \ 'upnp_device_type' in discovery_info and \ discovery_info['upnp_device_type'] != UPNP_DEVICE_MEDIA_RENDERER: _LOGGER.debug('Device is not a MediaRenderer: %s', discovery_info.get('ssdp_description')) return is_picky = False if config.get(CONF_URL) is not None: url = config.get(CONF_URL) name = config.get(CONF_NAME) elif discovery_info is not None: url = discovery_info['ssdp_description'] name = discovery_info['name'] # Samsung TVs are particular picky with regard to their sources manufacturer = discovery_info.get('manufacturer', '').lower() is_samsung = 'samsung' in manufacturer is_tv = 'tv' in discovery_info.get('name', '').lower() is_picky = is_samsung and is_tv cfg_extra = { CONF_MAX_VOLUME: config.get(CONF_MAX_VOLUME), CONF_PICKY_DEVICE: config.get(CONF_PICKY_DEVICE) or is_picky, } # set up our Views, if not already done so if __name__ not in hass.data: hass.data[__name__] = {} hass.async_run_job(start_notify_view, hass) hass.async_run_job(start_proxy_view, hass) from async_upnp_client import UpnpFactory requester = HassUpnpRequester(hass) factory = UpnpFactory(requester) device = DlnaDmrDevice(hass, url, name, factory, **cfg_extra) _LOGGER.debug("Adding device: %s", device) add_devices([device]) @asyncio.coroutine def fetch_headers(hass, url, headers): """Fetch headers from URL, first by trying HEAD, then by trying a GET.""" # try a HEAD request to the source src_response = None try: session = async_get_clientsession(hass) src_response = yield from session.head(url, headers=headers) yield from src_response.release() except aiohttp.ClientError: pass if src_response and 200 <= src_response.status < 300: return src_response.headers # try a GET request to the source, but ignore all the data session = async_get_clientsession(hass) src_response = yield from session.get(url, headers=headers) yield from src_response.release() return src_response.headers class UpnpNotifyView(HomeAssistantView): """Callback view for UPnP NOTIFY messages""" url = '/api/dlna_dmr.notify' name = 'api:dlna_dmr:notify' requires_auth = False def __init__(self, hass): self.hass = hass self._registered_services = {} self._backlog = {} def register(self, router): """Register the view with a router.""" handler = request_handler_factory(self, self.async_notify) router.add_route('notify', UpnpNotifyView.url, handler) @asyncio.coroutine def async_notify(self, request): """Callback method for NOTIFY requests.""" t_sid = request.headers.get('SID', 'missing') _LOGGER.debug('%s.async_notify(): SID: %s, service: %s', self, t_sid, self._registered_services.get(t_sid)) if 'SID' not in request.headers: return aiohttp.web.Response(status=422) headers = request.headers sid = headers['SID'] body = yield from request.text() # find UpnpService by SID if sid not in self._registered_services: self._backlog[sid] = {'headers': headers, 'body': body} return aiohttp.web.Response(status=202) service = self._registered_services[sid] service.on_notify(headers, body) return aiohttp.web.Response(status=200) @property def callback_url(self): """Full URL to be called by device/service.""" base_url = self.hass.config.api.base_url return urllib.parse.urljoin(base_url, self.url) def register_service(self, sid, service): """ Register a UpnpService under SID. """ _LOGGER.debug('%s.register_service(): SID: %s, service: %s', self, sid, service) if sid in self._registered_services: raise RuntimeError('SID {} already registered.'.format(sid)) self._registered_services[sid] = service if sid in self._backlog: item = self._backlog[sid] service.on_notify(item['headers'], item['body']) del self._backlog[sid] def unregister_service(self, sid): """Unregister service by SID.""" if sid in self._registered_services: del self._registered_services[sid] class PickyDeviceProxyView(HomeAssistantView): """View to serve device""" url = '/api/dlna_dmr.proxy/{key}' proxy_path = '/api/dlna_dmr.proxy' name = 'api:dlna_dmr:proxy' requires_auth = False DLNA_CONTENT_FEATURES = \ 'DLNA.ORG_OP=01;' \ 'DLNA.ORG_CI=0;' \ 'DLNA.ORG_FLAGS=01700000000000000000000000000000' DLNA_TRANSFER_MODE = 'Streaming' def __init__(self, hass): self.hass = hass self._entries = {} def register(self, router): """Register the view with a router.""" handler = request_handler_factory(self, self.async_head) router.add_route('head', self.url, handler) handler = request_handler_factory(self, self.async_get) router.add_route('get', self.url, handler) def _prune_entries(self): """Prune entries older than 24 hours.""" max_age = timedelta(hours=24) now = datetime.now() to_remove = [] for key, entry in self._entries.items(): age = now - entry['added_at'] if age > max_age: to_remove.append(key) for key in to_remove: del self._entries[key] def add_url(self, url): """Add a new URL to the proxy, valid for 24 hours.""" self._prune_entries() import hashlib key = hashlib.sha256(url.encode('utf-8')).hexdigest() self._entries[key] = { 'url': url, 'added_at': datetime.now(), } return key @property def callback_url(self): """Full URL to be called by device/service.""" base_url = self.hass.config.api.base_url return urllib.parse.urljoin(base_url, self.url) @asyncio.coroutine def async_head(self, request, **args): """Handle HEAD request.""" url = None if 'key' in args: key = args['key'] entry = self._entries[key] url = entry['url'] else: return aiohttp.web.Response(body="Missing URL", status=422) src_headers = yield from fetch_headers(self.hass, url, request.headers) headers = { 'Accept-Ranges': 'bytes', 'transferMode.dlna.org': self.DLNA_TRANSFER_MODE, 'contentFeatures.dlna.org': self.DLNA_CONTENT_FEATURES, } headers.update(src_headers) return aiohttp.web.Response(headers=headers) @asyncio.coroutine def async_get(self, request, **args): """Handle GET request.""" url = None if 'key' in args: key = args['key'] entry = self._entries[key] url = entry['url'] else: return aiohttp.web.Response(body="Missing URL", status=422) # get data from source session = async_get_clientsession(self.hass) src_response = yield from session.get(url, headers=request.headers) src_data = yield from src_response.read() headers = { 'Accept-Ranges': 'bytes', 'transferMode.dlna.org': self.DLNA_TRANSFER_MODE, 'contentFeatures.dlna.org': self.DLNA_CONTENT_FEATURES, } headers.update(src_response.headers) if 'range' in request.headers: range_ = request.headers['range'] parts = [int(x) for x in range_.replace('bytes=', '').split('-') if x] from_ = parts[0] to_ = parts[1] if len(parts) == 2 else len(src_data) chunk_size = (to_ - from_) headers['Content-Range'] = 'bytes {}-{}/{}'.format(from_, to_, len(src_data)) headers['Content-Length'] = str(chunk_size) src_data = src_data[from_:to_] return aiohttp.web.Response(body=src_data, status=206, headers=headers) return aiohttp.web.Response(body=src_data, status=200, headers=headers) class HassUpnpRequester(object): """async_upnp_client.UpnpRequester for home-assistant.""" def __init__(self, hass): self.hass = hass @asyncio.coroutine def async_http_request(self, method, url, headers=None, body=None): """Do a HTTP request.""" session = async_get_clientsession(self.hass) with async_timeout.timeout(5, loop=self.hass.loop): response = yield from session.request(method, url, headers=headers, data=body) response_body = yield from response.text() yield from response.release() yield from asyncio.sleep(0.25) return response.status, response.headers, response_body class DlnaDmrDevice(MediaPlayerDevice): """Representation of a DLNA DMR device.""" def __init__(self, hass, url, name, factory, **additional_configuration): self.hass = hass self._url = url self._name = name self._factory = factory self._additional_configuration = additional_configuration self._notify_view = hass.data[__name__]['notify_view'] self._device = None self._is_connected = False hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self._async_on_hass_stop) @property def available(self): """Device is avaiable?""" return self._is_connected @asyncio.coroutine def _async_on_hass_stop(self, event): """Event handler on HASS stop.""" yield from self.async_unsubscribe_all() def _service(self, service_type): """Get UpnpService by service_type or alias.""" if not self._device: return None service_type = SERVICE_TYPES.get(service_type, service_type) return self._device.service(service_type) @asyncio.coroutine def async_unsubscribe_all(self): """ Disconnect from device. This removes all UpnpServices. """ if not self._device: return for service in self._device.services.values(): try: sid = service.subscription_sid if sid: self._notify_view.unregister_service(sid) yield from service.async_unsubscribe(True) except (asyncio.TimeoutError, aiohttp.ClientError): pass @asyncio.coroutine def _async_init_device(self): """Fetch and init services.""" self._device = yield from self._factory.async_create_device(self._url) # set name if self.name is None or self.name == DEFAULT_NAME: self._name = self._device.name # subscribe services for events callback_url = self._notify_view.callback_url for service in self._device.services.values(): service.on_state_variable_change = self.on_state_variable_change sid = yield from service.async_subscribe(callback_url) if sid: self._notify_view.register_service(sid, service) @asyncio.coroutine def async_update(self): """Retrieve the latest data.""" if not self._device: try: yield from self._async_init_device() except (asyncio.TimeoutError, aiohttp.ClientError): # Not yet seen alive, leave for now, gracefully return # call GetTransportInfo/GetPositionInfo regularly try: avt_service = self._service('AVT') if avt_service: get_transport_info_action = \ avt_service.action('GetTransportInfo') state = yield from self._async_poll_transport_info( get_transport_info_action) yield from asyncio.sleep(0.25) if state == STATE_PLAYING or state == STATE_PAUSED: # playing something... get position info get_position_info_action = avt_service.action( 'GetPositionInfo') yield from self._async_poll_position_info( get_position_info_action) else: yield from self._device.async_ping() self._is_connected = True except (asyncio.TimeoutError, aiohttp.ClientError) as ex: _LOGGER.debug('%s.async_update(): error on update: %s', self, ex) self._is_connected = False yield from self.async_unsubscribe_all() @asyncio.coroutine def _async_poll_transport_info(self, action): """Update transport info from device.""" result = yield from action.async_call(InstanceID=0) # set/update state_variable 'TransportState' service = action.service state_var = service.state_variable('TransportState') old_value = state_var.value state_var.value = result['CurrentTransportState'] if old_value != result['CurrentTransportState']: self.on_state_variable_change(service, [state_var]) return self.state @asyncio.coroutine def _async_poll_position_info(self, action): """Update position info""" result = yield from action.async_call(InstanceID=0) service = action.service track_duration = service.state_variable('CurrentTrackDuration') track_duration.value = result['TrackDuration'] time_position = service.state_variable('RelativeTimePosition') time_position.value = result['RelTime'] self.on_state_variable_change(service, [track_duration, time_position]) def on_state_variable_change(self, service, state_variables): """State variable(s) changed, let homeassistant know""" self.schedule_update_ha_state() @property def supported_features(self): """Flag media player features that are supported.""" _LOGGER.debug('%s.supported_features()', self) supported_features = 0 if not self._device: return supported_features rc_service = self._service('RC') _LOGGER.debug('%s.supported_features(): rc_service: %s', self, rc_service) if rc_service: _LOGGER.debug('%s.supported_features(): rc_service: %s, mute: %s', self, rc_service, rc_service.state_variable('Mute')) if rc_service.state_variable('Mute'): supported_features |= SUPPORT_VOLUME_MUTE _LOGGER.debug('%s.supported_features(): rc_service: %s, volume: %s', self, rc_service, rc_service.state_variable('Volume')) if rc_service.state_variable('Volume'): supported_features |= SUPPORT_VOLUME_SET avt_service = self._service('AVT') if avt_service: state_var = avt_service.state_variable('CurrentTransportActions') if state_var: value = state_var.value or '' actions = value.split(',') if 'Play' in actions: supported_features |= SUPPORT_PLAY if 'Stop' in actions: supported_features |= SUPPORT_STOP if 'Pause' in actions: supported_features |= SUPPORT_PAUSE current_track_var = avt_service.state_variable('CurrentTrack') num_tracks_var = avt_service.state_variable('NumberOfTracks') if current_track_var and \ num_tracks_var and \ current_track_var.value is not None and \ num_tracks_var.value is not None: current_track = current_track_var.value num_tracks = num_tracks_var.value if current_track > 1: supported_features |= SUPPORT_PREVIOUS_TRACK if num_tracks > current_track: supported_features |= SUPPORT_NEXT_TRACK play_media_action = avt_service.action('SetAVTransportURI') play_action = avt_service.action('Play') if play_media_action and play_action: supported_features |= SUPPORT_PLAY_MEDIA return supported_features @property @requires_state_variable('RC', 'Volume') def volume_level(self, state_variable): """Volume level of the media player (0..1).""" # pylint: disable=arguments-differ _LOGGER.debug('%s.volume_level(): state_var: %s', self, state_variable) value = state_variable.value if value is None: _LOGGER.debug('%s.volume_level(): Got no value', self) return None override_max = self._additional_configuration.get('max_volume', None) max_value = override_max or state_variable.max_value or 100 return min(value / max_value, 1.0) @asyncio.coroutine @requires_action('RC', 'SetVolume') def async_set_volume_level(self, action, volume): """Set volume level, range 0..1.""" # pylint: disable=arguments-differ _LOGGER.debug('%s.async_set_volume_level(): action: %s, volume: %s', self, action, volume) argument = action.argument('DesiredVolume') state_variable = argument.related_state_variable min_ = state_variable.min_value or 0 override_max = self._additional_configuration.get('max_volume', None) max_ = override_max or state_variable.max_value or 100 desired_volume = int(min_ + volume * (max_ - min_)) yield from action.async_call(InstanceID=0, Channel='Master', DesiredVolume=desired_volume) @property @requires_state_variable('RC', 'Mute') def is_volume_muted(self, state_variable): """Boolean if volume is currently muted.""" # pylint: disable=arguments-differ _LOGGER.debug('%s.is_volume_muted(): state_var: %s', self, state_variable) value = state_variable.value if value is None: _LOGGER.debug('%s.is_volume_muted(): Got no value', self) return None return value @asyncio.coroutine @requires_action('RC', 'SetMute') def async_mute_volume(self, action, mute): """Mute the volume.""" # pylint: disable=arguments-differ _LOGGER.debug('%s.async_set_volume_level(): action: %s, mute: %s', self, action, mute) desired_mute = bool(mute) yield from action.async_call(InstanceID=0, Channel='Master', DesiredMute=desired_mute) @asyncio.coroutine @requires_action('AVT', 'Pause') def async_media_pause(self, action): """Send pause command.""" # pylint: disable=arguments-differ yield from action.async_call(InstanceID=0) @asyncio.coroutine @requires_action('AVT', 'Play') def async_media_play(self, action): """Send play command.""" # pylint: disable=arguments-differ yield from action.async_call(InstanceID=0, Speed='1') @asyncio.coroutine @requires_action('AVT', 'Stop') def async_media_stop(self, action): """Send stop command.""" # pylint: disable=arguments-differ yield from action.async_call(InstanceID=0) @asyncio.coroutine @requires_action('AVT', 'SetAVTransportURI') def async_play_media(self, action, media_type, media_id, **kwargs): """Play a piece of media.""" # pylint: disable=arguments-differ picky_device = self._additional_configuration.get(CONF_PICKY_DEVICE, False) media_info = { 'media_url': media_id, 'upnp_class': HOME_ASSISTANT_UPNP_CLASS_MAPPING[media_type], } src_headers = None try: req_src_headers = { 'GetContentFeatures.dlna.org': '1' } src_headers = yield from fetch_headers(self.hass, media_id, req_src_headers) if 'Content-Type' in src_headers: media_info['mime_type'] = src_headers['Content-Type'] if 'ContentFeatures.dlna.org' in media_info: media_info['dlna_features'] = \ src_headers['contentFeatures.dlna.org'] except aiohttp.ClientError: pass is_dlna_source = src_headers and \ 'contentFeatures.dlna.org' in src_headers if not is_dlna_source: if picky_device: _LOGGER.debug('%s.async_play_media(): detected invalid source,' ' routing through proxy', self) # get proxy url proxy_view = self.hass.data[__name__]['proxy_view'] base_url = self.hass.config.api.base_url proxy_url = urllib.parse.urljoin( base_url, PickyDeviceProxyView.proxy_path) key = proxy_view.add_url(media_id) media_info['media_url'] = '{}/{}'.format(proxy_url, key) media_info['dlna_features'] = \ PickyDeviceProxyView.DLNA_CONTENT_FEATURES else: media_info['dlna_features'] = \ PickyDeviceProxyView.DLNA_CONTENT_FEATURES.replace('17', '00') meta_data = """ Home Assistant {upnp_class} {media_url} """.format(**media_info) yield from action.async_call(InstanceID=0, CurrentURI=media_id, CurrentURIMetaData=meta_data) yield from asyncio.sleep(0.25) # send play command yield from self.async_media_play() yield from asyncio.sleep(0.25) @asyncio.coroutine @requires_action('AVT', 'Previous') def async_media_previous_track(self, action): """Send previous track command.""" # pylint: disable=arguments-differ yield from action.async_call(InstanceID=0) @asyncio.coroutine @requires_action('AVT', 'Next') def async_media_next_track(self, action): """Send next track command.""" # pylint: disable=arguments-differ yield from action.async_call(InstanceID=0) @property @requires_state_variable('AVT', 'CurrentTrackMetaData') def media_title(self, state_variable): """Title of current playing media.""" # pylint: disable=arguments-differ xml = state_variable.value if not xml: return None root = ET.fromstring(xml) title_xml = root.find('.//dc:title', NS) if title_xml is None: return None return title_xml.text @property @requires_state_variable('AVT', 'CurrentTrackMetaData') def media_image_url(self, state_variable): """Image url of current playing media.""" # pylint: disable=arguments-differ xml = state_variable.value if not xml: return None root = ET.fromstring(xml) for res in root.findall('.//didl_lite:res', NS): protocol_info = res.attrib.get('protocolInfo') or '' if protocol_info.startswith('http-get:*:image/'): url = protocol_info.text return url return None @property def state(self): """State of the player.""" if not self._is_connected: return STATE_OFF avt_service = self._service('AVT') if not avt_service: return STATE_ON transport_state = avt_service.state_variable('TransportState') if not transport_state: return STATE_ON elif transport_state.value == 'PLAYING': return STATE_PLAYING elif transport_state.value == 'PAUSED_PLAYBACK': return STATE_PAUSED return STATE_IDLE @property @requires_state_variable('AVT', 'CurrentTrackDuration') def media_duration(self, state_variable): """Duration of current playing media in seconds.""" # pylint: disable=arguments-differ if state_variable is None or state_variable.value is None: return None split = [int(v) for v in re.findall(r"[\w']+", state_variable.value)] delta = timedelta(hours=split[0], minutes=split[1], seconds=split[2]) return delta.seconds @property @requires_state_variable('AVT', 'RelativeTimePosition') def media_position(self, state_variable): """Position of current playing media in seconds.""" # pylint: disable=arguments-differ if state_variable is None or state_variable.value is None: return None split = [int(v) for v in re.findall(r"[\w']+", state_variable.value)] delta = timedelta(hours=split[0], minutes=split[1], seconds=split[2]) return delta.seconds @property @requires_state_variable('AVT', 'RelativeTimePosition') def media_position_updated_at(self, state_variable): """When was the position of the current playing media valid. Returns value from homeassistant.util.dt.utcnow(). """ # pylint: disable=arguments-differ return state_variable.updated_at @property def name(self): """Return the name of the device.""" return self._name @property def unique_id(self) -> str: """Return an unique ID.""" return "{}.{}".format(__name__, self._url) def __str__(self): return "".format(self._url)