DEV Community

Dhruvil Shah
Dhruvil Shah

Posted on

Testing ZBLog

`run steps

!/bin/sh

sudo apt-get update -y
sudo apt-get install python3-pip -y
pip3 install pytest==5.3.1 urllib3==1.25.7 Flask==1.1.1 Flask-HTTPAuth==3.3.0 flask-marshmallow==0.10.1 Flask-Migrate==2.5.2 Flask-Script==2.0.6 Flask-SQLAlchemy==2.4.1 Flask-Testing==0.8.0 marshmallow==3.2.2
cd /home/labuser/Desktop/Project/t4_ecommerce_flaskh/ecommerceapp;
export FLASK_APP=main.py;
python -m flask seeddata

  1. set FLASK_APP=main
  2. python -m flask db init (ignore the errormigrations folder not empty)
  3. python -m flask db migrate
  4. python -m flask db upgrade
  5. python -m flask run

$env:FLASK_APP="main.py"

sqlite3’ command will get you into sqlite3 terminal.

  1. ‘.open dbname.db’ will get that database as core. All further operations will be made on that database.
  2. ‘.tables’ will list the tables in that database.
  3. ‘select * from tablename; ’ will list the rows of table.

virtual env

ctrl shift p
python interpreter`
enter interpreterpath
select scripts python exe

python -m pytest tests.py

seed.py

from werkzeug.security import generate_password_hash
from . import db
from .models import Product, Role, Cart, CartProduct, Category,User

db.drop_all()
db.session.commit()
try:
db.drop_all()
db.create_all()
c1=Category(category_name='Fashion')
db.session.add(c1)
db.session.commit()

c2=Category(category_name='Electronics')
db.session.add(c2)
db.session.commit()

c3=Category(category_name='Books')
db.session.add(c3)
db.session.commit()

c4=Category(category_name='Groceries')
db.session.add(c4)
db.session.commit()

c5=Category(category_name='Medicines')
db.session.add(c5)
db.session.commit()


r1=Role(role_name='CONSUMER')
db.session.add(r1)
db.session.commit()

r2=Role(role_name='SELLER')
db.session.add(r2)
db.session.commit()

password=generate_password_hash("pass_word",method='pbkdf2:sha256',salt_length=8)
u1=User(user_name='jack',password=password,user_role=1)
db.session.add(u1)
db.session.commit()

u2=User(user_name='bob',password=password,user_role=1)
db.session.add(u2)
db.session.commit()

u3=User(user_name='apple',password=password,user_role=2)
db.session.add(u3)
db.session.commit()

u4=User(user_name='glaxo',password=password,user_role=2)
db.session.add(u4)
db.session.commit()

cart1=Cart(total_amount=20, user_id=1)
db.session.add(cart1)
db.session.commit()

cart2=Cart(total_amount=0, user_id=2)
db.session.add(cart2)
db.session.commit()

p1=Product(price=29190,product_name='ipad',category_id=2,seller_id=3)
db.session.add(p1)
db.session.commit()
p2=Product(price=10,product_name='crocin',category_id=5,seller_id=4)
db.session.add(p2)
db.session.commit()

cp1=CartProduct(cart_id=1,product_id=2,quantity=2)
db.session.add(cp1)
db.session.commit()
print('database successfully initialized')
Enter fullscreen mode Exit fullscreen mode

except Exception:
db.session.rollback()
print('error in adding data to db')

routes.py

