Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ warn_unused_ignores = false
profile = "black"
multi_line_output = 3

[tool.coverage.run]
omit = [
"taskiq/__main__.py",
"taskiq/abc/cmd.py",
"taskiq/cli/scheduler/args.py",
"taskiq/cli/scheduler/cmd.py",
"taskiq/cli/utils.py",
"taskiq/cli/worker/args.py",
"taskiq/cli/worker/async_task_runner.py",
"taskiq/cli/worker/cmd.py",
]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
4 changes: 2 additions & 2 deletions taskiq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from taskiq.abc.cmd import TaskiqCMD


def main() -> None: # noqa: C901, WPS210
def main() -> None: # noqa: C901, WPS210 # pragma: no cover
"""
Main entrypoint of the taskiq.

Expand Down Expand Up @@ -72,5 +72,5 @@ def main() -> None: # noqa: C901, WPS210
command.exec(sys.argv[1:])


if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
main()
7 changes: 4 additions & 3 deletions taskiq/abc/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ async def listen(
def task(
self,
task_name: Callable[_FuncParams, _ReturnType],
**lavels: Any,
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: # pragma: no cover
...

@overload
def task(
self,
task_name: Optional[str] = None,
**labels: Union[str, int],
**labels: Any,
) -> Callable[
[Callable[_FuncParams, _ReturnType]],
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
Expand All @@ -184,7 +185,7 @@ def task(
def task( # type: ignore[misc]
self,
task_name: Optional[str] = None,
**labels: Union[str, int],
**labels: Any,
) -> Any:
"""
Decorator that turns function into a task.
Expand Down Expand Up @@ -223,7 +224,7 @@ def inner(
if inner_task_name is None:
fmodule = func.__module__
if fmodule == "__main__": # pragma: no cover
fmodule = ".".join( # noqa: WPS220
fmodule = ".".join(
sys.argv[0]
.removesuffix(
".py",
Expand Down
2 changes: 1 addition & 1 deletion taskiq/abc/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Sequence


class TaskiqCMD(ABC):
class TaskiqCMD(ABC): # pragma: no cover
"""Base class for new commands."""

short_help = ""
Expand Down
36 changes: 1 addition & 35 deletions taskiq/decor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing_extensions import ParamSpec

from taskiq.kicker import AsyncKicker
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
from taskiq.task import AsyncTaskiqTask

if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.broker import AsyncBroker
Expand Down Expand Up @@ -93,40 +93,6 @@ async def kiq(
"""
return await self.kicker().kiq(*args, **kwargs)

@overload
def kiq_sync(
self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]",
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> SyncTaskiqTask[_T]:
...

@overload
def kiq_sync(
self: "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]",
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> SyncTaskiqTask[_ReturnType]:
...

def kiq_sync(
self,
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> Any:
"""
This method sends function call over the network.

It gets current broker and calls it's kick method,
returning what it returns.

:param args: function's arguments.
:param kwargs: function's key word arguments.

:returns: taskiq task.
"""
return self.kicker().kiq_sync(*args, **kwargs)

def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
"""
This function returns kicker object.
Expand Down
14 changes: 0 additions & 14 deletions taskiq/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,6 @@ def __eq__(self, rhs: object) -> bool:
rhs.kwargs,
)

def __str__(self) -> str:
if self.dependency is None:
dep_name = "<from hint>"
else:
dep_name = (
f"{self.dependency.__module__}:" # noqa: WPS237
f"{self.dependency.__name__}"
)
return (
f"TaskiqDepends({dep_name}, "
f"use_cache={self.use_cache}, "
f"kwargs={self.kwargs})"
)


class DependencyResolveContext:
"""
Expand Down
38 changes: 2 additions & 36 deletions taskiq/kicker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.exceptions import SendTaskError
from taskiq.message import TaskiqMessage
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
from taskiq.utils import maybe_awaitable, run_sync
from taskiq.task import AsyncTaskiqTask
from taskiq.utils import maybe_awaitable

if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.broker import AsyncBroker
Expand Down Expand Up @@ -142,40 +142,6 @@ async def kiq( # noqa: C901
result_backend=self.broker.result_backend,
)

@overload
def kiq_sync(
self: "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]",
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> SyncTaskiqTask[_T]:
...

@overload
def kiq_sync(
self: "AsyncKicker[_FuncParams, _ReturnType]",
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> SyncTaskiqTask[_ReturnType]:
...

def kiq_sync(
self,
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> Any:
"""
This method sends function call over the network.

It just wraps async kiq call in run_sync
funcion.

:param args: function's arguments.
:param kwargs: function's key word arguments.

:returns: sync taskiq task.
"""
return SyncTaskiqTask(run_sync(self.kiq(*args, **kwargs)))

@classmethod
def _prepare_arg(cls, arg: Any) -> Any:
"""
Expand Down
2 changes: 2 additions & 0 deletions taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ async def get_schedules(self) -> List["ScheduledTask"]:
if task.broker != self.broker:
continue
for schedule in task.labels.get("schedule", []):
if "cron" not in schedule:
continue
labels = schedule.get("labels", {})
labels.update(task.labels)
schedules.append(
Expand Down
55 changes: 0 additions & 55 deletions taskiq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
ResultIsReadyError,
TaskiqResultTimeoutError,
)
from taskiq.utils import run_sync

if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.result_backend import AsyncResultBackend
Expand Down Expand Up @@ -67,60 +66,6 @@ def wait_result( # noqa: WPS234
"""


class SyncTaskiqTask(_Task[_ReturnType]):
"""Sync wrapper over AsyncTaskiqTask."""

def __init__(self, async_task: "AsyncTaskiqTask[_ReturnType]") -> None:
self.async_task = async_task

def is_ready(self) -> bool:
"""
Checks if task is completed.

:return: True if task is completed.
"""
return run_sync(self.async_task.is_ready())

def get_result(self, with_logs: bool = False) -> "TaskiqResult[_ReturnType]":
"""
Get result of a task from result backend.

:param with_logs: whether you want to fetch logs from worker.

:return: task's return value.
"""
return run_sync(self.async_task.get_result(with_logs=with_logs))

def wait_result(
self,
check_interval: float = 0.2,
timeout: float = -1,
with_logs: bool = False,
) -> "TaskiqResult[_ReturnType]":
"""
Waits until result is ready.

This method just checks whether the task is
ready. And if it is it returns the result.

It may throw TaskiqResultTimeoutError if
task didn't became ready in provided
period of time.

:param check_interval: How often checks are performed.
:param timeout: timeout for the result.
:param with_logs: whether you want to fetch logs from worker.
:return: task's return value.
"""
return run_sync(
self.async_task.wait_result(
check_interval=check_interval,
timeout=timeout,
with_logs=with_logs,
),
)


class AsyncTaskiqTask(_Task[_ReturnType]):
"""AsyncTask for AsyncResultBackend."""

Expand Down
33 changes: 2 additions & 31 deletions taskiq/utils.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,11 @@
import asyncio
import inspect
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Awaitable, Coroutine, TypeVar, Union
from typing import Any, Coroutine, TypeVar, Union

_T = TypeVar("_T") # noqa: WPS111


def run_sync(coroutine: "Coroutine[Any, Any, _T]") -> _T:
"""
Run the coroutine synchronously.

This function tries to run corouting using asyncio.run.

If it's not possible, it manually creates executor and
runs async function returns it's result.

1. When called within a coroutine.
2. When called from ``python -m asyncio``, or iPython with %autoawait
enabled, which means an event loop may already be running in the
current thread.

:param coroutine: awaitable to execute.
:returns: the same type as if it were awaited.
"""
try:
# We try this first, as in most situations this will work.
return asyncio.run(coroutine)
except RuntimeError:
# An event loop already exists.
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coroutine)
return future.result()


async def maybe_awaitable(
possible_coroutine: "Union[_T, Awaitable[_T]]",
possible_coroutine: "Union[_T, Coroutine[Any, Any, _T]]",
) -> _T:
"""
Awaits coroutine if needed.
Expand Down
File renamed without changes.
File renamed without changes.
Loading