import hashlib import html import os from datetime import datetime, timedelta from pathlib import Path from typing import Any, Union from urllib.parse import urlparse import helpers.main_helper as main_helper from apis.onlyfans.classes.auth_model import create_auth from apis.onlyfans.classes.hightlight_model import create_highlight from apis.onlyfans.classes.message_model import create_message from apis.onlyfans.classes.post_model import create_post from apis.onlyfans.classes.story_model import create_story from apis.onlyfans.classes.user_model import create_user from apis.onlyfans.onlyfans import start from classes.prepare_metadata import prepare_reformat from modules.module_streamliner import StreamlinedDatascraper class OnlyFansDataScraper(StreamlinedDatascraper): def __init__(self, api: start) -> None: self.api = api StreamlinedDatascraper.__init__(self, self) # Scrapes the API for content async def media_scraper( self, post_result: Union[create_story, create_post, create_message], subscription: create_user, formatted_directory: Path, api_type: str, ): authed = subscription.get_authed() api = authed.api site_settings = api.get_site_settings() if not site_settings: return new_set: dict[str, Any] = {} new_set["content"] = [] directories: list[Path] = [] if api_type == "Stories": pass if api_type == "Archived": pass if api_type == "Posts": pass if api_type == "Messages": pass download_path = formatted_directory model_username = subscription.username date_format = site_settings.date_format locations = self.media_types for media_type, alt_media_types in locations.__dict__.items(): date_today = datetime.now() master_date = datetime.strftime(date_today, "%d-%m-%Y %H:%M:%S") file_directory_format = site_settings.file_directory_format post_id = post_result.id new_post = {} new_post["medias"] = [] new_post["archived"] = False rawText = "" text = "" previews = [] date = None price = None if isinstance(post_result, create_story): date = post_result.createdAt if isinstance(post_result, create_post): if post_result.isReportedByMe: continue rawText = post_result.rawText text = post_result.text previews = post_result.preview date = post_result.postedAt price = post_result.price new_post["archived"] = post_result.isArchived if isinstance(post_result, create_message): if post_result.isReportedByMe: continue text = post_result.text previews = post_result.previews date = post_result.createdAt price = post_result.price if api_type == "Mass Messages": media_user = post_result.fromUser media_username = media_user.username if media_username != model_username: continue final_text = rawText if rawText else text if date == "-001-11-30T00:00:00+00:00": date_string = master_date date_object = datetime.strptime(master_date, "%d-%m-%Y %H:%M:%S") else: if not date: date = master_date date_object = datetime.fromisoformat(date) date_string = date_object.replace(tzinfo=None).strftime( "%d-%m-%Y %H:%M:%S" ) master_date = date_string new_post["post_id"] = post_id new_post["user_id"] = subscription.id if isinstance(post_result, create_message): new_post["user_id"] = post_result.fromUser.id new_post["text"] = final_text new_post["postedAt"] = date_string new_post["paid"] = False new_post["preview_media_ids"] = previews new_post["api_type"] = api_type new_post["price"] = 0 if price is None: price = 0 if price: if all(media["canView"] for media in post_result.media): new_post["paid"] = True else: print new_post["price"] = price for media in post_result.media: media_id = media["id"] preview_link = "" link = await post_result.link_picker(media, site_settings.video_quality) matches = ["us", "uk", "ca", "ca2", "de"] if not link: continue url = urlparse(link) if not url.hostname: continue subdomain = url.hostname.split(".")[0] if "files" in media: if "preview" in media["files"] and "url" in media["files"]["preview"]: preview_link = media["files"]["preview"]["url"] else: preview_link = media["preview"] if any(subdomain in nm for nm in matches): subdomain = url.hostname.split(".")[1] if "upload" in subdomain: continue if "convert" in subdomain: link = preview_link rules = [link == "", preview_link == ""] if all(rules): continue new_media: dict[str, Any] = dict() new_media["media_id"] = media_id new_media["links"] = [] new_media["media_type"] = media_type new_media["preview"] = False new_media["created_at"] = new_post["postedAt"] if isinstance(post_result, create_story): date_object = datetime.fromisoformat(media["createdAt"]) date_string = date_object.replace(tzinfo=None).strftime( "%d-%m-%Y %H:%M:%S" ) new_media["created_at"] = date_string if int(media_id) in new_post["preview_media_ids"]: new_media["preview"] = True for xlink in link, preview_link: if xlink: new_media["links"].append(xlink) break if media["type"] not in alt_media_types: continue matches = [s for s in site_settings.ignored_keywords if s in final_text] if matches: print("Ignoring - ", f"PostID: {post_id}") continue filename = link.rsplit("/", 1)[-1] filename, ext = os.path.splitext(filename) ext = ext.__str__().replace(".", "").split("?")[0] final_api_type = ( os.path.join("Archived", api_type) if new_post["archived"] else api_type ) option: dict[str, Any] = {} option = option | new_post option["site_name"] = api.site_name option["media_id"] = media_id option["filename"] = filename option["api_type"] = final_api_type option["media_type"] = media_type option["ext"] = ext option["profile_username"] = authed.username option["model_username"] = model_username option["date_format"] = date_format option["postedAt"] = new_media["created_at"] option["text_length"] = site_settings.text_length option["directory"] = download_path option["preview"] = new_media["preview"] option["archived"] = new_post["archived"] prepared_format = prepare_reformat(option) file_directory = await prepared_format.reformat_2(file_directory_format) prepared_format.directory = file_directory file_path = await prepared_format.reformat_2( site_settings.filename_format ) new_media["directory"] = os.path.join(file_directory) new_media["filename"] = os.path.basename(file_path) if file_directory not in directories: directories.append(file_directory) new_media["linked"] = None for k, v in subscription.temp_scraped: if k == api_type: continue if k == "Archived": v = getattr(v, api_type, []) if v: for post in v: found_medias = [] medias = post.media if medias: for temp_media in medias: temp_filename = temp_media.get("filename") if temp_filename: if temp_filename == new_media["filename"]: found_medias.append(temp_media) else: continue # found_medias = [x for x in medias # if x["filename"] == new_media["filename"]] if found_medias: for found_media in found_medias: found_media["linked"] = api_type new_media["linked"] = post["api_type"] new_media[ "filename" ] = f"linked_{new_media['filename']}" print print print print new_post["medias"].append(new_media) found_post = [x for x in new_set["content"] if x["post_id"] == post_id] if found_post: found_post = found_post[0] found_post["medias"] += new_post["medias"] else: new_set["content"].append(new_post) new_set["directories"] = directories return new_set async def process_mass_messages( authed: create_auth, mass_messages: list[create_message] ): def compare_message(queue_id, remote_messages): for message in remote_messages: if "isFromQueue" in message and message["isFromQueue"]: if queue_id == message["queueId"]: return message print print global_found = [] chats = [] api = authed.get_api() site_settings = api.get_site_settings() config = api.config if not (config and site_settings): return settings = config.settings salt = settings.random_string encoded = f"{salt}" encoded = encoded.encode("utf-8") hash = hashlib.md5(encoded).hexdigest() profile_directory = authed.directory_manager.profile.metadata_directory mass_message_path = profile_directory.joinpath("Mass Messages.json") chats_path = profile_directory.joinpath("Chats.json") if os.path.exists(chats_path): chats = main_helper.import_json(chats_path) date_object = datetime.today() date_string = date_object.strftime("%d-%m-%Y %H:%M:%S") for mass_message in mass_messages: if "status" not in mass_message: mass_message["status"] = "" if "found" not in mass_message: mass_message["found"] = {} if "hashed_ip" not in mass_message: mass_message["hashed_ip"] = "" mass_message["hashed_ip"] = mass_message.get("hashed_ip", hash) mass_message["date_hashed"] = mass_message.get("date_hashed", date_string) if mass_message["isCanceled"]: continue queue_id = mass_message["id"] text = mass_message["textCropped"] text = html.unescape(text) mass_found = mass_message["found"] media_type = mass_message.get("mediaType") media_types = mass_message.get("mediaTypes") if mass_found or (not media_type and not media_types): continue identifier = None if chats: list_chats = chats for chat in list_chats: identifier = chat["identifier"] messages = chat["messages"]["list"] mass_found = compare_message(queue_id, messages) if mass_found: mass_message["found"] = mass_found mass_message["status"] = True break if not mass_found: list_chats = authed.search_messages(text=text, limit=2) if not list_chats: continue for item in list_chats["list"]: user = item["withUser"] identifier = user["id"] messages = [] print("Getting Messages") keep = ["id", "username"] list_chats2 = [x for x in chats if x["identifier"] == identifier] if list_chats2: chat2 = list_chats2[0] messages = chat2["messages"]["list"] messages = authed.get_messages( identifier=identifier, resume=messages ) for message in messages: message["withUser"] = {k: item["withUser"][k] for k in keep} message["fromUser"] = { k: message["fromUser"][k] for k in keep } mass_found = compare_message(queue_id, messages) if mass_found: mass_message["found"] = mass_found mass_message["status"] = True break else: item2 = {} item2["identifier"] = identifier item2["messages"] = authed.get_messages(identifier=identifier) chats.append(item2) messages = item2["messages"]["list"] for message in messages: message["withUser"] = {k: item["withUser"][k] for k in keep} message["fromUser"] = { k: message["fromUser"][k] for k in keep } mass_found = compare_message(queue_id, messages) if mass_found: mass_message["found"] = mass_found mass_message["status"] = True break print print print if not mass_found: mass_message["status"] = False main_helper.export_json(chats, chats_path) for mass_message in mass_messages: found = mass_message["found"] if found and found["media"]: user = found["withUser"] identifier = user["id"] print date_hashed_object = datetime.strptime( mass_message["date_hashed"], "%d-%m-%Y %H:%M:%S" ) next_date_object = date_hashed_object + timedelta(days=1) print if mass_message["hashed_ip"] != hash or date_object > next_date_object: print("Getting Message By ID") x = await authed.get_message_by_id( identifier=identifier, identifier2=found["id"], limit=1 ) new_found = x["result"]["list"][0] new_found["withUser"] = found["withUser"] mass_message["found"] = new_found mass_message["hashed_ip"] = hash mass_message["date_hashed"] = date_string global_found.append(found) print print main_helper.export_json(mass_messages, mass_message_path) return global_found async def get_all_stories(self, subscription: create_user): master_set: list[create_highlight | create_story] = [] master_set.extend(await subscription.get_stories()) master_set.extend(await subscription.get_archived_stories()) highlights = await subscription.get_highlights() valid_highlights: list[create_highlight | create_story] = [] for highlight in highlights: highlight = await subscription.get_highlights(hightlight_id=highlight.id) valid_highlights.extend(highlight) master_set.extend(valid_highlights) return master_set async def get_all_subscriptions( self, authed: create_auth, identifiers: list[int | str] = [], refresh: bool = True, ): results = await authed.get_subscriptions( identifiers=identifiers, refresh=refresh ) results.sort(key=lambda x: x.subscribedByData["expiredAt"]) return results