from flask import (
Flask,
request,
jsonify,
make_response,
session,
app,
Response,
url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate, MigrateCommand

from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker
from .models import *

from sqlalchemy.orm import joinedload

import base64

import json
import jwt
from functools import wraps
from flask import abort, current_app, request

"""
NOTE:
Use jsonify function to return the outputs and status code

Example:
with output
return jsonify(output),2000

only status code
return '',404

"""

Use this token_required method for your routes where ever needed.

def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
print(request.headers)
if "x-access-token" in request.headers:
token = request.headers["x-access-token"]
print(token)
if not token:
return jsonify({"Message": " Token is missing"}), 401
try:
print("yes")
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
print(data)
current_user = User.query.filter_by(user_id=data["user_id"]).first()
print(current_user.user_id)
print(current_user.user_role)
except:
return jsonify({"Message": "Token is invalid"}), 401
return f(current_user, *args, **kwargs)

return decorated
Enter fullscreen mode Exit fullscreen mode

def role_required(role):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
# user_id=kwargs('user_id',None)
user_id = kwargs.pop("user_id", None)
print(user_id)

        user_role = User.objects.get(user_id=user_id)
        print(user_role)

        roleName = user_role.user_role.role_name
        print(roleName)

        if roleName != role:
            abort(403)
        return fn(*args, **kwargs)

    return decorated_view

return wrapper
Enter fullscreen mode Exit fullscreen mode

@app.route("/")
def index():
return "Index Page"

@app.route("/api/public/login", methods=["GET"])
def login():

uname = request.headers["Authorization"][6:]
print(uname)
pass1 = str(base64.b64decode(uname))[2:-1]
username = pass1.split(":")[0]
password = pass1.split(":")[1]

# Find the user in the database based on the provided username
user = User.query.filter_by(user_name=username).first()

if user and check_password_hash(user.password, password):
    # Authentication successful
    token = jwt.encode(
        {"user_id": user.user_id, "exp": datetime.utcnow() + timedelta(hours=5)},
        app.config["SECRET_KEY"],
        algorithm="HS256",
    )
    # token = jwt.encode({'user_id': user.user_id, 'exp' :datetime.utcnow() + timedelta(minutes=30)}, app.config['SECRET_KEY'], algorithm="HS256")
    return jsonify({"token": token}), 200
else:
    # Authentication failed
    return jsonify({"token": "0"}), 401
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/public/product/search", methods=["GET"])
def search_products():
keyword = request.args.get("keyword")

# Perform the join operation to retrieve the matching products
products = (
    db.session.query(Product)
    .join(Category)
    .filter(Product.product_name.contains(keyword))
    .all()
)

if not products:
    return jsonify("Null"), 400

# Prepare the response data
response = []
for product in products:
    category = {
        "category_id": product.category.category_id,
        "category_name": product.category.category_name,
    }
    product_data = {
        "category": category,
        "product_id": product.product_id,
        "product_name": product.product_name,
        "price": product.price,
        "seller_id": product.seller_id,
    }
    response.append(product_data)

return jsonify(response), 200
Enter fullscreen mode Exit fullscreen mode

#------------------------------------Seller endpoints---------------------------------

#----------------URL /api/auth/seller/product

@app.route("/api/auth/seller/product", methods=["GET"])
@token_required
def get_seller_products(current_user):
if current_user.user_role != 2:
return {
"message": "Unauthorized Endpoint",
}, 403 # Implement your own authentication logic

# Retrieve the products owned by the seller
products = Product.query.filter_by(seller_id=current_user.user_id).all()

# If no products are found, return a 404 status code
if not products:
    return {"message": "Product not found"}, 403

# Prepare the response body
response1 = []
for product in products:
    response1.append(
        {
            "category": {
                "category_id": product.category.category_id,
                "category_name": product.category.category_name,
            },
            "price": product.price,
            "product_id": product.product_id,
            "product_name": product.product_name,
            "seller_id": product.seller_id,
        }
    )
print(jsonify(response1))

# return json.dumps(response1),200

return jsonify(response1), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/seller/product/int:product_id", methods=["GET"])
@token_required
def get_product(current_user, product_id):
if current_user.user_role != 2:
return {
"message": "Unauthorized Endpoint",
}, 403

product = Product.query.filter_by(
    product_id=product_id, seller_id=current_user.user_id
).first()
if not product:
    return jsonify({"error": "Product not found"}), 404

response = [
    {
        "category": {
            "category_id": product.category.category_id,
            "category_name": product.category.category_name,
        },
        "price": product.price,
        "product_id": product.product_id,
        "product_name": product.product_name,
        "seller_id": product.seller_id,
    }
]
return jsonify(response), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/seller/product", methods=["POST"])
@token_required
def add_product(current_user):
if current_user.user_role != 2: # Assuming seller role_id is 2
return jsonify({"message": "Forbidden"}), 403

data = request.get_json()
product_id = data.get("product_id")

# Check if the product_id already exists
existing_product = Product.query.filter_by(product_id=product_id).first()
if existing_product:
    return jsonify({"message": "Product ID already exists"}), 409

# Create and save the new product
product = Product(
    product_id=product_id,
    product_name=data.get("product_name"),
    price=data.get("price"),
    seller_id=current_user.user_id,
    category_id=data.get("category_id"),
)
db.session.add(product)
db.session.commit()

return jsonify(product.product_id), 201
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/seller/product", methods=["PUT"])
@token_required
def update_product(current_user):
if current_user.user_role != 2: # Assuming seller role_id is 2
return jsonify({"message": "Forbidden"}), 403

data = request.get_json()
product_id = data.get("product_id")
price = data.get("price")

# Check if the product is owned by the seller
product = Product.query.filter_by(
    product_id=product_id, seller_id=current_user.user_id
).first()
if not product:
    return jsonify({"message": "Product not found or not owned by the seller"}), 404

if product.price != price:
    cartProducts = CartProduct.query.filter_by(product_id=product_id)
    for cp in cartProducts:
        cp.cart.total_amount += cp.quantity * (price - product.price)

# Update the product price
product.price = price
db.session.commit()

return jsonify({"message": "Product price updated successfully"}), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/seller/product/int:prodid", methods=["DELETE"])
@token_required
def delete_product(current_user, prodid):
if current_user.user_role != 2: # Assuming seller role_id is 2
return jsonify({"message": "Forbidden"}), 403

# Check if the product is owned by the seller
product = Product.query.filter_by(
    product_id=prodid, seller_id=current_user.user_id
).first()
if not product:
    return jsonify({"message": "Product not found or not owned by the seller"}), 404

cartProducts = CartProduct.query.filter_by(product_id=prodid)

for cp in cartProducts:
    cp.cart.total_amount -= cp.quantity * product.price

# Delete the product
db.session.delete(product)
db.session.commit()

return jsonify({"message": "Product deleted successfully"}), 200
Enter fullscreen mode Exit fullscreen mode

#------------------------------------Consumer endpoints---------------------------------

#----------------URL /api/auth/consumer/product

@app.route("/api/auth/consumer/cart", methods=["GET"])
@token_required
def get_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403

# Perform the join on Cart, CartProduct, Product, and Category tables
cart_data = (
    Cart.query.join(CartProduct, Cart.cart_id == CartProduct.cart_id)
    .join(Product, CartProduct.product_id == Product.product_id)
    .join(Category, Product.category_id == Category.category_id)
    .filter(Cart.user_id == current_user.user_id)
    .with_entities(
        CartProduct.cp_id,
        Cart.cart_id,
        Cart.total_amount,
        Product.product_id,
        Product.price,
        Product.product_name,
        Category.category_id,
        Category.category_name,
    )
    .all()
)

cart_items = []
for cart_product in cart_data:
    cart_item = {
        "cartproducts": {
            "product": {
                "product_id": cart_product.product_id,
                "price": cart_product.price,
                "product_name": cart_product.product_name,
                "category": {
                    "category_name": cart_product.category_name,
                    "category_id": cart_product.category_id,
                },
            },
            "cp_id": cart_product.cp_id,
        },
        "cart_id": cart_product.cart_id,
        "total_amount": cart_product.total_amount,
    }
    cart_items.append(cart_item)

return jsonify(cart_items), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/consumer/cart", methods=["POST"])
@token_required
def add_to_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403

data = request.get_json()
product_id = data.get("product_id")
quantity = data.get("quantity")

# Check if the product is already present in the cart
existing_cart_product = (
    CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
    .filter(
        Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
    )
    .first()
)

if existing_cart_product:
    return jsonify({"message": "Product already in cart"}), 409

# Get the price of the product
product = Product.query.filter_by(product_id=product_id).first()
if not product:
    return jsonify({"message": "Product not found"}), 404

# Calculate the total amount
total_amount = current_user.cart.total_amount + (product.price * quantity)

# Create a new CartProduct instance and add it to the cart
new_cart_product = CartProduct(
    cart_id=current_user.cart.cart_id, product_id=product_id, quantity=quantity
)
db.session.add(new_cart_product)
db.session.commit()

# Update the total amount of the cart
current_user.cart.total_amount = total_amount
db.session.commit()

return jsonify(total_amount), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/consumer/cart", methods=["PUT"])
@token_required
def update_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403

data = request.get_json()
product_id = data.get("product_id")
quantity = data.get("quantity")

# Check if the product is present in the cart
cart_product = (
    CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
    .filter(
        Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
    )
    .first()
)

if not cart_product:
    return jsonify({"message": "Product not found in cart"}), 404

# Get the price of the product
product = Product.query.filter_by(product_id=product_id).first()
if not product:
    return jsonify({"message": "Product not found"}), 404

# Update the quantity and total amount
total_amount = current_user.cart.total_amount + (
    quantity - cart_product.quantity
) * (product.price)
cart_product.quantity = quantity
current_user.cart.total_amount = total_amount
db.session.commit()

return jsonify(total_amount), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/api/auth/consumer/cart", methods=["DELETE"])
@token_required
def remove_from_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403

data = request.get_json()
product_id = data.get("product_id")

# Check if the product is present in the cart
cart_product = (
    CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
    .filter(
        Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
    )
    .first()
)

if not cart_product:
    return jsonify({"message": "Product not found in cart"}), 404

# Get the price and quantity of the product
product = Product.query.filter_by(product_id=product_id).first()
if not product:
    return jsonify({"message": "Product not found"}), 404

# Calculate the total amount
total_amount = current_user.cart.total_amount - (
    product.price * cart_product.quantity
)

# Remove the cart product from the cart
db.session.delete(cart_product)
db.session.commit()

print(total_amount)

# Update the total amount of the cart
current_user.cart.total_amount = total_amount
db.session.commit()

return jsonify(total_amount), 200
Enter fullscreen mode Exit fullscreen mode

Password encode code

import base64
print(str(base64.b64decode("YXBwbGU6cGFzc193b3Jk"))[2:-1])

models.py

from ecommerceapp import db
from datetime import datetime

class Product(db.Model):
product_id=db.Column(db.Integer,primary_key=True)
product_name=db.Column(db.String(80),nullable=False)
price=db.Column(db.Float,nullable=False)
seller_id=db.Column(db.Integer, db.ForeignKey('user.user_id'))
category_id=db.Column(db.Integer, db.ForeignKey('category.category_id'))

class Category(db.Model):
category_id=db.Column(db.Integer,primary_key=True)
category_name=db.Column(db.String(80),nullable=False)
products=db.relationship("Product", cascade="all, delete-orphan", backref='category')

class Cart(db.Model):
cart_id=db.Column(db.Integer,primary_key=True)
total_amount=db.Column(db.Float,nullable=False)
user_id=db.Column(db.Integer, db.ForeignKey('user.user_id'))
cartproducts=db.relationship("CartProduct", cascade="all, delete-orphan", backref='cart')

class CartProduct(db.Model):
cp_id=db.Column(db.Integer,primary_key=True)
cart_id=db.Column(db.Integer, db.ForeignKey('cart.cart_id'))
product_id=db.Column(db.Integer, db.ForeignKey('product.product_id'))
quantity=db.Column(db.Integer)

class Role(db.Model):
role_id=db.Column(db.Integer,primary_key=True)
role_name=db.Column(db.String(20),nullable=False)
users=db.relationship("User", cascade="all, delete-orphan", backref='role')

class User(db.Model):
user_id=db.Column(db.Integer,primary_key=True)
user_name=db.Column(db.String(20),nullable=False)
password=db.Column(db.String(150),nullable=False)
user_role=db.Column(db.Integer, db.ForeignKey('role.role_id'))
cart = db.relationship('Cart', backref='user', uselist=False)

init.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate

app = Flask(name)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)

