Skip to content

Authentication and Authorization

To restrict access to the crime data, Batman added authentication and authorization to the application. He decided to use JWT (JSON Web Token) for authentication. He created a new table for users and added an endpoint for user registration.

User Model

Batman added a new User model to represent the users who can access the application.

py
# models.py
from sqlalchemy import Column, Integer, String, Boolean

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)

    
    def __repr__(self):
        return "<User(id={id}, username={username}, hashed_password={hashed_password})>".format(
            id=self.id,
            username=self.username,
            hashed_password=self.hashed_password,
        )

Then in crud.py, he added a new method to create a user.

py
# crud.py
# 还需要执行 pip install passlib[bcrypt]

from sqlalchemy.orm import Session
from .models import User

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def get_user(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def create_user(db: Session, user: User):
    hashed_password = get_password_hash(user.password)
    db_user = User(username=user.username, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

Authentication Utilities

Batman created utility functions to handle authentication, including hashing passwords and verifying passwords.

py
# crud.py
# 还需要执行 pip install passlib[bcrypt]
# pip install "python-jose[cryptography]"
from passlib.context import CryptContext
from jose import JWTError, jwt

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

ALGORITHM = "HS256"
SECRET_KEY = "your_secret_key"

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def decode_access_token(token: str):
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])


def authenticate_user(db: Session, username: str, password: str):
    user = get_user_by_username(db, username)
    if user is None:
        return False
    if not verify_password(password, user.hashed_password):
        return False

    created_token = create_access_token(data={"sub": .username})
    return created_token

User Registration Endpoint

Batman added a new endpoint to allow users to register.

py
from . import crud

@app.post("/users/register")
async def register_user(request):
    user = request.json()
    with SessionLocal() as db:
        created_user = crud.create_user(db, user)
    return created_user

@app.post("/users/login")
async def login_user(request):
    user = request.json()
    with SessionLocal() as db:
        token = crud.authenticate_user(db, **user)

    if token is None:
        raise HTTPException(status_code=401, detail="Invalid credentials")


    return {"access_token": token}

Released under the MIT License.