In [None]:
import io
import requests
import docx
from typing import List, Dict

if 'data_loader' not in globals():
 from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
 from mage_ai.data_preparation.decorators import test

def clean_line(line: str) -> str:
 line = line.strip()
 line = line.strip('\uFEFF')
 return line

def read_faq(file_id: str) -> List[Dict]:
 """
 Fetch and parse the FAQ document from Google Docs.
 
 Args:
 file_id (str): Google Docs file ID to be downloaded and processed.

 Returns:
 List[Dict]: A list of dictionaries containing the parsed questions and answers.
 """
 url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
 
 response = requests.get(url)
 response.raise_for_status()
 
 with io.BytesIO(response.content) as f_in:
 doc = docx.Document(f_in)

 questions = []

 question_heading_style = 'heading 2'
 section_heading_style = 'heading 1'
 
 section_title = ''
 question_title = ''
 answer_text_so_far = ''
 
 for p in doc.paragraphs:
 style = p.style.name.lower()
 p_text = clean_line(p.text)
 
 if len(p_text) == 0:
 continue
 
 if style == section_heading_style:
 section_title = p_text
 continue
 
 if style == question_heading_style:
 answer_text_so_far = answer_text_so_far.strip()
 if answer_text_so_far and section_title and question_title:
 questions.append({
 'text': answer_text_so_far,
 'section': section_title,
 'question': question_title,
 })
 answer_text_so_far = ''
 
 question_title = p_text
 continue
 
 answer_text_so_far += '\n' + p_text
 
 answer_text_so_far = answer_text_so_far.strip()
 if answer_text_so_far and section_title and question_title:
 questions.append({
 'text': answer_text_so_far,
 'section': section_title,
 'question': question_title,
 })

 return questions

@data_loader
def process_faq_documents(*args, **kwargs) -> List[Dict]:
 """
 Fetch and process FAQ documents from Google Docs.

 Args:
 *args: Variable length argument list.
 **kwargs: Arbitrary keyword arguments.

 Keyword Args:
 faq_documents (Dict[str, str]): A dictionary where keys are course names and values are Google Docs file IDs.

 Returns:
 List[Dict]: A list of dictionaries containing course names and their associated documents.
 """
 faq_documents = kwargs.get('faq_documents', {
 'another-course': '1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E', # Example document
 })
 
 documents = []
 
 for course, file_id in faq_documents.items():
 course_documents = read_faq(file_id)
 documents.append({'course': course, 'documents': course_documents})
 
 # Log the number of documents processed
 print(f"Processed {len(documents)} documents")
 
 return documents

@test
def test_process_faq_documents(output, *args) -> None:
 """
 Test function for the process_faq_documents block.
 """
 assert len(output) > 0, 'No documents were processed'
 for doc in output:
 assert 'course' in doc and 'documents' in doc, 'Processed document missing required keys'


In [None]:
import hashlib
from typing import Any, Dict, List

def generate_document_id(doc: Dict[str, Any]) -> str:
 """
 Generate a unique document ID based on course, question, and a portion of the text.
 """
 combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
 hash_object = hashlib.md5(combined.encode())
 hash_hex = hash_object.hexdigest()
 document_id = hash_hex[:8]
 return document_id

@transformer
def chunk_documents(data: Any, *args, **kwargs) -> List[Dict[str, Any]]:
 """
 Transform the documents by adding a course name and generating a unique document ID.
 This assumes that the documents already have well-defined boundaries and no chunking is needed.
 """
 documents = []

 for doc in data['documents']:
 doc['course'] = data['course']
 # previously we used just "id" for document ID
 doc['document_id'] = generate_document_id(doc)
 documents.append(doc)

 print(len(documents))

 print(documents)

 return documents
