Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#8: Allow to update, bulk edit, delete and stop TimeEntries #62

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tests/responses/time_entry_put_and_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

from typing import Dict, List, Union


BULK_EDIT_TIME_ENTRIES_RESPONSE: Dict[str, List[Union[int, Dict[str, Union[int, str]]]]] = {
"success": [3544298808],
"failure": [
{
"id": 202793182,
"message": "Time entry with ID: 202793182 was not found/is not accessible",
}
],
}
192 changes: 189 additions & 3 deletions tests/test_time_entry.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, Union
from random import randint
from typing import TYPE_CHECKING, Dict, List, Union
from unittest.mock import Mock, patch

import pytest
from httpx import Response
from pydantic import ValidationError
from toggl_python.exceptions import BadRequest
from toggl_python.schemas.time_entry import (
BulkEditTimeEntriesFieldNames,
BulkEditTimeEntriesOperation,
BulkEditTimeEntriesOperations,
BulkEditTimeEntriesResponse,
MeTimeEntryResponse,
MeTimeEntryWithMetaResponse,
MeWebTimerResponse,
)

from tests.responses.me_get import ME_WEB_TIMER_RESPONSE
from tests.responses.time_entry_get import ME_TIME_ENTRY_RESPONSE, ME_TIME_ENTRY_WITH_META_RESPONSE
from tests.responses.time_entry_put_and_patch import BULK_EDIT_TIME_ENTRIES_RESPONSE


if TYPE_CHECKING:
from respx import MockRouter
from toggl_python.entities.user import CurrentUser
from toggl_python.entities.workspace import Workspace


