diff --git a/app/repositories/contact_repo.py b/app/repositories/contact_repo.py index 7257669..25ef1f6 100644 --- a/app/repositories/contact_repo.py +++ b/app/repositories/contact_repo.py @@ -60,7 +60,6 @@ class ContactRepository: user_id: int, ) -> Contact | None: stmt = select(Contact).where(Contact.id == contact_id, Contact.organization_id == organization_id) - stmt = self._apply_role_clause(stmt, role, user_id) result = await self._session.scalars(stmt) return result.first() @@ -124,14 +123,4 @@ class ContactRepository: if role == OrganizationRole.MEMBER: raise ContactAccessError("Members cannot filter by owner") stmt = stmt.where(Contact.owner_id == params.owner_id) - return self._apply_role_clause(stmt, role, user_id) - - def _apply_role_clause( - self, - stmt: Select[tuple[Contact]], - role: OrganizationRole, - user_id: int, - ) -> Select[tuple[Contact]]: - if role == OrganizationRole.MEMBER: - return stmt.where(Contact.owner_id == user_id) return stmt diff --git a/tests/api/v1/test_contacts.py b/tests/api/v1/test_contacts.py index 0922979..002d4f5 100644 --- a/tests/api/v1/test_contacts.py +++ b/tests/api/v1/test_contacts.py @@ -124,6 +124,115 @@ async def test_member_cannot_assign_foreign_owner( assert response.status_code == 403 +@pytest.mark.asyncio +async def test_member_can_view_foreign_contacts( + session_factory: async_sessionmaker[AsyncSession], client: AsyncClient +) -> None: + scenario = await prepare_scenario(session_factory) + token = make_token(scenario.user_id, scenario.user_email) + + async with session_factory() as session: + membership = await session.scalar( + select(OrganizationMember).where( + OrganizationMember.organization_id == scenario.organization_id, + OrganizationMember.user_id == scenario.user_id, + ) + ) + assert membership is not None + membership.role = OrganizationRole.MEMBER + + other_user = User( + email="viewer@example.com", + hashed_password="hashed", + name="Viewer", + is_active=True, + ) + session.add(other_user) + await session.flush() + + session.add( + OrganizationMember( + organization_id=scenario.organization_id, + user_id=other_user.id, + role=OrganizationRole.MANAGER, + ) + ) + + session.add( + Contact( + organization_id=scenario.organization_id, + owner_id=other_user.id, + name="Foreign Owner", + email="foreign@example.com", + phone=None, + ) + ) + await session.commit() + + response = await client.get( + "/api/v1/contacts/", + headers=auth_headers(token, scenario), + ) + + assert response.status_code == 200 + names = {contact["name"] for contact in response.json()} + assert {"John Doe", "Foreign Owner"}.issubset(names) + + +@pytest.mark.asyncio +async def test_member_patch_foreign_contact_forbidden( + session_factory: async_sessionmaker[AsyncSession], client: AsyncClient +) -> None: + scenario = await prepare_scenario(session_factory) + token = make_token(scenario.user_id, scenario.user_email) + + async with session_factory() as session: + membership = await session.scalar( + select(OrganizationMember).where( + OrganizationMember.organization_id == scenario.organization_id, + OrganizationMember.user_id == scenario.user_id, + ) + ) + assert membership is not None + membership.role = OrganizationRole.MEMBER + + other_user = User( + email="owner2@example.com", + hashed_password="hashed", + name="Owner2", + is_active=True, + ) + session.add(other_user) + await session.flush() + + session.add( + OrganizationMember( + organization_id=scenario.organization_id, + user_id=other_user.id, + role=OrganizationRole.MANAGER, + ) + ) + + contact = Contact( + organization_id=scenario.organization_id, + owner_id=other_user.id, + name="Locked Contact", + email="locked@example.com", + phone=None, + ) + session.add(contact) + await session.commit() + contact_id = contact.id + + response = await client.patch( + f"/api/v1/contacts/{contact_id}", + json={"name": "Hacked"}, + headers=auth_headers(token, scenario), + ) + + assert response.status_code == 403 + + @pytest.mark.asyncio async def test_patch_contact_updates_fields( session_factory: async_sessionmaker[AsyncSession], client: AsyncClient diff --git a/tests/services/test_contact_service.py b/tests/services/test_contact_service.py index f1e7a3b..84bda88 100644 --- a/tests/services/test_contact_service.py +++ b/tests/services/test_contact_service.py @@ -182,6 +182,40 @@ async def test_member_owner_filter_forbidden(session: AsyncSession) -> None: ) +@pytest.mark.asyncio +async def test_member_can_view_foreign_contacts(session: AsyncSession) -> None: + owner = _make_user("owner") + member = _make_user("member") + context, repo, contact = await _setup_contact( + session, + role=OrganizationRole.MEMBER, + owner=owner, + context_user=member, + ) + service = ContactService(repository=repo) + + contacts = await service.list_contacts(filters=ContactListFilters(), context=context) + + assert contacts and contacts[0].id == contact.id + assert contacts[0].owner_id == owner.id != context.user_id + + +@pytest.mark.asyncio +async def test_member_cannot_update_foreign_contact(session: AsyncSession) -> None: + owner = _make_user("owner") + member = _make_user("member") + context, repo, contact = await _setup_contact( + session, + role=OrganizationRole.MEMBER, + owner=owner, + context_user=member, + ) + service = ContactService(repository=repo) + + with pytest.raises(ContactForbiddenError): + await service.update_contact(contact, ContactUpdateData(name="Blocked"), context=context) + + @pytest.mark.asyncio async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None: context, repo, contact = await _setup_contact(session)