DEV Community

Ben M
Ben M

Posted on

Simple backend example

from flask import Flask, jsonify, request
from flask_cors import CORS
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
import os
from dotenv import load_dotenv

load_dotenv()

app = Flask(__name__)
CORS(app)


# ---------- DB CONNECTION ----------
def get_cursor():
    conn = mysql.connector.connect(
        host=os.getenv("DB_HOST", "localhost"),
        port=int(os.getenv("DB_PORT", 3306)),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME", "task_db")
    )
    return conn, conn.cursor(dictionary=True)


# ---------- REGISTER ----------
@app.post("/api/register")
def register():
    conn, cursor = get_cursor()
    try:
        data = request.json or {}

        firstName       = data.get("firstName", "").strip()
        lastName        = data.get("lastName", "").strip()
        email           = data.get("email", "").strip()
        password        = data.get("password", "").strip()
        confirmPassword = data.get("confirmPassword", "").strip()

        # Validation
        if not firstName:
            return jsonify({"success": False, "message": "First name is required."}), 400
        if not lastName:
            return jsonify({"success": False, "message": "Last name is required."}), 400
        if not email:
            return jsonify({"success": False, "message": "Email is required."}), 400
        if not password:
            return jsonify({"success": False, "message": "Password is required."}), 400
        if len(password) < 8:
            return jsonify({"success": False, "message": "Password must be at least 8 characters."}), 400
        if password != confirmPassword:
            return jsonify({"success": False, "message": "Passwords do not match."}), 400

        # Check duplicate email
        cursor.execute("SELECT id FROM user WHERE email = %s", (email,))
        if cursor.fetchone():
            return jsonify({"success": False, "message": "Email already registered."}), 400

        # Hash password before storing
        hashed_password = generate_password_hash(password)

        # Insert new user (role_id 2 = customer)
        cursor.execute("""
            INSERT INTO user (role_id, firstName, lastName, email, password_hash)
            VALUES (%s, %s, %s, %s, %s)
        """, (2, firstName, lastName, email, hashed_password))

        conn.commit()

        # Get new user ID
        cursor.execute("SELECT LAST_INSERT_ID() AS id")
        new_user = cursor.fetchone()

        return jsonify({
            "success": True,
            "message": "User registered successfully!",
            "user": {"id": new_user["id"]}
        }), 200

    except Exception as e:
        conn.rollback()
        print("REGISTER ERROR:", e)
        return jsonify({"success": False, "message": "Server error"}), 500

    finally:
        cursor.close()
        conn.close()


# ---------- LOGIN ----------
@app.post("/api/login")
def login():
    conn, cursor = get_cursor()
    try:
        data     = request.json or {}
        email    = data.get("email", "").strip()
        password = data.get("password", "").strip()

        # Validation
        if not email:
            return jsonify({"success": False, "message": "Email is required."}), 400
        if not password:
            return jsonify({"success": False, "message": "Password is required."}), 400

        cursor.execute("SELECT * FROM user WHERE email = %s", (email,))
        user = cursor.fetchone()

        if not user:
            return jsonify({"success": False, "message": "Email not found."}), 400

        # Check hashed password
        if not check_password_hash(user["password_hash"], password):
            return jsonify({"success": False, "message": "Incorrect password."}), 400

        return jsonify({
            "success": True,
            "message": "Login successful.",
            "user": {
                "id":        user["id"],
                "firstName": user["firstName"],
                "lastName":  user["lastName"],
                "email":     user["email"],
                "phone":     user.get("phone"),
            }
        }), 200

    except Exception as e:
        print("LOGIN ERROR:", e)
        return jsonify({"success": False, "message": "Server error"}), 500

    finally:
        cursor.close()
        conn.close()


# ---------- GET PROFILE ----------
@app.get("/api/user/profile")
def get_profile():
    conn, cursor = get_cursor()
    try:
        user_id = request.args.get("userId")

        if not user_id:
            return jsonify({"success": False, "message": "Missing userId"}), 400

        cursor.execute("""
            SELECT id, firstName, lastName, email, phone
            FROM user
            WHERE id = %s
        """, (user_id,))

        user = cursor.fetchone()

        if not user:
            return jsonify({"success": False, "message": "User not found"}), 404

        return jsonify({"success": True, "user": user}), 200

    except Exception as e:
        print("PROFILE ERROR:", e)
        return jsonify({"success": False, "message": "Server error"}), 500

    finally:
        cursor.close()
        conn.close()


# ---------- UPDATE PROFILE ----------
@app.post("/api/user/update")
def update_profile():
    conn, cursor = get_cursor()
    try:
        data    = request.json or {}
        user_id = data.get("userId")

        if not user_id:
            return jsonify({"success": False, "message": "Missing userId"}), 400

        firstName = data.get("firstName", "").strip()
        lastName  = data.get("lastName", "").strip()
        email     = data.get("email", "").strip()
        phone     = data.get("phone", "").strip()

        if not firstName:
            return jsonify({"success": False, "message": "First name is required."}), 400
        if not lastName:
            return jsonify({"success": False, "message": "Last name is required."}), 400
        if not email:
            return jsonify({"success": False, "message": "Email is required."}), 400

        # Check if new email is taken by another user
        cursor.execute("SELECT id FROM user WHERE email = %s AND id != %s", (email, user_id))
        if cursor.fetchone():
            return jsonify({"success": False, "message": "Email already in use."}), 400

        cursor.execute("""
            UPDATE user
            SET firstName=%s, lastName=%s, email=%s, phone=%s
            WHERE id=%s
        """, (firstName, lastName, email, phone, user_id))

        conn.commit()

        return jsonify({"success": True, "message": "Profile updated"}), 200

    except Exception as e:
        conn.rollback()
        print("UPDATE ERROR:", e)
        return jsonify({"success": False, "message": "Server error"}), 500

    finally:
        cursor.close()
        conn.close()


if __name__ == "__main__":
    app.run(debug=True)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)