from . import routes, models

do not delete these below methods

@app.cli.command()
def erasedata():
db.drop_all()
db.session.commit()
db.create_all()
@app.cli.command()
def seeddata():
from . import seed


Fac

Skip to main content
Example

Test blog
June 13, 2024

Run Steps

$env:FLASK_APP = "main"
python -m flask db init (ignore the errormigrations folder not empty)
python -m flask db migrate
python -m flask db upgrade
python -m flask run

POSTMAN

Authorization
Select Bearer Token and direcltly add token at token no need to write Bearer ahead itwill be bydefault

ERASE DATA
python -m flask erasedata

SeedDATA
python -m flask seeddata

init.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate

app = Flask(name)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
app.app_context().push()

from . import routes, models

do not delete these below methods

@app.cli.command()
def erasedata():
db.drop_all()
db.session.commit()
# db.create_all()

@app.cli.command()
def seeddata():
from . import seed

seed.py

from werkzeug.security import generate_password_hash
from . import db
from .models import *

db.drop_all()
db.session.commit()
try:
db.create_all()

# Role creating
r1 = RoleModel(rolename='ADMIN')
db.session.add(r1)
db.session.commit()

r2 = RoleModel(rolename='USER')
db.session.add(r2)
db.session.commit()

# Creating Users
u1 = UserModel(username='admin1', email='adminemail@gmail.com', password='admin123$', role=1)
db.session.add(u1)
db.session.commit()

