Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ per-file-ignores =
S101,
; Found magic number
WPS432,
; Missing parameter(s) in Docstring
DAR101,

exclude =
./.git,
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@ jobs:
- name: Upload coverage reports to Codecov with GitHub Action
uses: codecov/codecov-action@v3
if: matrix.os == 'ubuntu-latest' && matrix.py_version == '3.9'
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
2 changes: 0 additions & 2 deletions taskiq/abc/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
from typing import ( # noqa: WPS235
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Coroutine,
Dict,
List,
NoReturn,
Optional,
TypeVar,
Union,
Expand Down
4 changes: 2 additions & 2 deletions taskiq/abc/middleware.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import TYPE_CHECKING, Any, Coroutine, Union

if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.broker import AsyncBroker
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult


class TaskiqMiddleware:
class TaskiqMiddleware: # pragma: no cover
"""Base class for middlewares."""

def __init__(self) -> None:
Expand Down
49 changes: 22 additions & 27 deletions taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import inspect
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Coroutine, Optional, TypeVar

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
from taskiq.cli.async_task_runner import run_task
from taskiq.exceptions import ResultSetError, TaskiqError
from taskiq.cli.args import TaskiqArgs
from taskiq.cli.receiver import Receiver
from taskiq.exceptions import TaskiqError
from taskiq.message import BrokerMessage

_ReturnType = TypeVar("_ReturnType")
Expand Down Expand Up @@ -100,16 +100,16 @@ def __init__( # noqa: WPS211
result_backend=result_backend,
task_id_generator=task_id_generator,
)
self.executor = ThreadPoolExecutor(max_workers=sync_tasks_pool_size)
self.cast_types = cast_types
if logs_format is None:
logs_format = (
"[%(asctime)s]"
"[%(levelname)-7s]"
"[%(module)s:%(funcName)s:%(lineno)d] "
"%(message)s"
)
self.logs_format = logs_format
self.receiver = Receiver(
self,
TaskiqArgs(
broker="",
modules=[],
max_threadpool_threads=sync_tasks_pool_size,
no_parse=not cast_types,
log_collector_format=logs_format or TaskiqArgs.log_collector_format,
),
)

async def kick(self, message: BrokerMessage) -> None:
"""
Expand All @@ -119,25 +119,20 @@ async def kick(self, message: BrokerMessage) -> None:

:param message: incomming message.

:raises ResultSetError: if cannot save results in result backend.
:raises TaskiqError: if someone wants to kick unknown task.
"""
target_task = self.available_tasks.get(message.task_name)
taskiq_message = self.formatter.loads(message=message)
if target_task is None:
raise TaskiqError("Unknown task.")
result = await run_task(
target=target_task.original_func,
signature=inspect.signature(target_task.original_func),
message=taskiq_message,
log_collector_format=self.logs_format,
executor=self.executor,
middlewares=self.middlewares,
)
try:
await self.result_backend.set_result(message.task_id, result)
except Exception as exc:
raise ResultSetError("Cannot set result.") from exc
if self.receiver.task_signatures:
if not self.receiver.task_signatures.get(target_task.task_name):
self.receiver.task_signatures[
target_task.task_name
] = inspect.signature(
target_task.original_func,
)

await self.receiver.callback(message=message)

async def listen(
self,
Expand Down
15 changes: 13 additions & 2 deletions taskiq/brokers/zmq_broker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
from logging import getLogger
from typing import Any, Callable, Coroutine, Optional, TypeVar

from taskiq.abc.broker import AsyncBroker
Expand All @@ -12,6 +14,8 @@

_T = TypeVar("_T") # noqa: WPS111

logger = getLogger(__name__)


class ZeroMQBroker(AsyncBroker):
"""
Expand Down Expand Up @@ -67,6 +71,13 @@ async def listen(

:param callback: function to call when message received.
"""
while True: # noqa: WPS457
loop = asyncio.get_event_loop()
while True:
with self.socket.connect(self.sub_host) as sock:
await callback(BrokerMessage.parse_raw(await sock.recv_string()))
received_str = await sock.recv_string()
try:
broker_msg = BrokerMessage.parse_raw(received_str)
except ValueError:
logger.warning("Cannot parse received message %s", received_str)
continue
loop.create_task(callback(broker_msg))
27 changes: 13 additions & 14 deletions taskiq/cli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ class TaskiqArgs:
"""Taskiq worker CLI arguments."""

broker: str
tasks_pattern: str
modules: List[str]
fs_discover: bool
log_level: str
workers: int
log_collector_format: str
max_threadpool_threads: int
no_parse: bool
shutdown_timeout: float
reload: bool
no_gitignore: bool
tasks_pattern: str = "tasks.py"
fs_discover: bool = False
log_level: str = "INFO"
workers: int = 2
log_collector_format: str = (
"[%(asctime)s][%(levelname)-7s][%(module)s:%(funcName)s:%(lineno)d] %(message)s"
)
max_threadpool_threads: int = 10
no_parse: bool = False
shutdown_timeout: float = 5
reload: bool = False
no_gitignore: bool = False

@classmethod
def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WPS213
Expand Down Expand Up @@ -128,8 +130,5 @@ def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WP
help="Do not use gitignore to check for updated files.",
)

if args is None:
namespace = parser.parse_args(args)
else:
namespace = parser.parse_args()
namespace = parser.parse_args(args)
return TaskiqArgs(**namespace.__dict__)
Loading