# VERSION: 1.2 # AUTHORS: BurningMop (burning.mop@yandex.com) # LICENSING INFORMATION # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import re from html.parser import HTMLParser import time import threading import urllib.error import urllib.parse import urllib.request from helpers import download_file, htmlentitydecode from novaprinter import prettyPrinter headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0', 'Referer': 'https://xxxclub.to' } def retrieve_url(url, customHeaders=headers): """ Return the content of the url page as a string """ req = urllib.request.Request(url, headers=customHeaders) try: response = urllib.request.urlopen(req) except urllib.error.URLError as errno: print(" ".join(("Connection error:", str(errno.reason)))) return "" dat = response.read() # Check if it is gzipped if dat[:2] == b'\x1f\x8b': # Data is gzip encoded, decode it compressedstream = io.BytesIO(dat) gzipper = gzip.GzipFile(fileobj=compressedstream) extracted_data = gzipper.read() dat = extracted_data info = response.info() charset = 'utf-8' try: ignore, charset = info['Content-Type'].split('charset=') except Exception: pass dat = dat.decode(charset, 'replace') dat = htmlentitydecode(dat) # return dat.encode('utf-8', 'replace') return dat class xxxclubto(object): url = 'https://xxxclub.to' headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0', 'Referer': url } name = 'XXXClub' supported_categories = { 'all': 'All', 'pictures': '5', } container_regex = r'(?s:.)*?<\/div>' items_regex = r'(?s:.)*?<\/li>' has_results = True class MyHtmlParser(HTMLParser): def error(self, message): pass UL, LI, SPAN, A = ('ul', 'li', 'span', 'a') def __init__(self, url, headers): HTMLParser.__init__(self) self.url = url self.headers = headers self.headers['Referer'] = url self.row = {} self.column = 0 self.foundResults = False self.foundTable = False self.insideRow = False self.insideCell = False self.insideNameLink = False self.foundTableHeading = False self.foundRowCatlabe = False self.magnet_regex = r'href="magnet:.*"' def handle_starttag(self, tag, attrs): params = dict(attrs) if 'browsetableinside' in params.get('class', ''): self.foundResults = True return if self.foundResults and tag == self.UL: self.foundTable = True return if self.foundTable and tag == self.LI: self.insideRow = True return if self.insideRow and self.foundTableHeading and tag == self.SPAN: classList = params.get('class', None) if self.foundRowCatlabe: self.insideCell = True self.column += 1 if 'catlabe' == classList: self.foundRowCatlabe = True return if self.insideRow and self.foundTableHeading and self.column == 1 and tag == self.A: self.insideNameLink = True href = params.get('href') link = f'{self.url}{href}' self.row['desc_link'] = link self.row['link'] = link torrent_page = retrieve_url(link, self.headers) matches = re.finditer(self.magnet_regex, torrent_page, re.MULTILINE) magnet_urls = [x.group() for x in matches] self.row['link'] = magnet_urls[0].split('"')[1] return def handle_data(self, data): if self.insideCell and self.foundRowCatlabe: if self.column == 1 and self.insideNameLink: self.row['name'] = data if self.column == 3: size = data.replace(',', '') self.row['size'] = size if self.column == 4: self.row['seeds'] = data if self.column == 5: self.row['leech'] = data return def handle_endtag(self, tag): if self.insideCell and self.insideNameLink and tag == self.A: self.insideNameLink = False if self.insideCell and tag == self.SPAN: self.insideCell = False if self.insideRow and tag == self.LI: if not self.foundTableHeading: self.foundTableHeading = True else: self.row['engine_url'] = self.url prettyPrinter(self.row) self.insideRow = False self.foundRowCatlabe = False self.column = 0 self.row = {} return def download_torrent(self, info): print(download_file(info)) def getPageUrl(self, what, category, page): return f'{self.url}/torrents/browse/{category}/{what}?page={page}' def hasResults(self, html): container_matches = re.finditer(self.container_regex, html, re.MULTILINE) container = [x.group() for x in container_matches] if len(container) > 0: containerHtml = container[0] items_matches = re.finditer(self.items_regex, containerHtml, re.MULTILINE) items = [x.group() for x in items_matches] self.has_results = len(items) > 1 else: self.has_results = False def threaded_search(self, page, what, cat): page_url = self.getPageUrl(what, cat, page) self.headers['Referer'] = page_url retrievedHtml = retrieve_url(page_url, self.headers) self.hasResults(retrievedHtml) parser = self.MyHtmlParser(self.url, self.headers) if self.has_results: parser.feed(retrievedHtml) parser.close() def search(self, what, cat='all'): parser = self.MyHtmlParser(self.url, self.headers) what = what.replace('%20', '+') what = what.replace(' ', '+') category = self.supported_categories[cat] page = 1 threads = [] while self.has_results: t = threading.Thread(args=(page, what, category), target=self.threaded_search) t.start() time.sleep(0.5) threads.append(t) page += 1 for t in threads: t.join()