DEV Community

Cover image for FastAPI, Pydantic, Psycopg3: the holy trinity for Python web APIs
Sam
Sam

Posted on

FastAPI, Pydantic, Psycopg3: the holy trinity for Python web APIs

Part 1: Discussion

Enter FastAPI

First of all, take the title with a pinch of salt.

If I was starting from scratch with Python web API development today, I would probably look more closely at LiteStar, which seems to me to be a better architected and with a better project governance structure.

But we have FastAPI and it's not going anywhere soon. I use it for a lot of personal and professional projects and still enjoy its simplicity.

For a guide on FastAPI design patterns, look no further than this page.

fastapi

Retrieving Database Data

Despite FastAPI being great at the actual 'API' part, there has been one persistent uncertainty for me: how to best access the database, particularly if we need to also handle geospatial data types.

Let's review our options.

Note 1: we are only interested in async libraries here, as FastAPI is ASGI.

Note 2: I will only discuss connecting to PostgreSQL, although parts of the discussion are still relevant to other databases.

orm

Simple To Code | Complex Design: ORMs

Handles your database connection and parsing of data from your database table into Python objects.

  • SQLAlchemy2: the biggest contender in the Python ORM world. Personally I really dislike the syntax, but each to their own.

  • TortoiseORM: I personally really like this Django-inspired async ORM; it's clean and nice to use.

  • Alternative ORMs: there are many such as peewee, PonyORM, etc.

The Middle Ground: Query Builders

No database connection. Simply output raw SQL from a Python-based query and pass it to the database driver.

  • SQLAlchemy Core: the core SQL query builder, without the mapping to objects part. There is also a higher level ORM built on this called databases that looks very nice. I do wonder how actively developed the project is however.

  • PyPika: I don't know much about this one.

Simple Design: Database Drivers

  • asyncpg: this was the gold standard async database driver for Postgres, being one of the first to market and most performant. While all other drivers use the C library libpq to interface with Postgres, MagicStack opted to rewrite their own custom implementation and also deviate from Python DBAPI spec. If performance is your main criteria here, then asyncpg is probably the best option.

  • psycopg3: well psycopg2 was clearly the king of the synchronous database driver world for Python/Postgres. psycopg3 (rebranded to simply psycopg) is the next, fully async, iteration of this library. This library has really come into it's own in recent years & I wish to discuss it further. See this interesting blog from the author about the early days of psycopg3.

Note that there is clearly a broader, more conceptual, discussion to be had here around ORMs vs query builders vs raw SQL. I won't cover that here.

Duplicated Models

Pydantic is bundled with FastAPI and is excellent for modelling, validating, and serialising API responses.

If we decide to use an ORM to retrieve data from our database, isn't it a bit inefficient keeping two sets of database models in sync? (one for the ORM, another for Pydantic)?

Wouldn't it be great if we could just use Pydantic to model the database?

This is exactly the problem the creator of FastAPI tried to solve with the library SQLModel.

While this could very well be a great solution to the problem, I have a few concerns:

  • Will this project suffer from the single-maintainer syndrome like FastAPI?

  • It's still a reasonably young project and concept, where documentation isn't fantastic.

  • It's intrinsically tied up with Pydantic and SQLAlchemy, meaning migration away would be extremely difficult.

  • For more complex queries, dropping down to SQLAlchemy underneath may be required.

Back To Basics

So many options! Analysis paralysis.

analysis-paralysis

When there is uncertainty I would use the following precept: keep it simple.

SQL was invented 50yrs ago and is still a key skill for any developer to learn. It's syntax is consistently easy to grasp and uncomplicated to write for most use cases (for the die-hard ORM users out there, give it a try, you might be surprised).

Hell, we can even use open-source LLMs these days to generate (mostly working) SQL queries and save you the typing.

While ORMs and query builders may come and go, database drivers are likely more consistent. The original psycopg2 library was written nearly 20yrs ago now and is still actively used in production globally.

Using Psycopg with Pydantic Models

As discussed, while psycopg may not be as performant as asyncpg (the real world implications of this theoretical performance is debatable though), psycopg focuses on ease of use and a familiar API.

The killer feature for me is Row Factories.

This functionality allows you to map returned database data to any Python object, including standard lib dataclasses, models from the great attrs library, and yes, Pydantic models!

For me, this is the best compromise of approaches: the ultimate flexibility of raw SQL, with the validation / type safety capabilities of Pydantic to model the database. Psycopg also handles things like variable input sanitation to avoid SQL injection.

It should be noted that asyncpg can also handle mapping to Pydantic models, but as more of a workaround than a built-in feature. See this issue thread for details. I also don't know if this approach plays nicely with other modelling libraries.

