diff --git a/pyproject.toml b/pyproject.toml index f4131e8a..7e243fef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,18 @@ warn_unused_ignores = false profile = "black" multi_line_output = 3 +[tool.coverage.run] +omit = [ + "taskiq/__main__.py", + "taskiq/abc/cmd.py", + "taskiq/cli/scheduler/args.py", + "taskiq/cli/scheduler/cmd.py", + "taskiq/cli/utils.py", + "taskiq/cli/worker/args.py", + "taskiq/cli/worker/async_task_runner.py", + "taskiq/cli/worker/cmd.py", +] + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" diff --git a/taskiq/__main__.py b/taskiq/__main__.py index 019a7888..6fb9fd3b 100644 --- a/taskiq/__main__.py +++ b/taskiq/__main__.py @@ -7,7 +7,7 @@ from taskiq.abc.cmd import TaskiqCMD -def main() -> None: # noqa: C901, WPS210 +def main() -> None: # noqa: C901, WPS210 # pragma: no cover """ Main entrypoint of the taskiq. @@ -72,5 +72,5 @@ def main() -> None: # noqa: C901, WPS210 command.exec(sys.argv[1:]) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover main() diff --git a/taskiq/abc/broker.py b/taskiq/abc/broker.py index 8e47291c..128739ba 100644 --- a/taskiq/abc/broker.py +++ b/taskiq/abc/broker.py @@ -167,6 +167,7 @@ async def listen( def task( self, task_name: Callable[_FuncParams, _ReturnType], + **lavels: Any, ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: # pragma: no cover ... @@ -174,7 +175,7 @@ def task( def task( self, task_name: Optional[str] = None, - **labels: Union[str, int], + **labels: Any, ) -> Callable[ [Callable[_FuncParams, _ReturnType]], AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType], @@ -184,7 +185,7 @@ def task( def task( # type: ignore[misc] self, task_name: Optional[str] = None, - **labels: Union[str, int], + **labels: Any, ) -> Any: """ Decorator that turns function into a task. @@ -223,7 +224,7 @@ def inner( if inner_task_name is None: fmodule = func.__module__ if fmodule == "__main__": # pragma: no cover - fmodule = ".".join( # noqa: WPS220 + fmodule = ".".join( sys.argv[0] .removesuffix( ".py", diff --git a/taskiq/abc/cmd.py b/taskiq/abc/cmd.py index fafc7ff2..b6077d1d 100644 --- a/taskiq/abc/cmd.py +++ b/taskiq/abc/cmd.py @@ -2,7 +2,7 @@ from typing import Sequence -class TaskiqCMD(ABC): +class TaskiqCMD(ABC): # pragma: no cover """Base class for new commands.""" short_help = "" diff --git a/taskiq/decor.py b/taskiq/decor.py index cb42f6ba..64a41a60 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -12,7 +12,7 @@ from typing_extensions import ParamSpec from taskiq.kicker import AsyncKicker -from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask +from taskiq.task import AsyncTaskiqTask if TYPE_CHECKING: # pragma: no cover from taskiq.abc.broker import AsyncBroker @@ -93,40 +93,6 @@ async def kiq( """ return await self.kicker().kiq(*args, **kwargs) - @overload - def kiq_sync( - self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]", - *args: _FuncParams.args, - **kwargs: _FuncParams.kwargs, - ) -> SyncTaskiqTask[_T]: - ... - - @overload - def kiq_sync( - self: "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]", - *args: _FuncParams.args, - **kwargs: _FuncParams.kwargs, - ) -> SyncTaskiqTask[_ReturnType]: - ... - - def kiq_sync( - self, - *args: _FuncParams.args, - **kwargs: _FuncParams.kwargs, - ) -> Any: - """ - This method sends function call over the network. - - It gets current broker and calls it's kick method, - returning what it returns. - - :param args: function's arguments. - :param kwargs: function's key word arguments. - - :returns: taskiq task. - """ - return self.kicker().kiq_sync(*args, **kwargs) - def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]: """ This function returns kicker object. diff --git a/taskiq/dependencies.py b/taskiq/dependencies.py index be91c167..69774701 100644 --- a/taskiq/dependencies.py +++ b/taskiq/dependencies.py @@ -140,20 +140,6 @@ def __eq__(self, rhs: object) -> bool: rhs.kwargs, ) - def __str__(self) -> str: - if self.dependency is None: - dep_name = "" - else: - dep_name = ( - f"{self.dependency.__module__}:" # noqa: WPS237 - f"{self.dependency.__name__}" - ) - return ( - f"TaskiqDepends({dep_name}, " - f"use_cache={self.use_cache}, " - f"kwargs={self.kwargs})" - ) - class DependencyResolveContext: """ diff --git a/taskiq/kicker.py b/taskiq/kicker.py index f6676e72..b6c1cee8 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -18,8 +18,8 @@ from taskiq.abc.middleware import TaskiqMiddleware from taskiq.exceptions import SendTaskError from taskiq.message import TaskiqMessage -from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask -from taskiq.utils import maybe_awaitable, run_sync +from taskiq.task import AsyncTaskiqTask +from taskiq.utils import maybe_awaitable if TYPE_CHECKING: # pragma: no cover from taskiq.abc.broker import AsyncBroker @@ -142,40 +142,6 @@ async def kiq( # noqa: C901 result_backend=self.broker.result_backend, ) - @overload - def kiq_sync( - self: "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]", - *args: _FuncParams.args, - **kwargs: _FuncParams.kwargs, - ) -> SyncTaskiqTask[_T]: - ... - - @overload - def kiq_sync( - self: "AsyncKicker[_FuncParams, _ReturnType]", - *args: _FuncParams.args, - **kwargs: _FuncParams.kwargs, - ) -> SyncTaskiqTask[_ReturnType]: - ... - - def kiq_sync( - self, - *args: _FuncParams.args, - **kwargs: _FuncParams.kwargs, - ) -> Any: - """ - This method sends function call over the network. - - It just wraps async kiq call in run_sync - funcion. - - :param args: function's arguments. - :param kwargs: function's key word arguments. - - :returns: sync taskiq task. - """ - return SyncTaskiqTask(run_sync(self.kiq(*args, **kwargs))) - @classmethod def _prepare_arg(cls, arg: Any) -> Any: """ diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index b9d63102..ff716b59 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -28,6 +28,8 @@ async def get_schedules(self) -> List["ScheduledTask"]: if task.broker != self.broker: continue for schedule in task.labels.get("schedule", []): + if "cron" not in schedule: + continue labels = schedule.get("labels", {}) labels.update(task.labels) schedules.append( diff --git a/taskiq/task.py b/taskiq/task.py index 29d01dea..8c9c1668 100644 --- a/taskiq/task.py +++ b/taskiq/task.py @@ -8,7 +8,6 @@ ResultIsReadyError, TaskiqResultTimeoutError, ) -from taskiq.utils import run_sync if TYPE_CHECKING: # pragma: no cover from taskiq.abc.result_backend import AsyncResultBackend @@ -67,60 +66,6 @@ def wait_result( # noqa: WPS234 """ -class SyncTaskiqTask(_Task[_ReturnType]): - """Sync wrapper over AsyncTaskiqTask.""" - - def __init__(self, async_task: "AsyncTaskiqTask[_ReturnType]") -> None: - self.async_task = async_task - - def is_ready(self) -> bool: - """ - Checks if task is completed. - - :return: True if task is completed. - """ - return run_sync(self.async_task.is_ready()) - - def get_result(self, with_logs: bool = False) -> "TaskiqResult[_ReturnType]": - """ - Get result of a task from result backend. - - :param with_logs: whether you want to fetch logs from worker. - - :return: task's return value. - """ - return run_sync(self.async_task.get_result(with_logs=with_logs)) - - def wait_result( - self, - check_interval: float = 0.2, - timeout: float = -1, - with_logs: bool = False, - ) -> "TaskiqResult[_ReturnType]": - """ - Waits until result is ready. - - This method just checks whether the task is - ready. And if it is it returns the result. - - It may throw TaskiqResultTimeoutError if - task didn't became ready in provided - period of time. - - :param check_interval: How often checks are performed. - :param timeout: timeout for the result. - :param with_logs: whether you want to fetch logs from worker. - :return: task's return value. - """ - return run_sync( - self.async_task.wait_result( - check_interval=check_interval, - timeout=timeout, - with_logs=with_logs, - ), - ) - - class AsyncTaskiqTask(_Task[_ReturnType]): """AsyncTask for AsyncResultBackend.""" diff --git a/taskiq/utils.py b/taskiq/utils.py index 32f194fe..02dfad0d 100644 --- a/taskiq/utils.py +++ b/taskiq/utils.py @@ -1,40 +1,11 @@ -import asyncio import inspect -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Awaitable, Coroutine, TypeVar, Union +from typing import Any, Coroutine, TypeVar, Union _T = TypeVar("_T") # noqa: WPS111 -def run_sync(coroutine: "Coroutine[Any, Any, _T]") -> _T: - """ - Run the coroutine synchronously. - - This function tries to run corouting using asyncio.run. - - If it's not possible, it manually creates executor and - runs async function returns it's result. - - 1. When called within a coroutine. - 2. When called from ``python -m asyncio``, or iPython with %autoawait - enabled, which means an event loop may already be running in the - current thread. - - :param coroutine: awaitable to execute. - :returns: the same type as if it were awaited. - """ - try: - # We try this first, as in most situations this will work. - return asyncio.run(coroutine) - except RuntimeError: - # An event loop already exists. - with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(asyncio.run, coroutine) - return future.result() - - async def maybe_awaitable( - possible_coroutine: "Union[_T, Awaitable[_T]]", + possible_coroutine: "Union[_T, Coroutine[Any, Any, _T]]", ) -> _T: """ Awaits coroutine if needed. diff --git a/taskiq/abc/tests/test_broker.py b/tests/abc/test_broker.py similarity index 100% rename from taskiq/abc/tests/test_broker.py rename to tests/abc/test_broker.py diff --git a/taskiq/cli/worker/tests/test_log_collector.py b/tests/cli/worker/test_log_collector.py similarity index 100% rename from taskiq/cli/worker/tests/test_log_collector.py rename to tests/cli/worker/test_log_collector.py diff --git a/taskiq/cli/worker/tests/test_parameters_parsing.py b/tests/cli/worker/test_parameters_parsing.py similarity index 100% rename from taskiq/cli/worker/tests/test_parameters_parsing.py rename to tests/cli/worker/test_parameters_parsing.py diff --git a/taskiq/cli/worker/tests/test_receiver.py b/tests/cli/worker/test_receiver.py similarity index 100% rename from taskiq/cli/worker/tests/test_receiver.py rename to tests/cli/worker/test_receiver.py diff --git a/taskiq/conftest.py b/tests/conftest.py similarity index 100% rename from taskiq/conftest.py rename to tests/conftest.py diff --git a/tests/middlewares/test_simple_retry.py b/tests/middlewares/test_simple_retry.py new file mode 100644 index 00000000..5ed8113b --- /dev/null +++ b/tests/middlewares/test_simple_retry.py @@ -0,0 +1,79 @@ +import uuid + +import pytest +from mock import AsyncMock + +from taskiq.formatters.json_formatter import JSONFormatter +from taskiq.message import TaskiqMessage +from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware +from taskiq.result import TaskiqResult + + +@pytest.fixture +def broker() -> AsyncMock: + mocked_broker = AsyncMock() + mocked_broker.id_generator = lambda: uuid.uuid4().hex + mocked_broker.formatter = JSONFormatter() + return mocked_broker + + +@pytest.mark.anyio +async def test_successfull_retry(broker: AsyncMock) -> None: + middleware = SimpleRetryMiddleware() + middleware.set_broker(broker) + await middleware.on_error( + TaskiqMessage( + task_id="test_id", + task_name="meme", + labels={ + "retry_on_error": True, + }, + args=[], + kwargs={}, + ), + TaskiqResult(is_err=True, return_value=None, execution_time=0.0), + Exception(), + ) + resend: TaskiqMessage = broker.kick.await_args.args[0] + assert resend.task_name == "meme" + assert resend.labels["_retries"] == "1" + assert resend.labels["_parent"] == "test_id" + + +@pytest.mark.anyio +async def test_no_retry(broker: AsyncMock) -> None: + middleware = SimpleRetryMiddleware() + middleware.set_broker(broker) + await middleware.on_error( + TaskiqMessage( + task_id="test_id", + task_name="meme", + labels={}, + args=[], + kwargs={}, + ), + TaskiqResult(is_err=True, return_value=None, execution_time=0.0), + Exception(), + ) + broker.kick.assert_not_called() + + +@pytest.mark.anyio +async def test_max_retries(broker: AsyncMock) -> None: + middleware = SimpleRetryMiddleware(default_retry_count=3) + middleware.set_broker(broker) + await middleware.on_error( + TaskiqMessage( + task_id="test_id", + task_name="meme", + labels={ + "retry_on_error": True, + "_retries": 2, + }, + args=[], + kwargs={}, + ), + TaskiqResult(is_err=True, return_value=None, execution_time=0.0), + Exception(), + ) + broker.kick.assert_not_called() diff --git a/tests/schedule_sources/test_label_based.py b/tests/schedule_sources/test_label_based.py new file mode 100644 index 00000000..70a7ff33 --- /dev/null +++ b/tests/schedule_sources/test_label_based.py @@ -0,0 +1,45 @@ +import pytest + +from taskiq.brokers.inmemory_broker import InMemoryBroker +from taskiq.schedule_sources.label_based import LabelScheduleSource +from taskiq.scheduler.scheduler import ScheduledTask + + +@pytest.mark.anyio +async def test_label_discovery() -> None: + broker = InMemoryBroker() + + @broker.task( + task_name="test_task", + schedule=[{"cron": "* * * * *"}], + ) + def task() -> None: + pass + + source = LabelScheduleSource(broker) + schedules = await source.get_schedules() + assert schedules == [ + ScheduledTask( + cron="* * * * *", + task_name="test_task", + labels={"schedule": [{"cron": "* * * * *"}]}, + args=[], + kwargs={}, + ), + ] + + +@pytest.mark.anyio +async def test_label_discovery_no_cron() -> None: + broker = InMemoryBroker() + + @broker.task( + task_name="test_task", + schedule=[{"args": ["* * * * *"]}], + ) + def task() -> None: + pass + + source = LabelScheduleSource(broker) + schedules = await source.get_schedules() + assert schedules == [] diff --git a/taskiq/tests/test_dependencies.py b/tests/test_dependencies.py similarity index 100% rename from taskiq/tests/test_dependencies.py rename to tests/test_dependencies.py diff --git a/taskiq/tests/test_funcs.py b/tests/test_funcs.py similarity index 100% rename from taskiq/tests/test_funcs.py rename to tests/test_funcs.py diff --git a/taskiq/tests/test_state.py b/tests/test_state.py similarity index 100% rename from taskiq/tests/test_state.py rename to tests/test_state.py diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..bd27f09c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,20 @@ +import pytest + +from taskiq.utils import maybe_awaitable + + +@pytest.mark.anyio +async def test_maybe_awaitable_coroutine() -> None: + async def meme() -> int: + return 1 + + val: int = await maybe_awaitable(meme()) + assert val == 1 + + +@pytest.mark.anyio +async def test_maybe_awaitable_sync() -> None: + def meme() -> int: + return 1 + + assert await maybe_awaitable(meme()) == 1