\d+)"[^>]*>.*?'
r']*class="genmed"[^>]*href="\.\/viewtopic\.php\?t=(?P\d+)"[^>]*>'
r'(?:
]*>\s* )?(?P.+?).*?'
r']*class="small tr-dl"[^>]*href="\./download\.php\?id=(?P\d+)">(?P[\d\., KMGTBkmg]+).*?'
r']*class="row4 seedmed"[^>]*>\s*(?P[-\d]+).*?'
r' | ]*class="row4 leechmed"[^>]*>\s*(?P[-\d]+).*?'
r' | ]*class="row4 small nowrap"[^>]*>\s*(?P\d+)',
re.S | re.I
)
RE_RESULTS = re.compile(r'Результатов\sпоиска:\s(\d{1,6})', re.S)
PATTERNS = ("%stracker.php?nm=%s&%s", "%s&start=%s")
PAGES = 50
ICON = ("iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAC10lEQVQ4jW2TX4hVZRTFf3t/3zn3zFznoolNTTkTZmH/KLKoFB9KJIViKAQftKiXoOjFgsweTJrIl+qhIhBDE6awEjQfJCkoh3DSdAxTuoMpaJZ0G6f5c2fm3nvO+XYPd7oYtNl7v63F2ou15ZbV/cteePr6/kVdnV15MINm/29Jc8XO6adfDR3xhesel50HBitzOuYv2DdwlkLsUBFUBBNBrAlq0hpmhgVjqpayY9OjPPHS9o/9rTdeW9qye4jRao3IKyqKSFNE5BxZCAhgZoRZgompOnu/Heaum7sXax7Miu0R7YWIQhyThoAhjE2lHH5vLZvX38/kTEpmYAhJIWpNnofgMSNyHvWOPAQObuvl4LELtCceFWHlvQs5/cgSepcv4syFEd794iTeO1QVDLwBLlK8E8wc5/6c5NnVt7d8KyYRr224D4Avj5wniT01yVEVABTAqeLV8Xe1zsRkA4CJqTqbPhrkwwM/t8guXZmmWsuJnLYIPICqUOpI+Pz1NczrKACwedePXKxUOTr8F92dJR57oIf3X1xB+eIoG7Z9g4hiGIqBU4cIVMamWxkoFWNEFXWOYuIBqDUyKuM14mhWQcsDrzRmAs+8M8Cr6+6h96Ee3nhqKbu+PkvX/HYevvsGAF7e/gPnLo8zb04bKled4ETw6lkw13PnTXMBiLzjuTVL/hPEZXd0cnlsmkYjZxaPgiEqiAqxF9LM2HFomE++Ow9AtZbSt+cUg+UKUeRwIqiCXK1AVHAqFGLPlv4hkkJMPQ30PtjN0fII5Uvj/PLbKCFAWxLRyHL+leC9czpTz6inATQQRw6nQqkYs3HnCQxoSzxpQ0gtI00D9TSQZQGneH+8/MfIK+uWdg2c+p0k8TjncE5b2QgELA+kWSC3QJ4GZhoZTy7voe/Nt38V9JpVz2/94K3bFnd35nlmACKGMftRAgTBNCCmIM3o79l/6KfvP+vb+A+G/CMLnWXurwAAAABJRU5ErkJggg==")
# setup logging
logging.basicConfig(
filemode="w",
filename=FILE_L,
format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
datefmt="%m-%d %H:%M",
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
def rng(t: int) -> range:
return range(PAGES, -(-t // PAGES) * PAGES, PAGES)
class EngineError(Exception):
...
@dataclass
class Config:
username: str = "USERNAME"
password: str = "PASSWORD"
cookies: str = "COOKIES"
proxy: bool = False
proxies: dict = field(default_factory=lambda: {"http": "", "https": ""})
ua: str = ("Mozilla/5.0 (X11; Linux i686; rv:38.0) Gecko/20100101 "
"Firefox/38.0 ")
def __post_init__(self):
try:
if not self._validate_json(json.loads(FILE_J.read_text())):
raise ValueError("Incorrect json scheme.")
except Exception as e:
logger.error(e)
FILE_J.write_text(self.to_str())
(BASEDIR / f"{FILENAME}.ico").write_bytes(base64.b64decode(ICON))
def to_str(self) -> str:
return json.dumps(self.to_dict(), indent=4)
def to_dict(self) -> dict:
return {self._to_camel(k): v for k, v in self.__dict__.items()}
def _validate_json(self, obj: dict) -> bool:
is_valid = True
for k, v in self.__dict__.items():
_val = obj.get(self._to_camel(k))
if type(_val) is not type(v):
is_valid = False
continue
if type(_val) is dict:
for dk, dv in v.items():
if type(_val.get(dk)) is not type(dv):
_val[dk] = dv
is_valid = False
setattr(self, k, _val)
return is_valid
@staticmethod
def _to_camel(s: str) -> str:
return "".join(x.title() if i else x
for i, x in enumerate(s.split("_")))
config = Config()
class Tapochek:
name = "Tapochek"
url = "https://tapochek.net/"
url_dl = url + "download.php?id="
supported_categories = {"all": "-1"} # not used, but kept for compatibility
# cookies
mcj = MozillaCookieJar()
# establish connection
session = build_opener(HTTPCookieProcessor(mcj))
def search(self, what: str, cat: str = "all") -> None:
self._catch_errors(self._search, what, cat)
def download_torrent(self, url: str) -> None:
self._catch_errors(self._download_torrent, url)
def login(self) -> None:
"""
Try cookies from config first, if not - attempt login with username/password.
After that save cookies to local .cookie file.
"""
# clear local in-memory jar
self.mcj.clear()
# try cookies string from config
if config.cookies != "COOKIES" and config.cookies:
for cookie in config.cookies.split("; "):
if "=" not in cookie:
continue
name, value = cookie.split("=", 1)
# domain set to tapochek.net
self.mcj.set_cookie(Cookie(
0, name, value, None, False, "tapochek.net", True, False,
"/", True, False, None, False, None, None, {}
))
logger.debug(f"That we have: {[cookie for cookie in self.mcj]}")
# quick check
if self._check_login_page():
self.mcj.save(str(FILE_C), ignore_discard=True, ignore_expires=True)
logger.info("We successfully authorized with provided cookies")
return
# if cookies not valid or not provided, use username/password POST
if config.username == "USERNAME" and config.password == "PASSWORD":
raise EngineError("Empty credentials in config file")
# prepare POST
form = {
"login_username": config.username,
"login_password": config.password,
"autologin": "1",
"login": "Вход"
}
data = urllib.parse.urlencode(form, encoding="cp1251").encode()
logger.debug(f"Login. Data before: {form}")
# request login page
resp = self.session.open(self.url + "login.php", data)
logger.debug(f"Login response URL: {resp.geturl()}")
# check login
if not self._check_login_page():
logger.debug(f"Cookies after login attempt: {[cookie for cookie in self.mcj]}")
raise EngineError("We not authorized, please check your credentials!")
self.mcj.save(str(FILE_C), ignore_discard=True, ignore_expires=True)
logger.info("We successfully authorized")
def _check_login_page(self) -> bool:
try:
page = self._request(self.url).decode("cp1251", "ignore")
return "ucp.php?mode=logout" in page or "Выход" in page
except Exception:
return False
def searching(self, query: str, first: bool = False) -> int:
page, torrents_found = self._request(query).decode("cp1251", "ignore"), -1
if first:
# check login status already done in login
match = RE_RESULTS.search(page)
# optional: if RE_RESULTS not found, still try to parse rows
if match is None:
# if no explicit results counter - set unknown (-1) but continue parsing
logger.debug("Results counter not found, will try to parse page")
torrents_found = -1
else:
torrents_found = int(match.group(1))
if torrents_found <= 0:
return 0
self.draw(page)
return torrents_found
def draw(self, html: str) -> None:
TAG_RE = re.compile(r'<[^>]+>')
for tor in RE_TORRENTS.finditer(html):
try:
tor_id = tor.group("tor_id")
name = unescape(TAG_RE.sub("",tor.group("name"))).strip()
dl_id = tor.group("dl_id")
link = self.url_dl + dl_id if dl_id else self.url
size_text = tor.group("size_text").replace(" ", " ").strip()
seeds = int(tor.group("seeds"))
leech = int(tor.group("leech"))
pub_date = int(tor.group("pub_date")) if tor.group("pub_date") else int(time.time())
prettyPrinter({
"link": link,
"name": name,
"size": size_text,
"seeds": seeds,
"leech": leech,
"engine_url": self.url,
"desc_link": f"{self.url}viewtopic.php?t={tor.group('topic_id')}",
"pub_date": int(time.mktime(time.localtime(int(pub_date))))
})
except Exception as e:
logger.exception("Failed to parse torrent row: %s", e)
def _catch_errors(self, handler: Callable, *args: str):
try:
self._init()
handler(*args)
except EngineError as ex:
logger.exception(ex)
self.pretty_error(args[0], str(ex))
except Exception as ex:
self.pretty_error(args[0] if args else "", "Unexpected error, please check logs")
logger.exception(ex)
def _init(self) -> None:
# add proxy handler if needed
if config.proxy:
if not any(config.proxies.values()):
raise EngineError("Proxy enabled, but not set!")
# socks5 support
for proxy_str in config.proxies.values():
if not proxy_str:
continue
if not proxy_str.lower().startswith("socks"):
continue
url = urlparse(proxy_str)
socks.set_default_proxy(
socks.PROXY_TYPE_SOCKS5,
url.hostname,
url.port,
True,
url.username,
url.password
)
socket.socket = socks.socksocket # type: ignore
break
else:
self.session.add_handler(ProxyHandler(config.proxies))
logger.debug("Proxy is set!")
# change user-agent
self.session.addheaders = [("User-Agent", config.ua)]
# load local cookies
try:
self.mcj.load(str(FILE_C), ignore_discard=True)
# quick check for some session cookie presence
if any("bb" in cookie.name or "phpbb" in cookie.name or "session" in cookie.name for cookie in self.mcj):
return logger.info("Local cookies loaded")
logger.info("Local cookies expired or bad, try to login")
logger.debug(f"That we have: {[cookie for cookie in self.mcj]}")
except FileNotFoundError:
logger.info("Local cookies not exists, try to login")
# perform login procedure
return self.login()
def _search(self, what: str, cat: str = "all") -> None:
c = self.supported_categories.get(cat, "-1")
query = PATTERNS[0] % (self.url, quote(unquote(what)), "f=-1" if c == "-1" else "c=" + c)
# first request
t0, total = time.time(), self.searching(query, True)
# if results are more than page size - request more in parallel
if total and total > PAGES:
qrs = [PATTERNS[1] % (query, x) for x in rng(total)]
with ThreadPoolExecutor(len(qrs)) as executor:
executor.map(self.searching, qrs, timeout=30)
logger.debug(f"--- {time.time() - t0} seconds ---")
logger.info(f"Found torrents: {total}")
def _download_torrent(self, url: str) -> None:
response = self._request(url)
with NamedTemporaryFile(suffix=".torrent", delete=False) as fd:
fd.write(response)
logger.debug(fd.name + " " + url)
print(fd.name + " " + url)
def _request(
self,
url: str,
data: Optional[bytes] = None,
repeated: bool = False,
) -> bytes:
try:
with self.session.open(url, data, 5) as r:
# checking that tracker isn't blocked
if r.geturl().startswith((self.url, self.url_dl)):
return r.read()
raise EngineError(f"{url} is blocked. Try another proxy.")
except (URLError, HTTPError) as err:
logger.debug(err)
error = str(getattr(err, "reason", err))
reason = f"{url} is not response! Maybe it is blocked."
if "timed out" in error:
if not repeated:
logger.debug("Request timed out. Repeating...")
return self._request(url, data, True)
reason = "Request timed out"
if "no host given" in error:
reason = "Proxy is bad, try another!"
elif hasattr(err, "code"):
reason = f"Request to {url} failed with status: {err.code}"
raise EngineError(reason)
def pretty_error(self, what: str, error: str) -> None:
prettyPrinter({
"engine_url": self.url,
"desc_link": f"file://{FILE_L}",
"name": f"[{unquote(what)}][Error]: {error}",
"link": self.url + "error",
"size": "1 TB",
"seeds": 100,
"leech": 100,
"pub_date": int(time.time())
})
# pep8
tapochek = Tapochek
if __name__ == "__main__":
if BASEDIR.parent.joinpath("settings_gui.py").exists():
from settings_gui import EngineSettingsGUI
EngineSettingsGUI(FILENAME)
engine = tapochek()
engine.search("doctor")
|