#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
a really really basic EPG for TVHeadend
Main information:
https://github.com/speculatrix/tvh_epg
Chromecast information:
https://github.com/speculatrix/tvh_epg/blob/master/CHROMECAST.md
note that not only do you need to download pychromecast from
https://github.com/balloob/pychromecast
and put it into a subdirectory alongside this cgi-bin, but you
need to pip install zeroconf to get it to work.
'''
import cgi
import cgitb
import codecs
import configparser
import datetime
import hashlib
import json
import os
import stat
import sys
import time
import collections
import socket
import urllib
import requests
from requests.auth import HTTPDigestAuth
# chromecast support is optional, and since it needs manually installing
# have to not die if it can't be found
try:
import pychromecast
CAST_SUPPORT = True
except ImportError:
CAST_SUPPORT = False
# pylint:disable=global-statement
# requires making code less readable:
# pylint:disable=too-many-branches
# pylint:disable=too-many-lines
# pylint:disable=too-many-locals
# pylint:disable=too-many-nested-blocks
# pylint:disable=too-many-statements
# broken in pylint3:
# pylint:disable=global-variable-not-assigned
##########################################################################################
URL_GITHUB_HASH_SELF = 'https://api.github.com/repos/speculatrix/tvh_epg/contents/tvh_epg.py'
TS_URL_CHN = 'api/channel/grid'
TS_URL_CTG = 'api/channeltag/grid'
TS_URL_CBE = 'api/dvr/entry/create_by_event'
TS_URL_DCG = 'api/dvr/config/grid'
TS_URL_DEG = 'api/dvr/entry/grid_finished'
TS_URL_EPG = 'api/epg/events/grid'
TS_URL_STC = 'api/status/connections'
TS_URL_STI = 'api/status/inputs'
TS_URL_SVI = 'api/serverinfo'
TS_URL_STR = 'stream/channel'
TS_URL_DVF = 'dvrfile/'
CGI_PARAMS = cgi.FieldStorage()
SECS_P_PIXEL = 10 # how many seconds per pixel
#MAX_FUTURE = 28800 # 8 hours - how far into the future to show a prog
#MAX_FUTURE = 18000 # 5 hours - how far into the future to show a prog
MAX_FUTURE = 14400 # 4 hours - how far into the future to show a prog
#MAX_PAST = 900 # 15 mins - how much of past programs to show
MAX_PAST = 720 # 12 mins - how much of past programs to show
#MAX_PAST = 400 # 15 mins - how much of past programs to show
CHAN_TABLE_COLUMNS = 4
INPUT_FORM_ESCAPE_TABLE = {
'"': """,
# "'": "’", # thanks DavidG
"&": "&",
}
URL_ESCAPE_TABLE = {
" ": "%20",
}
TD_EMPTY_CELL = '
| '
# state files, queues, logs and so on are stored in this directory
CONTROL_DIR = '/var/lib/tvh_epg'
# the settings file is stored in the control directory
SETTINGS_FILE = 'tvh_epg_settings.ini'
SETTINGS_SECTION = 'user'
CHAN_COLUMNS = 'channel_column_count'
MAX_CHANS = 'max_chans'
SH_LOGO = 'sh_ch_logo'
TS_AUTH = 'auth_plain_digest'
TS_PASS = 'ts_pass'
TS_PAUTH = 'ts_pauth'
TS_PROF_STRM = 'profile_strm'
TS_PROF_CAST = 'profile_chromecasting'
TS_URL = 'ts_url'
TS_URL_ICONS = 'ts_url_icons'
TS_URL_CAST = 'ts_url_icon_cast'
TS_USER = 'ts_user'
TITLE = 'title'
DFLT = 'default'
TYPE = 'type'
LOCAL_ICON_DIR = 'local_icon_dir'
ICON_WIDTH = 'forced_icon_width'
ICON_HEIGHT = 'forced_icon_height'
BG_COL_PAGE = 'bg_col_page'
BG_COL_INPUT = 'bg_col_input'
BG_COL_DEF_PAGE = 'f4f4f4'
BG_COL_DEF_INPUT = 'f8f8f8'
# default values of the settings when being created
SETTINGS_DEFAULTS = {
TS_URL: {
TITLE: 'URL of TV Headend Server',
DFLT: 'http://tvh.example.com:9981',
TYPE: 'text',
},
TS_URL_ICONS: {
TITLE: 'URL to picons',
DFLT: 'http://tvh.example.com/TVLogos/',
TYPE: 'text',
},
LOCAL_ICON_DIR: {
TITLE: 'Local icon directory, if set, checks icon file exists
(to avoid broken images)',
DFLT: '/home/hts/TVLogos/',
TYPE: 'text',
},
TS_URL_CAST: {
TITLE: 'URL to chromecast icon',
DFLT: 'http://tvh.example.com/ic_cast_connected_white_24dp.png',
TYPE: 'text',
},
TS_USER: {
TITLE: 'Username on TVH server',
DFLT: TS_USER,
TYPE: 'text',
},
TS_PASS: {
TITLE: 'Password on TVH server',
DFLT: TS_PASS,
TYPE: 'password',
},
TS_AUTH: {
TITLE: 'Authentication, plain or digest',
DFLT: 'plain',
TYPE: 'text'
},
TS_PAUTH: {
TITLE: 'Persistent Auth Token',
DFLT: TS_PAUTH,
TYPE: 'password',
},
TS_PROF_STRM: {
TITLE: 'profile for streaming',
DFLT: 'default',
TYPE: 'text',
},
TS_PROF_CAST: {
TITLE: 'profile for chromecasting',
DFLT: 'chromecast',
TYPE: 'text',
},
SH_LOGO: {
TITLE: 'Show Channel Logos',
DFLT: '0',
TYPE: 'text',
},
MAX_CHANS: {
TITLE: 'Maximum Number Of Channels',
DFLT: '500',
TYPE: 'text',
},
CHAN_COLUMNS: {
TITLE: 'Columns In Channel Table',
DFLT: '4',
TYPE: 'text',
},
ICON_HEIGHT: {
TITLE: 'Force icon height to this, 0 for off',
DFLT: '64',
TYPE: 'text',
},
ICON_WIDTH: {
TITLE: 'Force icon width to this, 0 for off',
DFLT: '80',
TYPE: 'text',
},
BG_COL_PAGE: {
TITLE: 'page background colour',
DFLT: BG_COL_DEF_PAGE,
TYPE: 'text',
},
BG_COL_INPUT: {
TITLE: 'input field background colour',
DFLT: BG_COL_DEF_INPUT,
TYPE: 'text',
},
}
DOCROOT_DEFAULT = '/home/hts'
##########################################################################################
def check_load_config_file():
# pylint:disable=too-many-return-statements
'''check there's a config file which is writable;
returns 0 if OK, -1 if the rest of the page should be aborted,
> 0 to trigger rendering of the settings page'''
global CONFIG_FILE_NAME
global MY_SETTINGS
# who am i?
my_euser_id = os.geteuid()
my_egroup_id = os.getegid()
config_bad = 1
################################################
# verify that CONTROL_DIR exists and is writable
try:
cdir_stat = os.stat(CONTROL_DIR)
except OSError:
error_text = f'''Error, directory "{CONTROL_DIR }" doesn\'t appear to exist.
Please do the following - needs root:
\tsudo mkdir "{ CONTROL_DIR }" && sudo chgrp { str(my_egroup_id) } "{ CONTROL_DIR }" && sudo chmod g+ws "{ CONTROL_DIR }"'''
config_bad = -1
return (config_bad,
error_text) # error so severe, no point in continuing
# owned by me and writable by me, or same group as me and writable through that group?
if ((cdir_stat.st_uid == my_euser_id and
(cdir_stat.st_mode & stat.S_IWUSR) != 0)
or (cdir_stat.st_gid == my_egroup_id and
(cdir_stat.st_mode & stat.S_IWGRP) != 0)):
#print 'OK, %s exists and is writable' % CONTROL_DIR
config_bad = 0
else:
error_text = '''Error, won\'t be able to write to directory "%s".
Please do the following:
\tsudo chgrp %s "%s" && sudo chmod g+ws "%s"''' \
% (CONTROL_DIR, str(my_egroup_id), CONTROL_DIR, CONTROL_DIR, )
return (-1, error_text) # error so severe, no point in continuing
########
# verify the settings file exists and is writable
if not os.path.isfile(CONFIG_FILE_NAME):
error_text = '''Error, can\'t open "%s" for reading.
Please do the following - needs root:
\tsudo touch "%s" && sudo chgrp %s "%s" && sudo chmod g+w "%s"''' \
% (CONFIG_FILE_NAME, CONFIG_FILE_NAME, str(my_egroup_id), CONFIG_FILE_NAME, CONFIG_FILE_NAME)
return (-1, error_text)
# owned by me and writable by me, or same group as me and writable through that group?
config_stat = os.stat(CONFIG_FILE_NAME)
if ((config_stat.st_uid == my_euser_id and
(config_stat.st_mode & stat.S_IWUSR) != 0)
or (config_stat.st_gid == my_egroup_id and
(config_stat.st_mode & stat.S_IWGRP) != 0)):
config_bad = 0
else:
error_text = '''Error, won\'t be able to write to file "%s"
Please do the following - needs root:
\tsudo chgrp %s "%s" && sudo chmod g+w %s''' \
% (CONFIG_FILE_NAME, CONFIG_FILE_NAME, my_egroup_id, CONFIG_FILE_NAME, )
return (-1, error_text)
# file is zero bytes?
if config_stat.st_size == 0:
error_text = 'Config file is empty, please go to settings and submit to save\n'
return (1, error_text)
if not MY_SETTINGS.read(CONFIG_FILE_NAME):
error_text = ('Error, failed to open and read config file "%s"' \
% (CONFIG_FILE_NAME, ))
return (-1, error_text)
return (0, 'OK')
##########################################################################################
def get_github_hash_self():
"""calculates the git hash of the version of this script in github"""
gh_resp = requests.get(URL_GITHUB_HASH_SELF)
gh_json = gh_resp.json()
return gh_json['sha']
##########################################################################################
def get_githash_self():
"""calculates the git hash of the running script"""
# stat this file
fullfile_name = __file__
fullfile_stat = os.stat(fullfile_name)
# read this entire file into memory
fullfile_content = ''
with open(fullfile_name, 'rb') as fullfile_fh:
fullfile_content = fullfile_fh.read()
# do what "git hash-object" does
sha_obj = hashlib.sha1()
sha_obj.update(b'blob %d\0' % fullfile_stat.st_size)
sha_obj.update(fullfile_content)
return sha_obj.hexdigest()
##########################################################################################
def epoch_to_human_duration(epoch_time):
'''takes numeric sec since unix epoch and returns humanly readable time'''
#return time.asctime(time.localtime(epoch_time))
human_dt = datetime.datetime.fromtimestamp(epoch_time)
return human_dt.strftime("%H:%M")
##########################################################################################
def epoch_to_human_date(epoch_time):
'''takes numeric sec since unix epoch and returns humanly readable time'''
#return time.asctime(time.localtime(epoch_time))
human_dt = datetime.datetime.fromtimestamp(epoch_time)
return human_dt.strftime("%d-%m-%Y %H:%M")
##########################################################################################
#def load_channel_dict_from_cache():
# '''load channel dict from cache file - FIXME'''
#
##########################################################################################
#def save_channel_dict_to_cache():
# '''saves channel dict to cache file - FIXME'''
#
##########################################################################################
def get_channeltag_grid():
'''gets the channeltag/grid values'''
global MY_SETTINGS
ts_url = MY_SETTINGS.get(SETTINGS_SECTION, TS_URL)
ts_auth = MY_SETTINGS.get(SETTINGS_SECTION, TS_AUTH)
ts_user = MY_SETTINGS.get(SETTINGS_SECTION, TS_USER)
ts_pass = MY_SETTINGS.get(SETTINGS_SECTION, TS_PASS)
ts_query = f'{ ts_url }/{ TS_URL_CTG }'
if ts_auth == 'plain':
ts_response = requests.get(ts_query, auth=(ts_user, ts_pass))
else:
ts_response = requests.get(ts_query, auth=HTTPDigestAuth(ts_user, ts_pass))
print(f'')
if ts_response.status_code != 200:
print(f'Error code { ts_response.status_code }\n{ ts_response.content }
')
return {}
ts_json = json.loads(ts_response.text, strict=False)
#print('%s
' % json.dumps(ts_json, sort_keys=True, \
# indent=4, separators=(',', ': ')) )
return ts_json
##########################################################################################
def get_channel_dict():
'''gets the channel listing and generats an ordered dict by name'''
global MY_SETTINGS
ts_url = MY_SETTINGS.get(SETTINGS_SECTION, TS_URL)
ts_auth = MY_SETTINGS.get(SETTINGS_SECTION, TS_AUTH)
ts_user = MY_SETTINGS.get(SETTINGS_SECTION, TS_USER)
ts_pass = MY_SETTINGS.get(SETTINGS_SECTION, TS_PASS)
ts_max_ch = MY_SETTINGS.get(SETTINGS_SECTION, MAX_CHANS)
ts_query = f'{ ts_url }/{ TS_URL_CHN }?limit={ ts_max_ch }'
if ts_auth == 'plain':
ts_response = requests.get(ts_query, auth=(ts_user, ts_pass))
else:
ts_response = requests.get(ts_query, auth=HTTPDigestAuth(ts_user, ts_pass))
print(f'')
if ts_response.status_code != 200:
print(f'Error code { ts_response.status_code }\n{ ts_response.content }
')
return {}
ts_text = ts_response.text
#print(f'Extreme Debug!\n\n{ ts_text }\n')
ts_json = json.loads(ts_text, strict=False)
#print('%s
' % json.dumps(ts_json, sort_keys=True, \
# indent=4, separators=(',', ': ')) )
channel_map = {} # full channel info
channel_list = [] # build a list of channel names
ordered_channel_map = collections.OrderedDict()
if 'entries' in ts_json:
# grab all channel info
name_unknown = 0
number_unknown = -1
for entry in ts_json['entries']:
# start building a dict with channel name as key
if 'name' in entry:
channel_name = entry['name']
else:
channel_name = 'unknown ' + str(name_unknown)
name_unknown += 1
channel_list.append(channel_name)
if channel_name not in channel_map:
channel_map[channel_name] = {}
# store the channel specific info
ch_map = channel_map[channel_name]
if 'tags' in entry:
ch_map['tags'] = entry['tags']
if 'number' in entry:
ch_map['number'] = entry['number']
else:
ch_map['number'] = number_unknown
name_unknown -= 1
ch_map['uuid'] = entry['uuid']
if 'icon_public_url' in entry:
ch_map['icon_public_url'] = entry['icon_public_url']
channel_list_sorted = sorted(channel_list, key=lambda s: s.casefold())
# case insensitive sort of channel list
for chan in channel_list_sorted:
# ... produces an ordered dict
#print('adding %s
' % (chan, ))
ordered_channel_map[chan] = channel_map[chan]
return ordered_channel_map
##########################################################################################
def get_dvr_config_grid():
'''gets the dvr/config/grid dict'''
global MY_SETTINGS
ts_url = MY_SETTINGS.get(SETTINGS_SECTION, TS_URL)
ts_auth = MY_SETTINGS.get(SETTINGS_SECTION, TS_AUTH)
ts_user = MY_SETTINGS.get(SETTINGS_SECTION, TS_USER)
ts_pass = MY_SETTINGS.get(SETTINGS_SECTION, TS_PASS)
ts_query = '%s/%s' % (
ts_url,
TS_URL_DCG,
)
if ts_auth == 'plain':
ts_response = requests.get(ts_query, auth=(ts_user, ts_pass))
else:
ts_response = requests.get(ts_query, auth=HTTPDigestAuth(ts_user, ts_pass))
print(f'')
ts_json = json.loads(ts_response.text, strict=False)
#print('%s
' % json.dumps(ts_json, sort_keys=True, \
# indent=4, separators=(',', ': ')) )
return ts_json
##########################################################################################
def html_page_footer():
'''no surprises'''
print('''