u2 = UserModel(username='admin2', email='adminemail2@gmail.com', password='admin789$', role=1)
db.session.add(u2)
db.session.commit()

u3 = UserModel(username='user', email='useremail@gmail.com', password='user123$', role=2)
db.session.add(u3)
db.session.commit()

print('Database successfully initialized')
Enter fullscreen mode Exit fullscreen mode

except Exception as e:
db.session.rollback()
print(f'Error in adding data to db: {e}')

config.py

import os
basedir = os.path.abspath(os.path.dirname(file))

class BaseConfig(object):
DEBUG = True
TESTING = False
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'ecommerceapp.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'

class TestingConfig(BaseConfig):
TESTING = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'testecommerce.db')

models.py

from ecommerceapp import db
from datetime import datetime

class RoleModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
rolename = db.Column(db.String(50), unique=True, nullable=False)

class UserModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
password = db.Column(db.String(100), nullable=False)
role = db.Column(db.Integer, db.ForeignKey('role_model.id'), nullable=False)

class FacultyModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
faculty_name = db.Column(db.String(100), nullable=False)
qualification = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
gender = db.Column(db.String(10), nullable=False)
department = db.Column(db.String(50), nullable=False)
experience = db.Column(db.Integer, nullable=False)
admin_id = db.Column(db.Integer, db.ForeignKey('user_model.id'), nullable=False)

