Introduction
In this blog, I’ll discuss how I built a Retrieval-Augmented Generation (RAG) system capable of processing and retrieving information from multiple PDFs on my local machine, with the end goal of deploying it at a production level in AWS and GCP.
With cost, security, and performance in mind, I explored affordable alternatives for handling terabytes of data in a real world scenario. It's crucial to recognize that not all PDFs are created equal. Developers must handle various PDF text extraction challenges, such as AES encryption, watermarks, or slow processing times, to ensure a smooth user experience.
While powerful and costly AWS and GCP services could handle PDF processing, they are not feasible for production due to cost concerns. Therefore, I developed a solution using two open-source tools: PyPDF
and PyTesseract
.
Additionally, I implemented what I call 'pre-cloud-development-observability' features, such as OpenAI Token usage and API costs, application execution time, and MongoDB specific operation metrics, all logged for analysis. - After all, who doesn't enjoy delving into log files to optimize performance? 🙋🏻♀️
Note: This blog is an in depth explanation of this application. For the Setup Guide and Python/Application Script, refer to the Github repository.
Application stack:
- Streamlit - Front End
- OpenAi - LLM/Foundation Model
- Langchain - NLP Orchestration
- MongoDB Atlas Vector Search - Cloud-based Vector Database
- Dotenv - Local secret management
- PyPDF - PDF text extraction
- PyTesseract - OCR on AES Encrypted PDFs or PDFs with images in the background that would result in an empty text extraction
Key Features
- Secure API/TOKEN keys connection hidden in the
.env
file - Processes multiple files - up to 200MB - within 1 single upload operation
- Capability to answer questions based on pre-processed documents stored in the database - no need to reupload the same PDFs
- Text extraction from AES-encrypted PDFs or those with background images
- Parallel text extraction for PDFs > 5MB for improved performance
- A 'Clear Chat History' button
- Observability/logging features for future Cloud Development considerations:
- Langchain
callback
function that calculates OpenAi token usage. - MongoDB operation specific logs recorded through the
pymongo
driver - Script execution time measurement ## Application Demo Video
- Langchain
System Architecture Overview
The entire application runs from one Python file named chatbot-app.py
. The UI, built with Streamlit, processes PDFs using either simple text extraction or OCR. Langchain serves as the application's 'master brain,' creating vector embeddings, sending them to the database, and communicating with the foundation model, OpenAI.
PDF upload and text extraction
Two Python packages are used for text extraction:
-
PyPDF
for regular PDFs -
pytesseract
for OCR on PDFs requiring it
Users upload multiple files (up to Streamlit's 200MB limit) in the UI's Sidebar and click 'Process'. Streamlit then invokes the get_pdf_text function
, which is part of the process_pdf
logic. process_pdf
attempts text extraction in the following order:
- Simple extraction with
PyPDF
- IF text extraction fails or an error occurs (e.g., due to encryption, or a watermark in the background)
-
ELSE
ocr_on_pdf
is invoked for OCR processing, using parallel processing for files > 5MB through aThreadPoolExecutor
. ```python
def process_pdf(pdf):
try:
with tempfile.NamedTemporaryFile(delete=False) as temp_pdf:
temp_pdf.write(pdf.read())
temp_pdf_path = temp_pdf.name
file_size = getsize(temp_pdf_path) / (1024 * 1024) # Size in MB
logging.info(f"Processing PDF: {pdf.name}, Size: {file_size:.2f} MB")
if file_size == 0:
logging.warning(f"The PDF file '{pdf.name}' is empty.")
return ""
pdf_reader = PdfReader(temp_pdf_path)
try:
text_from_pdf = "".join(page.extract_text() or "" for page in pdf_reader.pages)
except Exception as e:
# Catch specific exception for AES encryption
if "cryptography>=3.1 is required for AES algorithm" in str(e):
logging.warning(f"PDF '{pdf.name}' is AES encrypted. Performing OCR.")
return ocr_on_pdf(temp_pdf_path)
else:
raise e
if not text_from_pdf:
logging.warning(f"No text extracted from '{pdf.name}'. Performing OCR.")
return ocr_on_pdf(temp_pdf_path)
logging.info(f"Processed PDF: {pdf.name}")
return text_from_pdf
except Exception as e:
logging.error(f"Error processing PDF: {pdf.name}. Error: {e}")
return ""
def get_pdf_text(pdf_docs):
return "".join(process_pdf(pdf) for pdf in pdf_docs)
<br>
Below is the `ocr_on_pdf` function with `pytesseract` and the `ThreadPoolExecutor`:
```python
def ocr_on_pdf(pdf_path):
try:
pytesseract.pytesseract.tesseract_cmd = getenv("TESSERACT_PATH")
images = convert_from_path(pdf_path)
file_size = path.getsize(pdf_path) / (1024 * 1024) # Size in MB
if file_size > 5: # If file is larger than 5MB
with ThreadPoolExecutor() as executor:
extracted_texts = list(executor.map(ocr_single_page, images))
extracted_text = "\n".join(extracted_texts)
logging.info(f"Parallel OCR completed for large file: {pdf_path}")
else:
extracted_text = "\n".join(ocr_single_page(image) for image in images)
logging.info(f"Sequential OCR completed for small file: {pdf_path}")
return extracted_text
except Exception as e:
logging.error(f"Error during OCR on PDF: {e}")
return ""
Text conversion into vectors, storage and retrieval
Once extracted, langchain
begins to do its 'orchestration magic' by splitting up the texts into chunks of 1000 characters each through the CharacterTextSplitter
class:
def get_text_chunks(text):
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
chunks = [chunk for chunk in text_splitter.split_text(text)]
The text chunks are vectorized using Langchain's OpenAIEmbeddings
class and stored in the Vector Database:
def get_vectorstore(text_chunks: List[str], metadatas: List[Dict[str, Any]] = None) -> MongoDBAtlasVectorSearch:
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
mongo_client = MongoClient(ATLAS_URI)
db = mongo_client[MONGODB_DB]
collection = db[MONGODB_COLLECTION]
vector_search = MongoDBAtlasVectorSearch(
collection=collection,
embedding=embeddings,
index_name="vector_index",
text_key="text",
embedding_key="embedding",
relevance_score_fn="cosine"
)
ids = [vector_search.add_texts([chunk], [metadata] if metadatas else None)[0]
for chunk, metadata in zip(text_chunks, metadatas or [None] * len(text_chunks))]
logging.info(f"Added {len(ids)} embeddings to the vector store")
return vector_search
This is how vectorized texts appear in MongoDB's GUI:
MongoDB Atlas Vector Search organizes text chunks and vectors into ObjectIDs
, adhering to the Document Database Model, simplifying integration with larger applications already using this model:
The get_conversation_chain
function retrieves text from MongoDB, sending it to OpenAI for question answering.:
def get_conversation_chain(vectorstore):
llm = ChatOpenAI(model_name="gpt-4")
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
conversation_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(),
memory=memory
)
return conversation_chain
The geeky 🤓 Cloud Developer in me was thrilled to see how MongoDB's use of the K-nearest neighbors (KNN) ML algorithm provided accurate answers. - On a side note, as this algorithm requires a lot of compute power from a database, it would be interesting to explore its performance in a production environment with terabytes of data, but that should be a discussion for another blog. 📖 👩🏻💻
Streamlit Setup and 'Gotchas'
Throughout the application flow, st.session_state
manages conversation states, vector retrieval, OpenAI token usage, and chat history clearing. Both session state initialization and page configuration must be done at the beginning of the script to avoid potential errors:
st.set_page_config(page_title="Chat with PDF Manuals", page_icon=":telephone_receiver:")
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
In the handle_user_input
function, session_state
manages interactions, tracks OpenAI token usage, and appends
chat history, enabling the 'user' the option to ask follow up questions:
def handle_userinput(user_question):
if st.session_state.vectorstore is None:
st.warning("Please upload PDFs first or wait until the database is initialized.")
else:
with get_openai_callback() as cb:
response = st.session_state.conversation.invoke({"question": user_question})
st.session_state.chat_history.append({"type": "user", "content": user_question})
st.session_state.chat_history.append({"type": "bot", "content": response["answer"]})
logging.info(f"\n\tOpenAI Token Usage:\n\t{cb}")
The clear_chat_history
function, triggered by a button in the main function, resets the conversation state:
def clear_chat_history():
logging.info("Clearing chat history")
st.session_state.chat_history = []
st.session_state.conversation = None
st.rerun()
Streamlit's default sidebar
in the main
function facilitates multiple PDF uploads:
with st.sidebar:
st.subheader("Your PDF Manuals")
uploaded_files = st.file_uploader("Upload your PDFs here and click on 'Process'", accept_multiple_files=True, type=["pdf"])
if st.button("Process") and uploaded_files:
with st.spinner("Processing..."):
raw_text = get_pdf_text(uploaded_files)
text_chunks = get_text_chunks(raw_text)
vectorstore = get_vectorstore(text_chunks)
st.session_state.vectorstore = vectorstore
st.session_state.conversation = get_conversation_chain(vectorstore)
st.success("Processing complete.")
While building the UI, I experimented with real-time text extraction display and a progress bar, but these features cluttered the UI. I opted for simplicity, relying on the default st.spinner
for processing feedback.
Observability
Understanding application behavior is crucial before deploying to the Cloud. I set up two loggers, all written to a .log
file:
- A standard python logger to observe the application activity
- A specific MongoDB performance logger out of the
pymongo
monitoring module.
Application Observability
While processing large PDFs, I monitored the script execution
time by measuring the duration from the start to the end of the main
function. Two key observations were made:
- OCR on PDFs larger than 5MB took considerable time on my M1 MacBook Pro, prompting the addition of 'Parallel Processing' through a
ThreadPoolExecutor
as a way to avoid performance issues in the Cloud. - Cloud functions like AWS Lambda or GCP Cloud Functions may not handle this application. Since I don't plan on maintaining a constantly running VM, this observation indicated that an architecture using Serverless Containers — such as AWS ECS with Fargate or GCP Cloud Run — would be the optimal deployment approach. These containers would only run when the application is invoked, offering cost-efficiency with the option to autoscale. More on this in future blogs 📝.
To gauge the cost implications of using OpenAI's foundation model, I tracked API usage using Langchain's get_openai_callback
functionality. This made it easier to understand the actual costs associated with each application usage:
MongoDB Logs
Coming from a DevOps world 👩🏻🏭, and having a passion for understanding databases under the hood, I leveraged this chatbot application to implement pymongo
event_loggers
. I created a class to aggregate the count of successful and failed operations and their average duration each time the program ran:
# MongoDB Event Listeners
class AggregatedCommandLogger(monitoring.CommandListener):
def __init__(self):
self.operation_counts = defaultdict(int)
self.total_duration = 0
self.total_operations = 0
def started(self, event):
pass
def succeeded(self, event):
self.operation_counts[event.database_name] += 1
self.total_duration += event.duration_micros
self.total_operations += 1
def failed(self, event):
database_name = event.__dict__.get('database_name', 'unknown')
logging.info(f"Command failed: operation_id={event.operation_id}, duration_micros={event.duration_micros}, database_name={database_name}")
def summarize_and_reset(self):
if self.total_operations > 0:
avg_duration = self.total_duration / self.total_operations
summary = f"MongoDB operations summary: {self.total_operations} total operations, "
summary += f"average duration: {avg_duration:.2f} microseconds. "
summary += "Operations per database: " + ", ".join(f"{db}: {count}" for db, count in self.operation_counts.items())
logging.info(summary)
# Reset counters
self.operation_counts.clear()
self.total_duration = 0
self.total_operations = 0
aggregated_logger = AggregatedCommandLogger()
monitoring.register(aggregated_logger)
def log_mongodb_summary():
aggregated_logger.summarize_and_reset()
The results aligned with expectations — no errors occurred, and operations between my local machine and MongoDB Atlas were swift and reliable. By building these pymongo.monitoring
event_loggers
, I preemptively simplified potential troubleshooting in a Cloud infrastructure, while also gaining insights into the appropriate MongoDB database size for real-world use.
Security
All of the environment variables such as the OpenAI API keys, Tesseract CLI location, MongoDB connection string, database and collection name were securely stored in the .env
file - I added a sample .env
in the Github repository.
For Cloud deployment, these variables will be managed via a Secrets Manager — either AWS or GCP — ensuring consistent security practices across environments.
Conclusion
This application showcases a blend of open-source tools, observability practices, and database management, offering a blueprint for scaling in AWS or GCP. Building it from scratch with a cloud-centric vision helped identify and address potential issues early on. The main challenge was handling different types of PDFs, balancing cost-efficiency, speed, and security.
Future improvements for the 🤖 include:
- Adding a 'Web URL Input' for users to upload a file or provide a PDF URL
- Implementing PDF metadata extraction and storing it in a separate MongoDB Atlas Database, allowing users to track previously vectorized PDFs and ask questions about them
- Introducing a dropdown box in the UI to view available PDF file names
Top comments (0)