As I mentioned above, I typically work with geospatial data: an area often neglected by ORMs and query builders. Dropping to the raw SQL gives me the ability to parse and unparse geospatial data as I need to more acceptable types in pure Python. See my related article on this topic.

Part 2: Example Usage

Create A Database Table

Here we create a simple database table called user in raw SQL.

I would also consider handling database creation and migrations using SQL only, but this is a topic for another article.

init_db.sql

CREATE TYPE public.userrole AS ENUM (
    'READ_ONLY',
    'STANDARD',
    'ADMIN'
);

CREATE TABLE public.users (
    id integer NOT NULL,
    username character varying,
    role public.userrole NOT NULL DEFAULT 'STANDARD',
    profile_img character varying,
    email_address character varying,
    is_email_verified boolean DEFAULT false,
    registered_at timestamp with time zone DEFAULT now()
);
Enter fullscreen mode Exit fullscreen mode

Model Your Database With Pydantic

Here we create a model called DbUser:

db_models.py

from typing import Optional
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
from pydantic.functional_validators import field_validator
from geojson_pydantic import Feature

class UserRole(str, Enum):
    """Types of user, mapped to database enum userrole."""

    READ_ONLY = "READ_ONLY"
    STANDARD = "STANDARD"
    ADMIN = "ADMIN"

class DbUser(BaseModel):
    """Table users."""

    id: int
    username: str
    role: Optional[UserRole] = UserRole.STANDARD
    profile_img: Optional[str] = None
    email_address: Optional[str] = None
    is_email_verified: bool = False
    registered_at: Optional[datetime]
    # This is a geospatial type I will handle in the SQL
    favourite_place: Optional[dict]

    # DB computed fields (handled in the SQL)
    total_users: Optional[int] = None

    # This example isn't very realistic, but you get the idea
    @field_validator("is_email_verified", mode="before")
    @classmethod
    def i_want_my_ints_as_bools(cls, value: int) -> bool:
        """Example of a validator to convert data type."""
        return bool(value)
Enter fullscreen mode Exit fullscreen mode

Here we get the type safety and validation of Pydantic.

We can add any form of validation or data transformation to this model for when the data is extracted from the database.

Setting Up Psycopg With FastAPI

We use psycopg_pool to create a pooled database connection:

db.py

from typing import cast
from fastapi import Request
from psycopg import Connection
from psycopg_pool import AsyncConnectionPool

# You should be using environment variables in a settings file here
from app.config import settings


def get_db_connection_pool() -> AsyncConnectionPool:
    """Get the connection pool for psycopg.

    NOTE the pool connection is opened in the FastAPI server startup (lifespan).

    Also note this is also a sync `def`, as it only returns a context manager.
    """
    return AsyncConnectionPool(
        conninfo=settings.DB_URL.unicode_string(), open=False
    )


async def db_conn(request: Request) -> Connection:
    """Get a connection from the psycopg pool.

    Info on connections vs cursors:
    https://www.psycopg.org/psycopg3/docs/advanced/async.html

    Here we are getting a connection from the pool, which will be returned
    after the session ends / endpoint finishes processing.

    In summary:
    - Connection is created on endpoint call.
    - Cursors are used to execute commands throughout endpoint.
      Note it is possible to create multiple cursors from the connection,
      but all will be executed in the same db 'transaction'.
    - Connection is closed on endpoint finish.
    """
    db_pool = cast(AsyncConnectionPool, request.state.db_pool)
    async with db_pool.connection() as conn:
        yield conn
Enter fullscreen mode Exit fullscreen mode

Next we open the connection pool in the FastAPI lifespan event:

main.py

from typing import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI

from .db import get_db_connection_pool

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """FastAPI startup/shutdown event."""
    log.debug("Starting up FastAPI server.")

    # Create a pooled db connection and make available in lifespan state
    # https://asgi.readthedocs.io/en/latest/specs/lifespan.html#lifespan-state
    # NOTE to use within a request (this is wrapped in database.py already):
    # from typing import cast
    # db_pool = cast(AsyncConnectionPool, request.state.db_pool)
    # async with db_pool.connection() as conn:
    db_pool = get_db_connection_pool()
    await db_pool.open()

    yield

    # Shutdown events
    print("Shutting down FastAPI server.")
    # Here we make sure to close the connection pool
    await db_pool.close()
Enter fullscreen mode Exit fullscreen mode

Now when you FastAPI app starts, you should have an open connection pool, ready to take connection from inside endpoints.

Helper Methods For The Pydantic Model

It would be useful to add a few methods to the Pydantic model for common functionality: getting one user, all users, creating a user, updating a user, deleting a user.

