#!/usr/bin/env python3 """ =head1 NAME arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85 and compatible cable modems =head1 DESCRIPTION Connect to the web-frontend and get current DOCSIS status of upstream and downstream channels. (Signal Power, SNR, Lock Status) =head1 REQUIREMENTS =over 4 =item BeautifulSoup =item pycryptodome =back =head1 CONFIGURATION =head2 Example [arris] env.url env.username admin env.password yourpassword =head2 Parameters url - URL to web-frontend username - defaults to "admin" password - valid password =head1 REFERENCES https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/ =head1 AUTHOR Copyright (c) 2019 Daniel Hiepler Copyright (c) 2004-2009 Nicolas Stransky Copyright (c) 2018 Lars Kruse =head1 LICENSE Permission to use, copy, and modify this software with or without fee is hereby granted, provided that this entire notice is included in all source code copies of any software which is or includes a copy or modification of this software. THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR PURPOSE. =head1 MAGIC MARKERS #%# family=contrib =cut """ import binascii from bs4 import BeautifulSoup from Crypto.Cipher import AES import hashlib import json import re import requests import sys import os """ The CREDENTIAL_COOKIE below equals the following: base64.encodebytes(b'{ "unique":"280oaPSLiF", "family":"852", "modelname":"TG2492LG-85", ' '"name":"technician", "tech":true, "moca":0, "wifi":5, "conType":"WAN", ' '"gwWan":"f", "DefPasswdChanged":"YES" }').decode() """ CREDENTIAL_COOKIE = "eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiVEcy"\ "NDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOjAsICJ3"\ "aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2hhbmdlZCI6"\ "IllFUyIgfQ==" def login(session, url, username, password): """login to """ # get login page r = session.get(f"{url}") # parse HTML h = BeautifulSoup(r.text, "lxml") # get session id from javascript in head current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1] # encrypt password salt = os.urandom(8) iv = os.urandom(8) key = hashlib.pbkdf2_hmac( 'sha256', bytes(password.encode("ascii")), salt, iterations=1000, dklen=128 / 8 ) secret = {"Password": password, "Nonce": current_session_id} plaintext = bytes(json.dumps(secret).encode("ascii")) associated_data = "loginPassword" # initialize cipher cipher = AES.new(key, AES.MODE_CCM, iv) # set associated data cipher.update(bytes(associated_data.encode("ascii"))) # encrypt plaintext encrypt_data = cipher.encrypt(plaintext) # append digest encrypt_data += cipher.digest() # return login_data = { 'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"), 'Name': username, 'Salt': binascii.hexlify(salt).decode("ascii"), 'Iv': binascii.hexlify(iv).decode("ascii"), 'AuthData': associated_data } # login r = session.put( f"{url}/php/ajaxSet_Password.php", headers={ "Content-Type": "application/json", "csrfNonce": "undefined" }, data=json.dumps(login_data) ) # parse result result = json.loads(r.text) # success? if result['p_status'] == "Fail": print("login failure", file=sys.stderr) exit(-1) # remember CSRF nonce csrf_nonce = result['nonce'] # prepare headers session.headers.update({ "X-Requested-With": "XMLHttpRequest", "csrfNonce": csrf_nonce, "Origin": f"{url}/", "Referer": f"{url}/" }) # set credentials cookie session.cookies.set("credential", CREDENTIAL_COOKIE) # set session r = session.post(f"{url}/php/ajaxSet_Session.php") def docsis_status(session): """get current DOCSIS status page, parse and return channel data""" r = session.get(f"{url}/php/status_docsis_data.php") # extract json from javascript json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1] json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1] # parse json downstream_data = json.loads(json_downstream_data) upstream_data = json.loads(json_upstream_data) # convert lock status to numeric values for d in [upstream_data, downstream_data]: for c in d: if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked": c['LockStatus'] = 1 else: c['LockStatus'] = 0 return downstream_data, upstream_data # ----------------------------------------------------------------------------- if __name__ == "__main__": # get config url = os.getenv("url") username = os.getenv("username") password = os.getenv("password") # validate config if not url or not username or not password: print("Set url, username and password first.", file=sys.stderr) exit(1) # create session session = requests.Session() # login with username and password login(session, url, username, password) # get DOCSIS status downstream, upstream = docsis_status(session) # prepare munin graph info graph_descriptions = [ { "name": "up_signal", "title": "DOCSIS Upstream signal strength", "vlabel": "dBmV", "info": "DOCSIS upstream signal strength by channel", "data": upstream, "key": "PowerLevel" }, { "name": "up_lock", "title": "DOCSIS Upstream lock", "vlabel": "locked", "info": "DOCSIS upstream channel lock status", "data": upstream, "key": "LockStatus" }, { "name": "down_signal", "title": "DOCSIS Downstream signal strength", "vlabel": "dBmV", "info": "DOCSIS downstream signal strength by channel", "data": downstream, "key": "PowerLevel" }, { "name": "down_lock", "title": "DOCSIS Downstream lock", "vlabel": "locked", "info": "DOCSIS downstream channel lock status", "data": downstream, "key": "LockStatus" }, { "name": "down_snr", "title": "DOCSIS Downstream signal/noise ratio", "vlabel": "dB", "info": "SNR/MER", "data": downstream, "key": "SNRLevel" } ] # configure ? if len(sys.argv) > 1 and "config" == sys.argv[1]: # process all graphs for g in graph_descriptions: # graph config print(f"multigraph docsis_{g['name']}") print(f"graph_title {g['title']}") print("graph_category network") print(f"graph_vlabel {g['vlabel']}") print(f"graph_info {g['info']}") print("graph_scale no") # channels for c in g['data']: # only use channels with PowerLevel if not c['PowerLevel']: continue info_text = f"Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}" print(f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)") print(f"channel_{c['ChannelID']}.info {info_text}") # output values ? else: # process all graphs for g in graph_descriptions: print(f"multigraph docsis_{g['name']}") # channels for c in g['data']: # only use channels with PowerLevel if not c['PowerLevel']: continue print(f"channel_{c['ChannelID']}.value {c[g['key']]}")