volsu-contests/backend/app/routers/leaderboard.py
2025-11-30 19:55:50 +03:00

144 lines
4.4 KiB
Python

from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from pydantic import BaseModel
from app.database import get_db
from app.models.user import User
from app.models.contest import Contest
from app.models.problem import Problem
from app.models.submission import Submission
from app.dependencies import get_current_user
router = APIRouter()
class LeaderboardEntry(BaseModel):
rank: int
user_id: int
username: str
avatar_url: str | None
total_score: int
problems_solved: int
last_submission_time: datetime | None
class LeaderboardResponse(BaseModel):
contest_id: int
contest_title: str
is_hidden: bool
entries: list[LeaderboardEntry]
@router.get("/{contest_id}", response_model=LeaderboardResponse)
async def get_leaderboard(
contest_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Get contest
result = await db.execute(select(Contest).where(Contest.id == contest_id))
contest = result.scalar_one_or_none()
if not contest:
raise HTTPException(status_code=404, detail="Contest not found")
# Check if leaderboard should be hidden (during contest)
is_hidden = contest.is_running and current_user.role != "admin"
if is_hidden:
return LeaderboardResponse(
contest_id=contest.id,
contest_title=contest.title,
is_hidden=True,
entries=[],
)
# Get best scores for each user and problem
# Subquery to get max score per user per problem
best_scores = (
select(
Submission.user_id,
Submission.problem_id,
func.max(Submission.score).label("best_score"),
func.max(Submission.created_at).label("last_submission"),
)
.where(Submission.contest_id == contest_id)
.group_by(Submission.user_id, Submission.problem_id)
.subquery()
)
# Aggregate by user
result = await db.execute(
select(
best_scores.c.user_id,
func.sum(best_scores.c.best_score).label("total_score"),
func.count(best_scores.c.problem_id).label("problems_attempted"),
func.max(best_scores.c.last_submission).label("last_submission_time"),
)
.group_by(best_scores.c.user_id)
.order_by(func.sum(best_scores.c.best_score).desc())
)
rows = result.all()
# Get user info
user_ids = [row.user_id for row in rows]
users_result = await db.execute(select(User).where(User.id.in_(user_ids)))
users = {u.id: u for u in users_result.scalars().all()}
# Get problem count to determine "solved"
problems_result = await db.execute(
select(Problem).where(Problem.contest_id == contest_id)
)
problems = {p.id: p.total_points for p in problems_result.scalars().all()}
# Get best scores per problem per user to count solved problems
solved_counts = {}
for row in rows:
user_id = row.user_id
# Get individual problem scores for this user
user_scores_result = await db.execute(
select(
Submission.problem_id,
func.max(Submission.score).label("score"),
)
.where(
Submission.contest_id == contest_id,
Submission.user_id == user_id,
)
.group_by(Submission.problem_id)
)
user_scores = user_scores_result.all()
# Count problems where score equals total_points
solved = sum(
1 for ps in user_scores
if ps.problem_id in problems and ps.score == problems[ps.problem_id]
)
solved_counts[user_id] = solved
# Build leaderboard
entries = []
for rank, row in enumerate(rows, start=1):
user = users.get(row.user_id)
if user:
entries.append(LeaderboardEntry(
rank=rank,
user_id=row.user_id,
username=user.username,
avatar_url=user.avatar_url,
total_score=row.total_score or 0,
problems_solved=solved_counts.get(row.user_id, 0),
last_submission_time=row.last_submission_time,
))
return LeaderboardResponse(
contest_id=contest.id,
contest_title=contest.title,
is_hidden=False,
entries=entries,
)