test_task_crm/app/api/v1/tasks.py

92 lines
3.1 KiB
Python

"""Task API endpoints with inline schemas."""
from __future__ import annotations
from datetime import date, datetime, time, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from app.api.deps import get_organization_context, get_task_service
from app.models.task import TaskCreate, TaskRead
from app.services.organization_service import OrganizationContext
from app.services.task_service import (
TaskDueDateError,
TaskForbiddenError,
TaskListFilters,
TaskOrganizationError,
TaskService,
)
class TaskCreatePayload(BaseModel):
deal_id: int
title: str
description: str | None = None
due_date: date | None = None
def to_domain(self) -> TaskCreate:
return TaskCreate(
deal_id=self.deal_id,
title=self.title,
description=self.description,
due_date=_date_to_datetime(self.due_date) if self.due_date else None,
)
def to_range_boundary(value: date | None, *, end_of_day: bool) -> datetime | None:
"""Convert a date query param to an inclusive datetime boundary."""
if value is None:
return None
boundary_time = time(23, 59, 59, 999999) if end_of_day else time(0, 0, 0)
return datetime.combine(value, boundary_time, tzinfo=timezone.utc)
def _date_to_datetime(value: date) -> datetime:
return datetime.combine(value, time(0, 0, 0), tzinfo=timezone.utc)
router = APIRouter(prefix="/tasks", tags=["tasks"])
@router.get("/", response_model=list[TaskRead])
async def list_tasks(
deal_id: int | None = None,
only_open: bool = False,
due_before: date | None = Query(default=None),
due_after: date | None = Query(default=None),
context: OrganizationContext = Depends(get_organization_context),
service: TaskService = Depends(get_task_service),
) -> list[TaskRead]:
"""Filter tasks by deal, state, or due date range."""
filters = TaskListFilters(
deal_id=deal_id,
only_open=only_open,
due_before=to_range_boundary(due_before, end_of_day=True),
due_after=to_range_boundary(due_after, end_of_day=False),
)
tasks = await service.list_tasks(filters=filters, context=context)
return [TaskRead.model_validate(task) for task in tasks]
@router.post("/", response_model=TaskRead, status_code=status.HTTP_201_CREATED)
async def create_task(
payload: TaskCreatePayload,
context: OrganizationContext = Depends(get_organization_context),
service: TaskService = Depends(get_task_service),
) -> TaskRead:
"""Create a task ensuring due-date and ownership constraints."""
try:
task = await service.create_task(payload.to_domain(), context=context)
except TaskDueDateError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except TaskForbiddenError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc
except TaskOrganizationError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
return TaskRead.model_validate(task)