def test_get_time_entry__without_query_params(
Expand Down Expand Up @@ -111,7 +118,7 @@ def test_get_time_entries__with_meta_query_param(
assert result == expected_result


@patch("toggl_python.schemas.time_entry.datetime")
@patch("toggl_python.schemas.base.datetime")
@pytest.mark.parametrize(
argnames="query_params, method_kwargs",
argvalues=(
Expand Down Expand Up @@ -190,7 +197,7 @@ def test_get_time_entries__invalid_query_params(
_ = authed_current_user.get_time_entries(**query_params)


@patch("toggl_python.schemas.time_entry.datetime")
@patch("toggl_python.schemas.base.datetime")
def test_get_time_entries__too_old_since_value(
mocked_datetime: Mock, authed_current_user: CurrentUser
) -> None:
Expand All @@ -212,3 +219,182 @@ def test_get_web_timer__ok(response_mock: MockRouter, authed_current_user: Curre

assert mocked_route.called is True
assert result == expected_result


@pytest.mark.parametrize(
argnames=("field_name", "field_value"),
argvalues=[
("billable", True),
("description", "updated description"),
("duration", -1),
("project_id", 757542305),
("shared_with_user_ids", [1243543643, 676586868]),
("start", "2020-11-11T09:30:00-04:00"),
("stop", "2010-01-29T19:50:00+02:00"),
("tag_ids", [24032, 354742502]),
("tags", ["new tag"]),
("task_id", 1593268409),
("user_id", 573250897),
],
)
def test_workspace_update_time_entry__ok(
field_name: str,
field_value: Union[bool, str, int, List[int]],
response_mock: MockRouter,
authed_workspace: Workspace,
) -> None:
workspace_id = 123
time_entry_id = 98765
payload = {field_name: field_value}
fake_response = ME_TIME_ENTRY_RESPONSE.copy()
fake_response.update(**payload)
mocked_route = response_mock.put(
f"/workspaces/{workspace_id}/time_entries/{time_entry_id}"
).mock(
return_value=Response(status_code=200, json=fake_response),
)
expected_result = MeTimeEntryResponse.model_validate(fake_response)

result = authed_workspace.update_time_entry(workspace_id, time_entry_id, **payload)

assert mocked_route.called is True
assert result == expected_result


def test_update_time_entry__user_cannot_access_project(
response_mock: MockRouter, authed_workspace: Workspace
) -> None:
workspace_id = 123
time_entry_id = 98765
error_message = "User cannot access the selected project"
mocked_route = response_mock.put(
f"/workspaces/{workspace_id}/time_entries/{time_entry_id}"
).mock(
return_value=Response(status_code=400, text=error_message),
)

with pytest.raises(BadRequest, match=error_message):
_ = authed_workspace.update_time_entry(workspace_id, time_entry_id, project_id=125872350)

assert mocked_route.called is True


def test_delete_time_entry__ok(response_mock: MockRouter, authed_workspace: Workspace) -> None:
workspace_id = 123
time_entry_id = 98765
mocked_route = response_mock.delete(
f"/workspaces/{workspace_id}/time_entries/{time_entry_id}"
).mock(
return_value=Response(status_code=200),
)

result = authed_workspace.delete_time_entry(workspace_id, time_entry_id)

assert mocked_route.called is True
assert result is True


def test_bulk_edit_time_entries__too_much_ids(authed_workspace: Workspace) -> None:
workspace_id = 123
time_entry_ids = [randint(100000, 999999) for _ in range(101)] # noqa: S311
error_message = "Limit to max TimeEntry IDs exceeded. "

with pytest.raises(ValueError, match=error_message):
_ = authed_workspace.bulk_edit_time_entries(workspace_id, time_entry_ids, operations=[])


def test_bulk_edit_time_entries__empty_time_entry_ids(authed_workspace: Workspace) -> None:
workspace_id = 123
error_message = "Specify at least one TimeEntry ID"

with pytest.raises(ValueError, match=error_message):
_ = authed_workspace.bulk_edit_time_entries(workspace_id, time_entry_ids=[], operations=[])


def test_bulk_edit_time_entries__empty_operations(authed_workspace: Workspace) -> None:
workspace_id = 123
time_entry_ids = [12345677]
error_message = "Specify at least one edit operation"

with pytest.raises(ValueError, match=error_message):
_ = authed_workspace.bulk_edit_time_entries(workspace_id, time_entry_ids, operations=[])


@pytest.mark.parametrize(
argnames=("operation"), argvalues=[item.value for item in BulkEditTimeEntriesOperations]
)
@pytest.mark.parametrize(
argnames=("field_name", "field_value"),
argvalues=[
(BulkEditTimeEntriesFieldNames.billable.value, True),
(BulkEditTimeEntriesFieldNames.description.value, "updated description"),
(BulkEditTimeEntriesFieldNames.duration.value, -1),
(BulkEditTimeEntriesFieldNames.project_id.value, 757542305),
(BulkEditTimeEntriesFieldNames.shared_with_user_ids.value, [1243543643, 676586868]),
(BulkEditTimeEntriesFieldNames.start.value, datetime(2024, 5, 10, tzinfo=timezone.utc)),
(BulkEditTimeEntriesFieldNames.stop.value, datetime(2022, 4, 15, tzinfo=timezone.utc)),
(BulkEditTimeEntriesFieldNames.tag_ids.value, [24032, 354742502]),
(BulkEditTimeEntriesFieldNames.tags.value, ["new tag"]),
(BulkEditTimeEntriesFieldNames.task_id.value, 1593268409),
(BulkEditTimeEntriesFieldNames.user_id.value, 573250897),
],
)
def test_bulk_edit_time_entries__ok(
field_name: BulkEditTimeEntriesFieldNames,
field_value: Union[str, int],
operation: BulkEditTimeEntriesOperations,
response_mock: MockRouter,
authed_workspace: Workspace,
) -> None:
workspace_id = 123
time_entry_ids = [98765, 43210]
edit_operation = BulkEditTimeEntriesOperation(
operation=operation, field_name=field_name, field_value=field_value
)
mocked_route = response_mock.patch(
f"/workspaces/{workspace_id}/time_entries/{time_entry_ids}"
).mock(
return_value=Response(status_code=200, json=BULK_EDIT_TIME_ENTRIES_RESPONSE),
)
expected_result = BulkEditTimeEntriesResponse.model_validate(BULK_EDIT_TIME_ENTRIES_RESPONSE)

result = authed_workspace.bulk_edit_time_entries(
workspace_id, time_entry_ids, operations=[edit_operation]
)

assert mocked_route.called is True
assert result == expected_result


def test_stop_time_entry__ok(response_mock: MockRouter, authed_workspace: Workspace) -> None:
workspace_id = 123
time_entry_id = 98765
mocked_route = response_mock.patch(
f"/workspaces/{workspace_id}/time_entries/{time_entry_id}/stop"
).mock(
return_value=Response(status_code=200, json=ME_TIME_ENTRY_RESPONSE),
)
expected_result = MeTimeEntryResponse.model_validate(ME_TIME_ENTRY_RESPONSE)

result = authed_workspace.stop_time_entry(workspace_id, time_entry_id)

assert mocked_route.called is True
assert result == expected_result


def test_stop_time_entry__already_stopped(
response_mock: MockRouter, authed_workspace: Workspace
) -> None:
workspace_id = 123
time_entry_id = 98765
error_message = "Time entry already stopped"
mocked_route = response_mock.patch(
f"/workspaces/{workspace_id}/time_entries/{time_entry_id}/stop"
).mock(
return_value=Response(status_code=409, text=error_message),
)

with pytest.raises(BadRequest, match=error_message):
_ = authed_workspace.stop_time_entry(workspace_id, time_entry_id)

assert mocked_route.called is True
99 changes: 99 additions & 0 deletions toggl_python/entities/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

from toggl_python.api import ApiWrapper
from toggl_python.schemas.project import ProjectQueryParams, ProjectResponse
from toggl_python.schemas.time_entry import (
BulkEditTimeEntriesOperation,
BulkEditTimeEntriesResponse,
MeTimeEntryResponse,
TimeEntryRequest,
)
from toggl_python.schemas.workspace import GetWorkspacesQueryParams, WorkspaceResponse


Expand Down Expand Up @@ -85,3 +91,96 @@ def get_projects( # noqa: PLR0913 - Too many arguments in function definition (
response_body = response.json()

return [ProjectResponse.model_validate(project_data) for project_data in response_body]

def update_time_entry( # noqa: PLR0913 - Too many arguments in function definition (13 > 12)
self,
workspace_id: int,
time_entry_id: int,
billable: Optional[bool] = None,
description: Optional[str] = None,
duration: Optional[int] = None,
project_id: Optional[int] = None,
shared_with_user_ids: Optional[List[int]] = None,
start: Optional[datetime] = None,
stop: Optional[datetime] = None,
tag_ids: Optional[List[int]] = None,
tags: Optional[List[str]] = None,
task_id: Optional[int] = None,
user_id: Optional[int] = None,
) -> MeTimeEntryResponse:
"""Some params from docs are not listed because API don't use them to change object."""
request_body_schema = TimeEntryRequest(
billable=billable,
description=description,
duration=duration,
project_id=project_id,
shared_with_user_ids=shared_with_user_ids,
start=start,
stop=stop,
tag_ids=tag_ids,
tags=tags,
task_id=task_id,
user_id=user_id,
)
request_body = request_body_schema.model_dump(mode="json", exclude_none=True)

response = self.client.put(
url=f"{self.prefix}/{workspace_id}/time_entries/{time_entry_id}", json=request_body
)
self.raise_for_status(response)

response_body = response.json()

return MeTimeEntryResponse.model_validate(response_body)

def delete_time_entry(self, workspace_id: int, time_entry_id: int) -> bool:
response = self.client.delete(
url=f"{self.prefix}/{workspace_id}/time_entries/{time_entry_id}"
)
self.raise_for_status(response)

return response.is_success

def bulk_edit_time_entries(
self,
workspace_id: int,
time_entry_ids: List[int],
operations: List[BulkEditTimeEntriesOperation],
) -> MeTimeEntryResponse:
if not time_entry_ids:
error_message = "Specify at least one TimeEntry ID"
raise ValueError(error_message)

max_time_entries_ids = 100
if len(time_entry_ids) > max_time_entries_ids:
error_message = (
f"Limit to max TimeEntry IDs exceeded. "
f"Max {max_time_entries_ids} ids per request are allowed"
)
raise ValueError(error_message)
if not operations:
error_message = "Specify at least one edit operation"
raise ValueError(error_message)

request_body = [
operation.model_dump(mode="json", exclude_none=True) for operation in operations
]

response = self.client.patch(
url=f"{self.prefix}/{workspace_id}/time_entries/{time_entry_ids}", json=request_body
)
self.raise_for_status(response)

response_body = response.json()

return BulkEditTimeEntriesResponse.model_validate(response_body)

def stop_time_entry(self, workspace_id: int, time_entry_id: int) -> MeTimeEntryResponse:
response = self.client.patch(
url=f"{self.prefix}/{workspace_id}/time_entries/{time_entry_id}/stop"
)
self.raise_for_status(response)

response_body = response.json()

return MeTimeEntryResponse.model_validate(response_body)
Loading