DEV Community

Moshe Uminer for SirixDB

Posted on • Edited on

Building A Time-Traveling Contacts App with SirixDB

Why SirixDB

SirixDB is a Temporal Database. It was built from the ground up with temporality in mind. Temporal means time related. Using SirixDB, prior revisions of updated and deleted data are effieciently retained.

The use cases for SirixDB divide into two main categories:

  1. Where you need to be able to view older versions of a given record. In these use-cases, SirixDB obiviates the need for a history table to reconstruct the state of the database at a given time. Instead, you can simply specify a timestamp for your queries to use.
  2. Where you need to multiple versions of your data at once. In these cases, we cannot simply use a history table to rebuild to the state at a given time, because we need to compare the state of the database at many timestamps. With SirixDB, however, it is easy to write such time-travel queries.

There are various solutions for these purposes, such as history tables and event sourcing. But using SirixDB is generally a simple and effective solution.

Some Caveats

At this time, SirixDB does not support relational data. Instead, databases and resources (the equivalent of a table in SQL) may contain either XML or JSON, using XQuery - with the JSONiq extension - as the query language (modified to use => instead of the . operator).

It should be noted that SirixDB is ACID compliant.

SirixDB is currently in alpha.

A Demo of SirixDB and PySirix

We will build a simple contacts app to demonstrate the temporal capabilities of SirixDB. A simple contacts app would support the following operations:

  • add a contact
  • view a contact
  • update a contact
  • remove a contact
  • search for a contact

However, once we have the time-traveling capabilities of SirixDB, we can easily support operations such as:

  • view previous versions of contact information
    • view a contact as it was at a specific date and time
    • view all versions ever stored for a given contact
  • view a deleted contact
  • search deleted contacts
  • search all contacts, including deleted contacts

It is completely possible to support these operations in another database system, but it is generally simpler and more efficient to use a temporal datababse system like SirixDB.

Before We Begin

We will only build an API server with endpoints for the above mentioned functionality. We will not build a frontend.

We'll write the API in Python using the FastAPI framework, with the pysirix library for connecting to SirixDB.

A SirixDB server stores databases, wherein a database contains resources of a single type (either XML or JSON). A resource can be thought of as a large document file (though of course, the internal representation of SirixDB uses a binary encoding -- a huge durable, persistent tree). Thus, a JSON resource is represents a large XML or JSON file. We can query JSON data with XQuery and the JSONiq extension to XQuery.

However, instead of writing custom queries, we will (for the most part) use the JsonStore abstraction provided by the pysirix package. A JsonStore creates a resource with an empty array as its root, and inserts (and retrieves) arbitrary JSON objects to (and from) this array. In this abstraction, it is similiar to MongoDB is used.

Let's begin.

Environment and Dependencies

We first need to set up SirixDB itself. Currently, the practical way to do this is using Docker Compose. Place these files (except for wait.sh, which is not needed) in a directory together, and simply run

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

This will create both SirixDB and Keycloak (used for authentication) containers. The create-sirix-users.sh file is configured to create a user named admin with the password admin.

Then, to install FastAPI, Uvicorn (for running the fastapi server) and PySirix, run

pip install fastapi uvicorn pysirix
Enter fullscreen mode Exit fullscreen mode

pysirix can be used with both regular and async code. We will use it both ways - sync mode in the initialization script, and async mode in the FastAPI app.

Code

The completed tutorial can be found on github.

Initialization Script

Before we begin, we will create a script that will initialize the database and resource on the SirixDB server (running in docker) for our project.

import pysirix
import httpx


def init(database_name: str, resource_name: str):
    client = httpx.Client(base_url="http://localhost:9443")
    sirix = pysirix.sirix_sync("admin", "admin", client)
    store = sirix.database(database_name, pysirix.DBType.JSON).json_store(resource_name)
    if not store.exists():
        # database will be created implicitly if it does not exist when the resource is created
        store.create()
        print(f"created resource {resource_name} in database {database_name}")
    else:
        print(f"resource {resource_name} in database {database_name} already exists")
    client.close()


if __name__ == "__main__":
    init("contacts", "contacts")

