📢 Introduction
Picture this: You’re building a real-time LLM-powered app. Your users are expecting fast, continuous updates from the AI, but instead, they’re staring at a frozen screen. What gives?
Spoiler alert — it’s not your LLM that’s slowing things down. It’s your function calls.
Every time your app makes a call to process data, hit an API, or load a large file, you risk blocking the stream. The result? Delays, lag, and an experience that feels anything but “real-time.”
But don’t worry — this bottleneck has 3 simple fixes. In this post, I’ll show you:
- Why function calls block LLM streams
- The 3 strategies to prevent bottlenecks
- How to keep your streams fast, smooth, and uninterrupted
Let’s get into it. 🚀
❌ Why Function Calls Are Slowing You Down
LLM streaming works by sending a steady flow of small chunks of text to the client. But here’s the catch: Every time you call a function during the stream — to process data, access an API, or run a calculation — the stream pauses until the function finishes.
This happens because most functions are synchronous by default, which means they block the current thread. Imagine you’re in a group chat, but one friend keeps pausing the conversation to answer a phone call. Annoying, right?
Here’s what’s really happening:
- 🔁 Synchronous (Blocking) Functions: The stream has to “wait” for these functions to finish before sending the next chunk of data.
- 🔥 Non-blocking (Asynchronous) Functions: The stream continues while the function does its work in the background.
Here’s a visual of the difference:
[ Blocking Call ] ---> Stream Pauses
[ Async Call ] ------> Stream Continues
🛠️ 3 Ways to Fix It
To avoid blocking the stream, you need to make your app non-blocking. Here are the 3 best techniques to do just that:
1️⃣ Use Asynchronous Functions
If your function is doing I/O (like hitting an API), make it asynchronous so it can "wait" for the API without pausing the stream. Async functions allow the app to keep streaming while the function completes.
When to use it:
- When calling external APIs
- When reading/writing to files or databases
How it works:
- Use Python’s
async def
for your functions. - Use
await
to “pause” the function without blocking the stream.
Example: Streaming an LLM While Calling an API
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def async_function(data):
await asyncio.sleep(2) # Simulate a slow API call
return f"Processed: {data}"
async def stream_generator(request: Request):
data_chunks = ["chunk1", "chunk2", "chunk3"]
for chunk in data_chunks:
processed_chunk = await async_function(chunk)
yield f"data: {processed_chunk}\n\n"
await asyncio.sleep(0.1) # Simulate delay between chunks
@app.get("/stream")
async def stream(request: Request):
return StreamingResponse(stream_generator(request), media_type="text/event-stream")
🔍 What’s happening here?
- Each chunk is being processed asynchronously.
- The stream keeps flowing while
async_function
is working.
Pro Tip: Use await asyncio.sleep()
to simulate non-blocking behavior. Replace this with actual I/O tasks like API calls, file reads, or database queries.
2️⃣ Leverage Background Tasks
If you have heavy computations (like ML inference), you don’t want to keep your stream waiting. Instead, offload the task into the background and continue streaming while the computation runs.
When to use it:
- When you have CPU-heavy computations (e.g., model predictions)
- When dealing with large files or datasets
How it works:
- Move heavy functions into a background task.
- Use FastAPI’s
BackgroundTasks
to offload computations.
Example: Stream LLM Responses While Running a Background Computation
import asyncio
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
app = FastAPI()
async def background_task(data, results):
await asyncio.sleep(2) # Simulate a heavy ML computation
results.append(f"Processed: {data}")
async def stream_generator(request, background_tasks):
data_chunks = ["chunk1", "chunk2", "chunk3"]
results = []
for chunk in data_chunks:
background_tasks.add_task(background_task, chunk, results)
yield f"data: Processing {chunk}\n\n"
await asyncio.sleep(0.1) # Simulate a slight delay
while len(results) < len(data_chunks): # Wait for all background tasks
await asyncio.sleep(0.1)
for result in results:
yield f"data: {result}\n\n"
@app.get("/stream")
async def stream(request: Request, background_tasks: BackgroundTasks):
return StreamingResponse(stream_generator(request, background_tasks), media_type="text/event-stream")
🔍 What’s happening here?
- The heavy computation (
background_task
) runs in the background. - The stream stays responsive, sending "Processing..." updates in real time.
Pro Tip: Background tasks are perfect for CPU-bound operations like ML inference, large file processing, and batch jobs.
3️⃣ Chunk Your Data
If you have to process large datasets, break them into smaller "chunks" and process each one at a time. This keeps the stream alive, rather than forcing it to wait for the whole dataset to be processed.
When to use it:
- When dealing with large datasets (e.g., CSV files, large JSON)
- When paginating results from a large database query
How it works:
- Divide large datasets into chunks.
- Process each chunk and stream it immediately.
Example: Stream Responses While Processing Large Datasets
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def process_chunk(chunk):
await asyncio.sleep(1) # Simulate processing time
return f"Processed: {chunk}"
async def stream_generator(request):
data_chunks = ["chunk1", "chunk2", "chunk3", "chunk4", "chunk5"]
for chunk in data_chunks:
processed_chunk = await process_chunk(chunk)
yield f"data: {processed_chunk}\n\n"
await asyncio.sleep(0.1) # Simulate delay between chunks
@app.get("/stream")
async def stream(request: Request):
return StreamingResponse(stream_generator(request), media_type="text/event-stream")
🔍 What’s happening here?
- Instead of processing a big file all at once, the data is processed in chunks.
- The stream stays responsive, sending updates as each chunk finishes.
Pro Tip: Use chunked processing for large datasets (like CSVs) to stream "partial results" instead of waiting for the whole job to finish.
📊 Which Method Should You Use?
Method | Use For | Use Case Example |
---|---|---|
Async Functions | I/O tasks (like APIs) | Streaming responses from API calls |
Background Tasks | Heavy computation | Running ML inference while streaming |
Chunked Processing | Large datasets | Streaming data from large files |
🚀 Conclusion
When it comes to LLM streaming, blocking function calls are a hidden bottleneck. They stop the stream, causing lags and bad user experiences.
But now you know the 3 ways to fix it:
1️⃣ Use Async Functions for I/O tasks.
2️⃣ Use Background Tasks for heavy computations.
3️⃣ Use Chunked Processing for large datasets.
By using these techniques, you’ll keep your streams fast, smooth, and real-time.
💡 Want more LLM superpowers? Check out Louis Sanna’s guide on Responsive LLM Applications with Server-Sent Events. It’s the ultimate toolkit for building high-performance, real-time AI apps.
Want to know more about building Responsive LLMs? Check out my course on newline: Responsive LLM Applications with Server-Sent Events
Top comments (0)