#!/usr/bin/env python3 import re import yaml import json import base64 import requests import socket import urllib.parse from requests.adapters import HTTPAdapter import geoip2.database class sub_convert(): # {'input_type': ['url', 'content'],'output_type': ['url', 'YAML', 'Base64']} def main(raw_input, input_type='url', output_type='url', custom_set={'dup_rm_enabled': False, 'format_name_enabled': False}): """Convert subscribe content to YAML or Base64 or url. 首先获取到订阅内容,然后对其进行格式化处理。如果内容不是 “订阅内容解析错误”,在进行去重、改名操作后(可选)输出目标格式,否则输出 “订阅内容解析错误”。 """ if input_type == 'url': # 获取 URL 订阅链接内容 sub_content = '' if isinstance(raw_input, list): a_content = [] for url in raw_input: s = requests.Session() s.mount('http://', HTTPAdapter(max_retries=5)) s.mount('https://', HTTPAdapter(max_retries=5)) try: print('Downloading from:' + url) resp = s.get(url, timeout=5) s_content = sub_convert.yaml_decode( sub_convert.format(resp.content.decode('utf-8'))) a_content.append(s_content) except Exception as err: print(err) return 'Url 解析错误' sub_content = sub_convert.format(''.join(a_content)) else: s = requests.Session() s.mount('http://', HTTPAdapter(max_retries=5)) s.mount('https://', HTTPAdapter(max_retries=5)) try: print('Downloading from:' + raw_input) resp = s.get(raw_input, timeout=5) sub_content = sub_convert.format( resp.content.decode('utf-8')) except Exception as err: print(err) return 'Url 解析错误' elif input_type == 'content': # 解析订阅内容 sub_content = sub_convert.format(raw_input) if sub_content != '订阅内容解析错误': dup_rm_enabled = custom_set['dup_rm_enabled'] format_name_enabled = custom_set['format_name_enabled'] final_content = sub_convert.makeup( sub_content, dup_rm_enabled, format_name_enabled) if output_type == 'YAML': return final_content elif output_type == 'Base64': return sub_convert.base64_encode(sub_convert.yaml_decode(final_content)) elif output_type == 'url': return sub_convert.yaml_decode(final_content) elif output_type == 'content': return sub_convert.yaml_decode(final_content) else: print('Please define right output type.') return '订阅内容解析错误' else: return '订阅内容解析错误' # 对链接文本(Base64, url, YAML)进行格式化处理, 输出节点的配置字典(Clash 配置), output 为真是输出 YAML 文本 # 对链接文本(Base64, url, YAML)进行格式化处理, 输出节点的配置字典(Clash 配置), output 为真是输出 YAML 文本 def format(sub_content, output=False): if '' not in sub_content: if 'proxies:' not in sub_content: # 对 URL 内容进行格式化处理 url_list = [] try: if '://' not in sub_content: sub_content = sub_convert.base64_decode(sub_content) raw_url_list = re.split(r'\r?\n+', sub_content) for url in raw_url_list: while len(re.split('ss://|ssr://|vmess://|trojan://|vless://', url)) > 2: try: url_to_split = url[8:] if 'ss://' in url_to_split and 'vmess://' not in url_to_split and 'vless://' not in url_to_split: # https://www.runoob.com/python/att-string-replace.html url_splited = url_to_split.replace( 'ss://', '\nss://', 1) elif 'ssr://' in url_to_split: url_splited = url_to_split.replace( 'ssr://', '\nssr://', 1) elif 'vmess://' in url_to_split: url_splited = url_to_split.replace( 'vmess://', '\nvmess://', 1) elif 'trojan://' in url_to_split: url_splited = url_to_split.replace( 'trojan://', '\ntrojan://', 1) elif 'vless://' in url_to_split: url_splited = url_to_split.replace( 'vless://', '\nvless://', 1) url_split = url_splited.split('\n') front_url = url[:8] + url_split[0] url_list.append(front_url) url = url_split[1] except Exception as e: print( f"failed to fix one line in formatting line: {url}") url_list.append(url) url_content = '\n'.join(url_list) return sub_convert.yaml_encode(url_content, output=False) except: print('Sub_content 格式错误') return '订阅内容解析错误' elif 'proxies:' in sub_content: # 对 Clash 内容进行格式化处理 try: # fix clash servers from https://github.com/kxswa/k if '! ' in sub_content: sub_content = sub_content.replace( '! ', '').replace('!', '') try_load = yaml.safe_load(sub_content) if output: raise ValueError else: content_yaml_dic = try_load return content_yaml_dic # 返回字典, output 值为 True 时返回修饰过的 YAML 文本 except Exception: try: sub_content = sub_content.replace( '\'', '').replace('"', '') url_list = [] il_chars = ['|', '?', '[', ']', '@', '!', '%', ':'] lines = re.split(r'\n+', sub_content) line_fix_list = [] for line in lines: value_list = re.split(r': |, ', line) if len(value_list) > 6: value_list_fix = [] for value in value_list: for char in il_chars: value_il = False if char in value: value_il = True break if value_il == True and ('{' not in value and '}' not in value): value = '"' + value + '"' value_list_fix.append(value) elif value_il == True and '}' in value: if '}}}' in value: host_part = value.replace( '}}}', '') host_value = '"'+host_part+'"}}}' value_list_fix.append(host_value) elif '}}' not in value: host_part = value.replace('}', '') host_value = '"'+host_part+'"}' value_list_fix.append(host_value) else: value_list_fix.append(value) line_fix = line for index in range(len(value_list_fix)): line_fix = line_fix.replace( value_list[index], value_list_fix[index]) line_fix_list.append(line_fix) elif len(value_list) == 2: value_list_fix = [] for value in value_list: for char in il_chars: value_il = False if char in value: value_il = True break if value_il == True: value = '"' + value + '"' value_list_fix.append(value) line_fix = line for index in range(len(value_list_fix)): line_fix = line_fix.replace( value_list[index], value_list_fix[index]) line_fix_list.append(line_fix) elif len(value_list) == 1: if ':' in line: line_fix_list.append(line) else: line_fix_list.append(line) sub_content = '\n'.join(line_fix_list).replace( 'False', 'false').replace('True', 'true') if output: return sub_content else: content_yaml_dic = yaml.safe_load(sub_content) return content_yaml_dic # 返回字典, output 值为 True 时返回修饰过的 YAML 文本 except: print('Sub_content 格式错误') return '订阅内容解析错误' else: print('订阅内容解析错误') return '订阅内容解析错误' else: print('订阅内容解析错误') return '订阅内容解析错误' # 输入节点配置字典, 对节点进行区域的筛选和重命名,输出 YAML 文本 def makeup(input, dup_rm_enabled=False, format_name_enabled=False): # 区域判断(Clash YAML): https://blog.csdn.net/CSDN_duomaomao/article/details/89712826 (ip-api) if isinstance(input, dict): sub_content = input else: sub_content = sub_convert.format(input) proxies_list = sub_content['proxies'] if dup_rm_enabled: # 去重 print("\nBefore was " + str(proxies_list.__len__()) + "\n") begin = 0 raw_length = len(proxies_list) length = len(proxies_list) while begin < length: if (begin + 1) == 1: print(f'\n-----Restart-----\nStarting Quantity {length}') elif (begin + 1) % 100 == 0: print( f'Current Benchmark {begin + 1}-----Current Quantity {length}') elif (begin + 1) == length and (begin + 1) % 100 != 0: repetition = raw_length - length print( f'Current Benchmark {begin + 1}-----Current Quantity {length}\nNumber of Repetition {repetition}\n-----Deduplication Completed-----\n') proxy_compared = proxies_list[begin] begin_2 = begin + 1 while begin_2 <= (length - 1): check = False if proxy_compared['server'] == proxies_list[begin_2]['server'] and proxy_compared['port'] == proxies_list[begin_2]['port']: check = True if 'net' in proxies_list[begin_2] and 'net' in proxy_compared: if proxy_compared['net'] != proxies_list[begin_2]['net']: check = False if 'tls' in proxies_list[begin_2] and 'tls' in proxy_compared: if proxy_compared['tls'] != proxies_list[begin_2]['tls']: check = False if 'id' in proxies_list[begin_2] and 'id' in proxy_compared: if proxy_compared['id'] != proxies_list[begin_2]['id']: check = False if 'password' in proxies_list[begin_2] and 'password' in proxy_compared: if proxy_compared['password'] != proxies_list[begin_2]['password']: check = False if 'cipher' in proxies_list[begin_2] and 'cipher' in proxy_compared: if proxy_compared['cipher'] != proxies_list[begin_2]['cipher']: check = False if 'type' in proxies_list[begin_2] and 'type' in proxy_compared: if proxy_compared['type'] != proxies_list[begin_2]['type']: check = False if check: proxies_list.pop(begin_2) length -= 1 begin_2 += 1 begin += 1 print("\nNow is " + str(proxies_list.__len__()) + "\n") url_list = [] for proxy in proxies_list: # 改名 if format_name_enabled: emoji = { 'AD': '🇦🇩', 'AE': '🇦🇪', 'AF': '🇦🇫', 'AG': '🇦🇬', 'AI': '🇦🇮', 'AL': '🇦🇱', 'AM': '🇦🇲', 'AO': '🇦🇴', 'AQ': '🇦🇶', 'AR': '🇦🇷', 'AS': '🇦🇸', 'AT': '🇦🇹', 'AU': '🇦🇺', 'AW': '🇦🇼', 'AX': '🇦🇽', 'AZ': '🇦🇿', 'BA': '🇧🇦', 'BB': '🇧🇧', 'BD': '🇧🇩', 'BE': '🇧🇪', 'BF': '🇧🇫', 'BG': '🇧🇬', 'BH': '🇧🇭', 'BI': '🇧🇮', 'BJ': '🇧🇯', 'BL': '🇧🇱', 'BM': '🇧🇲', 'BN': '🇧🇳', 'BO': '🇧🇴', 'BQ': '🇧🇶', 'BR': '🇧🇷', 'BS': '🇧🇸', 'BT': '🇧🇹', 'BV': '🇧🇻', 'BW': '🇧🇼', 'BY': '🇧🇾', 'BZ': '🇧🇿', 'CA': '🇨🇦', 'CC': '🇨🇨', 'CD': '🇨🇩', 'CF': '🇨🇫', 'CG': '🇨🇬', 'CH': '🇨🇭', 'CI': '🇨🇮', 'CK': '🇨🇰', 'CL': '🇨🇱', 'CM': '🇨🇲', 'CN': '🇨🇳', 'CO': '🇨🇴', 'CR': '🇨🇷', 'CU': '🇨🇺', 'CV': '🇨🇻', 'CW': '🇨🇼', 'CX': '🇨🇽', 'CY': '🇨🇾', 'CZ': '🇨🇿', 'DE': '🇩🇪', 'DJ': '🇩🇯', 'DK': '🇩🇰', 'DM': '🇩🇲', 'DO': '🇩🇴', 'DZ': '🇩🇿', 'EC': '🇪🇨', 'EE': '🇪🇪', 'EG': '🇪🇬', 'EH': '🇪🇭', 'ER': '🇪🇷', 'ES': '🇪🇸', 'ET': '🇪🇹', 'EU': '🇪🇺', 'FI': '🇫🇮', 'FJ': '🇫🇯', 'FK': '🇫🇰', 'FM': '🇫🇲', 'FO': '🇫🇴', 'FR': '🇫🇷', 'GA': '🇬🇦', 'GB': '🇬🇧', 'GD': '🇬🇩', 'GE': '🇬🇪', 'GF': '🇬🇫', 'GG': '🇬🇬', 'GH': '🇬🇭', 'GI': '🇬🇮', 'GL': '🇬🇱', 'GM': '🇬🇲', 'GN': '🇬🇳', 'GP': '🇬🇵', 'GQ': '🇬🇶', 'GR': '🇬🇷', 'GS': '🇬🇸', 'GT': '🇬🇹', 'GU': '🇬🇺', 'GW': '🇬🇼', 'GY': '🇬🇾', 'HK': '🇭🇰', 'HM': '🇭🇲', 'HN': '🇭🇳', 'HR': '🇭🇷', 'HT': '🇭🇹', 'HU': '🇭🇺', 'ID': '🇮🇩', 'IE': '🇮🇪', 'IL': '🇮🇱', 'IM': '🇮🇲', 'IN': '🇮🇳', 'IO': '🇮🇴', 'IQ': '🇮🇶', 'IR': '🇮🇷', 'IS': '🇮🇸', 'IT': '🇮🇹', 'JE': '🇯🇪', 'JM': '🇯🇲', 'JO': '🇯🇴', 'JP': '🇯🇵', 'KE': '🇰🇪', 'KG': '🇰🇬', 'KH': '🇰🇭', 'KI': '🇰🇮', 'KM': '🇰🇲', 'KN': '🇰🇳', 'KP': '🇰🇵', 'KR': '🇰🇷', 'KW': '🇰🇼', 'KY': '🇰🇾', 'KZ': '🇰🇿', 'LA': '🇱🇦', 'LB': '🇱🇧', 'LC': '🇱🇨', 'LI': '🇱🇮', 'LK': '🇱🇰', 'LR': '🇱🇷', 'LS': '🇱🇸', 'LT': '🇱🇹', 'LU': '🇱🇺', 'LV': '🇱🇻', 'LY': '🇱🇾', 'MA': '🇲🇦', 'MC': '🇲🇨', 'MD': '🇲🇩', 'ME': '🇲🇪', 'MF': '🇲🇫', 'MG': '🇲🇬', 'MH': '🇲🇭', 'MK': '🇲🇰', 'ML': '🇲🇱', 'MM': '🇲🇲', 'MN': '🇲🇳', 'MO': '🇲🇴', 'MP': '🇲🇵', 'MQ': '🇲🇶', 'MR': '🇲🇷', 'MS': '🇲🇸', 'MT': '🇲🇹', 'MU': '🇲🇺', 'MV': '🇲🇻', 'MW': '🇲🇼', 'MX': '🇲🇽', 'MY': '🇲🇾', 'MZ': '🇲🇿', 'NA': '🇳🇦', 'NC': '🇳🇨', 'NE': '🇳🇪', 'NF': '🇳🇫', 'NG': '🇳🇬', 'NI': '🇳🇮', 'NL': '🇳🇱', 'NO': '🇳🇴', 'NP': '🇳🇵', 'NR': '🇳🇷', 'NU': '🇳🇺', 'NZ': '🇳🇿', 'OM': '🇴🇲', 'PA': '🇵🇦', 'PE': '🇵🇪', 'PF': '🇵🇫', 'PG': '🇵🇬', 'PH': '🇵🇭', 'PK': '🇵🇰', 'PL': '🇵🇱', 'PM': '🇵🇲', 'PN': '🇵🇳', 'PR': '🇵🇷', 'PS': '🇵🇸', 'PT': '🇵🇹', 'PW': '🇵🇼', 'PY': '🇵🇾', 'QA': '🇶🇦', 'RE': '🇷🇪', 'RO': '🇷🇴', 'RS': '🇷🇸', 'RU': '🇷🇺', 'RW': '🇷🇼', 'SA': '🇸🇦', 'SB': '🇸🇧', 'SC': '🇸🇨', 'SD': '🇸🇩', 'SE': '🇸🇪', 'SG': '🇸🇬', 'SH': '🇸🇭', 'SI': '🇸🇮', 'SJ': '🇸🇯', 'SK': '🇸🇰', 'SL': '🇸🇱', 'SM': '🇸🇲', 'SN': '🇸🇳', 'SO': '🇸🇴', 'SR': '🇸🇷', 'SS': '🇸🇸', 'ST': '🇸🇹', 'SV': '🇸🇻', 'SX': '🇸🇽', 'SY': '🇸🇾', 'SZ': '🇸🇿', 'TC': '🇹🇨', 'TD': '🇹🇩', 'TF': '🇹🇫', 'TG': '🇹🇬', 'TH': '🇹🇭', 'TJ': '🇹🇯', 'TK': '🇹🇰', 'TL': '🇹🇱', 'TM': '🇹🇲', 'TN': '🇹🇳', 'TO': '🇹🇴', 'TR': '🇹🇷', 'TT': '🇹🇹', 'TV': '🇹🇻', 'TW': '🇹🇼', 'TZ': '🇹🇿', 'UA': '🇺🇦', 'UG': '🇺🇬', 'UM': '🇺🇲', 'US': '🇺🇸', 'UY': '🇺🇾', 'UZ': '🇺🇿', 'VA': '🇻🇦', 'VC': '🇻🇨', 'VE': '🇻🇪', 'VG': '🇻🇬', 'VI': '🇻🇮', 'VN': '🇻🇳', 'VU': '🇻🇺', 'WF': '🇼🇫', 'WS': '🇼🇸', 'XK': '🇽🇰', 'YE': '🇾🇪', 'YT': '🇾🇹', 'ZA': '🇿🇦', 'ZM': '🇿🇲', 'ZW': '🇿🇼', 'RELAY': '🏁', 'NOWHERE': '🇦🇶', } server = proxy['server'] if server.replace('.', '').isdigit(): ip = server else: try: # https://cloud.tencent.com/developer/article/1569841 ip = socket.gethostbyname(server) except Exception: ip = server with geoip2.database.Reader('./utils/Country.mmdb') as ip_reader: try: response = ip_reader.country(ip) country_code = response.country.iso_code except Exception: ip = '0.0.0.0' country_code = 'NOWHERE' if country_code == 'CLOUDFLARE': country_code = 'RELAY' elif country_code == 'PRIVATE': country_code = 'RELAY' if country_code in emoji: name_emoji = emoji[country_code] else: name_emoji = emoji['NOWHERE'] proxy_index = proxies_list.index(proxy) if len(proxies_list) >= 999: proxy['name'] = f'{name_emoji}{country_code}-{ip}-{proxy_index:0>4d}' elif len(proxies_list) <= 999 and len(proxies_list) > 99: proxy['name'] = f'{name_emoji}{country_code}-{ip}-{proxy_index:0>3d}' elif len(proxies_list) <= 99: proxy['name'] = f'{name_emoji}{country_code}-{ip}-{proxy_index:0>2d}' if proxy['server'] != '127.0.0.1': proxy_str = str(proxy) url_list.append(proxy_str) elif format_name_enabled == False: if proxy['server'] != '127.0.0.1': # 防止加入无用节点 proxy_str = str(proxy) url_list.append(proxy_str) yaml_content_dic = {'proxies': url_list} # yaml.dump 显示中文方法 https://blog.csdn.net/weixin_41548578/article/details/90651464 yaml.dump 各种参数 https://blog.csdn.net/swinfans/article/details/88770119 yaml_content_raw = yaml.dump(yaml_content_dic, default_flow_style=False, sort_keys=False, allow_unicode=True, width=750, indent=2) yaml_content = sub_convert.format(yaml_content_raw, output=True) return yaml_content # 输出 YAML 格式文本 # 将 URL 内容转换为 YAML 文本, output 为 False 时输出节点配置字典 # 将 URL 内容转换为 YAML 文本, output 为 False 时输出节点配置字典 # to yaml def yaml_encode(url_content, output=True): try: url_list = [] lines = re.split(r'\n+', url_content) for line in lines: try: yaml_url = {} if 'vmess://' in line: try: vmess_json_config = json.loads( sub_convert.base64_decode(line.replace('vmess://', ''))) vmess_default_config = { 'v': 'Vmess Node', 'ps': 'Vmess Node', 'add': '0.0.0.0', 'port': 0, 'id': '', 'aid': 0, 'scy': 'auto', 'net': '', 'type': '', 'host': '', 'path': '/', 'tls': '' } vmess_default_config.update(vmess_json_config) vmess_config = vmess_default_config yaml_url = {} #yaml_config_str = ['name', 'server', 'port', 'type', 'uuid', 'alterId', 'cipher', 'tls', 'skip-cert-verify', 'network', 'ws-path', 'ws-headers'] #vmess_config_str = ['ps', 'add', 'port', 'id', 'aid', 'scy', 'tls', 'net', 'host', 'path'] # 生成 yaml 节点字典 if vmess_config['id'] == '' or vmess_config['id'] is None: print('节点格式错误') else: yaml_url.setdefault( 'name', urllib.parse.unquote(str(vmess_config['ps']))) yaml_url.setdefault( 'server', vmess_config['add']) yaml_url.setdefault( 'port', int(vmess_config['port'])) yaml_url.setdefault('type', 'vmess') yaml_url.setdefault('uuid', vmess_config['id']) yaml_url.setdefault( 'alterId', int(vmess_config['aid'])) yaml_url.setdefault( 'cipher', vmess_config['scy']) yaml_url.setdefault('skip-cert-verify', True) if vmess_config['net'] == '' or vmess_config['net'] is False or vmess_config['net'] is None: yaml_url.setdefault('network', 'tcp') else: yaml_url.setdefault( 'network', vmess_config['net']) if vmess_config['tls'] == 'tls' or vmess_config['net'] == 'h2' or vmess_config['net'] == 'grpc': yaml_url.setdefault('tls', True) # elif vmess_config['tls'] == '': # yaml_url.setdefault('tls', False) # else: # yaml_url.setdefault('tls', 'tls') yaml_url.setdefault('ws-opts', {}) if vmess_config['path'] == '' or vmess_config['path'] is False or vmess_config['path'] is None: # yaml_url['ws-opts'].setdefault('path', '/') pass else: yaml_url['ws-opts'].setdefault( 'path', vmess_config['path']) if vmess_config['host'] == '': pass # yaml_url['ws-opts'].setdefault( # 'headers', {'Host': vmess_config['add']}) else: yaml_url['ws-opts'].setdefault( 'headers', {'Host': vmess_config['host']}) url_list.append(yaml_url) except Exception as err: print(f'yaml_encode 解析 vmess 节点发生错误: {err}') pass if 'ss://' in line and 'vless://' not in line and 'vmess://' not in line: if '#' not in line: line = line + '#SS%20Node' try: ss_content = line.replace('ss://', '') # https://www.runoob.com/python/att-string-split.html part_list = ss_content.split('#', 1) yaml_url.setdefault( 'name', urllib.parse.unquote(part_list[1])) if '@' in part_list[0]: mix_part = part_list[0].split('@', 1) method_part = sub_convert.base64_decode( mix_part[0]) server_part = f'{method_part}@{mix_part[1]}' else: server_part = sub_convert.base64_decode( part_list[0]) # 使用多个分隔符 https://blog.csdn.net/shidamowang/article/details/80254476 https://zhuanlan.zhihu.com/p/92287240 server_part_list = server_part.split(':', 1) method_part = server_part_list[0] server_part_list = server_part_list[1].rsplit( '@', 1) password_part = server_part_list[0] server_part_list = server_part_list[1].split( ':', 1) yaml_url.setdefault('server', server_part_list[0]) yaml_url.setdefault('port', server_part_list[1]) yaml_url.setdefault('type', 'ss') yaml_url.setdefault('cipher', method_part) yaml_url.setdefault('password', password_part) url_list.append(yaml_url) except Exception as err: print(f'yaml_encode 解析 ss 节点发生错误: {err}') pass if 'ssr://' in line: try: ssr_content = sub_convert.base64_decode( line.replace('ssr://', '')) parts = re.split(':', ssr_content) if len(parts) != 6: print('SSR 格式错误: %s' % ssr_content) password_and_params = parts[5] password_and_params = re.split( '/\?', password_and_params) password_encode_str = password_and_params[0] params = password_and_params[1] param_parts = re.split('\&', params) param_dic = {'remarks': 'U1NSIE5vZGU=', 'obfsparam': '', 'protoparam': '', 'group': ''} for part in param_parts: key_and_value = re.split('\=', part) param_dic.update( {key_and_value[0]: key_and_value[1]}) yaml_url.setdefault( 'name', sub_convert.base64_decode(param_dic['remarks'])) yaml_url.setdefault('server', parts[0]) yaml_url.setdefault('port', parts[1]) yaml_url.setdefault('type', 'ssr') yaml_url.setdefault('cipher', parts[3]) yaml_url.setdefault( 'password', sub_convert.base64_decode(password_encode_str)) yaml_url.setdefault('obfs', parts[4]) yaml_url.setdefault('protocol', parts[2]) yaml_url.setdefault( 'obfsparam', sub_convert.base64_decode(param_dic['obfsparam'])) yaml_url.setdefault( 'protoparam', sub_convert.base64_decode(param_dic['protoparam'])) yaml_url.setdefault( 'group', sub_convert.base64_decode(param_dic['group'])) url_list.append(yaml_url) except Exception as err: print(f'yaml_encode 解析 ssr 节点发生错误: {err}') pass if 'trojan://' in line: try: url_content = line.replace('trojan://', '') # https://www.runoob.com/python/att-string-split.html part_list = re.split('#', url_content, maxsplit=1) yaml_url.setdefault( 'name', urllib.parse.unquote(part_list[1])) server_part = part_list[0].replace('trojan://', '') # 使用多个分隔符 https://blog.csdn.net/shidamowang/article/details/80254476 https://zhuanlan.zhihu.com/p/92287240 server_part_list = re.split( ':|@|\?|&', server_part) yaml_url.setdefault('server', server_part_list[1]) yaml_url.setdefault('port', server_part_list[2]) yaml_url.setdefault('type', 'trojan') yaml_url.setdefault( 'password', server_part_list[0]) server_part_list = server_part_list[3:] for config in server_part_list: if 'sni=' in config: yaml_url.setdefault('sni', config[4:]) elif 'allowInsecure=' in config or 'tls=' in config: if config[-1] == 0: yaml_url.setdefault('tls', False) elif 'type=' in config: if config[5:] != 'tcp': yaml_url.setdefault( 'network', config[5:]) elif 'path=' in config: yaml_url.setdefault('ws-path', config[5:]) elif 'security=' in config: if config[9:] != 'tls': yaml_url.setdefault('tls', False) yaml_url.setdefault('skip-cert-verify', True) url_list.append(yaml_url) except Exception as err: print(f'yaml_encode 解析 trojan 节点发生错误: {err}') pass except Exception as e: print( f'failed to proccess yaml encoding the raw line: {line} & error: {e}') yaml_content_dic = {'proxies': url_list} if output: yaml_content = yaml.dump(yaml_content_dic, default_flow_style=False, sort_keys=False, allow_unicode=True, width=750, indent=2) else: yaml_content = yaml_content_dic return yaml_content except Exception as err: print(f'yaml encode error: {err}') def base64_encode(url_content): # 将 URL 内容转换为 Base64 if url_content == None: url_content = '' base64_content = base64.b64encode( url_content.encode('utf-8')).decode('ascii') return base64_content # to url def yaml_decode(url_content): # YAML 文本转换为 URL 链接内容 try: if isinstance(url_content, dict): sub_content = url_content else: sub_content = sub_convert.format(url_content) print("Formatting Completed!") proxies_list = sub_content['proxies'] protocol_url = [] # 不同节点订阅链接内容 https://github.com/hoochanlon/fq-book/blob/master/docs/append/srvurl.md for index in range(len(proxies_list)): try: proxy = proxies_list[index] # Vmess 节点提取, 由 Vmess 所有参数 dump JSON 后 base64 encode 得来。 if proxy['type'] == 'vmess': yaml_default_config = { 'name': 'Vmess Node', 'server': '0.0.0.0', 'port': 0, 'uuid': '', 'alterId': 0, 'cipher': 'auto', 'network': 'ws', 'ws-opts': {'path': '', 'headers': {'Host': ''}}, 'tls': '', 'sni': '' } # yaml_default_config.update(proxy) proxy_config = yaml_default_config vmess_value = { 'v': 2, 'ps': proxy_config['name'], 'add': proxy_config['server'], 'port': proxy_config['port'], 'id': proxy_config['uuid'], 'aid': proxy_config['alterId'], 'scy': proxy_config['cipher'], 'net': proxy_config['network'], 'type': None, 'sni': proxy_config['sni'] } if 'tls' in proxy: if proxy['tls'] == 'true' or proxy['tls'] == True: vmess_value['tls'] = 'tls' # else: # vmess_value['tls'] = '' if 'ws-opts' in proxy: if proxy['ws-opts'] != None and proxy['ws-opts'] != {} and proxy['ws-opts'] != '': if 'headers' in proxy_config['ws-opts']: if proxy_config['ws-opts']['headers']['Host'] != '': vmess_value['host'] = proxy_config['ws-opts']['headers']['Host'] if 'path' in proxy_config['ws-opts']: if proxy_config['ws-opts']['path'] != '': vmess_value['path'] = proxy_config['ws-opts']['path'] vmess_raw_proxy = json.dumps( vmess_value, sort_keys=False, indent=2, ensure_ascii=False) vmess_proxy = str( '\nvmess://' + sub_convert.base64_encode(vmess_raw_proxy) + '\n') protocol_url.append(vmess_proxy) # SS 节点提取, 由 ss_base64_decoded 部分(参数: 'cipher', 'password', 'server', 'port') Base64 编码后 加 # 加注释(URL_encode) elif proxy['type'] == 'ss': ss_base64_decoded = str(proxy['cipher']) + ':' + str( proxy['password']) + '@' + str(proxy['server']) + ':' + str(proxy['port']) ss_base64 = sub_convert.base64_encode( ss_base64_decoded) ss_proxy = str('\nss://' + ss_base64 + '#' + str(urllib.parse.quote(proxy['name'])) + '\n') protocol_url.append(ss_proxy) # Trojan 节点提取, 由 trojan_proxy 中参数再加上 # 加注释(URL_encode) # trojan Go https://p4gefau1t.github.io/trojan-go/developer/url/ elif proxy['type'] == 'trojan': if 'tls' in proxy.keys() and 'network' in proxy.keys(): if proxy['tls'] == True and proxy['network'] != 'tcp': network_type = proxy['network'] trojan_go = f'?security=tls&type={network_type}&headerType=none' elif proxy['tls'] == False and proxy['network'] != 'tcp': trojan_go = f'??allowInsecure=0&type={network_type}&headerType=none' else: trojan_go = '?allowInsecure=1' if 'sni' in proxy.keys(): trojan_go = trojan_go+'&sni='+proxy['sni'] trojan_proxy = str('\ntrojan://' + str(proxy['password']) + '@' + str(proxy['server']) + ':' + str( proxy['port']) + trojan_go + '#' + str(urllib.parse.quote(proxy['name'])) + '\n') protocol_url.append(trojan_proxy) # ssr 节点提取, 由 ssr_base64_decoded 中所有参数总体 base64 encode elif proxy['type'] == 'ssr': ssr_default_config = {} remarks = sub_convert.base64_encode( proxy['name']).replace('+', '-') server = proxy['server'] port = str(proxy['port']) password = sub_convert.base64_encode(proxy['password']) cipher = proxy['cipher'] protocol = proxy['protocol'] obfs = proxy['obfs'] param_dic = {'group': 'U1NSUHJvdmlkZXI', 'obfsparam': '', 'protoparam': ''} for key in param_dic.keys(): try: param_dic.update( {key: sub_convert.base64_encode(proxy[key])}) except Exception: pass group, obfsparam, protoparam = param_dic[ 'group'], param_dic['obfsparam'], param_dic['protoparam'] """ for key in {'group', 'obfsparam', 'protoparam'}: if key in proxy: if key == 'group': group = sub_convert.base64_encode(proxy[key]) elif key == 'obfsparam': obfsparam = sub_convert.base64_encode(proxy[key]) elif key == 'protoparam': protoparam = sub_convert.base64_encode(proxy[key]) else: if key == 'group': group = 'U1NSUHJvdmlkZXI' elif key == 'obfsparam': obfsparam = '' elif key == 'protoparam': protoparam = '' """ ssr_proxy = '\nssr://'+sub_convert.base64_encode(server+':'+port+':'+protocol+':'+cipher+':'+obfs+':' + password+'/?group='+group+'&remarks='+remarks+'&obfsparam='+obfsparam+'&protoparam='+protoparam+'\n') protocol_url.append(ssr_proxy) except Exception as e: print(f'yaml decode Error in coverting servers {e} 错误') yaml_content = ''.join(protocol_url) # note added here yaml_content = list( filter(lambda x: x != '', yaml_content.split("\n"))) yaml_content = "\n".join(yaml_content) return yaml_content except Exception as err: print(f'yaml decode 发生 {err} 错误') return '订阅内容解析错误' def base64_decode(url_content): # Base64 转换为 URL 链接内容 if '-' in url_content: url_content = url_content.replace('-', '+') if '_' in url_content: url_content = url_content.replace('_', '/') # print(len(url_content)) missing_padding = len(url_content) % 4 if missing_padding != 0: # 不是4的倍数后加= https://www.cnblogs.com/wswang/p/7717997.html url_content += '='*(4 - missing_padding) try: base64_content = base64.b64decode(url_content.encode( 'utf-8')).decode('utf-8', 'ignore') # https://www.codenong.com/42339876/ base64_content_format = base64_content return base64_content_format except UnicodeDecodeError: base64_content = base64.b64decode(url_content) base64_content_format = base64_content return str(base64_content) # {url='订阅链接', output_type={'clash': 输出 Clash 配置, 'base64': 输出 Base64 配置, 'url': 输出 url 配置}, host='远程订阅转化服务地址'} def convert_remote(url='', output_type='clash', host='http://127.0.0.1:25500'): # 使用远程订阅转换服务,输出相应配置。 sever_host = host # https://docs.python.org/zh-cn/3/library/urllib.parse.html url = urllib.parse.quote(url, safe='') if output_type == 'clash': converted_url = sever_host+'/sub?target=clash&url=' + \ url+'&insert=false&emoji=true&list=true' try: resp = requests.get(converted_url) except Exception as err: print(err) return 'Url 解析错误' if resp.text == 'No nodes were found!': sub_content = 'Url 解析错误' else: sub_content = sub_convert.makeup(sub_convert.format( resp.text), dup_rm_enabled=False, format_name_enabled=True) elif output_type == 'base64': converted_url = sever_host+'/sub?target=mixed&url=' + \ url+'&insert=false&emoji=true&list=true' try: resp = requests.get(converted_url) except Exception as err: print(err) return 'Url 解析错误' if resp.text == 'No nodes were found!': sub_content = 'Url 解析错误' else: sub_content = sub_convert.base64_encode(resp.text) elif output_type == 'url': converted_url = sever_host+'/sub?target=mixed&url=' + \ url+'&insert=false&emoji=true&list=true' try: resp = requests.get(converted_url) except Exception as err: print(err) return 'Url 解析错误' if resp.text == 'No nodes were found!': sub_content = 'Url 解析错误' else: sub_content = resp.text return sub_content if __name__ == '__main__': subscribe = 'https://cdn.jsdelivr.net/gh/mahdibland/ShadowsocksAggregator@master/sub/sub_merge.txt' output_path = './output.txt' content = sub_convert.main(subscribe, 'url', 'YAML') file = open(output_path, 'w', encoding='utf-8') file.write(content) file.close() print(f'Writing content to output.txt\n')