feat: enhance contact access control; add tests for member viewing and updating foreign contacts
Test / test (push) Successful in 14s
Details
Test / test (push) Successful in 14s
Details
This commit is contained in:
parent
6db1e865f6
commit
472cb654d8
|
|
@ -60,7 +60,6 @@ class ContactRepository:
|
||||||
user_id: int,
|
user_id: int,
|
||||||
) -> Contact | None:
|
) -> Contact | None:
|
||||||
stmt = select(Contact).where(Contact.id == contact_id, Contact.organization_id == organization_id)
|
stmt = select(Contact).where(Contact.id == contact_id, Contact.organization_id == organization_id)
|
||||||
stmt = self._apply_role_clause(stmt, role, user_id)
|
|
||||||
result = await self._session.scalars(stmt)
|
result = await self._session.scalars(stmt)
|
||||||
return result.first()
|
return result.first()
|
||||||
|
|
||||||
|
|
@ -124,14 +123,4 @@ class ContactRepository:
|
||||||
if role == OrganizationRole.MEMBER:
|
if role == OrganizationRole.MEMBER:
|
||||||
raise ContactAccessError("Members cannot filter by owner")
|
raise ContactAccessError("Members cannot filter by owner")
|
||||||
stmt = stmt.where(Contact.owner_id == params.owner_id)
|
stmt = stmt.where(Contact.owner_id == params.owner_id)
|
||||||
return self._apply_role_clause(stmt, role, user_id)
|
|
||||||
|
|
||||||
def _apply_role_clause(
|
|
||||||
self,
|
|
||||||
stmt: Select[tuple[Contact]],
|
|
||||||
role: OrganizationRole,
|
|
||||||
user_id: int,
|
|
||||||
) -> Select[tuple[Contact]]:
|
|
||||||
if role == OrganizationRole.MEMBER:
|
|
||||||
return stmt.where(Contact.owner_id == user_id)
|
|
||||||
return stmt
|
return stmt
|
||||||
|
|
|
||||||
|
|
@ -124,6 +124,115 @@ async def test_member_cannot_assign_foreign_owner(
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_member_can_view_foreign_contacts(
|
||||||
|
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
|
||||||
|
) -> None:
|
||||||
|
scenario = await prepare_scenario(session_factory)
|
||||||
|
token = make_token(scenario.user_id, scenario.user_email)
|
||||||
|
|
||||||
|
async with session_factory() as session:
|
||||||
|
membership = await session.scalar(
|
||||||
|
select(OrganizationMember).where(
|
||||||
|
OrganizationMember.organization_id == scenario.organization_id,
|
||||||
|
OrganizationMember.user_id == scenario.user_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert membership is not None
|
||||||
|
membership.role = OrganizationRole.MEMBER
|
||||||
|
|
||||||
|
other_user = User(
|
||||||
|
email="viewer@example.com",
|
||||||
|
hashed_password="hashed",
|
||||||
|
name="Viewer",
|
||||||
|
is_active=True,
|
||||||
|
)
|
||||||
|
session.add(other_user)
|
||||||
|
await session.flush()
|
||||||
|
|
||||||
|
session.add(
|
||||||
|
OrganizationMember(
|
||||||
|
organization_id=scenario.organization_id,
|
||||||
|
user_id=other_user.id,
|
||||||
|
role=OrganizationRole.MANAGER,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
session.add(
|
||||||
|
Contact(
|
||||||
|
organization_id=scenario.organization_id,
|
||||||
|
owner_id=other_user.id,
|
||||||
|
name="Foreign Owner",
|
||||||
|
email="foreign@example.com",
|
||||||
|
phone=None,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
response = await client.get(
|
||||||
|
"/api/v1/contacts/",
|
||||||
|
headers=auth_headers(token, scenario),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
names = {contact["name"] for contact in response.json()}
|
||||||
|
assert {"John Doe", "Foreign Owner"}.issubset(names)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_member_patch_foreign_contact_forbidden(
|
||||||
|
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
|
||||||
|
) -> None:
|
||||||
|
scenario = await prepare_scenario(session_factory)
|
||||||
|
token = make_token(scenario.user_id, scenario.user_email)
|
||||||
|
|
||||||
|
async with session_factory() as session:
|
||||||
|
membership = await session.scalar(
|
||||||
|
select(OrganizationMember).where(
|
||||||
|
OrganizationMember.organization_id == scenario.organization_id,
|
||||||
|
OrganizationMember.user_id == scenario.user_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert membership is not None
|
||||||
|
membership.role = OrganizationRole.MEMBER
|
||||||
|
|
||||||
|
other_user = User(
|
||||||
|
email="owner2@example.com",
|
||||||
|
hashed_password="hashed",
|
||||||
|
name="Owner2",
|
||||||
|
is_active=True,
|
||||||
|
)
|
||||||
|
session.add(other_user)
|
||||||
|
await session.flush()
|
||||||
|
|
||||||
|
session.add(
|
||||||
|
OrganizationMember(
|
||||||
|
organization_id=scenario.organization_id,
|
||||||
|
user_id=other_user.id,
|
||||||
|
role=OrganizationRole.MANAGER,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
contact = Contact(
|
||||||
|
organization_id=scenario.organization_id,
|
||||||
|
owner_id=other_user.id,
|
||||||
|
name="Locked Contact",
|
||||||
|
email="locked@example.com",
|
||||||
|
phone=None,
|
||||||
|
)
|
||||||
|
session.add(contact)
|
||||||
|
await session.commit()
|
||||||
|
contact_id = contact.id
|
||||||
|
|
||||||
|
response = await client.patch(
|
||||||
|
f"/api/v1/contacts/{contact_id}",
|
||||||
|
json={"name": "Hacked"},
|
||||||
|
headers=auth_headers(token, scenario),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_patch_contact_updates_fields(
|
async def test_patch_contact_updates_fields(
|
||||||
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
|
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
|
||||||
|
|
|
||||||
|
|
@ -182,6 +182,40 @@ async def test_member_owner_filter_forbidden(session: AsyncSession) -> None:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_member_can_view_foreign_contacts(session: AsyncSession) -> None:
|
||||||
|
owner = _make_user("owner")
|
||||||
|
member = _make_user("member")
|
||||||
|
context, repo, contact = await _setup_contact(
|
||||||
|
session,
|
||||||
|
role=OrganizationRole.MEMBER,
|
||||||
|
owner=owner,
|
||||||
|
context_user=member,
|
||||||
|
)
|
||||||
|
service = ContactService(repository=repo)
|
||||||
|
|
||||||
|
contacts = await service.list_contacts(filters=ContactListFilters(), context=context)
|
||||||
|
|
||||||
|
assert contacts and contacts[0].id == contact.id
|
||||||
|
assert contacts[0].owner_id == owner.id != context.user_id
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_member_cannot_update_foreign_contact(session: AsyncSession) -> None:
|
||||||
|
owner = _make_user("owner")
|
||||||
|
member = _make_user("member")
|
||||||
|
context, repo, contact = await _setup_contact(
|
||||||
|
session,
|
||||||
|
role=OrganizationRole.MEMBER,
|
||||||
|
owner=owner,
|
||||||
|
context_user=member,
|
||||||
|
)
|
||||||
|
service = ContactService(repository=repo)
|
||||||
|
|
||||||
|
with pytest.raises(ContactForbiddenError):
|
||||||
|
await service.update_contact(contact, ContactUpdateData(name="Blocked"), context=context)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None:
|
async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None:
|
||||||
context, repo, contact = await _setup_contact(session)
|
context, repo, contact = await _setup_contact(session)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue