volsu-contests/backend/app/routers/contests.py
2025-11-30 19:55:50 +03:00

206 lines
6.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.user import User
from app.models.contest import Contest, ContestParticipant
from app.models.problem import Problem
from app.schemas.contest import ContestCreate, ContestUpdate, ContestResponse, ContestListResponse
from app.dependencies import get_current_user, get_current_admin
router = APIRouter()
@router.get("/", response_model=list[ContestListResponse])
async def get_contests(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Contest)
.options(selectinload(Contest.problems), selectinload(Contest.participants))
.order_by(Contest.start_time.desc())
)
contests = result.scalars().all()
return [
ContestListResponse(
id=c.id,
title=c.title,
start_time=c.start_time,
end_time=c.end_time,
is_active=c.is_active,
is_running=c.is_running,
has_ended=c.has_ended,
problems_count=len(c.problems),
participants_count=len(c.participants),
)
for c in contests
]
@router.get("/{contest_id}", response_model=ContestResponse)
async def get_contest(
contest_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Contest)
.options(selectinload(Contest.problems), selectinload(Contest.participants))
.where(Contest.id == contest_id)
)
contest = result.scalar_one_or_none()
if not contest:
raise HTTPException(status_code=404, detail="Contest not found")
# Check if current user is participating
is_participating = any(p.user_id == current_user.id for p in contest.participants)
return ContestResponse(
id=contest.id,
title=contest.title,
description=contest.description,
start_time=contest.start_time,
end_time=contest.end_time,
is_active=contest.is_active,
created_by=contest.created_by,
created_at=contest.created_at,
is_running=contest.is_running,
has_ended=contest.has_ended,
problems_count=len(contest.problems),
participants_count=len(contest.participants),
is_participating=is_participating,
)
@router.post("/", response_model=ContestResponse, status_code=status.HTTP_201_CREATED)
async def create_contest(
contest_data: ContestCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
contest = Contest(
title=contest_data.title,
description=contest_data.description,
start_time=contest_data.start_time,
end_time=contest_data.end_time,
is_active=contest_data.is_active,
created_by=current_user.id,
)
db.add(contest)
await db.commit()
await db.refresh(contest)
return ContestResponse(
id=contest.id,
title=contest.title,
description=contest.description,
start_time=contest.start_time,
end_time=contest.end_time,
is_active=contest.is_active,
created_by=contest.created_by,
created_at=contest.created_at,
is_running=contest.is_running,
has_ended=contest.has_ended,
problems_count=0,
participants_count=0,
)
@router.put("/{contest_id}", response_model=ContestResponse)
async def update_contest(
contest_id: int,
contest_data: ContestUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
result = await db.execute(
select(Contest)
.options(selectinload(Contest.problems), selectinload(Contest.participants))
.where(Contest.id == contest_id)
)
contest = result.scalar_one_or_none()
if not contest:
raise HTTPException(status_code=404, detail="Contest not found")
update_data = contest_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(contest, field, value)
await db.commit()
await db.refresh(contest)
return ContestResponse(
id=contest.id,
title=contest.title,
description=contest.description,
start_time=contest.start_time,
end_time=contest.end_time,
is_active=contest.is_active,
created_by=contest.created_by,
created_at=contest.created_at,
is_running=contest.is_running,
has_ended=contest.has_ended,
problems_count=len(contest.problems),
participants_count=len(contest.participants),
)
@router.delete("/{contest_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_contest(
contest_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
result = await db.execute(select(Contest).where(Contest.id == contest_id))
contest = result.scalar_one_or_none()
if not contest:
raise HTTPException(status_code=404, detail="Contest not found")
await db.delete(contest)
await db.commit()
@router.post("/{contest_id}/join", status_code=status.HTTP_201_CREATED)
async def join_contest(
contest_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Check if contest exists
result = await db.execute(select(Contest).where(Contest.id == contest_id))
contest = result.scalar_one_or_none()
if not contest:
raise HTTPException(status_code=404, detail="Contest not found")
if not contest.is_active:
raise HTTPException(status_code=400, detail="Contest is not active")
# Check if already joined
result = await db.execute(
select(ContestParticipant).where(
ContestParticipant.contest_id == contest_id,
ContestParticipant.user_id == current_user.id,
)
)
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Already joined this contest")
# Join contest
participant = ContestParticipant(
contest_id=contest_id,
user_id=current_user.id,
)
db.add(participant)
await db.commit()
return {"message": "Successfully joined the contest"}