DEV Community

Cover image for Dynamically Create Database & Tables With Async SQLAlchemy
infohash
infohash

Posted on • Edited on

Dynamically Create Database & Tables With Async SQLAlchemy

Often times, you create a startup function in your services that creates data objects at various resources during the initialization of the service. A common example is creating tables in a database that various components of your service expects to be present before they can read and write data. SQLAlchemy library in Python allows you to interact with the database programmatically using database-agnostic Object Relational Mapper. Let's see how we can create database and tables by using SQLAlchemy ORM with async APIs.

Note: All examples below are in async version and written with SQLAlchemy 2.0+.

Setup

I'm using postgreSQL server running in docker.

$ docker run --name postgres-server -e POSTGRES_USER=admin POSTGRES_PASSWORD=admin -d postgres:latest
Enter fullscreen mode Exit fullscreen mode

Database Creation

You may want a desired behaviour in your service that the startup hook should create database if it does not exist yet. Here's how:

import contextlib

from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine


async def init_app() -> None:
    async with create_async_engine(
        url='postgresql+asyncpg://admin:admin@localhost:5432/postgres',
        isolation_level='AUTOCOMMIT').begin() as conn:
        # If db already exists, suppress the exception.
        with contextlib.suppress(ProgrammingError):
            await conn.execute(text(
                'create database testdatabase owner admin'))


if __name__ == '__main__':
    import asyncio

    asyncio.run(init_app())
Enter fullscreen mode Exit fullscreen mode

Here's what's happening:

  1. ProgrammingError exception is suppressed in case the database already exists.
  2. We are connecting as a superuser to the default database postgres to execute create database query as a superuser.
  3. Postgres does not allow you to create a database inside transactions, and SQLAlchemy always tries to run queries in a transaction. To get around this, isolation_level is set to AUTOCOMMIT.

Tables

Declare Models

Picking an example from its rich documentation:

import typing

from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import (DeclarativeBase,
                            Mapped,
                            mapped_column,
                            relationship)


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String, nullable=False)
    fullname: Mapped[typing.Optional[str]] = mapped_column(String)

    addresses: Mapped[list["Address"]] = relationship(
        back_populates="user", cascade="all, delete-orphan"
        )


class Address(Base):
    __tablename__ = "address"

    id: Mapped[int] = mapped_column(primary_key=True)
    email_address: Mapped[str] = mapped_column(String, nullable=False)
    user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))

    user: Mapped["User"] = relationship(back_populates="addresses")
Enter fullscreen mode Exit fullscreen mode

Creating An Async Engine

If you have created the database dynamically or manually, pass the database URI to the engine.

from sqlalchemy.ext.asyncio.engine import AsyncEngine

async_engine: AsyncEngine = create_async_engine(url='postgresql+asyncpg://admin:admin@localhost:5432/testdatabase')
Enter fullscreen mode Exit fullscreen mode

Creating Tables

async def init_app() -> None:
    async with async_engine.begin() as conn:
        await conn.run_sync(lambda sync_conn: Base.metadata.create_all(
            bind=sync_conn, checkfirst=True))
Enter fullscreen mode Exit fullscreen mode

Here's what's happening:

  1. We are using run_sync method because create_all method directly on an AsyncConnection or AsyncEngine object is not currently supported, as there is not yet an awaitable form. So, we are passing a synchronous connection sync_conn to the bind argument.

  2. checkfirst argument checks if the table already exists.

Usecase

Usually, services are not given superuser access but if you have a service that onboards new tenants in your multi-tenancy cluster, it is required to dynamically create database during the onboarding of tenants. You can combine dynamic database and table creation here.

When creating tables, you will have to create another engine object that points to your newly created database. You don't have to pass isolation_level to this object unless you want to change the default one. The default is READ_COMMITTED for postgres.

import contextlib

from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine

from .models import Base


async def init_app():
    async with create_async_engine(
        url='postgresql+asyncpg://admin:admin@localhost:5432/postgres',
        isolation_level='AUTOCOMMIT').begin() as conn:
        # If db already exists, suppress the exception.
        with contextlib.suppress(ProgrammingError):
            await conn.execute(text('create database mydb owner admin'))

    # Create another engine object which points to the newly created database.
    async with create_async_engine(
        url='postgresql+asyncpg://admin:admin@localhost:5432/testdatabase').begin() as conn:
        await conn.run_sync(lambda sync_conn: Base.metadata.create_all(
           bind=sync_conn, checkfirst=True))


if __name__ == '__main__':
    import asyncio

    asyncio.run(init_app())
Enter fullscreen mode Exit fullscreen mode

Top comments (0)