#VERSION: 1.10 #AUTHORS: victorBuzdugan # https://github.com/victorBuzdugan/QbittorrentFilelistSearchPlugin # LICENSING INFORMATION # FileList.io search engine plugin for qBittorrent import json import logging import os import re from http.client import HTTPResponse from http.cookiejar import CookieJar from tempfile import NamedTemporaryFile from typing import Optional, Union from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import HTTPCookieProcessor, build_opener from novaprinter import prettyPrinter USER_AGENT: tuple = ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ' 'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15') # maximum number of request retries MAX_REQ_RETRIES = 3 # maximum number of search results pages MAX_PAGES = 5 FILE_PATH = os.path.dirname(os.path.realpath(__file__)) # region: logging configuration logging.basicConfig( filename=os.path.join(FILE_PATH, 'filelist.log'), filemode='w', # qBittorrent installs Python 3.8 by default; encoding was added in 3.9 # encoding='UTF-8', format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%d.%m %H:%M:%S', level=logging.DEBUG ) logger = logging.getLogger(__name__) # endregion # region: LOGIN credentials # enter credentials in filelist_credentials.json file # get model from github repo and rename it to filelist_credentials.json # OR ENTER YOUR LOGIN DATA HERE: credentials = { 'username': 'your_username_here', 'password': 'your_password_here' } # try to get login credentials from json file if it exists CREDENTIALS_FILE = os.path.join(FILE_PATH, 'filelist_credentials.json') try: with open(CREDENTIALS_FILE, mode='r', encoding='UTF-8') as file: logger.debug('Credentials file found. Credentials loaded.') credentials = json.load(file) except FileNotFoundError: logger.debug('Credentials file not found. Using data from current file.') # endregion # region: regex patterns RE_VALIDATOR = re.compile(r""" # starts with but not including (?<=name='validator'\svalue=') # capture (.*) # ends with but not included (?='\s\/>) """, re.VERBOSE) RE_ALL_RESULTS = re.compile(r""" # starts with # any char, including new line [\S\s]*? # ends with <\/div>\s*<\/div> """, re.VERBOSE) RE_GET_ID = re.compile(r""" # starts with but not including (?<=id=) # one or more digits \d+ """, re.VERBOSE) RE_GET_NAME = re.compile(r""" # starts with but not including (?<=title=') # one or more characters (excluding new line '.') .+? # ends with ' (?=') """, re.VERBOSE) RE_GET_SIZE = re.compile(r""" # starts with but not including (?<=) # catch group 1 ([\d.]+) # continues with but not included (?:) # catch group 2 (\w+) """, re.VERBOSE) RE_GET_SEEDERS = re.compile(r""" # starts with but not including (hex color 'w{6}') (?<=) # catch digits \d+ """, re.VERBOSE) RE_GET_LEECHERS = re.compile( r"(?<=vertical-align:middle;display:table-cell;'>)\d+" ) RE_NEXT_PAGE = re.compile( r"»" ) # endregion class filelist(): ''' filelist.io search class. ''' url: str = 'https://filelist.io' name: str = 'FileList' supported_categories: dict = { 'all': '0', 'movies': '19', 'tv': '21', 'music': '11', 'games': '9', 'anime': '24', 'software': '8' } url_dl: str = url + '/download.php?id=' url_login: str = url + '/login.php' url_login_post: str = url + '/takelogin.php' url_search: str = url + '/browse.php?search' url_details: str = url + '/details.php?id=' url_download: str = url + '/download.php?id=' critical_error: bool = False request_retry: int = 0 # initialize cookie jar cj = CookieJar() # initialize an OpenerDirector with the cookie jar attached session = build_opener(HTTPCookieProcessor(cj)) session.addheaders = [USER_AGENT] def __init__(self): """ Initialize the class. """ logger.debug('New filelist object created.') self._login() def _login(self) -> None: ''' Login to filelist. Login to filelist passing custom header, cookie with session id from cookie jar and username, password and validator from encoded payload. ''' # region: create payload self.payload = { 'unlock': '1', 'returnto': '%2F', } if (credentials['username'] == 'your_username_here' or credentials['password'] == 'your_password_here'): if credentials['username'] == 'your_username_here': logger.critical('Default username! Change username!') else: logger.critical('Default password! Change password!') self.critical_error = True return else: self.payload['username'] = credentials['username'] self.payload['password'] = credentials['password'] # load cookies and get validator value login_page = self._make_request(self.url_login) if not login_page: logger.critical("Can't acces login page!") self.critical_error = True return self.payload['validator'] = re.search(RE_VALIDATOR, login_page).group() if not self.payload['validator']: logger.critical('Could not retrieve validator key!') self.critical_error = True return else: logger.debug('Retrieved validator key.') # check if cookie is in the jar if "PHPSESSID" not in [cookie.name for cookie in self.cj]: logger.critical('Could not load cookie!') self.critical_error = True return else: logger.debug('Cookie is in the jar.') # encode payload to a percent-encoded ASCII text string # and encode to bytes self.payload = urlencode(self.payload).encode() # endregion # POST form and login main_page = self._make_request(self.url_login_post, data=self.payload) if main_page: logger.info('Logged in.') def _make_request( self, url: str, data: Optional[bytes] = None, decode: bool = True ) -> Optional[Union[str, bytes]]: ''' GET and POST to 'url'. If 'data' is passed results a POST. ''' if data: logger.debug('POST data to %s with decode = %s', url, decode) else: logger.debug('GET data from %s with decode = %s', url, decode) if self.request_retry > MAX_REQ_RETRIES: self.request_retry = 0 return try: with self.session.open(url, data=data, timeout=10) as response: response: HTTPResponse logger.debug('Response status: %s', response.status) if response.url == self.url_login_post: logger.critical('Redirected to error page!') bad_response = response.read().decode('UTF-8', 'replace') if 'Numarul maxim permis de actiuni' in bad_response: logger.error('Exceeded maximum number of ' 'login attempts. Retry in an hour!') elif 'User sau parola gresite.' in bad_response: logger.error('Wrong username and/or password!') elif 'Invalid login attempt!' in bad_response: logger.error('Wrong validator key ' 'or cookie not loaded!') self.critical_error = True return else: good_response = response.read() if decode: logger.debug('Returned url decoded as string.') self.request_retry = 0 return good_response.decode('UTF-8', 'replace') else: logger.debug('Returned url raw as bytes.') self.request_retry = 0 return good_response except HTTPError as error: if error.code == 403: logger.critical('Error 403: Connection refused! ' 'Bad "user-agent" or header not loaded.') self.critical_error = True return if error.code == 404: logger.error('Error 404: Page not found!') self.request_retry += 1 logger.debug('Retry %d/%d', self.request_retry, MAX_REQ_RETRIES) return self._make_request(url, data, decode) except URLError as error: logger.error(error.reason) self.request_retry += 1 logger.debug('Retry %d/%d', self.request_retry, MAX_REQ_RETRIES) return self._make_request(url, data, decode) except TimeoutError: logger.error('Request timed out') self.request_retry += 1 logger.debug('Retry %d/%d', self.request_retry, MAX_REQ_RETRIES) return self._make_request(url, data, decode) def download_torrent(self, url: str) -> None: """ Return download link to qBittorrent. """ if self.critical_error: self._return_error() return # Download url response = self._make_request(url, decode=False) if not response: logger.error('Cannot acces download torrent url!') return # Create a torrent file with NamedTemporaryFile(suffix=".torrent", delete=False) as temp_file: temp_file.write(response) # return file path and url logger.info('Returned to qBittorrent:"%s %s"', temp_file.name, url) print(temp_file.name + " " + url) # return (fd.name + " " + url) def search(self, what: str, cat: str = 'all') -> None: """ Search for torrent and return with prettyPrint(your_dict). `what` to search for `cat` is the name of a search category """ if self.critical_error: self._return_error() return what = what.replace('%20', '+') logger.debug('Searching for "%s" in category "%s"', what, cat) if cat not in self.supported_categories: logger.warning('Category "%s" not found, setting to "all".', cat) cat = 'all' def create_search_link(page_no: int = 0) -> str: ''' Creates a search link based on page number. ''' # create a list of tuples for urlencode search_link = [ # ('https://filelist.io/browse.php?search', search_string) (self.url_search, what), # category ('cat', self.supported_categories[cat]), # 0: Name & Description, 1: Name, 2: Description, 3: IMDB id ('searchin', '1'), # 0: Hybrid, 1: Relevance, 2: Date, 3: Size, # 4: Downloads, 5:Seeders ('sort', '5'), ('page', str(page_no)) ] search_link = urlencode(search_link, safe=':/?+') logger.debug('Encoded url: %s', search_link) return search_link page_no = 0 total_results = 0 while page_no < MAX_PAGES: logger.debug('Requesting page number %d', page_no) search_page = self._make_request(create_search_link(page_no)) if not search_page: logger.error('Cannot access results page!') break if 'Rezultatele cautarii dupa' in search_page: logger.info('Accessed results page.') if 'Nu s-a găsit nimic!' not in search_page: torrent_rows = re.finditer(RE_ALL_RESULTS, search_page) for torrent in torrent_rows: self._parse_torrent(torrent.group()) total_results += 1 # check if there is a next page # every page has 20 elements # '»' signals there is a next page if ((total_results % 20 == 0) and re.search(RE_NEXT_PAGE, search_page)): page_no += 1 else: break else: logger.info('No results found.') break else: logger.error('Cannot access results page!') break logger.info('Parsed %d torrents.', total_results) def _parse_torrent(self, torrent: str) -> None: """ Get details from torrent and prettyPrint. """ torrent_data = {'engine_url': f"{self.url}"} tid = re.search(RE_GET_ID, torrent).group() if not tid: logger.error('Cannot retrieve torrent id!') return # download link torrent_data['link'] = f'{self.url_download}{tid}' # description page torrent_data['desc_link'] = f'{self.url_details}{tid}' # name torrent_data['name'] = re.search(RE_GET_NAME, torrent).group() if not torrent_data['name']: logger.warning('Cannot retrieve torrent name. Setting a default.') torrent_data['name'] = 'Could not retrieve torrent name' # size size = re.search(RE_GET_SIZE, torrent) if size: size = size.groups() torrent_data['size'] = ' '.join(size) logger.debug("Torrent size: %s", torrent_data['size']) else: logger.debug('Could not retrieve torrent size. Setting -1.') torrent_data['size'] = -1 # seeders seeders = re.search(RE_GET_SEEDERS, torrent) if seeders: torrent_data['seeds'] = seeders.group() else: logger.debug('Could not retrieve number of seeders. Setting 0.') torrent_data['seeds'] = '0' # leechers leechers = re.search(RE_GET_LEECHERS, torrent) if leechers: torrent_data['leech'] = leechers.group() else: logger.debug('Could not retrieve number of leechers. Setting 0.') torrent_data['leech'] = '0' logger.info('Sending to prettyPrinter: %s|%s|%s|%s|%s|%s|%s|', torrent_data["link"], torrent_data["name"], torrent_data["seeds"], torrent_data["size"], torrent_data["leech"], torrent_data["engine_url"], torrent_data["desc_link"]) prettyPrinter(torrent_data) def _return_error(self) -> None: # intended high seeds, leech and big size to see the error when sorting logger.info('Sending critical error to prettyPrinter!') prettyPrinter({ 'engine_url': self.url, 'desc_link': 'https://github.com/victorBuzdugan/QbittorrentFilelistSearchPlugin', 'name': 'CRITICAL error check log file', 'link': self.url + "/error", 'size': '1 TB', 'seeds': 100, 'leech': 100}) self.critical_error = False if __name__ == "__main__": # test = filelist() # test.search('linux', 'all') # test.download_torrent('https://filelist.io/download.php?id=60739') pass