DEV Community

Cover image for Unexpected Expected Thriller: A Tale of Coding Curiosity
Prayson Wilfred Daniel
Prayson Wilfred Daniel

Posted on • Edited on

Unexpected Expected Thriller: A Tale of Coding Curiosity

Do you remember the last time you dived deep into a coding session, losing track of time, just for the sheer fun of it? It's not about deadlines, efficiency, or even perfection. It's the allure of experimentation, breaking, and rebuilding, all while discovering new paths.

Today, I'm going to take you on a thrilling coding adventure inspired by a LinkedIn code snippet, where I tangled with FastAPI, River, Watchdog, and Tenacity. Ready? Buckle up!

⚠️ Disclaimer:

For the adventurous spirit: Don\'t attempt this at home \`{home == production}\`. For clarity, the code has been condensed into three files.
Enter fullscreen mode Exit fullscreen mode

Once Upon a FastAPI...

LinkedInPost

It all started when my eyes caught a captivating MantisNLP's LinkedIn post. The post showcased FastAPI code snippets intended for educational purposes. What intrigued me was the swift model reloading. In the snippet, which carried out an ML prediction, the authors first loaded the model and then executed the prediction.

This stirred a question in me: Could I modify an API's model state based on an event change? If achievable, could we serve a machine learning model that's a shell or semi-trained and continues learning over time?

Using the penguins classification dataset as my canvas, I envisioned something like this:

from pydantic import BaseModel
from fastapi import FastAPI


app = FastAPI(title="😂 Pure Joy")

class Attributes(BaseModel):
    island:str
    bill_length_mm: float | None
    bill_depth_mm: float | None
    flipper_length_mm: float | None
    body_mass_g: float | None
    sex: str | None

class LearnAttributes(BaseModel):
    attributes: Attributes
    species: str

@app.on_event("startup")
def startup_event():
    # check and use if the ml model exists else create one
    ...


@app.post("/predict")
def predict(attributes: Attributes) -> dict:

    # predict
    ...

@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict
    # learn and update the ml model
    ...
Enter fullscreen mode Exit fullscreen mode

Swimming the River of Learning

But just tweaking the API felt too straightforward. I wanted both the API and the machine learning model to be dynamic, evolving in tandem. Enter River, a tool tailor-made for online machine learning. As new data streamed in, the model continually refines its skills.

Here's a basic model:

# ml.py
from river import compose
from river import preprocessing, stats
from river import naive_bayes


def penguins_model():
    island_transformation = compose.Select("island") | preprocessing.OneHotEncoder(
        drop_first=True
    )

    sex_transformation = (
        compose.Select("sex")
        | preprocessing.StatImputer(("sex", stats.Mode()))
        | preprocessing.OneHotEncoder(drop_first=True)
    )

    numeric_transformation = compose.Select(
        "bill_length_mm",
        "bill_depth_mm",
        "flipper_length_mm",
        "body_mass_g",
    ) | preprocessing.StatImputer(
        ("bill_length_mm", stats.Mean()),
        ("bill_depth_mm", stats.Mean()),
        ("flipper_length_mm", stats.Mean()),
        ("body_mass_g", stats.Mean()),
    )

    model = (
        island_transformation + sex_transformation + numeric_transformation
        | naive_bayes.MultinomialNB(alpha=1)
    )

    return model
Enter fullscreen mode Exit fullscreen mode

modelflow

Putting the Lego pieces together into a complete, testable code.

from pathlib import Path
import pickle
...

MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")

...

@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    if not model_file.exists():
        from ml import penguins_model

        app.state.ml = penguins_model()
    else:
        app.state.ml = pickle.loads(model_file.read_bytes())


@app.post("/predict")
def predict(attributes: Attributes) -> str | None:
    X = attributes.model_dump()
    return app.state.ml.predict_one(X)


@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str]:
    X = learn_attributes.attributes.model_dump()
    y = learn_attributes.species

    y_pred = app.state.ml.predict_one(X)
    app.state.ml.learn_one(X, y)

    Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml))

    return {"status": f"we learned {y}. We initially predicted {y_pred}"}

Enter fullscreen mode Exit fullscreen mode

Boom! That was easy. Let's run

uvicorn app:app --reload
Enter fullscreen mode Exit fullscreen mode

🎉 Hurrah ... The code worked seamlessly. That is, until multiple workers entered the fray, leading to unexpected pandemonium. Why? The reason lies in uvicorn's workers: each operates independently, causing utter chaos, especially when executing:

uvicorn app:app --reload --workers 4
Enter fullscreen mode Exit fullscreen mode

To better illustrate the conundrum, let's introduce counters for both predictions and learnings:

...

@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    if not model_file.exists():
       ...
        ml.meta = {"predicted": 0,
                   "learned": 0}

        app.state.ml = ml
    else:
        app.state.ml = pickle.loads(model_file.read_bytes())


@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
    X = attributes.model_dump()
    y_pred  = app.state.ml.predict_one(X)
    app.state.ml.meta["predicted"] += 1

    return {"predicted": y_pred,
            **app.state.ml.meta,}


@app.post("/learn")
def learn(learn_features: LearnAttributes) -> dict[str, str|int]:
    ...

    app.state.ml.meta["learned"] += 1

    Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml)

    return {"status": f"we learned {y}. We initially predicted {y_pred}",
            **app.state.ml.meta,}