routes.py

from sqlite3 import IntegrityError
from flask import (
Flask,
abort,
request,
jsonify,
make_response,
session,
app,
Response,
url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate

from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker

import json
import jwt
from functools import wraps

from .models import *

"""
NOTE:
Use jsonify function to return the outputs and status code

Example:
with output
return jsonify(output),2000

only status code
return '',404

"""

# Use this token_required method for your routes where ever needed.

def token_required(f):

@wraps(f)

def decorated(*args, **kwargs):

token = None

if "Authorization" in request.headers:

token = request.headers["Authorization"]

print(token)

if not token:

return jsonify({"Message": " Token is missing"}), 401

try:

data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")

print(data)

current_user = UserModel.query.filter_by(email=data["email"]).first()

print(current_user)

print(current_user.email)

except:

return jsonify({"Message": "Token is invalid"}), 401

return f(current_user, *args, **kwargs)

return decorated

def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if "Authorization" in request.headers:
token = request.headers["Authorization"].split(" ")[
1
] # Extract the token without the "Bearer " prefix
print("Token:", token)

    if not token:
        return jsonify({"Message": "Token is missing"}), 401

    try:
        data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
        print("Decoded Data:", data)

        current_user = UserModel.query.filter_by(email=data["email"]).first()
        print("Current User:", current_user)
        print("Current User Email:", current_user.email)

    except jwt.ExpiredSignatureError:
        return jsonify({"Message": "Token has expired"}), 401
    except jwt.InvalidTokenError:
        return jsonify({"Message": "Token is invalid"}), 401

    return f(current_user, *args, **kwargs)

return decorated
Enter fullscreen mode Exit fullscreen mode

#Write your code here for the API end points

Use this token_required method for your routes where ever needed.

def token_required(f):

@wraps(f)

def decorated(*args, **kwargs):

token = None

print(request.headers)

if 'x-access-token' in request.headers:

token = request.headers['x-access-token']

print(token)

if not token:

return jsonify({'Message':' Token is missing'}), 401

try:

print("yes")

data = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')

print(data)

current_user = UserModel.query.filter_by(email=data['email']).first()

print(current_user.email)

print(current_user.role)

except:

return jsonify({'Message': 'Token is invalid'}), 401

return f(current_user, *args, **kwargs)

return decorated

def role_required(role):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
email = kwargs.pop("email", None)
print(email)

        role = UserModel.objects.get(email=email)
        print(role)

        roleName = role.role.rolename
        print(roleName)

        if roleName != role:
            abort(403)
        return fn(*args, **kwargs)

    return decorated_view

return wrapper
Enter fullscreen mode Exit fullscreen mode

@app.route("/")
def index():
return "Index Page"

@app.route("/user/login", methods=["POST"])
def user_login():
data = request.get_json()
user = UserModel.query.filter_by(email=data.get("email")).first()

if user and user.password == data.get("password"):
    # If authentication is successful, create a JWT token
    token = jwt.encode(
        {"email": user.email}, app.config["SECRET_KEY"], algorithm="HS256"
    )

    print(token)
    return jsonify({"jwt": token, "status": 200})
else:
    return jsonify({"Message": "Invalid credentials"}), 400
Enter fullscreen mode Exit fullscreen mode

Constants for roles

ADMIN_ROLE = 1
USER_ROLE = 2

Endpoint for adding a new faculty

@app.route("/faculty/add", methods=["POST"])
@token_required
def add_faculty(current_user):
print("134 line", current_user.role)
if current_user.role != ADMIN_ROLE:
abort(400) # Only admins can add faculty

data = request.get_json()

new_faculty = FacultyModel(
    faculty_name=data.get("faculty_name"),
    qualification=data.get("qualification"),
    email=data.get("email"),
    gender=data.get("gender"),
    department=data.get("department"),
    experience=data.get("experience"),
    admin_id=current_user.id,
)

try:
    db.session.add(new_faculty)
    db.session.commit()
    return (
        jsonify(
            {
                "id": new_faculty.id,
                "faculty_name": new_faculty.faculty_name,
                "qualification": new_faculty.qualification,
                "email": new_faculty.email,
                "gender": new_faculty.gender,
                "department": new_faculty.department,
                "experience": new_faculty.experience,
                "admin_id": new_faculty.admin_id,
            }
        ),
        201,
    )
except IntegrityError as e:
    db.session.rollback()
    return (
        jsonify(
            {"Message": "Duplicate email or other integrity constraint violated"}
        ),
        400,
    )
except Exception as e:
    db.session.rollback()
    return jsonify({"Message": "Bad Request on invalid credentials"}), 400
Enter fullscreen mode Exit fullscreen mode

http://127.0.0.1:5000/faculty/list?gender=MALE&department=CE

@app.route("/faculty/list", methods=["GET"])
@token_required
def list_faculty(current_user):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401

# Get parameters from the URL
department = request.args.get("department")
experience = request.args.get("experience")
gender = request.args.get("gender")

# Query faculty based on the provided search criteria
# faculty_query = FacultyModel.query.filter_by(admin_id=current_user.id)
faculty_query = FacultyModel.query

if department:
    faculty_query = faculty_query.filter_by(department=department)
if experience:
    faculty_query = faculty_query.filter_by(experience=experience)
if gender:
    faculty_query = faculty_query.filter_by(gender=gender)

faculty_list = faculty_query.all()

if not faculty_list:
    return jsonify({"Message": "No data available"}), 400

# Format the response
formatted_faculty_list = []
for faculty in faculty_list:
    formatted_faculty_list.append(
        {
            "id": faculty.id,
            "faculty_name": faculty.faculty_name,
            "qualification": faculty.qualification,
            "email": faculty.email,
            "gender": faculty.gender,
            "department": faculty.department,
            "experience": faculty.experience,
            "admin_id": faculty.admin_id,
        }
    )

return jsonify(formatted_faculty_list), 200
Enter fullscreen mode Exit fullscreen mode

@app.route("/faculty/update/int:faculty_id", methods=["PATCH"])
@token_required
def update_faculty(current_user, faculty_id):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401

# Query the faculty by ID and check if it belongs to the authenticated user
faculty = FacultyModel.query.get(faculty_id)

if not faculty or faculty.admin_id != current_user.id:
    return jsonify({"Message": "Faculty not found or unauthorized access"}), 404

# Get the update data from the request body
data = request.get_json()

# Update the faculty details
faculty.qualification = data.get("qualification", faculty.qualification)
faculty.experience = data.get("experience", faculty.experience)
faculty.department = data.get("department", faculty.department)

try:
    db.session.commit()
    return (
        jsonify(
            {
                "id": faculty.id,
                "faculty_name": faculty.faculty_name,
                "qualification": faculty.qualification,
                "email": faculty.email,
                "gender": faculty.gender,
                "department": faculty.department,
                "experience": faculty.experience,
                "admin_id": faculty.admin_id,
            }
        ),
        200,
    )
except Exception as e:
    db.session.rollback()
    return jsonify({"Message": "Bad Request on invalid credentials"}), 400
Enter fullscreen mode Exit fullscreen mode

Endpoint for deleting a faculty by ID

@app.route('/faculty/delete/string:faculty_id', methods=['DELETE'])

@app.route("/faculty/delete/int:faculty_id", methods=["DELETE"])
@token_required
def delete_faculty(current_user, faculty_id):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401

# Query the faculty by ID
faculty = FacultyModel.query.get(faculty_id)

if not faculty:
    return jsonify({"Message": "Faculty not found"}), 400

# Check if the authenticated user is an admin or the creator of the faculty
if current_user.role != ADMIN_ROLE and faculty.admin_id != current_user.id:
    return jsonify({"Message": "You don’t have permission"}), 403

try:
    # Delete the faculty from the database
    db.session.delete(faculty)
    db.session.commit()
    return jsonify({"Message": "Deleted successfully"}), 200
except Exception as e:
    db.session.rollback()
    return jsonify({"Message": "Bad Request on invalid credentials"}), 400
Enter fullscreen mode Exit fullscreen mode

`

Top comments (0)