This is the first of a two part series on implementing authorization in a FastAPI application using Deta. In this article, we will learn about JWT tokens, set up the project, and build the auth logic. In the next article, we will implement the auth logic in a FastAPI application. The full code is available here.
Introduction
Implementing authorization can be useful, as it provides the client access to a specific set of functions, actions, data, etc. Consider an e-commerce website, you would want to make sure users are authorized before they can look at items in the cart. Another example is a chat application where only the owner has the right to add/remove people.
JWT (or JSON web tokens) are simply base64 strings that encode some information about the client. These tokens are signed using a secret key or a public/private key. We will implement the former method. Essentially, when the client is logged in, the server sends back a response with a signed token. Subsequently, the client can send requests to the server with the token as a header to access authorized routes, data, functions etc.
Let's get started
Agenda
- Setup
- FastAPI app skeleton + Auth logic
Setup
Tools
- FastAPI: we are using FastAPI to build the application
- Deta Base (Base) : database for our application
- Deta Micro: host our application
-
pyjwt
: library for encoding and decoding JWT tokens -
passlib[bcrypt]
: password hashing library
Install
To get started, create a folder for this project fastapi-jwt
, and create a requirements.txt
file with the following lines:
deta
fastapi
uvicorn
pyjwt
passlib[bcrypt]
Run the following command to install the libraries
pip install -r requirements.txt
Before we begin with the project we also need to get a Deta project key to use with Deta Base. We are using Base to store user account information such as username and hashed password.
To do that, navigate to the Deta Console then click on the arrow on the top left.
If you don't already have a Deta account, create one for free. Once you confirm your email, Deta will automatically generate a Project Key, this is the one we need, copy it and store it securely.
Create a new project and make sure to save the key in a secure place!
Add the key to your environment variables like this DETA_PROJECT_KEY=YOUR_COPIED_PROJECT_KEY
That's it for the setup, we have everything we need to get rolling. Let's go!
FastAPI app skeleton and Auth logic
Here is how our folder structure will look like at the end:
fastapi-jwt/
├── main.py
├── auth.py
├── user_model.py
└── requirements.txt
In main.py
, let's set up our FastAPI application, Deta Base, and skeletons for all the endpoints.
from fastapi import FastAPI, HTTPException
from deta import Deta
deta = Deta()
users_db = deta.Base('users')
app = FastAPI()
@app.post('/signup')
def signup():
return 'Sign up endpoint'
@app.post('/login')
def login():
return 'Login user endpoint'
@app.get('/refresh_token')
def refresh_token():
return 'New token'
@app.post('/secret')
def secret_data():
return 'Secret data'
@app.get('/notsecret')
def not_secret_data():
return 'Not secret data'
users_db
is our base where we store the account's hashed password. The schema for users
will look like the following:
{
key: str, # username
encoded_password: str
}
Now let's head over to auth.py
, to handle the authentication logic:
import os
import jwt # used for encoding and decoding jwt tokens
from fastapi import HTTPException # used to handle error handling
from passlib.context import CryptContext # used for hashing the password
from datetime import datetime, timedelta # used to handle expiry time for tokens
class Auth():
hasher= CryptContext(schemes=['bcrypt'])
secret = os.getenv("APP_SECRET_STRING")
def encode_password(self, password):
return self.hasher.hash(password)
def verify_password(self, password, encoded_password):
return self.hasher.verify(password, encoded_password)
So far, we just imported all the tools from the libraries, and we created the Auth
class with two functions. We don't want to store the plain text password in our users
Base. Therefore, we can use the encode_password
function to encode the password using the passlib['bcrypt']
library. We can store this encoded password in our users_db
base when the user makes an account.
We also have another function verify_password
which checks if the plain password and the encoded password from users_db
match. This can be useful to verify user in the /login
endpoint.
Notice that we get the variable secret
from our environment, make sure to generate a long secure string and store it in your environment variables under the name APP_SECRET_STRING
.
Now that we have a way to verify passwords, and hash passwords, it is time to handle the logic for encoding and decoding JSON web tokens. The tokens are the essence of auth logic.
Inside the Auth
class, add the following functions.
def encode_token(self, username):
payload = {
'exp' : datetime.utcnow() + timedelta(days=0, minutes=30),
'iat' : datetime.utcnow(),
'scope': 'access_token',
'sub' : username
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
if (payload['scope'] == 'access_token'):
return payload['sub']
raise HTTPException(status_code=401, detail='Scope for the token is invalid')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid token')
The encode_token
function takes a username as a parameter and uses pyjwt
to create an access_token
. We are using timedelta
to set the expiry of the token for 30 mins. The scope parameter can be used to verify that the client uses an access_token
. We can use this function inside the /login
endpoint, and return a token to the client.
decode_token
takes a token as a parameter, and attempts to decode it using the secret
. If there are any errors like expired token or an invalid token, we can simply raise an HTTPException
. Otherwise, we can return the username. This will be helpful to us when the client interacts with protected data, functions, etc. We can use this function to simply verify if they have access to the response.
When the token expires, the application forces the user to login. To avoid this, we can create two tokens access_token
and refresh_token
when the user logins. The refresh token usually has a longer expiry time than the access_token
, and will only be used to create a new token. Everytime the access_token
expires, the client sends a request to the server to create a new access_token
using the refresh_token
. If the refresh_token
expires then the client will be forced to login.
We need two more function to handle the refresh_token
logic.
def encode_refresh_token(self, username):
payload = {
'exp' : datetime.utcnow() + timedelta(days=0, hours=10),
'iat' : datetime.utcnow(),
'scope': 'refresh_token',
'sub' : username
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def refresh_token(self, refresh_token):
try:
payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
if (payload['scope'] == 'refresh_token'):
username = payload['sub']
new_token = self.encode_token(username)
return new_token
raise HTTPException(status_code=401, detail='Invalid scope for token')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Refresh token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid refresh token')
As you can tell, we are setting the expiry time of the refresh_token
to be 10 hours which is more than the access_token
. Plus, we are simply using refresh_token
to create a new access_token
. The scope
parameter from JWT token ensures that the refresh_token
is used only for creating new tokens, and access_token
is used only for interacting with protected endpoints.
That is all we need for the auth logic! Here is what the file looks like at the end:
auth.py
import os
import jwt # used for encoding and decoding jwt tokens
from fastapi import HTTPException # used to handle error handling
from passlib.context import CryptContext # used for hashing the password
from datetime import datetime, timedelta # used to handle expiry time for tokens
class Auth():
hasher= CryptContext(schemes=['bcrypt'])
secret = os.getenv("APP_SECRET_STRING")
def encode_password(self, password):
return self.hasher.hash(password)
def verify_password(self, password, encoded_password):
return self.hasher.verify(password, encoded_password)
def encode_token(self, username):
payload = {
'exp' : datetime.utcnow() + timedelta(days=0, minutes=30),
'iat' : datetime.utcnow(),
'scope': 'access_token',
'sub' : username
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
if (payload['scope'] == 'access_token'):
return payload['sub']
raise HTTPException(status_code=401, detail='Scope for the token is invalid')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid token')
def encode_refresh_token(self, username):
payload = {
'exp' : datetime.utcnow() + timedelta(days=0, hours=10),
'iat' : datetime.utcnow(),
'scope': 'refresh_token',
'sub' : username
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def refresh_token(self, refresh_token):
try:
payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
if (payload['scope'] == 'refresh_token'):
username = payload['sub']
new_token = self.encode_token(username)
return new_token
raise HTTPException(status_code=401, detail='Invalid scope for token')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Refresh token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid refresh token')
In the next article, we will implement the logic in a FastAPI application and deploy our app on Deta micros! The full code is available here.
Top comments (7)
🚨UPDATE ALERT 🚨
Hey everyone, I updated the post with some edits to the auth logic to support refresh token. Please take a look at the code, and check out part 2! dev.to/deta/get-started-with-fasta...
great article, looking forward for part 2.
It's coming soon 👀
Thanks for such an awesome article.
Thank you for reading it!
This is awesome!
not an expert here, but isn't it strongly recommended to add salt to the password before hashing? this is exactly the reason why i really really don't want to build my own auth service.