# Copyright (c) 2021 by xfangfang. All Rights Reserved. # # NVA protocol # # Macast Metadata # NVA Protocol # NVAProtocol # darwin,win32,linux # 0.32 # 0.7 # xfangfang # NVA protocol support for Macast. Known as "哔哩必连" v0.31: Fix proxy related problems. import re import os import urllib import socket import cheroot.server import cherrypy from cherrypy import _cpnative_server, Tool import email.utils import logging import threading import struct import json import time from urllib import parse import errno import requests from lxml import etree import select from macast.protocol import DLNAProtocol, DLNAHandler, Protocol from macast.renderer import Renderer from macast.utils import SETTING_DIR, XMLPath, load_xml, Setting LF = b'\n' CRLF = b'\r\n' TAB = b'\t' SPACE = b' ' COLON = b':' SEMICOLON = b';' EMPTY = b'' ASTERISK = b'*' FORWARD_SLASH = b'/' QUOTED_SLASH = b'%2F' QUOTED_SLASH_REGEX = re.compile(b''.join((b'(?i)', QUOTED_SLASH))) COMMAND = b'Command' SEND_CMD = b'\xe0' RET_CMD = b'\xc0' PING = b'\xe4' NVA_SERVICE = """ 1 0 GetAppInfo PackageName out A_ARG_TYPE_PackageName AppKey out A_ARG_TYPE_AppKey Signature out A_ARG_TYPE_Signature CurrentSignedIn out SignedIn LoginWithCode Code in A_ARG_TYPE_UnlimitedString PrepareForMirrorProjection ScreenWidth out A_ARG_TYPE_ScreenResolution ScreenHeight out A_ARG_TYPE_ScreenResolution PushUrl out A_ARG_TYPE_UnlimitedString SetDanmakuSwitch DesiredSwitch in DanmakuSwitch AppendDanmaku Content in A_ARG_TYPE_UnlimitedString Size in A_ARG_TYPE_UnlimitedInt Type in A_ARG_TYPE_UnlimitedInt Color in A_ARG_TYPE_UnlimitedInt DanmakuId in A_ARG_TYPE_UnlimitedString Action in A_ARG_TYPE_UnlimitedString GetPlayInfo Params in A_ARG_TYPE_UnlimitedString Content out A_ARG_TYPE_UnlimitedString GetAccountInfo VipInfo out A_ARG_TYPE_UnlimitedInt SwitchQuality Qn in A_ARG_TYPE_UnlimitedInt A_ARG_TYPE_UnlimitedString string A_ARG_TYPE_PackageName string com.xiaodianshi.tv.yst A_ARG_TYPE_AppKey string 0000000000000000 A_ARG_TYPE_Signature string 0000000000000000 A_ARG_TYPE_UnlimitedInt i4 A_ARG_TYPE_ScreenResolution ui4 SignedIn boolean 1 DanmakuSwitch boolean """.encode() logger = logging.getLogger("NVAPRotocol") logger.setLevel(logging.INFO) # NVAHTTPServer 替换cherrypy中的 CPHTTPServer 负责适配NVA协议 # 主要适配代码位于 NVAHTTPRequest 中 # 主要修改的内容是,适配NVA的协议头:NVA/1.0(等同HTTP看待) class NVAHTTPRequest(cheroot.server.HTTPRequest): def __init__(self, server, conn, proxy_mode=False, strict_mode=True): super(NVAHTTPRequest, self).__init__(server, conn, proxy_mode, strict_mode) # NVA PATCH: Add a variable indicating whether it is NVA protocol self.is_nva = False def read_request_line(self): """Read and parse first line of the HTTP request. Returns: bool: True if the request line is valid or False if it's malformed. """ # HTTP/1.1 connections are persistent by default. If a client # requests a page, then idles (leaves the connection open), # then rfile.readline() will raise socket.error("timed out"). # Note that it does this based on the value given to settimeout(), # and doesn't need the client to request or acknowledge the close # (although your TCP stack might suffer for it: cf Apache's history # with FIN_WAIT_2). request_line = self.rfile.readline() # Set started_request to True so communicate() knows to send 408 # from here on out. self.started_request = True if not request_line: return False if request_line == CRLF: # RFC 2616 sec 4.1: "...if the server is reading the protocol # stream at the beginning of a message and receives a CRLF # first, it should ignore the CRLF." # But only ignore one leading line! else we enable a DoS. request_line = self.rfile.readline() if not request_line: return False if not request_line.endswith(CRLF): self.simple_response( '400 Bad Request', 'HTTP requires CRLF terminators', ) return False try: method, uri, req_protocol = request_line.strip().split(SPACE, 2) # NVA PATCH: Fit for NVA protocol if b'NVA' in req_protocol: req_protocol = req_protocol.replace(b'NVA/1.0', b'HTTP/1.0') self.is_nva = True # let the http server forget the socket when first request is done self.close_connection = True self.conn.linger = True if not req_protocol.startswith(b'HTTP/'): self.simple_response( '400 Bad Request', 'Malformed Request-Line: bad protocol', ) return False rp = req_protocol[5:].split(b'.', 1) if len(rp) != 2: self.simple_response( '400 Bad Request', 'Malformed Request-Line: bad version', ) return False rp = tuple(map(int, rp)) # Minor.Major must be threat as integers if rp > (1, 1): self.simple_response( '505 HTTP Version Not Supported', 'Cannot fulfill request', ) return False except (ValueError, IndexError): self.simple_response('400 Bad Request', 'Malformed Request-Line') return False self.uri = uri self.method = method.upper() if self.strict_mode and method != self.method: resp = ( 'Malformed method name: According to RFC 2616 ' '(section 5.1.1) and its successors ' 'RFC 7230 (section 3.1.1) and RFC 7231 (section 4.1) ' 'method names are case-sensitive and uppercase.' ) self.simple_response('400 Bad Request', resp) return False try: scheme, authority, path, qs, fragment = urllib.parse.urlsplit(uri) except UnicodeError: self.simple_response('400 Bad Request', 'Malformed Request-URI') return False uri_is_absolute_form = (scheme or authority) if self.method == b'OPTIONS': # TODO: cover this branch with tests path = ( uri # https://tools.ietf.org/html/rfc7230#section-5.3.4 if (self.proxy_mode and uri_is_absolute_form) else path ) elif self.method == b'CONNECT': # TODO: cover this branch with tests if not self.proxy_mode: self.simple_response('405 Method Not Allowed') return False # `urlsplit()` above parses "example.com:3128" as path part of URI. # this is a workaround, which makes it detect netloc correctly uri_split = urllib.parse.urlsplit(b''.join((b'//', uri))) _scheme, _authority, _path, _qs, _fragment = uri_split _port = EMPTY try: _port = uri_split.port except ValueError: pass # FIXME: use third-party validation to make checks against RFC # the validation doesn't take into account, that urllib parses # invalid URIs without raising errors # https://tools.ietf.org/html/rfc7230#section-5.3.3 invalid_path = ( _authority != uri or not _port or any((_scheme, _path, _qs, _fragment)) ) if invalid_path: self.simple_response( '400 Bad Request', 'Invalid path in Request-URI: request-' 'target must match authority-form.', ) return False authority = path = _authority scheme = qs = fragment = EMPTY else: disallowed_absolute = ( self.strict_mode and not self.proxy_mode and uri_is_absolute_form ) if disallowed_absolute: # https://tools.ietf.org/html/rfc7230#section-5.3.2 # (absolute form) """Absolute URI is only allowed within proxies.""" self.simple_response( '400 Bad Request', 'Absolute URI not allowed if server is not a proxy.', ) return False invalid_path = ( self.strict_mode and not uri.startswith(FORWARD_SLASH) and not uri_is_absolute_form ) if invalid_path: # https://tools.ietf.org/html/rfc7230#section-5.3.1 # (origin_form) and """Path should start with a forward slash.""" resp = ( 'Invalid path in Request-URI: request-target must contain ' 'origin-form which starts with absolute-path (URI ' 'starting with a slash "/").' ) self.simple_response('400 Bad Request', resp) return False if fragment: self.simple_response( '400 Bad Request', 'Illegal #fragment in Request-URI.', ) return False if path is None: # FIXME: It looks like this case cannot happen self.simple_response( '400 Bad Request', 'Invalid path in Request-URI.', ) return False # Unquote the path+params (e.g. "/this%20path" -> "/this path"). # https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2 # # But note that "...a URI must be separated into its components # before the escaped characters within those components can be # safely decoded." https://www.ietf.org/rfc/rfc2396.txt, sec 2.4.2 # Therefore, "/this%2Fpath" becomes "/this%2Fpath", not # "/this/path". try: # TODO: Figure out whether exception can really happen here. # It looks like it's caught on urlsplit() call above. atoms = [ urllib.parse.unquote_to_bytes(x) for x in QUOTED_SLASH_REGEX.split(path) ] except ValueError as ex: self.simple_response('400 Bad Request', ex.args[0]) return False path = QUOTED_SLASH.join(atoms) if not path.startswith(FORWARD_SLASH): path = FORWARD_SLASH + path if scheme is not EMPTY: self.scheme = scheme self.authority = authority self.path = path # Note that, like wsgiref and most other HTTP servers, # we "% HEX HEX"-unquote the path but not the query string. self.qs = qs # Compare request and server HTTP protocol versions, in case our # server does not support the requested protocol. Limit our output # to min(req, server). We want the following output: # request server actual written supported response # protocol protocol response protocol feature set # a 1.0 1.0 1.0 1.0 # b 1.0 1.1 1.1 1.0 # c 1.1 1.0 1.0 1.0 # d 1.1 1.1 1.1 1.1 # Notice that, in (b), the response will be "HTTP/1.1" even though # the client only understands 1.0. RFC 2616 10.5.6 says we should # only return 505 if the _major_ version is different. sp = int(self.server.protocol[5]), int(self.server.protocol[7]) if sp[0] != rp[0]: self.simple_response('505 HTTP Version Not Supported') return False self.request_protocol = req_protocol # NVA PATCH: Set the correct protocol name to self.response_protocol if self.is_nva: self.response_protocol = 'NVA/%s.%s' % min(rp, sp) else: self.response_protocol = 'HTTP/%s.%s' % min(rp, sp) return True def send_headers(self): # noqa: C901 # FIXME """Assert, process, and send the HTTP response message-headers. You must set ``self.status``, and :py:attr:`self.outheaders ` before calling this. """ hkeys = [key.lower() for key, value in self.outheaders] status = int(self.status[:3]) if status == 413: # Request Entity Too Large. Close conn to avoid garbage. self.close_connection = True elif b'content-length' not in hkeys: # "All 1xx (informational), 204 (no content), # and 304 (not modified) responses MUST NOT # include a message-body." So no point chunking. if status < 200 or status in (204, 205, 304): pass else: needs_chunked = ( self.response_protocol == 'HTTP/1.1' and self.method != b'HEAD' ) if needs_chunked: # Use the chunked transfer-coding self.chunked_write = True self.outheaders.append((b'Transfer-Encoding', b'chunked')) else: # Closing the conn is the only way to determine len. self.close_connection = True # Override the decision to not close the connection if the connection # manager doesn't have space for it. if not self.close_connection: can_keep = self.server.can_add_keepalive_connection self.close_connection = not can_keep if b'connection' not in hkeys: if self.response_protocol == 'HTTP/1.1': # Both server and client are HTTP/1.1 or better if self.close_connection: self.outheaders.append((b'Connection', b'close')) else: # Server and/or client are HTTP/1.0 if not self.close_connection: self.outheaders.append((b'Connection', b'Keep-Alive')) if (b'Connection', b'Keep-Alive') in self.outheaders: self.outheaders.append(( b'Keep-Alive', u'timeout={connection_timeout}'. format(connection_timeout=self.server.timeout). encode('ISO-8859-1'), )) if (not self.close_connection) and (not self.chunked_read): # Read any remaining request body data on the socket. # "If an origin server receives a request that does not include an # Expect request-header field with the "100-continue" expectation, # the request includes a request body, and the server responds # with a final status code before reading the entire request body # from the transport connection, then the server SHOULD NOT close # the transport connection until it has read the entire request, # or until the client closes the connection. Otherwise, the client # might not reliably receive the response message. However, this # requirement is not be construed as preventing a server from # defending itself against denial-of-service attacks, or from # badly broken client implementations." remaining = getattr(self.rfile, 'remaining', 0) if remaining > 0: self.rfile.read(remaining) if b'date' not in hkeys: self.outheaders.append(( b'Date', email.utils.formatdate(usegmt=True).encode('ISO-8859-1'), )) if b'server' not in hkeys: self.outheaders.append(( b'Server', self.server.server_name.encode('ISO-8859-1'), )) # NVA PATCH: Returns the NVA protocol name if self.is_nva: proto = self.response_protocol.encode('ascii') else: proto = self.server.protocol.encode('ascii') buf = [proto + SPACE + self.status + CRLF] for k, v in self.outheaders: buf.append(k + COLON + SPACE + v + CRLF) buf.append(CRLF) self.conn.wfile.write(EMPTY.join(buf)) class NVAHTTPConnection(cheroot.server.HTTPConnection): RequestHandlerClass = NVAHTTPRequest class NVAHTTPServer(_cpnative_server.CPHTTPServer): ConnectionClass = NVAHTTPConnection # 负责下载的函数 class NetworkManager: proxies = None @staticmethod def GET(url): headers = { "User-Agent": "Macast", "Referer": "https://www.bilibili.com/client", "Origin": "https://www.bilibili.com" } try: data = requests.get(url, headers=headers, proxies=NetworkManager.proxies) return data except OSError as e: # fix proxy error with clash logger.error(f'nva requests.get OSError:{e}') if 'proxy' in str(e): try: NetworkManager.proxies = {'http': None, "https": None} data = requests.get(url, proxies=NetworkManager.proxies) return data except Exception as e: logger.error(f'nva requests.get Exception:{e}') cherrypy.engine.publish('app_notify', 'ERROR', 'Network error') raise Exception('Cannot get any data from network.') # DanmakuManager 负责处理弹幕下载和渲染 # 通过cid下载xml实时弹幕,转换为ass,保存为本地文件 # todo 实现protobuf实时弹幕下载与转换 # todo 实现弹幕文字大小随视频比例变化 class DanmakuManager: @staticmethod def get_danmaku(cid: str, file_path: str): """ :param cid: :param file_path: :param is_portrait: :return: """ # http://www.perlfu.co.uk/projects/asa/ass-specs.doc api = f'https://comment.bilibili.com/{cid}.xml' exist_time = 10 res_x = 638 res_y = 447 lines = res_y // 25 # todo fix layers = [[-1 for _ in range(lines)] for _ in range(4)] def int2color(color_int): """ :return: string, \c&Hbbggrr& """ if color_int == 16777215: # default color #FFFFFF return 'dark', '' color_int = hex(color_int)[2:] if len(color_int) < 6: color_int = '0' * (6 - len(color_int)) + color_int r = int(color_int[0:2], 16) g = int(color_int[2:4], 16) b = int(color_int[4:6], 16) gray = (r * 299 + g * 587 + b * 114) / 1000 border_style = 'dark' if gray < 60: border_style = 'light' color_str = color_int[4:6] + color_int[2:4] + color_int[0:2] return border_style, rf'\c&H{color_str}&' def sec2str(t: float) -> str: sec = int(t) return f'{sec // 3600}:{(sec % 3600) // 60:02d}:{sec % 60:02d}.{int(t * 100) % 100:02d}' def create_postion(danmaku_position_type, layer_index, danmaku_text, font_size, danmaku_start_time) -> str: if layer_index > 1: # 是否是字幕层 layer_index = 1 y = -100 if danmaku_position_type == 5: # 顶部弹幕 layer_index = 2 for index, l in enumerate(layers[layer_index]): if danmaku_start_time < l: continue layers[layer_index][index] = danmaku_start_time + exist_time y = index * 25 break position_str = rf'\an8\pos({int(res_x / 2)},{y})' elif danmaku_position_type == 4: # 底部弹幕 layer_index = 3 for index, l in enumerate(layers[layer_index]): if danmaku_start_time < l: continue layers[layer_index][index] = danmaku_start_time + exist_time y = res_y - index * 25 break position_str = rf'\an2\pos({int(res_x / 2)},{y})' else: # 普通弹幕 length = len(danmaku_text) * font_size for index, l in enumerate(layers[layer_index]): danmaku_text_length = len(danmaku_text) * font_size danmaku_text_time = (res_x * exist_time) / (res_x + danmaku_text_length) if (danmaku_start_time + danmaku_text_time) < l: continue # 找到空位 layers[layer_index][index] = danmaku_start_time + exist_time y = index * 25 # fix font size break position_str = rf'\move({res_x},{y},{-length},{y})' # todo 根据视频大小 动态改变字体大小 font_size = int(font_size) font = '' if font_size == 18: font = r'\fs18' elif font_size == 36: font = r'\fs36' return position_str + font try: danmaku = etree.fromstring(NetworkManager.GET(api).content) danmaku = danmaku.xpath("/i/d") ass = """[Script Info] Title: 弹幕 Original Script: """ + api + """ Script Updated By: https://github.com/xfangfang/Macast-Plugin Update Details: xml to ass ScriptType: V4.00+ Collisions: Normal PlayResX: 638 PlayResY: 447 PlayDepth: 8 Timer: 100.0 WrapStyle: 2 [v4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: dark, sans-serif, 25, &H36FFFFFF, &H36FFFFFF, &H36000000, &H36000000, 1, 0, 0, 0, 100, 100, 0.00, 0.00, 1, 1, 0, 7, 0, 0, 0, 0 Style: light, sans-serif, 25, &H36FFFFFF, &H36FFFFFF, &H36FFFFFF, &H36000000, 1, 0, 0, 0, 100, 100, 0.00, 0.00, 1, 1, 0, 7, 0, 0, 0, 0 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ dms = [] for i in danmaku: a = i.attrib['p'].split(',') a.append(i.text) dms.append(a) dms.sort(key=lambda e: float(e[0])) for data in dms: text = data[-1] danmaku_type = int(data[1]) danmaku_layer = int(data[5]) if danmaku_type < 7: start_time = float(data[0]) end_time = start_time + exist_time position = create_postion(danmaku_type, danmaku_layer, text, int(data[2]), start_time) style, color = int2color(int(data[3])) comment = f'Dialogue: {danmaku_layer},{sec2str(start_time)},{sec2str(end_time)},' + \ f'{style},{data[8]},0000,0000,0000,,{{{position}{color}}}{text}\n' ass += comment with open(file_path, 'w', encoding='utf-8') as f: f.write(ass) except Exception as e: logger.error(f'Error create sub file: {e}') cherrypy.engine.publish('app_notify', 'ERROR', f'Error create sub file: {e}') return None return file_path # NVAConectionHandler 负责保持每个客户端的长连接 # 解析和处理NVA协议数据,定时发送心跳包 class NVAConectionBaseHandler: def __init__(self, conn, req): self.sock = conn self.sock_lock = threading.Lock() self.counter = 0 self.counter_lock = threading.Lock() self.cache = b'' self.session = req.headers.get('Session', '') self.terminated = False self.ping_thread_running = False self.ping_thread = None def start(self): self.ping_thread_running = True self.ping_thread = threading.Thread(target=self.send_ping_thread, name='NVA_PING_THREAD', daemon=False) self.ping_thread.start() def cmd_from_client(self, method, counter, params=None): pass def res_from_client(self, counter, params=None): pass def terminate(self): self.ping_thread_running = False self.terminated = True if self.sock: try: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() except: pass finally: self.sock = None self.ping_thread.join() def send_ping_thread(self, freq=1): logger.info("NVA_PING_THREAD START") self.sock.setblocking(True) while self.ping_thread_running: time.sleep(freq) self.send_ping() logger.info("NVA_PING_THREAD DONE") def send_res(self, counter, data=None): logger.info(f"SEND_RES{counter} data:{data}") if data is None: binary = struct.pack(f">cBI", RET_CMD, 0, counter) else: data = json.dumps(data, separators=(',', ':')).encode() length = len(data) binary = struct.pack(f">cB2I{length}s", RET_CMD, 1, counter, length, data) logger.debug(f'SEND_RES {binary}') self.send(binary) def send_ping(self): logger.info(f'NVA_PING_THREAD send ping {self.counter}') with self.counter_lock: self.counter += 1 binary = struct.pack(f">cBI", PING, 0, self.counter) self.send(binary) def send_cmd(self, cmd, data=None): logger.info(f"SEND_CMD{self.counter} cmd:{cmd} data:{data}") with self.counter_lock: self.counter += 1 cmd = cmd.encode() cmd_length = len(cmd) if data is None: binary = struct.pack(f">cBI2B7sB{cmd_length}s", SEND_CMD, 2, self.counter, 1, 7, COMMAND, cmd_length, cmd) else: data = json.dumps(data, separators=(',', ':')).encode() length = len(data) binary = struct.pack(f">cBI2B7sB{cmd_length}sI{length}s", SEND_CMD, 3, self.counter, 1, 7, COMMAND, cmd_length, cmd, length, data) logger.debug(f"SEND_CMD {binary}") self.send(binary) def send(self, data): with self.sock_lock: try: self.sock.sendall(data) except Exception as e: logger.error(e) logger.error(self) # we will handle these socket related errors when receiving datas. pass def parse_request(self, data): index = 0 self.cache += data data = self.cache length = len(data) while index < length: if length - index < 6: # the smallest package length break if data[index] not in [SEND_CMD[0], RET_CMD[0]]: # wrong header index += 1 continue try: header, all_l, counter = struct.unpack('>cBI', data[index:index + 6]) if data[index] == SEND_CMD[0]: assert all_l in [2, 3] if length - index - 6 < 10: break n, cmd_l, cmd_str, method_l = struct.unpack('>BB7sB', data[index + 6:index + 16]) assert n == 1 and cmd_l == 7 and cmd_str == COMMAND if all_l == 2: # eg: Command GetVolume if length - index - 16 < method_l: break method = struct.unpack(f'>{method_l}s', data[index + 16:index + 16 + method_l])[0] self.cmd_from_client(method.decode(), counter) index += 16 + method_l else: # eg: Command OnProgress {"duration": 422, "position": 64} if length - index - 16 < method_l + 4: break method, param_l = struct.unpack(f'>{method_l}sI', data[index + 16:index + method_l + 20]) if length - index - method_l - 20 < param_l: break params = \ struct.unpack(f'>{param_l}s', data[index + method_l + 20:index + method_l + 20 + param_l])[ 0] try: params = json.loads(params) except Exception as e: # error decode params print(e) else: self.cmd_from_client(method.decode(), counter, params) finally: index += method_l + 20 + param_l else: assert all_l in [0, 1] if all_l == 0: # return nothing self.res_from_client(counter) index += 6 else: # {"volume": 0} if length - index - 6 < 4: break param_l = struct.unpack(f'>I', data[index + 6:index + 10])[0] if length - index - 10 < param_l: break params = struct.unpack(f'>{param_l}s', data[index + 10:index + 10 + param_l])[0] try: params = json.loads(params) except Exception as e: # error decode params print(e) else: self.res_from_client(counter, params) finally: index += 10 + param_l except Exception as e: print(e) index += 1 # try to get next batch of data self.cache = data[index:] class NVAConectionHandler(NVAConectionBaseHandler): def __init__(self, conn, req): super(NVAConectionHandler, self).__init__(conn, req) self.session = req.headers.get('Session', '') self.uuid = req.headers.get('UUID', '') self.playlist = [] self.sub = os.path.join(SETTING_DIR, 'macast.ass') logger.info(f'创建 NVAConectionHandler session:{self.session[-4:]} uuid:{self.uuid[-4:]} {self}') self.aid = '' self.oid = '' self.cid = '' self.epid = '' self.season_id = '' self.access_key = '' self.current_qn = 0 self.desire_qn = 0 self.desire_speed = 1 self.qn = {} self.playurl_type = 1 self.content_type = 1 self.title = '' self.title_p = '' self.is_portrait = False self.danmaku_switch_save = True def __del__(self): logger.info(f'销毁 NVAConectionHandler session:{self.session[-4:]} uuid:{self.uuid[-4:]} {self}') @property def renderer(self) -> Renderer: renderers = cherrypy.engine.publish('get_renderer') if len(renderers) == 0: logger.error("Unable to find an available renderer.") return Renderer() return renderers.pop() @property def protocol(self) -> Protocol: protocols = cherrypy.engine.publish('get_protocol') if len(protocols) == 0: logger.error("Unable to find an available protocol.") return NVAProtocol() return protocols.pop() def get_video_info(self): url = 'https://api.bilibili.com/x/tv/card/view_v2?' params = { # 'access_key': self.access_key, 'auto_play': 0, 'build': 104600, 'card_type': 2 if self.epid != 0 else 1, 'fourk': 0, 'is_ad': 'false', 'mobi_app': 'android_tv_yst', 'object_id': self.season_id if self.epid != 0 else self.oid, 'view_type': 2 # todo 验证参数含义 } url += '&'.join([f'{i}={params[i]}' for i in params]) play_list = [] play_index = 0 try: json_text = NetworkManager.GET(url).text json_obj = json.loads(json_text) if json_obj['code'] != 0: logger.error(f"Error getting video info 1: {json_obj['message']}") cherrypy.engine.publish('app_notify', 'ERROR', json_obj['message']) raise Exception('error') self.title = json_obj['data']['title'] video_list = json_obj['data']['auto_play']['cid_list'] for index, video in enumerate(video_list): title_p = video.get('title', '') if video.get('long_title', '') != '': title_p = video.get('long_title', '') play_list.append({ 'oid': video['playurl_args']['object_id'], 'epid': video['playurl_args']['object_id'], 'cid': video['playurl_args']['cid'], 'aid': video['aid'], 'title': title_p, 'is_portrait': video['is_portrait'] }) if int(self.cid) == int(video['playurl_args']['cid']): play_index = index logger.info(f"当前正在播放 分集{index + 1} {title_p}") self.is_portrait = video['is_portrait'] self.renderer.set_media_title(f'{self.title} {title_p}') except Exception as e: logger.error(f"Error getting video info 2: {e}") cherrypy.engine.publish('app_notify', 'ERROR', f"Error getting video info: {e}") finally: self.protocol.play_list = play_list self.protocol.play_index = play_index def get_video_url(self) -> (str, dict): base_url = 'https://api.bilibili.com/x/tv/playurl?' params = { 'build': 104600, 'is_proj': 1, 'device_type': 1, 'mobi_app': 'android_tv_yst', 'platform': 'android', 'fnval': 0, # 16 | 64 | 128 | 256 # todo 16为dash视频,尝试加载dash视频 'fnver': 0, 'fourk': 1, # allowed 4k 'playurl_type': self.playurl_type, 'protocol': 1, 'cid': self.cid, 'qn': self.desire_qn, 'object_id': self.oid, 'mobile_access_key': self.access_key } url = base_url + '&'.join(f'{i}={params[i]}' for i in params) try: res = NetworkManager.GET(url).text res = json.loads(res) print(res) if res['code'] != 0: error_msg = res.get('message', '') if error_msg != '': cherrypy.engine.publish('app_notify', 'ERROR', error_msg) print(error_msg, params) return None, {} qn_support = {} qn_extras = res['data'].get('qn_extras', []) for i in qn_extras: q = i['qn'] if q not in qn_support: qn_support[q] = {} qn_support[q]['quality'] = q qn_support[q]['needVip'] = i.get('need_vip', False) qn_support[q]['needLogin'] = i.get('need_login', False) support_formats = res['data'].get('support_formats', []) for i in support_formats: q = i['quality'] if q not in qn_support: qn_support[q] = {} qn_support[q]['description'] = i.get('new_description', '') qn_support[q]['displayDesc'] = i.get('display_desc', '') qn_support[q]['superscript'] = "Macast " + i.get('superscript', '') # todo: 专有尾巴 qn_support = [qn_support[i] for i in qn_support] self.current_qn = res['data'].get('quality', 0) self.qn = {"curQn": self.current_qn, "supportQnList": qn_support, "userDesireQn": self.desire_qn } # todo 清晰度切换 durl = res['data']['durl'] if durl: return durl[0]['url'], self.qn # todo: fix error when durl have more than one videos dash = res['data']['dash'] if dash: return dash['video'][0]['base_url'], self.qn except Exception as e: logger.error(f'error getting video urls {e}') cherrypy.engine.publish('app_notify', 'ERROR', f'error getting video urls {e}') return None, {} def send_play_cmd(self, url: str, start='0'): self.protocol.set_state_url(url) self.renderer.set_media_url(url, start) self.renderer.set_media_speed(self.desire_speed) self.renderer.set_media_title(f'{self.title} {self.title_p}') cherrypy.engine.publish('renderer_av_uri', url) # 通知客户端,正在加载 cherrypy.engine.publish('nva-broadcast', 'OnPlayState', {"playState": 3}) self.update_play_state() def get_extra_info(): logger.info("start get_extra_info") DanmakuManager.get_danmaku(self.cid, self.sub) self.renderer.set_media_sub_file({ 'url': self.sub, 'title': '弹幕' }) self.get_video_info() logger.info("end get_extra_info") threading.Thread(target=get_extra_info).start() def update_play_state(self): danmuku_open = True if self.danmaku_switch_save: danmuku_open = bool(self.protocol.get_state_display_subtitle()) self.renderer.set_media_sub_show(danmuku_open) cherrypy.engine.publish('nva-broadcast', 'OnDanmakuSwitch', {'open': danmuku_open}) cherrypy.engine.publish('nva-broadcast', 'OnEpisodeSwitch', {"playItem": {"aid": self.aid, "cid": self.cid, "contentType": self.content_type, "epId": self.epid, "seasonId": self.season_id}, "qnDesc": self.qn, "title": f'{self.title} {self.title_p}'}) cherrypy.engine.publish('nva-broadcast', 'OnQnSwitch', self.qn) current_speed = round(float(self.protocol.get_state_speed()), 2) support_speed = [0.5, 0.75, 1, 1.25, 1.5, 2] cherrypy.engine.publish('nva-broadcast', 'SpeedChanged', {"currSpeed": current_speed, "supportSpeedList": support_speed}) def cmd_from_client(self, method, counter, params=None): logger.info(f'CMD{counter} method:{method} params{params}') if method == 'GetVolume': volume = self.protocol.get_state_volume() self.send_res(counter, {'volume': volume}) elif method == 'SetVolume': volume = params.get('volume', -1) if 0 <= volume <= 100: self.renderer.set_media_volume(volume) elif method == 'Pause': self.send_res(counter) self.renderer.set_media_pause() elif method == 'Resume': self.send_res(counter) self.renderer.set_media_resume() elif method == 'SendDanmaku': # todo 1.发送弹幕,2.通过附字幕实现实时显示 # 1:滚动 # 5:上 # 4:下 # {'size': 25, 'mRemoteDmId': 1184473088, 'content': '感动', 'action': '', 'type': 1, 'color': 16777215} # {'size': 18, 'mRemoteDmId': -1382023168, 'content': '?!', 'action': '', 'type': 4, 'color': 16777215} self.send_res(counter) self.renderer.set_media_text(f'暂未支持弹幕:{params["content"]}', 4000) elif method == 'SwitchDanmaku': self.send_res(counter) self.renderer.set_media_sub_show(False if params['open'] == 'false' else True) elif method == 'SwitchSpeed': speed = float(params['speed']) self.renderer.set_media_speed(speed) self.renderer.set_media_text(f'修改倍速:{speed}X', 2000) elif method == 'SwitchQn': self.send_res(counter) self.desire_qn = params['qn'] url, qn = self.get_video_url() print(f'url: {url}\nqn: {qn}') position = self.protocol.get_state_position() self.send_play_cmd(url, position) elif method == 'Stop': self.send_res(counter) self.renderer.set_media_stop() elif method == 'Play': self.send_res(counter) self.aid = params['aid'] self.oid = params.get('oid', self.aid) self.cid = params['cid'] self.epid = int(params.get('epId', 0)) self.access_key = params.get('accessKey', '') self.current_qn = params.get('userDesireQn', 0) self.desire_qn = params.get('userDesireQn', 0) self.content_type = params.get('contentType', 1) self.season_id = int(params.get('seasonId', 0)) self.playurl_type = 1 self.danmaku_switch_save = params.get('danmakuSwitchSave', True) self.desire_speed = float(params.get('userDesireSpeed', 1)) self.title = '' if int(self.epid) != 0: # 番剧 self.oid = self.epid self.playurl_type = 2 url, qn = self.get_video_url() print(f'url: {url}\nqn: {qn}') self.send_play_cmd(url, params.get('seekTs', 0)) elif method == 'PlayUrl': # todo 开始时seek self.send_res(counter) url = params.get('url', '') title = params.get('title', '') self.title = title video_info = json.loads(parse.parse_qs(url)['nva_ext'][0]) ver = video_info.get('ver', -1) if ver != 2: logger.error("Maybe error in url parse") params = video_info.get('content', {}) self.qn = {"curQn": 0, "supportQnList": [{"description": "", "displayDesc": "", "needLogin": False, "needVip": False, "quality": 0, "superscript": ""}], "userDesireQn": 0} print(params) # todo self.aid = params['aid'] self.oid = params.get('oid', self.aid) self.cid = params['cid'] self.epid = int(params.get('epId', 0)) self.access_key = params.get('accessKey', '') self.current_qn = params.get('userDesireQn', 0) self.desire_qn = params.get('userDesireQn', 0) self.content_type = params.get('contentType', 1) self.playurl_type = 1 self.season_id = int(params.get('seasonId', 0)) self.desire_speed = float(params.get('userDesireSpeed', 1)) # todo danmuku switch save self.danmaku_switch_save = params.get('danmakuSwitchSave', True) if self.epid != 0: # 番剧 self.oid = self.epid self.playurl_type = 2 self.send_play_cmd(url, start=params.get('seekTs', 0)) elif method == 'Seek': self.send_res(counter) position = params.get('seekTs', 0) # second? duration = NVAProtocol.position_to_second(self.protocol.get_state_duration()) cherrypy.engine.publish('nva-broadcast', 'OnProgress', {"duration": duration, "position": position }) position = f'{position // 3600}:{(position % 3600) // 60:02d}:{position % 60:02d}' self.renderer.set_media_position(position) else: self.send_res(counter) def res_from_client(self, counter, params=None): logger.info(f'RET{counter} params:{params}') def terminate(self): super(NVAConectionHandler, self).terminate() # NVATool 负责将tcp长连接从 NVAHTTPServer 中分离,生成 NVAConectionHandler # NVAHandler 修改自 DLNAHandler,负责HTTP服务的处理 # 增加了NVA协议的相关适配,启动时需调用 reload() 设置后端服务为 NVAHTTPServer class NVATool(Tool): """NVA protocol tools for cherrypy """ def __init__(self): Tool.__init__(self, 'before_request_body', self.set_nva_handler) def _setup(self): conf = self._merged_args() hooks = cherrypy.serving.request.hooks p = conf.pop("priority", getattr(self.callable, "priority", self._priority)) hooks.attach(self._point, self.callable, priority=p, **conf) hooks.attach('before_finalize', self.nva_response_header, priority=70) hooks.attach('on_end_request', self.nva_start, priority=70) def set_nva_handler(self, handler_cls=NVAConectionHandler): print("NVATool set_nva_handler") request = cherrypy.serving.request conn = request.rfile.rfile.raw._sock request.nva_handler = handler_cls(conn, request) def nva_start(self): request = cherrypy.request if not hasattr(request, 'nva_handler'): return nva_handler = request.nva_handler request.nva_handler = None delattr(request, 'nva_handler') # By doing this we detach the socket from the CherryPy stack avoiding memory leaks request.rfile.rfile.detach() print("NVATool nva_start", nva_handler, request.remote.ip, request.remote.port) if request.method == 'SETUP': cherrypy.engine.publish('nva-add', nva_handler) else: cherrypy.engine.publish('nva-restore', nva_handler) def nva_response_header(self): cherrypy.response.headers['Session'] = cherrypy.request.headers.get('Session', '') cherrypy.response.headers['UUID'] = Setting.get_usn() cherrypy.response.headers['NvaVersion'] = '1' cherrypy.tools.nva = NVATool() @cherrypy.expose class NVAHandler(DLNAHandler): def reload(self): cherrypy.server.httpserver = NVAHTTPServer(cherrypy.server) self.build_description() def build_description(self): self.description = load_xml(XMLPath.DESCRIPTION.value).format( friendly_name='我的小电视', manufacturer="Bilibili Inc.", manufacturer_url="https://bilibili.com/", model_description="云视听小电视", model_name="Macast", model_url="https://app.bilibili.com/", model_number=Setting.get_version(), uuid=Setting.get_usn(), serial_num=1024, header_extra="""Macast 25 104600 master 254""", service_extra=""" urn:app-bilibili-com:service:NirvanaControl:3 urn:app-bilibili-com:serviceId:NirvanaControl NirvanaControl/action NirvanaControl/event dlna/NirvanaControl.xml """ ).encode() def GET(self, param=None, xml=None, **kwargs): if param == 'dlna' and xml == 'NirvanaControl.xml': return NVA_SERVICE return super(NVAHandler, self).GET(param, xml, **kwargs) @cherrypy.tools.nva() def SETUP(self, p): logger.info(f'SETUP ----{p}') print(cherrypy.request.headers) @cherrypy.tools.nva() def RESTORE(self, p): logger.info(f'RESTORE ----{p}') print(cherrypy.request.headers) @cherrypy.tools.nva() def STARTRESTORE(self, p): logger.info(f'STARTRESTORE ----{p}') print(cherrypy.request.headers) # NVA 协议主要代码 class SelectPoller(object): """ copy from ws4py.manager.SelectPoller """ def __init__(self, timeout=0.1): """ A socket poller that uses the `select` implementation to determines which file descriptors have data available to read. It is available on all platforms. """ self._fds = [] self.timeout = timeout def release(self): """ Cleanup resources. """ self._fds = [] def register(self, fd): """ Register a new file descriptor to be part of the select polling next time around. """ if fd not in self._fds: self._fds.append(fd) def unregister(self, fd): """ Unregister the given file descriptor. """ if fd in self._fds: self._fds.remove(fd) def poll(self): """ Polls once and returns a list of ready-to-be-read file descriptors. """ if not self._fds: time.sleep(self.timeout) return [] try: r, w, x = select.select(self._fds, [], [], self.timeout) except IOError as e: return [] return r class NVAProtocol(DLNAProtocol): """ Some code is from ws4py.manager.WebSocketManager """ def __init__(self): super(NVAProtocol, self).__init__() self.nva_manager = None self.lock = threading.Lock() self.clients = {} self.poller = SelectPoller(timeout=0.5) self.media_playing = False self.play_list = [] # [{oid:'', cid:'', title:'', is_portrait: false}...] self.play_index = 0 def start(self): super(NVAProtocol, self).start() self.nva_manager = threading.Thread(target=self.run_manager, name="NVA_MANAGER_THREAD", daemon=True) self.nva_manager.start() cherrypy.engine.subscribe('nva-add', self.add) cherrypy.engine.subscribe('nva-broadcast', self.broadcast) cherrypy.engine.subscribe('nva-restore', self.restore) def stop(self): super(NVAProtocol, self).stop() cherrypy.engine.unsubscribe('nva-add', self.add) cherrypy.engine.unsubscribe('nva-broadcast', self.broadcast) cherrypy.engine.unsubscribe('nva-restore', self.restore) with self.lock: for fd in self.clients: self.clients[fd].terminate() self.clients.clear() self.poller.release() def restore(self, nva: NVAConectionHandler): self.add(nva) with self.lock: clients = self.clients.copy() nva_iter = iter(clients.values()) for nva in nva_iter: if not nva.terminated: nva.update_play_state() def remove(self, nva: NVAConectionHandler): if nva not in self.clients: return logger.info(f"NVA Client {nva} leave.") with self.lock: fd = nva.sock.fileno() self.clients.pop(fd, None) self.poller.unregister(fd) nva.terminate() def add(self, nva: NVAConectionHandler): if nva in self.clients: return logger.info(f"NVA Client {nva} added.") with self.lock: new_fd = nva.sock.fileno() self.poller.register(new_fd) uuid = nva.uuid session = nva.session clients_remove_list = [] for fd in self.clients: if self.clients[fd].uuid == uuid: # 曾经连接过的设备重新连接 if self.clients[fd].session == session: logger.info('设备断线重连:恢复之前的session') # 恢复之前的session:手机断连,恢复session # 移除旧连接 clients_remove_list.append(fd) self.clients[fd].terminate() # 将新的socket转移到旧连接上 self.clients[new_fd] = self.clients[fd] self.clients[new_fd].sock = nva.sock self.clients[new_fd].counter = 0 self.clients[new_fd].terminated = False else: # 新的session:手机应用重启,重新投屏 # 移除旧连接 logger.info('设备断线重连:移除旧连接') clients_remove_list.append(fd) self.clients[fd].terminate() # 添加新的连接 self.clients[new_fd] = nva break else: # 新设备连接 logger.info('新设备连接') self.clients[new_fd] = nva for fd in clients_remove_list: self.poller.unregister(fd) self.clients.pop(fd, None) # 启动ping线程 logger.info("启动ping线程") self.clients[new_fd].start() def broadcast(self, cmd, params=None): with self.lock: clients = self.clients.copy() nva_iter = iter(clients.values()) for nva in nva_iter: if not nva.terminated: try: nva.send_cmd(cmd, params) except: pass def set_playlist(self, play_list, play_index): self.play_index = play_index self.play_list = play_list @staticmethod def position_to_second(position: str) -> int: pos = position.split(':') if len(pos) < 3: return 0 return int(pos[0]) * 3600 + int(pos[1]) * 60 + int(pos[2]) def set_state_position(self, data: str): """ :param data: string, eg: 00:00:00 :return: """ if data != self.get_state_position(): position = self.position_to_second(data) duration = self.position_to_second(self.get_state_duration()) if duration > 0: self.broadcast('OnProgress', {"duration": duration, "position": position }) super(NVAProtocol, self).set_state_position(data) # todo 直播时 总时长会不断增加,其实直播时无须返回时长信息 # 目前nva协议并没有涉及直播投放 def set_state_duration(self, data: str): """ :param data: string, eg: 00:00:00 :return: """ if data != self.get_state_duration(): duration = self.position_to_second(data) if duration > 0: # 有时候播放器初始化时会设置duration为0,这时候不需要发送给客户端 position = self.position_to_second(self.get_state_position()) self.broadcast('OnProgress', {"duration": duration, "position": position }) super(NVAProtocol, self).set_state_duration(data) def set_state_transport(self, data: str): super(NVAProtocol, self).set_state_transport(data) # 以下对playState的猜测,暂未得到确切验证 # 3 加载中 # 4 播放中 # 5 暂停 # 6 媒体播放结束 end of file # 7 停止 if data == 'PLAYING': self.broadcast('OnPlayState', {"playState": 4}) if not self.media_playing: # 播放成功 self.broadcast('PLAY_SUCCESS') self.media_playing = True return elif data == 'PAUSED_PLAYBACK': self.broadcast('OnPlayState', {"playState": 5}) return elif data == 'STOPPED': self.broadcast('OnPlayState', {"playState": 7}) elif data == 'NO_MEDIA_PRESENT': self.broadcast('OnPlayState', {"playState": 6}) self.play_index += 1 if self.play_index < len(self.play_list): logger.info(f"准备播放 分集{self.play_index + 1} {self.play_list[self.play_index]}") # 切换下一集 with self.lock: clients = self.clients.copy() nva_iter = iter(clients.values()) next_video = self.play_list[self.play_index] for nva in nva_iter: if nva: nva.cid = next_video['cid'] nva.oid = nva.aid = next_video['aid'] if next_video['epid'] != 0: nva.oid = nva.epid = next_video['epid'] nva.title_p = next_video['title'] url, qn = nva.get_video_url() print(f'url: {url}\nqn: {qn}') nva.send_play_cmd(url) break else: logger.error("客户端断开连接,播放结束") else: logger.info(f'分集{self.play_index} {self.play_list}') logger.info("没有下一集,播放结束") self.media_playing = False # 不同播放状态应返回的 playState 序列 # 播放一条视频 # 加载中(3)、播放(4)、播放成功(PLAY_SUCCESS) # 分集1播放完毕,自动播放分集2 # 媒体播放结束(6)、加载下一条视频(3)、播放(4)、播放成功(PLAY_SUCCESS) # MPVRenderer实现的顺序 媒体播放结束(6)、播放停止(7)、加载下一条视频(3)、播放(4)、播放成功(PLAY_SUCCESS) # 当所有分集播放完毕时 # 播放停止(7)、媒体播放结束(6)、加载推荐视频(3)、播放(4)、播放成功(PLAY_SUCCESS) # MPVRenderer实现的顺序:媒体播放结束(6)、播放停止(7) # 在正在播放时,切投其他视频 # 暂停(5)、播放(4)、加载中(3)、播放(4)、播放成功(PLAY_SUCCESS) # MPVRenderer实现的顺序实现的顺序:停止(7)、加载中(3)、播放(4)、播放成功(PLAY_SUCCESS) def set_state_transport_error(self): """ :return: """ super(NVAProtocol, self).set_state_transport_error() self.broadcast('OnPlayState', {"playState": 3}) # todo 反向控制手机静音/貌似没有这个功能 def set_state_mute(self, data: bool): """ :param data: bool :return: """ super(NVAProtocol, self).set_state_mute(data) # todo 反向控制手机音量/貌似没有这个功能 def set_state_volume(self, data: int): """ :param data: int, range from 0 to 100 :return: """ super(NVAProtocol, self).set_state_volume(data) def set_state_speed(self, data: str): super(NVAProtocol, self).set_state_speed(data) current_speed = round(float(data), 2) support_speed = [0.5, 0.75, 1, 1.25, 1.5, 2] self.broadcast('SpeedChanged', {"currSpeed": current_speed, "supportSpeedList": support_speed}) def set_state_display_subtitle(self, data: bool): super(NVAProtocol, self).set_state_display_subtitle(data) self.broadcast('OnDanmakuSwitch', {'open': data}) def run_manager(self): while self.running: with self.lock: polled = self.poller.poll() logger.debug(f'polled {polled}') if not self.running: break for fd in polled: if not self.running: break nva = self.clients.get(fd, None) if nva and not nva.terminated: try: data = nva.sock.recv(2048) if data == b'': raise Exception('socket received null data') nva.parse_request(data) except (socket.error, OSError, Exception) as e: if hasattr(e, "errno") and e.errno == errno.EINTR: logger.error("socket: errno.EINTR") pass else: logger.error(f"ERROR received data: {e}") nva.terminate() with self.lock: self.poller.unregister(fd) # todo 移除长时间未连接的客户端 # 下面的注释代码是立刻移除断开连接的客户端 # 但是立刻移除会导致意外断开连接的客户端无法完成重连 # 当断开连接的客户端数量过多时可能会导致内存占用小幅增加 # 不过日常使用的情况影响不大,所以暂时先不处理这个问题 # with self.lock: # self.clients.pop(fd, None) # self.poller.unregister(fd) else: logger.debug(f'retained clients {self.clients}') del nva @property def handler(self): if self._handler is None: self._handler = NVAHandler() return self._handler def init_services(self, description=XMLPath.DESCRIPTION.value): super(NVAProtocol, self).init_services() self.build_action('urn:app-bilibili-com:service:NirvanaControl:3', 'NirvanaControl', etree.fromstring(NVA_SERVICE)) if __name__ == '__main__': from macast import cli from macast_renderer.mpv import MPVRenderer cli(renderer=MPVRenderer(path='mpv'), protocol=NVAProtocol())