Enter fullscreen mode Exit fullscreen mode

This code first creates an httpx client to connect to http://localhost:9443, which is the URL of our local SirixDB server. Then the client is passed to the sirix_sync function, which is helper to both create a Sirix class and authenticate to Keycloak.

On the third line of the init() function, we create a JsonStoreSync class, and then we check if it exists on line 4. If it exists, we do not recreate it, as this would override the existing resource. If it does not exist, however, we create it. We then close the httpx client, and the script exits.

I have named this script initialize.py and placed it in a scripts directory, so I can run it as follows:

python scripts/initialize.py
Enter fullscreen mode Exit fullscreen mode

App Code

We first need to insert new contacts into the store. In order to do so, our code would look something like this:

from pysirix import sirix_async, DBType
import httpx


httpx_client = httpx.AsyncClient(base_url="http://localhost:9443")
sirix = await sirix_async("admin", "admin", httpx_client)
store = sirix.database("contacts", DBType.JSON).json_store("contacts")
await store.insert_one(contact.dict())

Enter fullscreen mode Exit fullscreen mode

However, since we will be needing the JsonStoreAsync class quite a lot, we will use FastAPI's dependency injection system to pass a store instance to our endpoint functions:

from fastapi import FastAPI
import httpx
from pysirix import sirix_async, DBType


app = FastAPI()


async def get_json_store() -> JsonStoreAsync:
    httpx_client = httpx.AsyncClient(base_url="http://localhost:9443")
    sirix = await sirix_async("admin", "admin", httpx_client)
    store = sirix.database("contacts", DBType.JSON).json_store("contacts")
    try:
        yield store
    finally:
        sirix.dispose()
        await httpx_client.aclose()
Enter fullscreen mode Exit fullscreen mode

Here, we recreate the Sirix instance the httpx AsyncClient instance every time this function is called (which will be every time an endpoint function is called). This also has the overhead of authenticating to Keycloak every time sirix_async is called. An alternative solution, which reuses the python classes, the connection pool, and removes the overhead of of re-authentication, would look like this:

app = FastAPI()

httpx_client = httpx.AsyncClient(base_url="http://localhost:9443")

sirix = Sirix("admin", "admin", httpx_client)


@app.on_event("startup")
async def init_sirix():
    await sirix.authenticate()

Enter fullscreen mode Exit fullscreen mode

Here, I have demonstrated how to initialize the Sirix class directly, instead of using the sirix_sync or sirix_async helper functions. It will be initialized when the app starts.
We cannot make this code, because the authenticate method (like all other methods) is asynchronous when using the httpx.AsyncClient.

However, even with this method, instead of simply using the sirix global, we will use FastAPI's dependency injection system to inject a JsonStoreAsync instance into our endpoint handlers. For this, we simply need a function that will initialize and return a JsonStoreAsync instance.

def get_json_store(sirix: Sirix) -> JsonStoreAsync:
    return sirix.database("contacts", DBType.JSON).json_store("contacts")

Enter fullscreen mode Exit fullscreen mode

Create a Contact

Now, we can define our first endpoint - /contacts/new:

from . import schemas


@app.post("/contact/new", status_code=status.HTTP_204_NO_CONTENT)
async def new_contact(
    contact: schemas.Contact, json_store: JsonStoreAsync = Depends(get_json_store)
):
    """
    Create a new contact
    """
    await json_store.insert_one(contact.dict())

Enter fullscreen mode Exit fullscreen mode

The only thing we still need for this endpoint, is to define what our Contact object should look like. So let's add the following to schemas.py:

from pydantic import BaseModel, root_validator
from typing import Optional


class Contact(BaseModel):
    name: Optional[str]
    phone: Optional[str]
    email: Optional[str]
    address: Optional[str]

    @root_validator
    def check_not_empty(cls, fields):
        """
        At least 1 field must be truthy.
        """
        assert any(
            fields.values()
        ), "At least 1 field must not be None/null, and not empty"
        return fields

Enter fullscreen mode Exit fullscreen mode

All fields are optional (so they can be None, or null in JSON), so as to allow creating a contact without all the fields containing a value. Of course, this means that a contact could be created without any fields present. We rectify this by adding a validator that asserts that at least one field contains a value.