Enter fullscreen mode Exit fullscreen mode

messy

The tally for predictions and learnings is inexplicably out of sync.

😭 My joy short lived. The core issue was that our ML model's knowledge wasn't uniformly distributed across workers. It felt like having several chefs in a kitchen, each adding their unique flavour, unaware of the others' contributions.


The Watchdog's Bark and Bite

That's when Watchdog barked into the scene. Its event handling capabilities ensured machine learning model updates happened seamlessly based on file changes across workers. Let's add the modification.

...
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")


class FileHandler(FileSystemEventHandler):
    def on_modified(self, event):
        print(f"path={event.src_path} event={event.event_type}")
        app.state.ml = pickle.loads(Path(MODEL_FILE).read_bytes())

...

handler = FileHandler()
observer = Observer()


@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    if not model_file.exists():
        ...
    else:
        ...

    observer.schedule(handler, path=MODEL_FILE, recursive=False)
    observer.start()


@app.on_event("shutdown")
def shutdown_event():
    observer.stop()
    observer.join()


@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
   ...


@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str | int]:
   ...
Enter fullscreen mode Exit fullscreen mode

unmessy

🎉 Hurrah! Again … It worked. But watchdog had its quirks. It bite back. Workers still occasionally collided, each clamouring to update the model and sometimes finding the file "in use" error. It is chaotic, to say the least. But isn't that what coding for fun is all about? Facing challenges and finding creative solutions.


Tenacity to the Rescue!

That's where Tenacity strutted in. Instead of letting workers clash and give up, Tenacity made them... well, tenacious! It equipped them with smart retries. If a worker faced an "in-use" error, it patiently waited and retried, ensuring that every update was eventually accounted for.

We are going to update our machine learning file to have the loading and saving in order to add the tenaciousness. More over, we can move code arround to cleanup a little. The final code looks like:

Getting schemas out

# schemas.py
from pydantic import BaseModel


class Attributes(BaseModel):
    island: str
    bill_length_mm: float | None
    bill_depth_mm: float | None
    flipper_length_mm: float | None
    body_mass_g: float | None
    sex: str | None


class LearnAttributes(BaseModel):
    attributes: Attributes
    species: str


class Predictions(BaseModel):
    species: str | None
    predicted: int
    learned: int


class Learnings(BaseModel):
    status: str
    predicted: int
    learned: int

Enter fullscreen mode Exit fullscreen mode

Adding model io(input and out) function that is tenacious

# ml.py
import pickle
from pathlib import Path
from typing import Literal

...
from tenacity import retry, retry_if_exception_type

...

@retry(retry=retry_if_exception_type(EOFError))
def ml_io(
    model_file: Path,
    mode: Literal["wb", "rb"] = "rb",
    ml_object: compose.Pipeline | None = None,
):
    if mode == "rb" and not model_file.exists():
        ml = penguins_model()
        ml.meta = {
            "predicted": 0,
            "learned": 0,
        }
        # save the first local copy (god like ...)
        model_file.write_bytes(pickle.dumps(ml_object))

        return ml

    elif mode == "rb" and model_file.exists():
        return pickle.loads(model_file.read_bytes())

    elif mode == "wb":
        return model_file.write_bytes(pickle.dumps(ml_object))
    else:
        NotImplemented(f"mode can only be `wb` or `rb`")
Enter fullscreen mode Exit fullscreen mode

And for our application:

# app.py
from pathlib import Path

from fastapi import FastAPI
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from ml import ml_io
from schemas import Attributes, LearnAttributes, Predictions, Learnings


MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")


class FileHandler(FileSystemEventHandler):
    def on_modified(self, event):
        # print(f"path={event.src_path} event={event.event_type}")
        app.state.ml = ml_io(model_file=Path(MODEL_FILE), mode="rb")


handler = FileHandler()
observer = Observer()


@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    app.state.ml = ml_io(model_file=model_file, mode="rb")

    observer.schedule(handler, path=MODEL_FILE, recursive=False)
    observer.start()


@app.on_event("shutdown")
def shutdown_event():
    observer.stop()
    observer.join()


@app.post("/predict")
def predict(attributes: Attributes) -> Predictions:
    X = attributes.model_dump()
    y_pred = app.state.ml.predict_one(X)
    app.state.ml.meta["predicted"] += 1

    return {
        "species": y_pred,
        **app.state.ml.meta,
    }


@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> Learnings:
    X = learn_attributes.attributes.model_dump()
    y = learn_attributes.species

    y_pred = app.state.ml.predict_one(X)
    app.state.ml.learn_one(X, y)

    app.state.ml.meta["learned"] += 1

    ml_io(model_file=Path(MODEL_FILE), mode="wb", ml_object=app.state.ml)

    return {
        "status": f"learned {y}. Initially predicted {y_pred}",
        **app.state.ml.meta,
    }
Enter fullscreen mode Exit fullscreen mode

The Sunset of Our Tale

This journey was not about creating the code for production. It was a thrilling adventure through challenges and solutions, a testament to the beauty of coding for fun. I experimented, learned, broke things, and eventually built something magnificent.

The biggest takeaway? Embrace the chaos and curiosity. Revel in the unpredictability. And remember, coding for fun should always be fun. And when in doubt, be tenacious!

Until then, keep on coding curiously

Top comments (0)