`run steps
!/bin/sh
sudo apt-get update -y
sudo apt-get install python3-pip -y
pip3 install pytest==5.3.1 urllib3==1.25.7 Flask==1.1.1 Flask-HTTPAuth==3.3.0 flask-marshmallow==0.10.1 Flask-Migrate==2.5.2 Flask-Script==2.0.6 Flask-SQLAlchemy==2.4.1 Flask-Testing==0.8.0 marshmallow==3.2.2
cd /home/labuser/Desktop/Project/t4_ecommerce_flaskh/ecommerceapp;
export FLASK_APP=main.py;
python -m flask seeddata
- set FLASK_APP=main
- python -m flask db init (ignore the errormigrations folder not empty)
- python -m flask db migrate
- python -m flask db upgrade
- python -m flask run
$env:FLASK_APP="main.py"
sqlite3’ command will get you into sqlite3 terminal.
- ‘.open dbname.db’ will get that database as core. All further operations will be made on that database.
- ‘.tables’ will list the tables in that database.
- ‘select * from tablename; ’ will list the rows of table.
virtual env
ctrl shift p
python interpreter`
enter interpreterpath
select scripts python exe
python -m pytest tests.py
seed.py
from werkzeug.security import generate_password_hash
from . import db
from .models import Product, Role, Cart, CartProduct, Category,User
db.drop_all()
db.session.commit()
try:
db.drop_all()
db.create_all()
c1=Category(category_name='Fashion')
db.session.add(c1)
db.session.commit()
c2=Category(category_name='Electronics')
db.session.add(c2)
db.session.commit()
c3=Category(category_name='Books')
db.session.add(c3)
db.session.commit()
c4=Category(category_name='Groceries')
db.session.add(c4)
db.session.commit()
c5=Category(category_name='Medicines')
db.session.add(c5)
db.session.commit()
r1=Role(role_name='CONSUMER')
db.session.add(r1)
db.session.commit()
r2=Role(role_name='SELLER')
db.session.add(r2)
db.session.commit()
password=generate_password_hash("pass_word",method='pbkdf2:sha256',salt_length=8)
u1=User(user_name='jack',password=password,user_role=1)
db.session.add(u1)
db.session.commit()
u2=User(user_name='bob',password=password,user_role=1)
db.session.add(u2)
db.session.commit()
u3=User(user_name='apple',password=password,user_role=2)
db.session.add(u3)
db.session.commit()
u4=User(user_name='glaxo',password=password,user_role=2)
db.session.add(u4)
db.session.commit()
cart1=Cart(total_amount=20, user_id=1)
db.session.add(cart1)
db.session.commit()
cart2=Cart(total_amount=0, user_id=2)
db.session.add(cart2)
db.session.commit()
p1=Product(price=29190,product_name='ipad',category_id=2,seller_id=3)
db.session.add(p1)
db.session.commit()
p2=Product(price=10,product_name='crocin',category_id=5,seller_id=4)
db.session.add(p2)
db.session.commit()
cp1=CartProduct(cart_id=1,product_id=2,quantity=2)
db.session.add(cp1)
db.session.commit()
print('database successfully initialized')
except Exception:
db.session.rollback()
print('error in adding data to db')
routes.py
from flask import (
Flask,
request,
jsonify,
make_response,
session,
app,
Response,
url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate, MigrateCommand
from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker
from .models import *
from sqlalchemy.orm import joinedload
import base64
import json
import jwt
from functools import wraps
from flask import abort, current_app, request
"""
NOTE:
Use jsonify function to return the outputs and status code
Example:
with output
return jsonify(output),2000
only status code
return '',404
"""
Use this token_required method for your routes where ever needed.
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
print(request.headers)
if "x-access-token" in request.headers:
token = request.headers["x-access-token"]
print(token)
if not token:
return jsonify({"Message": " Token is missing"}), 401
try:
print("yes")
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
print(data)
current_user = User.query.filter_by(user_id=data["user_id"]).first()
print(current_user.user_id)
print(current_user.user_role)
except:
return jsonify({"Message": "Token is invalid"}), 401
return f(current_user, *args, **kwargs)
return decorated
def role_required(role):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
# user_id=kwargs('user_id',None)
user_id = kwargs.pop("user_id", None)
print(user_id)
user_role = User.objects.get(user_id=user_id)
print(user_role)
roleName = user_role.user_role.role_name
print(roleName)
if roleName != role:
abort(403)
return fn(*args, **kwargs)
return decorated_view
return wrapper
@app.route("/")
def index():
return "Index Page"
@app.route("/api/public/login", methods=["GET"])
def login():
uname = request.headers["Authorization"][6:]
print(uname)
pass1 = str(base64.b64decode(uname))[2:-1]
username = pass1.split(":")[0]
password = pass1.split(":")[1]
# Find the user in the database based on the provided username
user = User.query.filter_by(user_name=username).first()
if user and check_password_hash(user.password, password):
# Authentication successful
token = jwt.encode(
{"user_id": user.user_id, "exp": datetime.utcnow() + timedelta(hours=5)},
app.config["SECRET_KEY"],
algorithm="HS256",
)
# token = jwt.encode({'user_id': user.user_id, 'exp' :datetime.utcnow() + timedelta(minutes=30)}, app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({"token": token}), 200
else:
# Authentication failed
return jsonify({"token": "0"}), 401
@app.route("/api/public/product/search", methods=["GET"])
def search_products():
keyword = request.args.get("keyword")
# Perform the join operation to retrieve the matching products
products = (
db.session.query(Product)
.join(Category)
.filter(Product.product_name.contains(keyword))
.all()
)
if not products:
return jsonify("Null"), 400
# Prepare the response data
response = []
for product in products:
category = {
"category_id": product.category.category_id,
"category_name": product.category.category_name,
}
product_data = {
"category": category,
"product_id": product.product_id,
"product_name": product.product_name,
"price": product.price,
"seller_id": product.seller_id,
}
response.append(product_data)
return jsonify(response), 200
#------------------------------------Seller endpoints---------------------------------
#----------------URL /api/auth/seller/product
@app.route("/api/auth/seller/product", methods=["GET"])
@token_required
def get_seller_products(current_user):
if current_user.user_role != 2:
return {
"message": "Unauthorized Endpoint",
}, 403 # Implement your own authentication logic
# Retrieve the products owned by the seller
products = Product.query.filter_by(seller_id=current_user.user_id).all()
# If no products are found, return a 404 status code
if not products:
return {"message": "Product not found"}, 403
# Prepare the response body
response1 = []
for product in products:
response1.append(
{
"category": {
"category_id": product.category.category_id,
"category_name": product.category.category_name,
},
"price": product.price,
"product_id": product.product_id,
"product_name": product.product_name,
"seller_id": product.seller_id,
}
)
print(jsonify(response1))
# return json.dumps(response1),200
return jsonify(response1), 200
@app.route("/api/auth/seller/product/int:product_id", methods=["GET"])
@token_required
def get_product(current_user, product_id):
if current_user.user_role != 2:
return {
"message": "Unauthorized Endpoint",
}, 403
product = Product.query.filter_by(
product_id=product_id, seller_id=current_user.user_id
).first()
if not product:
return jsonify({"error": "Product not found"}), 404
response = [
{
"category": {
"category_id": product.category.category_id,
"category_name": product.category.category_name,
},
"price": product.price,
"product_id": product.product_id,
"product_name": product.product_name,
"seller_id": product.seller_id,
}
]
return jsonify(response), 200
@app.route("/api/auth/seller/product", methods=["POST"])
@token_required
def add_product(current_user):
if current_user.user_role != 2: # Assuming seller role_id is 2
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
# Check if the product_id already exists
existing_product = Product.query.filter_by(product_id=product_id).first()
if existing_product:
return jsonify({"message": "Product ID already exists"}), 409
# Create and save the new product
product = Product(
product_id=product_id,
product_name=data.get("product_name"),
price=data.get("price"),
seller_id=current_user.user_id,
category_id=data.get("category_id"),
)
db.session.add(product)
db.session.commit()
return jsonify(product.product_id), 201
@app.route("/api/auth/seller/product", methods=["PUT"])
@token_required
def update_product(current_user):
if current_user.user_role != 2: # Assuming seller role_id is 2
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
price = data.get("price")
# Check if the product is owned by the seller
product = Product.query.filter_by(
product_id=product_id, seller_id=current_user.user_id
).first()
if not product:
return jsonify({"message": "Product not found or not owned by the seller"}), 404
if product.price != price:
cartProducts = CartProduct.query.filter_by(product_id=product_id)
for cp in cartProducts:
cp.cart.total_amount += cp.quantity * (price - product.price)
# Update the product price
product.price = price
db.session.commit()
return jsonify({"message": "Product price updated successfully"}), 200
@app.route("/api/auth/seller/product/int:prodid", methods=["DELETE"])
@token_required
def delete_product(current_user, prodid):
if current_user.user_role != 2: # Assuming seller role_id is 2
return jsonify({"message": "Forbidden"}), 403
# Check if the product is owned by the seller
product = Product.query.filter_by(
product_id=prodid, seller_id=current_user.user_id
).first()
if not product:
return jsonify({"message": "Product not found or not owned by the seller"}), 404
cartProducts = CartProduct.query.filter_by(product_id=prodid)
for cp in cartProducts:
cp.cart.total_amount -= cp.quantity * product.price
# Delete the product
db.session.delete(product)
db.session.commit()
return jsonify({"message": "Product deleted successfully"}), 200
#------------------------------------Consumer endpoints---------------------------------
#----------------URL /api/auth/consumer/product
@app.route("/api/auth/consumer/cart", methods=["GET"])
@token_required
def get_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403
# Perform the join on Cart, CartProduct, Product, and Category tables
cart_data = (
Cart.query.join(CartProduct, Cart.cart_id == CartProduct.cart_id)
.join(Product, CartProduct.product_id == Product.product_id)
.join(Category, Product.category_id == Category.category_id)
.filter(Cart.user_id == current_user.user_id)
.with_entities(
CartProduct.cp_id,
Cart.cart_id,
Cart.total_amount,
Product.product_id,
Product.price,
Product.product_name,
Category.category_id,
Category.category_name,
)
.all()
)
cart_items = []
for cart_product in cart_data:
cart_item = {
"cartproducts": {
"product": {
"product_id": cart_product.product_id,
"price": cart_product.price,
"product_name": cart_product.product_name,
"category": {
"category_name": cart_product.category_name,
"category_id": cart_product.category_id,
},
},
"cp_id": cart_product.cp_id,
},
"cart_id": cart_product.cart_id,
"total_amount": cart_product.total_amount,
}
cart_items.append(cart_item)
return jsonify(cart_items), 200
@app.route("/api/auth/consumer/cart", methods=["POST"])
@token_required
def add_to_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
quantity = data.get("quantity")
# Check if the product is already present in the cart
existing_cart_product = (
CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
.filter(
Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
)
.first()
)
if existing_cart_product:
return jsonify({"message": "Product already in cart"}), 409
# Get the price of the product
product = Product.query.filter_by(product_id=product_id).first()
if not product:
return jsonify({"message": "Product not found"}), 404
# Calculate the total amount
total_amount = current_user.cart.total_amount + (product.price * quantity)
# Create a new CartProduct instance and add it to the cart
new_cart_product = CartProduct(
cart_id=current_user.cart.cart_id, product_id=product_id, quantity=quantity
)
db.session.add(new_cart_product)
db.session.commit()
# Update the total amount of the cart
current_user.cart.total_amount = total_amount
db.session.commit()
return jsonify(total_amount), 200
@app.route("/api/auth/consumer/cart", methods=["PUT"])
@token_required
def update_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
quantity = data.get("quantity")
# Check if the product is present in the cart
cart_product = (
CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
.filter(
Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
)
.first()
)
if not cart_product:
return jsonify({"message": "Product not found in cart"}), 404
# Get the price of the product
product = Product.query.filter_by(product_id=product_id).first()
if not product:
return jsonify({"message": "Product not found"}), 404
# Update the quantity and total amount
total_amount = current_user.cart.total_amount + (
quantity - cart_product.quantity
) * (product.price)
cart_product.quantity = quantity
current_user.cart.total_amount = total_amount
db.session.commit()
return jsonify(total_amount), 200
@app.route("/api/auth/consumer/cart", methods=["DELETE"])
@token_required
def remove_from_consumer_cart(current_user):
if current_user.user_role != 1: # Assuming consumer role_id is 1
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
# Check if the product is present in the cart
cart_product = (
CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
.filter(
Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
)
.first()
)
if not cart_product:
return jsonify({"message": "Product not found in cart"}), 404
# Get the price and quantity of the product
product = Product.query.filter_by(product_id=product_id).first()
if not product:
return jsonify({"message": "Product not found"}), 404
# Calculate the total amount
total_amount = current_user.cart.total_amount - (
product.price * cart_product.quantity
)
# Remove the cart product from the cart
db.session.delete(cart_product)
db.session.commit()
print(total_amount)
# Update the total amount of the cart
current_user.cart.total_amount = total_amount
db.session.commit()
return jsonify(total_amount), 200
Password encode code
import base64
print(str(base64.b64decode("YXBwbGU6cGFzc193b3Jk"))[2:-1])
models.py
from ecommerceapp import db
from datetime import datetime
class Product(db.Model):
product_id=db.Column(db.Integer,primary_key=True)
product_name=db.Column(db.String(80),nullable=False)
price=db.Column(db.Float,nullable=False)
seller_id=db.Column(db.Integer, db.ForeignKey('user.user_id'))
category_id=db.Column(db.Integer, db.ForeignKey('category.category_id'))
class Category(db.Model):
category_id=db.Column(db.Integer,primary_key=True)
category_name=db.Column(db.String(80),nullable=False)
products=db.relationship("Product", cascade="all, delete-orphan", backref='category')
class Cart(db.Model):
cart_id=db.Column(db.Integer,primary_key=True)
total_amount=db.Column(db.Float,nullable=False)
user_id=db.Column(db.Integer, db.ForeignKey('user.user_id'))
cartproducts=db.relationship("CartProduct", cascade="all, delete-orphan", backref='cart')
class CartProduct(db.Model):
cp_id=db.Column(db.Integer,primary_key=True)
cart_id=db.Column(db.Integer, db.ForeignKey('cart.cart_id'))
product_id=db.Column(db.Integer, db.ForeignKey('product.product_id'))
quantity=db.Column(db.Integer)
class Role(db.Model):
role_id=db.Column(db.Integer,primary_key=True)
role_name=db.Column(db.String(20),nullable=False)
users=db.relationship("User", cascade="all, delete-orphan", backref='role')
class User(db.Model):
user_id=db.Column(db.Integer,primary_key=True)
user_name=db.Column(db.String(20),nullable=False)
password=db.Column(db.String(150),nullable=False)
user_role=db.Column(db.Integer, db.ForeignKey('role.role_id'))
cart = db.relationship('Cart', backref='user', uselist=False)
init.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate
app = Flask(name)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
from . import routes, models
do not delete these below methods
@app.cli.command()
def erasedata():
db.drop_all()
db.session.commit()
db.create_all()
@app.cli.command()
def seeddata():
from . import seed
Fac
Skip to main content
Example
Test blog
June 13, 2024
Run Steps
$env:FLASK_APP = "main"
python -m flask db init (ignore the errormigrations folder not empty)
python -m flask db migrate
python -m flask db upgrade
python -m flask run
POSTMAN
Authorization
Select Bearer Token and direcltly add token at token no need to write Bearer ahead itwill be bydefault
ERASE DATA
python -m flask erasedata
SeedDATA
python -m flask seeddata
init.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate
app = Flask(name)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
app.app_context().push()
from . import routes, models
do not delete these below methods
@app.cli.command()
def erasedata():
db.drop_all()
db.session.commit()
# db.create_all()
@app.cli.command()
def seeddata():
from . import seed
seed.py
from werkzeug.security import generate_password_hash
from . import db
from .models import *
db.drop_all()
db.session.commit()
try:
db.create_all()
# Role creating
r1 = RoleModel(rolename='ADMIN')
db.session.add(r1)
db.session.commit()
r2 = RoleModel(rolename='USER')
db.session.add(r2)
db.session.commit()
# Creating Users
u1 = UserModel(username='admin1', email='adminemail@gmail.com', password='admin123$', role=1)
db.session.add(u1)
db.session.commit()
u2 = UserModel(username='admin2', email='adminemail2@gmail.com', password='admin789$', role=1)
db.session.add(u2)
db.session.commit()
u3 = UserModel(username='user', email='useremail@gmail.com', password='user123$', role=2)
db.session.add(u3)
db.session.commit()
print('Database successfully initialized')
except Exception as e:
db.session.rollback()
print(f'Error in adding data to db: {e}')
config.py
import os
basedir = os.path.abspath(os.path.dirname(file))
class BaseConfig(object):
DEBUG = True
TESTING = False
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'ecommerceapp.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
class TestingConfig(BaseConfig):
TESTING = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'testecommerce.db')
models.py
from ecommerceapp import db
from datetime import datetime
class RoleModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
rolename = db.Column(db.String(50), unique=True, nullable=False)
class UserModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
password = db.Column(db.String(100), nullable=False)
role = db.Column(db.Integer, db.ForeignKey('role_model.id'), nullable=False)
class FacultyModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
faculty_name = db.Column(db.String(100), nullable=False)
qualification = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
gender = db.Column(db.String(10), nullable=False)
department = db.Column(db.String(50), nullable=False)
experience = db.Column(db.Integer, nullable=False)
admin_id = db.Column(db.Integer, db.ForeignKey('user_model.id'), nullable=False)
routes.py
from sqlite3 import IntegrityError
from flask import (
Flask,
abort,
request,
jsonify,
make_response,
session,
app,
Response,
url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate
from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker
import json
import jwt
from functools import wraps
from .models import *
"""
NOTE:
Use jsonify function to return the outputs and status code
Example:
with output
return jsonify(output),2000
only status code
return '',404
"""
# Use this token_required method for your routes where ever needed.
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if "Authorization" in request.headers:
token = request.headers["Authorization"]
print(token)
if not token:
return jsonify({"Message": " Token is missing"}), 401
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
print(data)
current_user = UserModel.query.filter_by(email=data["email"]).first()
print(current_user)
print(current_user.email)
except:
return jsonify({"Message": "Token is invalid"}), 401
return f(current_user, *args, **kwargs)
return decorated
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if "Authorization" in request.headers:
token = request.headers["Authorization"].split(" ")[
1
] # Extract the token without the "Bearer " prefix
print("Token:", token)
if not token:
return jsonify({"Message": "Token is missing"}), 401
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
print("Decoded Data:", data)
current_user = UserModel.query.filter_by(email=data["email"]).first()
print("Current User:", current_user)
print("Current User Email:", current_user.email)
except jwt.ExpiredSignatureError:
return jsonify({"Message": "Token has expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"Message": "Token is invalid"}), 401
return f(current_user, *args, **kwargs)
return decorated
#Write your code here for the API end points
Use this token_required method for your routes where ever needed.
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
print(request.headers)
if 'x-access-token' in request.headers:
token = request.headers['x-access-token']
print(token)
if not token:
return jsonify({'Message':' Token is missing'}), 401
try:
print("yes")
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
print(data)
current_user = UserModel.query.filter_by(email=data['email']).first()
print(current_user.email)
print(current_user.role)
except:
return jsonify({'Message': 'Token is invalid'}), 401
return f(current_user, *args, **kwargs)
return decorated
def role_required(role):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
email = kwargs.pop("email", None)
print(email)
role = UserModel.objects.get(email=email)
print(role)
roleName = role.role.rolename
print(roleName)
if roleName != role:
abort(403)
return fn(*args, **kwargs)
return decorated_view
return wrapper
@app.route("/")
def index():
return "Index Page"
@app.route("/user/login", methods=["POST"])
def user_login():
data = request.get_json()
user = UserModel.query.filter_by(email=data.get("email")).first()
if user and user.password == data.get("password"):
# If authentication is successful, create a JWT token
token = jwt.encode(
{"email": user.email}, app.config["SECRET_KEY"], algorithm="HS256"
)
print(token)
return jsonify({"jwt": token, "status": 200})
else:
return jsonify({"Message": "Invalid credentials"}), 400
Constants for roles
ADMIN_ROLE = 1
USER_ROLE = 2
Endpoint for adding a new faculty
@app.route("/faculty/add", methods=["POST"])
@token_required
def add_faculty(current_user):
print("134 line", current_user.role)
if current_user.role != ADMIN_ROLE:
abort(400) # Only admins can add faculty
data = request.get_json()
new_faculty = FacultyModel(
faculty_name=data.get("faculty_name"),
qualification=data.get("qualification"),
email=data.get("email"),
gender=data.get("gender"),
department=data.get("department"),
experience=data.get("experience"),
admin_id=current_user.id,
)
try:
db.session.add(new_faculty)
db.session.commit()
return (
jsonify(
{
"id": new_faculty.id,
"faculty_name": new_faculty.faculty_name,
"qualification": new_faculty.qualification,
"email": new_faculty.email,
"gender": new_faculty.gender,
"department": new_faculty.department,
"experience": new_faculty.experience,
"admin_id": new_faculty.admin_id,
}
),
201,
)
except IntegrityError as e:
db.session.rollback()
return (
jsonify(
{"Message": "Duplicate email or other integrity constraint violated"}
),
400,
)
except Exception as e:
db.session.rollback()
return jsonify({"Message": "Bad Request on invalid credentials"}), 400
http://127.0.0.1:5000/faculty/list?gender=MALE&department=CE
@app.route("/faculty/list", methods=["GET"])
@token_required
def list_faculty(current_user):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401
# Get parameters from the URL
department = request.args.get("department")
experience = request.args.get("experience")
gender = request.args.get("gender")
# Query faculty based on the provided search criteria
# faculty_query = FacultyModel.query.filter_by(admin_id=current_user.id)
faculty_query = FacultyModel.query
if department:
faculty_query = faculty_query.filter_by(department=department)
if experience:
faculty_query = faculty_query.filter_by(experience=experience)
if gender:
faculty_query = faculty_query.filter_by(gender=gender)
faculty_list = faculty_query.all()
if not faculty_list:
return jsonify({"Message": "No data available"}), 400
# Format the response
formatted_faculty_list = []
for faculty in faculty_list:
formatted_faculty_list.append(
{
"id": faculty.id,
"faculty_name": faculty.faculty_name,
"qualification": faculty.qualification,
"email": faculty.email,
"gender": faculty.gender,
"department": faculty.department,
"experience": faculty.experience,
"admin_id": faculty.admin_id,
}
)
return jsonify(formatted_faculty_list), 200
@app.route("/faculty/update/int:faculty_id", methods=["PATCH"])
@token_required
def update_faculty(current_user, faculty_id):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401
# Query the faculty by ID and check if it belongs to the authenticated user
faculty = FacultyModel.query.get(faculty_id)
if not faculty or faculty.admin_id != current_user.id:
return jsonify({"Message": "Faculty not found or unauthorized access"}), 404
# Get the update data from the request body
data = request.get_json()
# Update the faculty details
faculty.qualification = data.get("qualification", faculty.qualification)
faculty.experience = data.get("experience", faculty.experience)
faculty.department = data.get("department", faculty.department)
try:
db.session.commit()
return (
jsonify(
{
"id": faculty.id,
"faculty_name": faculty.faculty_name,
"qualification": faculty.qualification,
"email": faculty.email,
"gender": faculty.gender,
"department": faculty.department,
"experience": faculty.experience,
"admin_id": faculty.admin_id,
}
),
200,
)
except Exception as e:
db.session.rollback()
return jsonify({"Message": "Bad Request on invalid credentials"}), 400
Endpoint for deleting a faculty by ID
@app.route('/faculty/delete/string:faculty_id', methods=['DELETE'])
@app.route("/faculty/delete/int:faculty_id", methods=["DELETE"])
@token_required
def delete_faculty(current_user, faculty_id):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401
# Query the faculty by ID
faculty = FacultyModel.query.get(faculty_id)
if not faculty:
return jsonify({"Message": "Faculty not found"}), 400
# Check if the authenticated user is an admin or the creator of the faculty
if current_user.role != ADMIN_ROLE and faculty.admin_id != current_user.id:
return jsonify({"Message": "You don’t have permission"}), 403
try:
# Delete the faculty from the database
db.session.delete(faculty)
db.session.commit()
return jsonify({"Message": "Deleted successfully"}), 200
except Exception as e:
db.session.rollback()
return jsonify({"Message": "Bad Request on invalid credentials"}), 400
`
Top comments (0)