Our /contact/new endpoint should now work as expected. If we run our app with uvicorn, and navigate to http://localhost:8000/docs, we can test it out, and store a new contact.

List Contacts

Our second endpoint will simply return all contacts. By default, we will return all current contacts, but if a database revision number is supplied, we will return the database as it was at that state. Alternatively, if a UTC timestamp we will supply the contacts that existed at that time.

@app.get("/contact/list", response_model=list[schemas.ContactWithMeta])
async def list_contacts(
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    List all contacts. Optionally, a revision may be specified.
    """
    results = await json_store.find_all(
        {}, revision=parse_revision(revision_id, revision_timestamp), hash=True
    )
    return [
        schemas.ContactWithMeta(**result, key=result["nodeKey"]) for result in results
    ]

Enter fullscreen mode Exit fullscreen mode

We will use the find_all method to return all contacts matching the query_dict. We have supplied an empty dictionary, so all records will match. If we supplied {"name": "SirixDB"}, then only those records whose name field is exactly SirixDB would be returned.

By default, the find_all method also returns a nodeKey for every record returned. This is a stable and unique identifier for the root of the record it is returned with. It also optionally returns a hash for the revision of the record. We will use both shortly for updates and deletes.

In order to return the nodeKeys to the client, we will define a new ContactWithMeta class, that requires a key field:

class ContactWithMeta(Contact):
    key: int
    hash: str

Enter fullscreen mode Exit fullscreen mode

Also, because we will often provide the option of providing either a revision number or a stringified timestamp, we will define a utility function that handle converting the provided revision parameters to be acceptable to pysirix.

from datetime import datetime


def parse_revision(
    revision_id: Union[int], revision_timestamp: Union[str, None]
) -> Union[int, datetime, None]:
    """
    A utility function to return either a revision ID or a revision timestamp, or ``None``,
            given two possible values (``revision_id`` and ``revision_timestamp``).

    :return: an ``int`` or ``datetime`` representing a revision 
    """
    return revision_id or (
        (
            revision_timestamp
            and datetime.strptime(revision_timestamp, "%Y-%m-%dT%H:%M:%S.%f")
        )
        or None
    )

Enter fullscreen mode Exit fullscreen mode

If a number was provided as revision_id, the number will be returned, otherwise, if a timestamp was provided, it will be returned as a datetime object. Else, None will be returned, which is also a valid value to pass to pysirix, to indicate the most recent revision.

Delete a Contact

The JsonStoreAsync class does not currently provide a method to delete a record by nodeKey, only by query_dict. A simple way to delete a record by nodeKey is to use the Resource class:

from fastapi import Response
from pysirix import SirixServerError


@app.delete("/contact/{contact_key}")
async def delete_contact(
    contact_key: int, hash: str, resource: Resource = Depends(get_json_resource)
):
    """
    Delete the contact with the given key.
    If the record has changed since the hash was obtained, a 409 error is returned.
    """
    try:
        await resource.delete(contact_key, hash)
    except SirixServerError:
        return Response(status_code=status.HTTP_409_CONFLICT)
    return Response(status_code=status.HTTP_204_NO_CONTENT)

Enter fullscreen mode Exit fullscreen mode

We need to handle a possible SirixServerError, due to the way the Resource class deletes the record.

Ideally, you would wish to be certain that the record has not changed between the last time you retrieved it, and the time of deletion. One way of doing so is to retrieve a hash associated with the record upon retrieval, and to provide the hash when deleting. If the hashes do not match, then the server returns an error response, and a SirixServerError is raised.

To provide the Resource to our function, we can create a simple dependency injection function (here I will use the alternate method shown above):

from pysirix import Resource


def get_json_resource() -> Resource:
    return sirix.database("contacts", DBType.JSON).resource("contacts")

Enter fullscreen mode Exit fullscreen mode

Update a Contact

@app.put("/contact/{contact_key}", status_code=status.HTTP_204_NO_CONTENT)
async def update_contact(
    contact_key: int,
    contact: schemas.Contact,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    Update a contact. Fields in the new contact object will
            overwrite fields in the old version of the contact.
    """
    await json_store.update_by_key(contact_key, contact.dict())

Enter fullscreen mode Exit fullscreen mode

The update_by_key method takes a key and a dictionary. Any fields present in the dictionary override the corresponding fields currently present in the record.

View Old Versions of a Contact

@app.get("/contact/{contact_key}", response_model=schemas.Contact)
async def view_contact(
    contact_key: int,
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    Return a contact, given its key. Can return the contact as it was in different points in time.
    By default, the current version is returned.
    """
    result = await json_store.find_by_key(
        contact_key, parse_revision(revision_id, revision_timestamp)
    )
    return schemas.Contact(**result)

Enter fullscreen mode Exit fullscreen mode

This function is similar to the /contact/list endpoint, except that it returns only a single contact, as it was at a given time.

View All Revisions of a Contact

@app.get(
    "/contact/{contact_key}/history",
    response_model=Union[list[schemas.HistoricalContact], list[schemas.Revision]],
)
async def view_contact_history(
    contact_key: int,
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    embed: bool = False,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    Return the history of a contact, given its key.
    If `embed` if `False`, then only the metadata of each revision will be returned.
    Else, the contact (as it was at that revision) will be returned as well.

    If the contact does not currently exist, a `revision_id` or `revision_timestamp`
    of when the contact _did_ exist can be supplied.
    """
    if embed:
        results = await json_store.history_embed(
            contact_key, parse_revision(revision_id, revision_timestamp)
        )
        return [schemas.HistoricalContact(**result) for result in results]
    else:
        return await json_store.history(
            contact_key, revision=parse_revision(revision_id, revision_timestamp)
        )

Enter fullscreen mode Exit fullscreen mode

We also need to define the schemas for this method:

class Revision(BaseModel):
    """
    This schema is of the form of pysirix.types.SubtreeRevision
    """

    revisionTimestamp: str
    revisionNumber: int


class HistoricalContact(Revision):
    """
    This schema is of the form of pysirix.types.QueryResult
    """
    revision: Contact

Enter fullscreen mode Exit fullscreen mode

Search (Within a Revision)

We will now implement search. We will first implement searching within a particular revision (or the current revision), and later will implement another endpoint for all-time search.

We will first declare the QueryTerm class that will contain a search term.

class QueryTerm(BaseModel):
    # the term to match against
    term: str
    # whether to match when the field string contains `term`, instead of looking for an exact match
    fuzzy: bool = False
    # which field in the record to match against
    field: str

Enter fullscreen mode Exit fullscreen mode

And now our endpoint handler:

from fastapi import HTTPException


@app.post("/contact/search", response_model=list[schemas.ContactWithMeta])
async def search_contacts(
    query_terms: list[schemas.QueryTerm],
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    resource: Resource = Depends(get_json_resource),
):
    """
    Search for a contact. If an empty list is provided instead of a list of
            search terms, a 400 error is returned.
    Provide a `revision_id` or `revision_timestamp` to search a particular revision,
    instead of the latest.
    """
    if len(query_terms) == 0:
        raise HTTPException(
            status.HTTP_400_BAD_REQUEST,
            "when not using search terms, use the /contact/list endpoint instead",
        )
    if revision_id:
        open_resource = f"jn:doc('contacts','contacts', {revision_id})"
    elif revision_timestamp:
        open_resource = (
            f"jn:open('contacts','contacts', xs:dateTime('{revision_timestamp}'))"
        )
    else:
        open_resource = "."
    query_list = []
    for query_term in query_terms:
        if query_term.fuzzy:
            query_list.append(
                f"(typeswitch($i=>{query_term.field}) "
                f"case xs:string return contains(xs:string($i=>{query_term.field}), '{query_term.term}')"
                " default return false())"
            )
        else:
            query_list.append(f"$i=>{query_term.field} eq '{query_term.term}'")
    query_filter = " and ".join(query_list)
    query = (
        f"for $i in bit:array-values({open_resource}) where {query_filter}"
        " return {$i, 'nodeKey': sdb:nodekey($i), 'hash': sdb:hash($i)}"
    )
    results = await resource.query(query)
    return [
        schemas.ContactWithMeta(**result, key=result["nodeKey"])
        for result in results["rest"]
    ]

Enter fullscreen mode Exit fullscreen mode

First, we will create the open_resource variable, containing the XQuery code to open the resource. If we wish to query the latest resource, we can simply use . to indicate the resource, since we are using the Resource class. If we wish to open a particular revision, however, we must use jn:doc or jn:open. We also need to convert the timestamp into an xs:dateTime.

We then create the query_list, where each item in the list is an expression returning either true or false. If the search is not fuzzy, we will simply do a comparison of the search term and the field value. If we wish to do a fuzzy search, we use the contains method to check if the search term is contained in the field.

However, there is a potential bug in using the contains method. Since a field must be converted to xs:string before using the contains method, if the field is null, it will be converted to the string "null", and matched. Also, if for some reason the field of that record is not a string (as SirixDB does not enforce type schemas on the resources), there may even be an error thrown by a failed conversion.

To work around this issue, we use a typeswitch statement to first check that the field is a string, otherwise, the typeswitch returns false.

Finally, we declare the query itself: for $i in bit:array-values({open_resource}) opens the resource and iterates through the array at the base of the resource. where {query_filter} then filters out any record ($i) that does not match our query terms, and then finally return {$i, 'nodeKey': sdb:nodekey($i), 'hash': sdb:hash($i)} returns an object - for every record that passed the filter - containing the object, the key, and the hash.

As we will soon see, if we simply returned $i, we would get an object of the compatible with the schemas.HistoricalContact class declared above. However, when returning {$i}, only the record itself is returned.

Search (all-time)

We will now implement a similar query to search all contacts stored, including deleted contacts.

@app.post("/contact/search/all-time", response_model=list[schemas.HistoricalContact])
async def search_contacts_all_time(
    query_terms: list[schemas.QueryTerm],
    existing: bool = True,
    resource: Resource = Depends(get_json_resource),
):
    """
    Search for a contact, even it does not currently exist.
    All contacts are returned if an empty list is provided.

    If `existing` is `True`, then currently existing contacts will be returned as well.
    """
    if len(query_terms) == 0:
        query_filter = ""
    else:
        query_list = []
        for query_term in query_terms:
            if query_term.fuzzy:
                query_list.append(
                    f"(typeswitch($i=>{query_term.field}) "
                    f"case xs:string return contains(xs:string($i=>{query_term.field}), '{query_term.term}')"
                    " default return false())"
                )
            else:
                query_list.append(f"$i=>{query_term.field} eq '{query_term.term}'")
        query_filter = " and ".join(query_list)
        query_filter = f"where {query_filter}"
    query_deleted_only = "where sdb:is-deleted($i)" if not existing else ""
    deduplicate = (
        "if (not(exists(jn:future($i)))) then $i "
        "else if (sdb:hash($i) ne sdb:hash(jn:future($i))) then $i "
        "else ()"
    )
    query = (
        "for $rev in jn:all-times(.) for $i in bit:array-values($rev) "
        f"{query_deleted_only} {query_filter} return {deduplicate}"
    )
    results = await resource.query(query)
    return results["rest"]

Enter fullscreen mode Exit fullscreen mode

In this route we will handle requests without any query terms, by not filtering results, (instead of raising an error, and directing to use a different route, as in the non-temporal search route). In that case, query_filter will be an empty string. If query terms are provided, however, we will build a filter expression similar to the one used above in the non-temporal search.

We then create query_deleted_only, which will be an empty string if we are to query existing contacts as well, or add an additional where clause if we are to not query existing contacts.

We then define a deduplicate expression, where we check if the hash of the record is the same as the hash of that record in the next revision. If it is the same, then we return the empty sequence () to indicate we are returning nothing. If, on the other hand, the hashes are not equal (indicating that a change has occurred), or it does not exist at all in the next revision (meaning it was deleted), then we return the record.

We then define the query itself, iterating through all revisions of the resource (for $rev in jn:all-times(.)), applying our search filters and deduplicate filters.

Join Us!

Find us on slack, discourse, or github discussions.

Top comments (0)