Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/available-components/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
order: 1
dir:
order: 3
---

# Available components

In this section, you can find a list of officially supported plugins for the taskiq.

* [Available brokers](./brokers.md)
* [Available result backends](./result-backends.md)
* [Available schedule sources](./scheduler-sources.md)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
order: 5
order: 2
---

# Available brokers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
order: 6
order: 3
---

# Result backends
# Available result backends

Result backends are used to store execution results.
This includes:
Expand Down
59 changes: 59 additions & 0 deletions docs/available-components/schedule-sources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
order: 4
---


# Available schedule sources

These objects are used to fetch current schedule for tasks.
Currently we have only one schedule source.


## LabelScheduleSource

This source parses labels of tasks, and if it finds a `schedule` label, it considers this task as scheduled.

The format of the schedule label is the following:

```python
@broker.task(
schedule=[
{
"cron": "* * * * *", # type: str, required argument.
"args": [], # type List[Any] | None, can be omitted.
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
}
]
)
async def my_task():
...
```

Parameters:
* `cron` - crontab string when to run the task.
* `args` - args to use, when invoking the task.
* `kwargs` - key-word arguments to use when invoking the task.
* `labels` - additional labels to use wehn invoking the task.

Usage:

```python
from taskiq.scheduler import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource

broker = ...

scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)
```


::: warning Cool notice!

In order to resolve all labels correctly, don't forget to import
all task modules using CLI interface.

:::
44 changes: 44 additions & 0 deletions docs/examples/extending/result_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import TypeVar

from taskiq import TaskiqResult
from taskiq.abc.result_backend import AsyncResultBackend

_ReturnType = TypeVar("_ReturnType")


class MyResultBackend(AsyncResultBackend[_ReturnType]):
async def startup(self) -> None:
"""Do something when starting broker."""

async def shutdown(self) -> None:
"""Do something on shutdown."""

async def set_result(
self,
task_id: str,
result: TaskiqResult[_ReturnType],
) -> None:
# Here you must set result somewhere.
pass

async def get_result(
self,
task_id: str,
with_logs: bool = False,
) -> TaskiqResult[_ReturnType]:
# Here you must retrieve result by id.

# Logs is a part of a result.
# Here we have a parameter whether you want to
# fetch result with logs or not, because logs
# can have a lot of info and sometimes it's critical
# to get only needed information.
pass

async def is_result_ready(
self,
task_id: str,
) -> bool:
# This function checks if result of a task exists,
# without actual fetching the result.
pass
28 changes: 28 additions & 0 deletions docs/examples/extending/schedule_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import List

from taskiq import ScheduledTask, ScheduleSource


class MyScheduleSource(ScheduleSource):
async def startup(self) -> None:
"""Do something when starting broker."""

async def shutdown(self) -> None:
"""Do something on shutdown."""

async def get_schedules(self) -> List["ScheduledTask"]:
# Here you must return list of scheduled tasks from your source.
return [
ScheduledTask(
task_name="",
labels={},
args=[],
kwargs={},
cron="* * * * *",
),
]

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
async def add_schedule(self, schedule: "ScheduledTask") -> None:
return await super().add_schedule(schedule)
16 changes: 16 additions & 0 deletions docs/examples/schedule/intro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from taskiq_aio_pika import AioPikaBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq.scheduler import TaskiqScheduler

broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")

scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)


@broker.task(schedule=[{"cron": "*/5 * * * *", "args": [1]}])
async def heavy_task(value: int) -> int:
return value + 1
8 changes: 8 additions & 0 deletions docs/examples/schedule/without_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")


@broker.task
async def heavy_task(value: int) -> int:
return value + 1
9 changes: 9 additions & 0 deletions docs/extending-taskiq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ Taskiq is super extendable. The core library comes with different abstract class
You can implement these abstract classes to extend functionality.

All abstract classes can be found in `taskiq.abc` package.


## Contents:

