Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions taskiq/abc/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ def post_execute(
:param result: result of execution for current task.
"""

def post_save(
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
) -> "Union[None, Coroutine[Any, Any, None]]":
"""
Post save hook.

This function is called after result of
the executions is saved in the result_backend.

:param message: processed message.
:param result: returned value.
"""

def on_error(
self,
message: "TaskiqMessage",
Expand Down
6 changes: 5 additions & 1 deletion taskiq/cli/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(self, broker: AsyncBroker, cli_args: TaskiqArgs) -> None:
max_workers=cli_args.max_threadpool_threads,
)

async def callback( # noqa: C901
async def callback( # noqa: C901, WPS213
self,
message: BrokerMessage,
raise_err: bool = False,
Expand Down Expand Up @@ -142,6 +142,10 @@ async def callback( # noqa: C901
if raise_err:
raise exc

for middleware in self.broker.middlewares:
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
await maybe_awaitable(middleware.post_save(taskiq_msg, result))

async def run_task( # noqa: C901, WPS210
self,
target: Callable[..., Any],
Expand Down