{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Week 2: arXiv API Integration & PDF Processing\n", "\n", "**What We're Building This Week:**\n", "\n", "Week 2 focuses on implementing the core data ingestion pipeline that will automatically fetch, process, and store arXiv papers. This is the foundation that feeds our RAG system with fresh academic content.\n", "\n", "## Week 2 Focus Areas\n", "\n", "### šŸŽÆ Core Objectives\n", "- **arXiv API Integration**: Build a robust client with rate limiting and retry logic\n", "- **PDF Processing Pipeline**: Download and parse scientific PDFs with structured content extraction\n", "- **Database Storage**: Persist paper metadata and content in PostgreSQL\n", "- **Error Handling**: Implement comprehensive error handling and graceful degradation\n", "- **Automation Ready**: Prepare components for Airflow orchestration\n", "\n", "### šŸ”§ What We'll Test In This Notebook\n", "1. **arXiv API Client** - Fetch CS.AI papers with proper rate limiting\n", "2. **PDF Download System** - Download and cache PDFs with error handling \n", "3. **Docling PDF Parser** - Extract structured content (sections, tables, figures)\n", "4. **Database Integration** - Store and retrieve papers from PostgreSQL\n", "5. **Complete Pipeline** - End-to-end processing from arXiv to database\n", "6. **Production Readiness** - Error handling, logging, and performance metrics\n", "\n", "\n", "### šŸ“Š Success Metrics\n", "- arXiv API calls succeed with proper rate limiting\n", "- PDF download and caching works reliably \n", "- Docling extracts structured content from scientific PDFs\n", "- Database stores complete paper metadata\n", "- Pipeline handles errors gracefully and continues processing\n", "- All components ready for Airflow automation (Week 2+)\n", "\n", "---\n", "\n", "## Week 2 Component Status\n", "| Component | Purpose | Status |\n", "|-----------|---------|--------|\n", "| **arXiv API Client** | Fetch CS.AI papers with rate limiting | āœ… Complete |\n", "| **PDF Downloader** | Download and cache PDFs locally | āœ… Complete |\n", "| **Docling Parser** | Extract structured content from PDFs | āœ… Complete |\n", "| **Metadata Fetcher** | Orchestrate complete pipeline | āœ… Complete |\n", "| **Database Storage** | Store papers in PostgreSQL | āš ļø Needs volume refresh |\n", "| **Airflow DAGs** | Automate daily ingestion | āš ļø Needs container update |\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## āš ļø IMPORTANT: Week 2 Database Schema Update\n", "\n", "**NEW USERS OR SCHEMA CONFLICTS**: If you're starting Week 2 fresh or experiencing database schema conflicts, use this clean start approach:\n", "\n", "### Fresh Start (Recommended for Week 2)\n", "```bash\n", "# Complete clean slate - removes all data but ensures correct schema\n", "docker compose down -v\n", "\n", "# Build fresh containers with latest code\n", "docker compose up --build -d\n", "```\n", "\n", "**When to use this:**\n", "- First time running Week 2 \n", "- Schema errors or column missing errors\n", "- Want to start with clean database\n", "- Previous Week 1 data not important\n", "\n", "**Note**: This destroys existing data but ensures you have the correct Week 2 schema with all new columns for PDF processing and arXiv metadata.\n", "\n", "---\n", "\n", "## Prerequisites Check\n", "\n", "**Before starting:**\n", "1. Week 1 infrastructure completed\n", "2. UV environment activated\n", "3. Docker Desktop running\n", "\n", "**Why fresh containers?** Week 2 includes new Airflow dependencies and code changes that require rebuilding images rather than using cached layers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check if Fresh Containers are Built and All Services Healthy\n", "import subprocess\n", "import requests\n", "from pathlib import Path\n", "\n", "print(\"WEEK 2 CONTAINER & SERVICE HEALTH CHECK\")\n", "print(\"=\" * 50)\n", "\n", "# Find project root\n", "current_dir = Path.cwd()\n", "if current_dir.name == \"week2\" and current_dir.parent.name == \"notebooks\":\n", " project_root = current_dir.parent.parent\n", "elif (current_dir / \"compose.yml\").exists():\n", " project_root = current_dir\n", "else:\n", " print(\"āœ— Could not find project root\")\n", " exit()\n", "\n", "print(f\"Project root: {project_root}\")\n", "\n", "# Step 1: Check if containers are built and running\n", "print(\"\\n1. Checking container status...\")\n", "try:\n", " result = subprocess.run(\n", " [\"docker\", \"compose\", \"ps\", \"--format\", \"table\"],\n", " cwd=str(project_root),\n", " capture_output=True,\n", " text=True,\n", " timeout=10\n", " )\n", " \n", " if result.returncode == 0 and result.stdout.strip():\n", " print(\"āœ“ Containers are running:\")\n", " for line in result.stdout.strip().split('\\n'):\n", " print(f\" {line}\")\n", " else:\n", " print(\"āœ— No containers running or docker compose failed\")\n", " print(\"Please run the build commands from the markdown cell above\")\n", " exit()\n", " \n", "except Exception as e:\n", " print(f\"āœ— Error checking containers: {e}\")\n", " print(\"Please run the build commands from the markdown cell above\")\n", " exit()\n", "\n", "# Step 2: Check all service health (corrected endpoints)\n", "print(\"\\n2. Checking service health...\")\n", "services_to_test = {\n", " \"FastAPI\": \"http://localhost:8000/api/v1/health\",\n", " \"PostgreSQL (via API)\": \"http://localhost:8000/api/v1/health\", \n", " \"Ollama\": \"http://localhost:11434/api/version\",\n", " \"OpenSearch\": \"http://localhost:9200/_cluster/health\",\n", " \"Airflow\": \"http://localhost:8080/health\"\n", "}\n", "\n", "all_healthy = True\n", "for service_name, url in services_to_test.items():\n", " try:\n", " response = requests.get(url, timeout=5)\n", " if response.status_code == 200:\n", " print(f\"āœ“ {service_name}: Healthy\")\n", " else:\n", " print(f\"āœ— {service_name}: HTTP {response.status_code}\")\n", " all_healthy = False\n", " except requests.exceptions.ConnectionError:\n", " print(f\"āœ— {service_name}: Not accessible\")\n", " all_healthy = False\n", " except Exception as e:\n", " print(f\"āœ— {service_name}: {type(e).__name__}\")\n", " all_healthy = False\n", "\n", "print(\"\\n\" + \"=\" * 50)\n", "if all_healthy:\n", " print(\"āœ“ ALL SERVICES HEALTHY! Ready for Week 2 development.\")\n", "else:\n", " print(\"āœ— Some services need attention.\")\n", " print(\"If you just rebuilt containers, wait 1-2 minutes and run this cell again.\")\n", " print(\"Airflow and OpenSearch take longest to start up.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Environment Check\n", "import sys\n", "from pathlib import Path\n", "\n", "print(f\"Python Version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\")\n", "print(f\"Environment: {sys.executable}\")\n", "\n", "# Find project root\n", "current_dir = Path.cwd()\n", "if current_dir.name == \"week2\" and current_dir.parent.name == \"notebooks\":\n", " project_root = current_dir.parent.parent\n", "elif (current_dir / \"compose.yml\").exists():\n", " project_root = current_dir\n", "else:\n", " project_root = None\n", "\n", "if project_root and (project_root / \"compose.yml\").exists():\n", " print(f\"āœ“ Project root: {project_root}\")\n", " # Add project to Python path\n", " sys.path.insert(0, str(project_root))\n", "else:\n", " print(\"āœ— Missing compose.yml - check directory\")\n", " exit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Service Health Verification\n", "\n", "Ensure all services from Week 1 are still running correctly:\n", "\n", "### šŸ”— Service Access Points\n", "- **FastAPI**: http://localhost:8000/docs (API documentation)\n", "- **PostgreSQL**: via API or `docker exec -it rag-postgres psql -U rag_user -d rag_db`\n", "- **OpenSearch**: http://localhost:9200/_cluster/health\n", "- **Ollama**: http://localhost:11434 (LLM service)\n", "- **Airflow**: http://localhost:8080 (Username: `admin`, Password: `admin`)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Service Connectivity\n", "import requests\n", "import subprocess\n", "import json\n", "\n", "services_to_test = {\n", " \"FastAPI\": \"http://localhost:8000/api/v1/health\",\n", " \"PostgreSQL (via API)\": \"http://localhost:8000/api/v1/health\", \n", " \"Ollama\": \"http://localhost:11434/api/version\",\n", " \"OpenSearch\": \"http://localhost:9200/_cluster/health\",\n", " \"Airflow\": \"http://localhost:8080/health\" \n", "}\n", "\n", "print(\"WEEK 2 PREREQUISITE CHECK\")\n", "print(\"=\" * 50)\n", "\n", "all_healthy = True\n", "\n", "for service_name, url in services_to_test.items():\n", " try:\n", " response = requests.get(url, timeout=5)\n", " if response.status_code == 200:\n", " print(f\"āœ“ {service_name}: Healthy\")\n", " else:\n", " print(f\"āœ— {service_name}: HTTP {response.status_code}\")\n", " all_healthy = False\n", " except requests.exceptions.ConnectionError:\n", " print(f\"āœ— {service_name}: Not accessible\")\n", " all_healthy = False\n", " except Exception as e:\n", " print(f\"āœ— {service_name}: {type(e).__name__}\")\n", " all_healthy = False\n", "\n", "print()\n", "if all_healthy:\n", " print(\"All services healthy! Ready for Week 2 development.\")\n", "else:\n", " print(\"Some services need attention. Check Week 1 notebook.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. arXiv API Client Testing\n", "\n", "Test the arXiv API client with rate limiting and retry logic:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test arXiv API Client\n", "import asyncio\n", "from datetime import datetime, timedelta\n", "\n", "# Import our arXiv client\n", "from src.services.arxiv.factory import make_arxiv_client\n", "\n", "print(\"TESTING ARXIV API CLIENT\")\n", "print(\"=\" * 40)\n", "\n", "# Create client\n", "arxiv_client = make_arxiv_client()\n", "print(f\"āœ“ Client created: {arxiv_client.base_url}\")\n", "print(f\" Rate limit: {arxiv_client.rate_limit_delay}s\")\n", "print(f\" Max results: {arxiv_client.max_results}\")\n", "print(f\" Category: {arxiv_client.search_category}\")\n", "print()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Paper Fetching\n", "async def test_paper_fetching():\n", " \"\"\"Test fetching papers from arXiv with rate limiting.\"\"\"\n", " \n", " print(\"Test 1: Fetch Recent CS.AI Papers\")\n", " try:\n", " papers = await arxiv_client.fetch_papers(\n", " max_results=2, \n", " sort_by=\"submittedDate\",\n", " sort_order=\"descending\"\n", " )\n", " \n", " print(f\"āœ“ Fetched {len(papers)} papers\")\n", " \n", " if papers:\n", " for i, paper in enumerate(papers[:2], 1):\n", " print(f\" {i}. [{paper.arxiv_id}] {paper.title[:60]}...\")\n", " print(f\" Authors: {', '.join(paper.authors[:2])}{'...' if len(paper.authors) > 2 else ''}\")\n", " print(f\" Categories: {', '.join(paper.categories)}\")\n", " print(f\" Published: {paper.published_date}\")\n", " print()\n", " \n", " return papers\n", " \n", " except Exception as e:\n", " print(f\"āœ— Error fetching papers: {e}\")\n", " if \"503\" in str(e):\n", " print(\" arXiv API temporarily unavailable (normal)\")\n", " print(\" Rate limiting and error handling working correctly\")\n", " return []\n", "\n", "# Run the test\n", "papers = await test_paper_fetching()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Date Filtering\n", "async def test_date_filtering():\n", " \"\"\"Test date range filtering functionality.\"\"\"\n", " \n", " print(\"Test 2: Date Range Filtering\")\n", " \n", " # Use specific dates: \n", " from_date = \"20250808\" \n", " to_date = \"20250809\" \n", " try:\n", " date_papers = await arxiv_client.fetch_papers(\n", " max_results=5,\n", " from_date=from_date,\n", " to_date=to_date\n", " )\n", " \n", " print(f\"āœ“ Date filtering test: {len(date_papers)} papers from {from_date}-{to_date}\")\n", " \n", " if date_papers:\n", " for i, paper in enumerate(date_papers, 1):\n", " print(f\" {i}. [{paper.arxiv_id}] {paper.title[:60]}...\")\n", " print(f\" Authors: {', '.join(paper.authors[:2])}{'...' if len(paper.authors) > 2 else ''}\")\n", " print(f\" Categories: {', '.join(paper.categories)}\")\n", " print(f\" Published: {paper.published_date}\")\n", " print()\n", " \n", " return date_papers\n", " \n", " except Exception as e:\n", " print(f\"āœ— Date filtering error: {e}\")\n", " return []\n", "\n", "# Run date filtering test\n", "date_papers = await test_date_filtering()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. PDF Download and Caching\n", "\n", "Test PDF download functionality with caching:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test PDF Download\n", "async def test_pdf_download(test_papers):\n", " \"\"\"Test PDF downloading with caching.\"\"\"\n", "\n", " print(\"Test 3: PDF Download & Caching\")\n", " \n", " if not test_papers:\n", " print(\"No papers available for PDF download test\")\n", " return None\n", " \n", " # Test with first paper\n", " test_paper = test_papers[0]\n", " print(f\"Testing PDF download for: {test_paper.arxiv_id}\")\n", " print(f\"Title: {test_paper.title[:60]}...\")\n", " \n", " try:\n", " # Download PDF \n", " pdf_path = await arxiv_client.download_pdf(test_paper)\n", " \n", " if pdf_path and pdf_path.exists():\n", " size_mb = pdf_path.stat().st_size / (1024 * 1024)\n", " print(f\"āœ“ PDF downloaded: {pdf_path.name} ({size_mb:.2f} MB)\")\n", " \n", " return pdf_path\n", " else:\n", " print(\"āœ— PDF download failed\")\n", " return None\n", " \n", " except Exception as e:\n", " print(f\"āœ— PDF download error: {e}\")\n", " return None\n", "\n", "# Run PDF download test \n", "pdf_path = await test_pdf_download(date_papers[:1])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Docling PDF Processing\n", "\n", "Test PDF parsing with Docling for structured content extraction:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test PDF Parsing with Docling\n", "from src.services.pdf_parser.factory import make_pdf_parser_service\n", "from src.config import get_settings\n", "from pathlib import Path\n", "\n", "print(\"Test 4: PDF Parsing with Docling\")\n", "print(\"=\" * 40)\n", "\n", "# Create PDF parser\n", "pdf_parser = make_pdf_parser_service()\n", "settings = get_settings()\n", "print(\"PDF parser service created\")\n", "print(f\"Config: {settings.pdf_parser.max_pages} pages, {settings.pdf_parser.max_file_size_mb}MB\")\n", "\n", "# Test parsing with actual PDF files\n", "cache_dir = Path(\"data/arxiv_pdfs\")\n", "if cache_dir.exists():\n", " pdf_files = list(cache_dir.glob(\"*.pdf\"))\n", " print(f\"\\nFound {len(pdf_files)} PDF files to test parsing\")\n", " \n", " if pdf_files:\n", " # Test parsing the first PDF\n", " test_pdf = pdf_files[0]\n", " print(f\"Testing PDF parsing with: {test_pdf.name}\")\n", " \n", " try:\n", " pdf_content = await pdf_parser.parse_pdf(test_pdf)\n", " \n", " if pdf_content:\n", " print(f\"āœ“ PDF parsing successful!\")\n", " print(f\" Sections: {len(pdf_content.sections)}\")\n", " print(f\" Raw text length: {len(pdf_content.raw_text)} characters\")\n", " print(f\" Parser used: {pdf_content.parser_used}\")\n", " \n", " # Show first section as example\n", " if pdf_content.sections:\n", " first_section = pdf_content.sections[0]\n", " print(f\" First section: '{first_section.title}' ({len(first_section.content)} chars)\")\n", " else:\n", " print(\"āœ— PDF parsing failed (Docling compatibility issue)\")\n", " print(\"This is expected - not all PDFs work with Docling\")\n", " \n", " except Exception as e:\n", " print(f\"āœ— PDF parsing error: {e}\")\n", " print(\"This demonstrates the error handling in action\")\n", " else:\n", " print(\"No PDF files available for parsing test\")\n", "else:\n", " print(\"No PDF cache directory found\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Database Storage Testing\n", "\n", "Test storing papers in PostgreSQL database:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Database Storage\n", "from src.db.factory import make_database\n", "from src.repositories.paper import PaperRepository\n", "from src.schemas.arxiv.paper import PaperCreate\n", "from dateutil import parser as date_parser\n", "\n", "print(\"Test 5: Database Storage\")\n", "print(\"=\" * 40)\n", "\n", "# Create database connection\n", "database = make_database()\n", "print(\"āœ“ Database connection created\")\n", "\n", "if papers:\n", " test_paper = papers[0]\n", " print(f\"Storing paper: {test_paper.arxiv_id}\")\n", " \n", " try:\n", " with database.get_session() as session:\n", " paper_repo = PaperRepository(session)\n", " \n", " # Convert to database format\n", " published_date = date_parser.parse(test_paper.published_date) if isinstance(test_paper.published_date, str) else test_paper.published_date\n", " \n", " paper_create = PaperCreate(\n", " arxiv_id=test_paper.arxiv_id,\n", " title=test_paper.title,\n", " authors=test_paper.authors,\n", " abstract=test_paper.abstract,\n", " categories=test_paper.categories,\n", " published_date=published_date,\n", " pdf_url=test_paper.pdf_url\n", " )\n", " \n", " # Store paper (upsert to avoid duplicates)\n", " stored_paper = paper_repo.upsert(paper_create)\n", " \n", " if stored_paper:\n", " print(f\"āœ“ Paper stored with ID: {stored_paper.id}\")\n", " print(f\" Database ID: {stored_paper.id}\")\n", " print(f\" arXiv ID: {stored_paper.arxiv_id}\")\n", " print(f\" Title: {stored_paper.title[:50]}...\")\n", " print(f\" Authors: {len(stored_paper.authors)} authors\")\n", " print(f\" Categories: {', '.join(stored_paper.categories)}\")\n", " \n", " # Test retrieval\n", " retrieved_paper = paper_repo.get_by_arxiv_id(test_paper.arxiv_id)\n", " if retrieved_paper:\n", " print(f\"āœ“ Paper retrieval test passed\")\n", " else:\n", " print(f\"āœ— Paper retrieval failed\")\n", " else:\n", " print(\"āœ— Paper storage failed\")\n", " \n", " except Exception as e:\n", " print(f\"āœ— Database error: {e}\")\n", "else:\n", " print(\"No papers available for database storage test\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Complete Pipeline\n", "from src.services.metadata_fetcher import make_metadata_fetcher\n", "\n", "print(\"Test 6: Complete Metadata Fetcher Pipeline\")\n", "print(\"=\" * 50)\n", "\n", "# Create metadata fetcher\n", "metadata_fetcher = make_metadata_fetcher(arxiv_client, pdf_parser)\n", "print(\"āœ“ Metadata fetcher service created\")\n", "\n", "# Test with small batch\n", "print(\"Running small batch test (2 papers, no PDF processing for speed)...\")\n", "\n", "try:\n", " with database.get_session() as session:\n", " results = await metadata_fetcher.fetch_and_process_papers(\n", " max_results=2, \n", " process_pdfs=False, \n", " store_to_db=True,\n", " db_session=session\n", " )\n", " \n", " print(\"\\nPIPELINE RESULTS:\")\n", " print(f\" Papers fetched: {results.get('papers_fetched', 0)}\")\n", " print(f\" PDFs downloaded: {results.get('pdfs_downloaded', 0)}\")\n", " print(f\" PDFs parsed: {results.get('pdfs_parsed', 0)}\")\n", " print(f\" Papers stored: {results.get('papers_stored', 0)}\")\n", " print(f\" Processing time: {results.get('processing_time', 0):.1f}s\")\n", " print(f\" Errors: {len(results.get('errors', []))}\")\n", " \n", " if results.get('errors'):\n", " print(\"\\nErrors encountered:\")\n", " for error in results.get('errors', [])[:3]: # Show first 3 errors\n", " print(f\" - {error}\")\n", " \n", " if results.get('papers_fetched', 0) > 0:\n", " print(\"\\nāœ“ Pipeline test successful!\")\n", " else:\n", " print(\"\\nNo papers fetched - may be arXiv API unavailability\")\n", " \n", "except Exception as e:\n", " print(f\"āœ— Pipeline error: {e}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Airflow DAGs\n", "print(\"Test 7: Airflow DAG Status\")\n", "print(\"=\" * 40)\n", "\n", "print(\" Airflow UI Access:\")\n", "print(\" URL: http://localhost:8080\")\n", "print(\" Username: admin\")\n", "print(\" Password: admin\")\n", "print()\n", "\n", "# Check DAG status using docker exec\n", "try:\n", " result = subprocess.run(\n", " [\"docker\", \"exec\", \"rag-airflow\", \"airflow\", \"dags\", \"list\"],\n", " capture_output=True,\n", " text=True,\n", " timeout=10\n", " )\n", " \n", " if result.returncode == 0:\n", " lines = result.stdout.strip().split('\\n')\n", " dag_lines = [line for line in lines if 'arxiv' in line.lower() or 'hello' in line.lower()]\n", " \n", " print(\"Available DAGs:\")\n", " for line in dag_lines:\n", " if '|' in line:\n", " parts = [part.strip() for part in line.split('|')]\n", " if len(parts) >= 3:\n", " dag_id = parts[0]\n", " is_paused = parts[2]\n", " status = \"Active\" if is_paused == \"False\" else \"Paused\"\n", " print(f\" - {dag_id}: {status}\")\n", " \n", " # Check for import errors\n", " error_result = subprocess.run(\n", " [\"docker\", \"exec\", \"rag-airflow\", \"airflow\", \"dags\", \"list-import-errors\"],\n", " capture_output=True,\n", " text=True,\n", " timeout=10\n", " )\n", " \n", " if \"docling\" in error_result.stderr:\n", " print(\"\\nKnown Issue: Docling not installed in Airflow container\")\n", " print(\" - This is expected for Week 2\")\n", " print(\" - DAG structure is complete, runtime needs container fix\")\n", " print(\" - Solution: Add docling to Airflow container startup\")\n", " elif error_result.returncode == 0:\n", " print(\"\\nāœ“ No DAG import errors found\")\n", " \n", " else:\n", " print(f\"āœ— Could not list DAGs: {result.stderr}\")\n", " \n", "except Exception as e:\n", " print(f\"āœ— Airflow test error: {e}\")\n", "\n", "print(\"\\n To view DAGs graphically:\")\n", "print(\" 1. Open http://localhost:8080 in your browser\")\n", "print(\" 2. Login with admin/admin\")\n", "print(\" 3. Click on 'arxiv_paper_ingestion' DAG to see the workflow\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test Complete Pipeline with PDF Processing\n", "print(\"Test 8: Complete Pipeline with PDF Processing\")\n", "print(\"=\" * 50)\n", "\n", "# Reuse metadata fetcher from Test 6\n", "print(\"āœ“ Using metadata fetcher service from previous test\")\n", "\n", "# Test with small batch including PDF processing\n", "print(\"Running enhanced test (3 papers with PDF processing)...\")\n", "\n", "try:\n", " with database.get_session() as session:\n", " results = await metadata_fetcher.fetch_and_process_papers(\n", " max_results=3, # Small batch\n", " from_date=\"20250813\", # Recent date\n", " to_date=\"20250814\",\n", " process_pdfs=True, \n", " store_to_db=True,\n", " db_session=session\n", " )\n", " \n", " print(\"\\nENHANCED PIPELINE RESULTS:\")\n", " print(f\" Papers fetched: {results.get('papers_fetched', 0)}\")\n", " print(f\" PDFs downloaded: {results.get('pdfs_downloaded', 0)}\")\n", " print(f\" PDFs parsed: {results.get('pdfs_parsed', 0)}\")\n", " print(f\" Papers stored: {results.get('papers_stored', 0)}\")\n", " print(f\" Processing time: {results.get('processing_time', 0):.1f}s\")\n", " print(f\" Errors: {len(results.get('errors', []))}\")\n", " \n", " # Show success rates\n", " if results.get('papers_fetched', 0) > 0:\n", " download_rate = (results['pdfs_downloaded'] / results['papers_fetched']) * 100\n", " parse_rate = (results['pdfs_parsed'] / results['pdfs_downloaded']) * 100 if results.get('pdfs_downloaded', 0) > 0 else 0\n", " print(f\" Download success rate: {download_rate:.1f}%\")\n", " print(f\" Parse success rate: {parse_rate:.1f}%\")\n", " \n", " if results.get('errors'):\n", " print(\"\\nErrors encountered (showing graceful error handling):\")\n", " for error in results.get('errors', [])[:3]: # Show first 3 errors\n", " print(f\" - {error}\")\n", " \n", " if results.get('papers_fetched', 0) > 0:\n", " print(\"\\nāœ“ Enhanced pipeline test successful!\")\n", " if results.get('errors'):\n", " print(\"āœ“ System continued processing despite PDF failures\")\n", " else:\n", " print(\"\\n! No papers fetched - may be arXiv API unavailability\")\n", " \n", "except Exception as e:\n", " print(f\"āœ— Pipeline error: {e}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.11" } }, "nbformat": 4, "nbformat_minor": 4 }