* [Brokers](./broker.md)
* [Brokers](./broker.md)
* [Result backends](./resutl-backend.md)
* [CLI](./cli.md)
* [Schedule sources](./schedule-sources.md)
6 changes: 3 additions & 3 deletions docs/extending-taskiq/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from setuptools import setup
setup(
# ...,
entry_points={
'taskiq-cli': [
'taskiq_cli': [
'demo = my_project.cmd:MyCommand',
]
}
Expand All @@ -37,14 +37,14 @@ setup(
@tab setuptools pyproject.toml

```toml
[project.entry-points.taskiq-cli]
[project.entry-points.taskiq_cli]
demo = "my_project.cmd:MyCommand"
```

@tab poetry

```toml
[tool.poetry.plugins.taskiq-cli]
[tool.poetry.plugins.taskiq_cli]
demo = "my_project.cmd:MyCommand"
```

Expand Down
47 changes: 1 addition & 46 deletions docs/extending-taskiq/resutl-backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,7 @@ To create new `result_backend` you have to implement `taskiq.abc.result_backend.

Here's a minimal example of a result backend:

```python
from typing import TypeVar
from taskiq import TaskiqResult
from taskiq.abc.result_backend import AsyncResultBackend

_ReturnType = TypeVar("_ReturnType")


class MyResultBackend(AsyncResultBackend[_ReturnType]):
async def startup(self) -> None:
"""Do something when starting broker."""

async def shutdown(self) -> None:
"""Do something on shutdown."""

async def set_result(
self,
task_id: str,
result: TaskiqResult[_ReturnType],
) -> None:
# Here you must set result somewhere.
pass

async def get_result(
self,
task_id: str,
with_logs: bool = False,
) -> TaskiqResult[_ReturnType]:
# Here you must retrieve result by id.

# Logs is a part of a result.
# Here we have a parameter whether you want to
# fetch result with logs or not, because logs
# can have a lot of info and sometimes it's critical
# to get only needed information.
pass

async def is_result_ready(
self,
task_id: str,
) -> bool:
# This function checks if result of a task exists,
# without actual fetching the result.
pass

```
@[code python](../examples/extending/result_backend.py)

::: info Cool tip!
It's a good practice to skip fetching logs from the storage unless `with_logs=True` is explicitly specified.
Expand Down
12 changes: 12 additions & 0 deletions docs/extending-taskiq/schedule-sources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
order: 5
---

# Schedule source

Schedule sources are used to get schedule for tasks.
To create new `schedule source` you have to implement the `taskiq.abc.schedule_source.ScheduleSource` abstract class.

Here's a minimal example of a schedule source:

@[code python](../examples/extending/schedule_source.py)
2 changes: 1 addition & 1 deletion docs/guide/architecture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ This can be done with context.
Context holds information about the current broker and current incoming message.
To get it, simply add the context parameter with `type-hint`.

::: warning Cool warning!
::: danger Cool warning!
Context injected only if you have a type hint.
:::

Expand Down
28 changes: 27 additions & 1 deletion docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ order: 4
Core library comes with CLI programm called `taskiq`, which is used to run different subcommands.


By default taskiq is shipped with only one command: `worker`. You can search for more taskiq plugins
By default taskiq is shipped with only two commands: `worker` and `scheduler`. You can search for more taskiq plugins
using pypi. Some plugins may add new commands to taskiq.

## Worker
Expand Down Expand Up @@ -51,3 +51,29 @@ To enable this option simply pass the `--reload` or `-r` option to taskiq CLI.

Also this option supports `.gitignore` files. If you have such files in your directory. It won't reload worker,
if you cange ignored file's contents. To disable this functionality pass `--do-not-use-gitignore` option.


## Scheduler

Scheduler is used to schedule tasks as described in [Scheduling tasks](./scheduling-tasks.md) section.

To run it simply run

```
taskiq scheduler <path to scheduler> [optional module to import]...
```

For example

```python
taskiq scheduler my_project.broker:scheduler my_project.module1 my_project.module2
```

### Parameters

Path to scheduler is the only required argument.

* `--tasks-pattern` or `-tp`.
It's a name of files to import. By default is searches for all `tasks.py` files.
* `--fs-discover` or `-fsd`. This option enables search of task files in current directory recursively, using the given pattern.
* `--log-level` is used to set a log level.
Loading