But first we should create some Pydantic models for input validation (to create a new user) and output serialisation (your JSON response via the API).

user_schemas.py

from typing import Annotated
from pydantic import BaseModel, Field
from pydantic.functional_validators import field_validator
from geojson_pydantic import FeatureCollection, Feature, MultiPolygon, Polygon
from .db_models import DbUser

class UserIn(DbUser):
    """User details for insert into DB."""

    # Exclude fields not required for input
    id: Annotated[int, Field(exclude=True)] = None
    favourite_place: Optional[Feature]

    @field_validator("favourite_place", mode="before")
    @classmethod
    def parse_input_geojson(
        cls,
        value: FeatureCollection | Feature | MultiPolygon | Polygon,
    ) -> Optional[Polygon]:
        """Parse any format geojson into a single Polygon."""
        if value is None:
            return None
        # NOTE I don't include this helper function for brevity
        featcol = normalise_to_single_geom_featcol(value)
        return featcol.get("features")[0].get("geometry")

class UserOut(DbUser):
    """User details for insert into DB."""

    # Ensure it's parsed as a Polygon geojson from db object
    favourite_place: Polygon

    # More logic to append computed values
Enter fullscreen mode Exit fullscreen mode

Then we can define our helper methods: one, all, create:

db_models.py

...previous imports
from typing import Self, Optional
from fastapi.exceptions import HTTPException
from psycopg import Connection
from psycopg.rows import class_row

from .user_schemas import UserIn

class DbUser(BaseModel):
    """Table users."""

    ...the fields

    @classmethod
    async def one(cls, db: Connection, user_id: int) -> Self:
        """Get a user by ID.

        NOTE how the favourite_place field is converted in the db to geojson.
        """
        async with db.cursor(row_factory=class_row(cls)) as cur:
            sql = """
                SELECT
                    u.*,
                    ST_AsGeoJSON(favourite_place)::jsonb AS favourite_place,
                    (SELECT COUNT(*) FROM users) AS total_users
                FROM users u
                WHERE
                    u.id = %(user_id)s
                GROUP BY u.id;
            """

            await cur.execute(
                sql,
                {"user_id": user_id},
            )

            db_project = await cur.fetchone()
            if not db_project:
                raise KeyError(f"User ({user_identifier}) not found.")

            return db_project

    @classmethod
    async def all(
        cls, db: Connection, skip: int = 0, limit: int = 100
    ) -> Optional[list[Self]]:
        """Fetch all users."""
        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(
                """
                SELECT
                    *,
                    ST_AsGeoJSON(favourite_place)::jsonb
                FROM users
                OFFSET %(offset)s
                LIMIT %(limit)s;
                """,
                {"offset": skip, "limit": limit},
            )
            return await cur.fetchall()

    @classmethod
    async def create(
        cls,
        db: Connection,
        user_in: UserIn,
    ) -> Optional[Self]:
        """Create a new user."""

        # Omit defaults and empty values from the model
        model_dump = user_in.model_dump(exclude_none=True, exclude_default=True)
        columns = ", ".join(model_dump.keys())
        value_placeholders = ", ".join(f"%({key})s" for key in model_dump.keys())

        sql = f"""
            INSERT INTO users
                ({columns})
            VALUES
                ({value_placeholders})
            RETURNING *;
        """


        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(sql, model_dump)
            new_user = await cur.fetchone()

            if new_user is None:
                msg = f"Unknown SQL error for data: {model_dump}"
                print(f"Failed user creation: {model_dump}")
                raise HTTPException(status_code=500, detail=msg)

        return new_user
Enter fullscreen mode Exit fullscreen mode

Usage

routes.py

from typing import Annotated
from fastapi import Depends, HTTPException
from psycopg import Connection

from .main import app
from .db import db_conn
from .models import DbUser
from .user_schemas import UserIn, UserOut

@app.post("/", response_model=UserOut)
async def create_user(
    user_info: UserIn,
    db: Annotated[Connection, Depends(db_conn)],
):
    """Create a new user.

    Here the input is parsed and validated by UserIn
    then the output is parsed and validated by UserOut
    returning the user json data.
    """

    new_user = await DbUser.create(db, user_info)
    if not new_user:
        raise HTTPException(
            status_code=422,
            detail="User creation failed.",
        )

    return new_user

    # NOTE within an endpoint we can also use
    # DbUser.one(db, user_id) and DbUser.all(db)
Enter fullscreen mode Exit fullscreen mode

This is the approach I have started to use in a project I maintain, the FMTM, a tool to collect field data for communities around the world.

See the full codebase here.
And ⭐ if you found this useful!

That's all for now! I hope this helps someone out there 🚀

Top comments (0)