{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "private_outputs": true, "provenance": [], "authorship_tag": "ABX9TyN+X7HKQL6ujawLQnVIkp/G", "include_colab_link": true }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "markdown", "metadata": { "id": "view-in-github", "colab_type": "text" }, "source": [ "\"Open" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "byuTeAlxpoWs" }, "outputs": [], "source": [ "!pip install beautifulsoup4 google-api-python-client google-auth google-auth-httplib2 httplib2 requests\n", "!pip install UserAgent\n", "!pip install newspaper3k\n", "!pip install openai\n", "!pip install transformers\n" ] }, { "cell_type": "code", "source": [ "import requests\n", "import requests.exceptions\n", "from concurrent.futures import ThreadPoolExecutor, as_completed\n", "from collections import deque\n", "from bs4 import BeautifulSoup\n", "from urllib.parse import urljoin\n", "import pandas as pd\n", "import newspaper\n", "from newspaper import Article\n", "import openai\n", "import transformers\n", "from transformers import GPT2Tokenizer\n", "import time\n", "\n", "\n", "openai.api_key = \"Your OpenAI Key\"\n", "\n", "\n", "def truncate_string_to_max_tokens(input_string, max_tokens=1700):\n", " # Tokenize the input string\n", " #tokens = tokenizer.tokenize(input_string)\n", "\n", " # Truncate the tokens to a maximum of 2000 tokens\n", " truncated_string = input_string[:max_tokens]\n", "\n", " # Convert the truncated tokens back to a string\n", " #truncated_string = tokenizer.convert_tokens_to_string(truncated_tokens)\n", "\n", " return truncated_string\n", "\n", "\n", "\n", "\n", "def process_url(current_url, base_url):\n", " data = []\n", " try:\n", " response = requests.get(current_url)\n", " soup = BeautifulSoup(response.content, 'html.parser')\n", " title_tag = soup.find('title')\n", " page_title = title_tag.text.strip() if title_tag else \"No Title\"\n", " print(page_title)\n", "\n", " for link in soup.find_all('a'):\n", " href = link.get('href')\n", " if href and not href.startswith('#') and not href.startswith('mailto:'):\n", " full_url = urljoin(base_url, href)\n", " if base_url in full_url:\n", " try:\n", " article = Article(full_url)\n", " article.download()\n", " article.parse()\n", " content = article.text\n", " except Exception as e:\n", " print(f\"Error downloading or parsing page: {full_url}\")\n", " content = ''\n", " content = truncate_string_to_max_tokens(content,max_tokens=2000)\n", " # Get recommended anchor text from GPT-4\n", " prompt = f\"Content to analyze: \\n ### {content} ### \\n Recommended Anchor Text:\"\n", " gpt_response = openai.ChatCompletion.create(\n", " model=\"gpt-3.5-turbo\",\n", " messages=[\n", " {\n", " \"role\": \"system\",\n", " \"content\": \"\"\" Based on the following content, suggest an appropriate anchor text with a focus on SEO, intent, and clickability.\n", " \"\"\"\n", " },\n", " {\n", " \"role\": \"user\",\n", " \"content\": f\"{prompt}\"\n", " }\n", " ],\n", " max_tokens=20,\n", " n=1,\n", " stop=None,\n", " temperature=0.5,\n", " )\n", " anchor_text_recommendation = gpt_response[\"choices\"][0][\"message\"][\"content\"].strip()\n", " print(anchor_text_recommendation)\n", " entry = {\n", " 'url': full_url,\n", " 'anchor_text': link.text.strip(),\n", " 'page_title': page_title,\n", " 'linked_content': content,\n", " 'recommended_anchor_text': anchor_text_recommendation\n", " }\n", " data.append(entry)\n", "\n", " # Save to CSV\n", " df_entry = pd.DataFrame([entry])\n", " with open('output.csv', mode='a') as f:\n", " df_entry.to_csv(f, header=False, index=False)\n", "\n", " except requests.exceptions.SSLError:\n", " pass\n", "\n", " return data\n", "\n", "\n", "def crawl_website(base_url, max_pages=None):\n", " visited_links = set()\n", " links_to_visit = deque([base_url])\n", " data = []\n", " pages_crawled = 0\n", "\n", " # Initialize CSV output file with headers\n", " df_header = pd.DataFrame(columns=['url', 'anchor_text', 'page_title', 'linked_content', 'recommended_anchor_text'])\n", " df_header.to_csv('output.csv', index=False)\n", "\n", " with ThreadPoolExecutor(max_workers=5) as executor:\n", " while links_to_visit and (max_pages is None or pages_crawled < max_pages):\n", " futures = []\n", " while links_to_visit and (max_pages is None or pages_crawled < max_pages):\n", " current_url = links_to_visit.popleft()\n", " if current_url not in visited_links:\n", " visited_links.add(current_url)\n", " futures.append(executor.submit(process_url, current_url, base_url))\n", " pages_crawled += 1\n", "\n", " for future in as_completed(futures):\n", " try:\n", " new_data = future.result()\n", " for entry in new_data:\n", " full_url = entry['url']\n", " if full_url not in visited_links:\n", " links_to_visit.append(full_url)\n", " data.append(entry)\n", " except requests.exceptions.SSLError:\n", " pass\n", "\n", " return data\n", "\n", "def main():\n", " base_url = 'https://frac.tl'\n", " max_crawl_limit = 3\n", " data = crawl_website(base_url, max_pages=max_crawl_limit)\n", "\n", " # Deduplicate data and drop blanks\n", " deduped_data = []\n", " unique_data = set()\n", " for entry in data:\n", " unique_key = (entry['url'], entry['anchor_text'], entry['page_title'])\n", " if unique_key not in unique_data and entry['anchor_text']:\n", " deduped_data.append(entry)\n", " unique_data.add(unique_key)\n", "\n", " df = pd.DataFrame(deduped_data)\n", " print(df)\n", " return df\n", "\n", "result = main()" ], "metadata": { "id": "fI_P5xQBvEJQ" }, "execution_count": null, "outputs": [] } ] }