diff --git a/docs/available-components/README.md b/docs/available-components/README.md new file mode 100644 index 00000000..76396ddc --- /dev/null +++ b/docs/available-components/README.md @@ -0,0 +1,13 @@ +--- +order: 1 +dir: + order: 3 +--- + +# Available components + +In this section, you can find a list of officially supported plugins for the taskiq. + +* [Available brokers](./brokers.md) +* [Available result backends](./result-backends.md) +* [Available schedule sources](./scheduler-sources.md) diff --git a/docs/guide/available-brokers.md b/docs/available-components/brokers.md similarity index 99% rename from docs/guide/available-brokers.md rename to docs/available-components/brokers.md index 1fdb72bf..7c6cf977 100644 --- a/docs/guide/available-brokers.md +++ b/docs/available-components/brokers.md @@ -1,5 +1,5 @@ --- -order: 5 +order: 2 --- # Available brokers diff --git a/docs/guide/result_backends.md b/docs/available-components/result-backends.md similarity index 95% rename from docs/guide/result_backends.md rename to docs/available-components/result-backends.md index 1883cc03..9d258cb9 100644 --- a/docs/guide/result_backends.md +++ b/docs/available-components/result-backends.md @@ -1,8 +1,8 @@ --- -order: 6 +order: 3 --- -# Result backends +# Available result backends Result backends are used to store execution results. This includes: diff --git a/docs/available-components/schedule-sources.md b/docs/available-components/schedule-sources.md new file mode 100644 index 00000000..fdac8039 --- /dev/null +++ b/docs/available-components/schedule-sources.md @@ -0,0 +1,59 @@ +--- +order: 4 +--- + + +# Available schedule sources + +These objects are used to fetch current schedule for tasks. +Currently we have only one schedule source. + + +## LabelScheduleSource + +This source parses labels of tasks, and if it finds a `schedule` label, it considers this task as scheduled. + +The format of the schedule label is the following: + +```python +@broker.task( + schedule=[ + { + "cron": "* * * * *", # type: str, required argument. + "args": [], # type List[Any] | None, can be omitted. + "kwargs": {}, # type: Dict[str, Any] | None, can be omitted. + "labels": {}, # type: Dict[str, Any] | None, can be omitted. + } + ] +) +async def my_task(): + ... +``` + +Parameters: +* `cron` - crontab string when to run the task. +* `args` - args to use, when invoking the task. +* `kwargs` - key-word arguments to use when invoking the task. +* `labels` - additional labels to use wehn invoking the task. + +Usage: + +```python +from taskiq.scheduler import TaskiqScheduler +from taskiq.schedule_sources import LabelScheduleSource + +broker = ... + +scheduler = TaskiqScheduler( + broker=broker, + sources=[LabelScheduleSource(broker)], +) +``` + + +::: warning Cool notice! + +In order to resolve all labels correctly, don't forget to import +all task modules using CLI interface. + +::: diff --git a/docs/examples/extending/result_backend.py b/docs/examples/extending/result_backend.py new file mode 100644 index 00000000..5bb5e6d3 --- /dev/null +++ b/docs/examples/extending/result_backend.py @@ -0,0 +1,44 @@ +from typing import TypeVar + +from taskiq import TaskiqResult +from taskiq.abc.result_backend import AsyncResultBackend + +_ReturnType = TypeVar("_ReturnType") + + +class MyResultBackend(AsyncResultBackend[_ReturnType]): + async def startup(self) -> None: + """Do something when starting broker.""" + + async def shutdown(self) -> None: + """Do something on shutdown.""" + + async def set_result( + self, + task_id: str, + result: TaskiqResult[_ReturnType], + ) -> None: + # Here you must set result somewhere. + pass + + async def get_result( + self, + task_id: str, + with_logs: bool = False, + ) -> TaskiqResult[_ReturnType]: + # Here you must retrieve result by id. + + # Logs is a part of a result. + # Here we have a parameter whether you want to + # fetch result with logs or not, because logs + # can have a lot of info and sometimes it's critical + # to get only needed information. + pass + + async def is_result_ready( + self, + task_id: str, + ) -> bool: + # This function checks if result of a task exists, + # without actual fetching the result. + pass diff --git a/docs/examples/extending/schedule_source.py b/docs/examples/extending/schedule_source.py new file mode 100644 index 00000000..a441a173 --- /dev/null +++ b/docs/examples/extending/schedule_source.py @@ -0,0 +1,28 @@ +from typing import List + +from taskiq import ScheduledTask, ScheduleSource + + +class MyScheduleSource(ScheduleSource): + async def startup(self) -> None: + """Do something when starting broker.""" + + async def shutdown(self) -> None: + """Do something on shutdown.""" + + async def get_schedules(self) -> List["ScheduledTask"]: + # Here you must return list of scheduled tasks from your source. + return [ + ScheduledTask( + task_name="", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ), + ] + + # This method is optional. You may not implement this. + # It's just a helper to people to be able to interact with your source. + async def add_schedule(self, schedule: "ScheduledTask") -> None: + return await super().add_schedule(schedule) diff --git a/docs/examples/schedule/intro.py b/docs/examples/schedule/intro.py new file mode 100644 index 00000000..dd44750b --- /dev/null +++ b/docs/examples/schedule/intro.py @@ -0,0 +1,16 @@ +from taskiq_aio_pika import AioPikaBroker + +from taskiq.schedule_sources import LabelScheduleSource +from taskiq.scheduler import TaskiqScheduler + +broker = AioPikaBroker("amqp://guest:guest@localhost:5672/") + +scheduler = TaskiqScheduler( + broker=broker, + sources=[LabelScheduleSource(broker)], +) + + +@broker.task(schedule=[{"cron": "*/5 * * * *", "args": [1]}]) +async def heavy_task(value: int) -> int: + return value + 1 diff --git a/docs/examples/schedule/without_schedule.py b/docs/examples/schedule/without_schedule.py new file mode 100644 index 00000000..ac28fca0 --- /dev/null +++ b/docs/examples/schedule/without_schedule.py @@ -0,0 +1,8 @@ +from taskiq_aio_pika import AioPikaBroker + +broker = AioPikaBroker("amqp://guest:guest@localhost:5672/") + + +@broker.task +async def heavy_task(value: int) -> int: + return value + 1 diff --git a/docs/extending-taskiq/README.md b/docs/extending-taskiq/README.md index a615c78f..366a7893 100644 --- a/docs/extending-taskiq/README.md +++ b/docs/extending-taskiq/README.md @@ -10,3 +10,12 @@ Taskiq is super extendable. The core library comes with different abstract class You can implement these abstract classes to extend functionality. All abstract classes can be found in `taskiq.abc` package. + + +## Contents: + +* [Brokers](./broker.md) +* [Brokers](./broker.md) +* [Result backends](./resutl-backend.md) +* [CLI](./cli.md) +* [Schedule sources](./schedule-sources.md) diff --git a/docs/extending-taskiq/cli.md b/docs/extending-taskiq/cli.md index f5d044e0..d2cb3e4a 100644 --- a/docs/extending-taskiq/cli.md +++ b/docs/extending-taskiq/cli.md @@ -27,7 +27,7 @@ from setuptools import setup setup( # ..., entry_points={ - 'taskiq-cli': [ + 'taskiq_cli': [ 'demo = my_project.cmd:MyCommand', ] } @@ -37,14 +37,14 @@ setup( @tab setuptools pyproject.toml ```toml -[project.entry-points.taskiq-cli] +[project.entry-points.taskiq_cli] demo = "my_project.cmd:MyCommand" ``` @tab poetry ```toml -[tool.poetry.plugins.taskiq-cli] +[tool.poetry.plugins.taskiq_cli] demo = "my_project.cmd:MyCommand" ``` diff --git a/docs/extending-taskiq/resutl-backend.md b/docs/extending-taskiq/resutl-backend.md index 79065533..57db701a 100644 --- a/docs/extending-taskiq/resutl-backend.md +++ b/docs/extending-taskiq/resutl-backend.md @@ -10,52 +10,7 @@ To create new `result_backend` you have to implement `taskiq.abc.result_backend. Here's a minimal example of a result backend: -```python -from typing import TypeVar -from taskiq import TaskiqResult -from taskiq.abc.result_backend import AsyncResultBackend - -_ReturnType = TypeVar("_ReturnType") - - -class MyResultBackend(AsyncResultBackend[_ReturnType]): - async def startup(self) -> None: - """Do something when starting broker.""" - - async def shutdown(self) -> None: - """Do something on shutdown.""" - - async def set_result( - self, - task_id: str, - result: TaskiqResult[_ReturnType], - ) -> None: - # Here you must set result somewhere. - pass - - async def get_result( - self, - task_id: str, - with_logs: bool = False, - ) -> TaskiqResult[_ReturnType]: - # Here you must retrieve result by id. - - # Logs is a part of a result. - # Here we have a parameter whether you want to - # fetch result with logs or not, because logs - # can have a lot of info and sometimes it's critical - # to get only needed information. - pass - - async def is_result_ready( - self, - task_id: str, - ) -> bool: - # This function checks if result of a task exists, - # without actual fetching the result. - pass - -``` +@[code python](../examples/extending/result_backend.py) ::: info Cool tip! It's a good practice to skip fetching logs from the storage unless `with_logs=True` is explicitly specified. diff --git a/docs/extending-taskiq/schedule-sources.md b/docs/extending-taskiq/schedule-sources.md new file mode 100644 index 00000000..3ac3150e --- /dev/null +++ b/docs/extending-taskiq/schedule-sources.md @@ -0,0 +1,12 @@ +--- +order: 5 +--- + +# Schedule source + +Schedule sources are used to get schedule for tasks. +To create new `schedule source` you have to implement the `taskiq.abc.schedule_source.ScheduleSource` abstract class. + +Here's a minimal example of a schedule source: + +@[code python](../examples/extending/schedule_source.py) diff --git a/docs/guide/architecture-overview.md b/docs/guide/architecture-overview.md index 92ee7c09..29a55c25 100644 --- a/docs/guide/architecture-overview.md +++ b/docs/guide/architecture-overview.md @@ -241,7 +241,7 @@ This can be done with context. Context holds information about the current broker and current incoming message. To get it, simply add the context parameter with `type-hint`. -::: warning Cool warning! +::: danger Cool warning! Context injected only if you have a type hint. ::: diff --git a/docs/guide/cli.md b/docs/guide/cli.md index 0429eb9a..5040e303 100644 --- a/docs/guide/cli.md +++ b/docs/guide/cli.md @@ -7,7 +7,7 @@ order: 4 Core library comes with CLI programm called `taskiq`, which is used to run different subcommands. -By default taskiq is shipped with only one command: `worker`. You can search for more taskiq plugins +By default taskiq is shipped with only two commands: `worker` and `scheduler`. You can search for more taskiq plugins using pypi. Some plugins may add new commands to taskiq. ## Worker @@ -51,3 +51,29 @@ To enable this option simply pass the `--reload` or `-r` option to taskiq CLI. Also this option supports `.gitignore` files. If you have such files in your directory. It won't reload worker, if you cange ignored file's contents. To disable this functionality pass `--do-not-use-gitignore` option. + + +## Scheduler + +Scheduler is used to schedule tasks as described in [Scheduling tasks](./scheduling-tasks.md) section. + +To run it simply run + +``` +taskiq scheduler [optional module to import]... +``` + +For example + +```python +taskiq scheduler my_project.broker:scheduler my_project.module1 my_project.module2 +``` + +### Parameters + +Path to scheduler is the only required argument. + +* `--tasks-pattern` or `-tp`. + It's a name of files to import. By default is searches for all `tasks.py` files. +* `--fs-discover` or `-fsd`. This option enables search of task files in current directory recursively, using the given pattern. +* `--log-level` is used to set a log level. diff --git a/docs/guide/scheduling-tasks.md b/docs/guide/scheduling-tasks.md new file mode 100644 index 00000000..571eae0b --- /dev/null +++ b/docs/guide/scheduling-tasks.md @@ -0,0 +1,64 @@ +--- +order: 7 +--- + +# Scheduling tasks + +Sometimes you may want to execute some tasks according to some schedule. +For example, you want to call a function every day at 2 pm. + +It's easy to do with taskiq. We have primitives that can help you to solve your problems. + +Let's imagine we have a module, as shown below, and we want to execute the `heavy_task` every 5 minutes. +What should we do? + +@[code python](../examples/schedule/without_schedule.py) + +Of course we can implement loop like this: + +```python + while True: + await heavy_task.kiq(1) + await asyncio.sleep(timedelta(minutes=5).total_seconds) +``` + +But if you have many schedules it may be a little painful to implement. So let me introuce you the `TaskiqScheduler`. +Let's add scheduler to our module. + +@[code python](../examples/schedule/intro.py) + +That's it. + +Now we need to start our scheduler with the `taskiq scheduler` command. Like this: + +```bash:no-line-numbers +taskiq scheduler module:scheduler +``` + +::: danger Be careful! + +Please always run only one instance of the scheduler! +If you run more than one scheduler at a time, please be careful since +it may execute one task N times, where N is the number of running scheduler instances. + +::: + +This command will import the scheduler you defined and start sending tasks to your broker. + +You can check list of available schedule sources in the [Available schedule sources](../available-components/schedule-sources.md) section. + + +## Multiple sources + +Sometimes you may want to use multiple sources to assemble a schedule for tasks. The `TaskiqScheduler` can do so. +But it's obvious how to merge schedules from different sources. + +That's why you can pass a custom merge function to resolve all possible conflicts or if you want to have more +complex logic aside from sources. For example, filter out some task schedules. + +Currently we have only two default functions to merge tasks. You can find them in the `taskiq.scheduler.merge_functions` module. + +* `preserve_all` - simply adds new schedules to the old ones. +* `only_unique` - adds scheudle only if it was not added by previous sources. + +Every time we update schedule it gets task from the source and executes this function to merge them together. diff --git a/poetry.lock b/poetry.lock index 6e841c8a..7f75a3b3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -637,6 +637,14 @@ category = "main" optional = true python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +[[package]] +name = "pycron" +version = "3.0.0" +description = "Simple cron-like parser, which determines if current datetime matches conditions." +category = "main" +optional = false +python-versions = ">=3.5" + [[package]] name = "pydantic" version = "1.10.2" @@ -981,7 +989,7 @@ zmq = ["pyzmq"] [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "bd939735931a7b22fb879088796c49ca7f75195e1eba290a798c721b43afd831" +content-hash = "fc7269926cf306cf1b11898b8cbd3f03bee332141831dbe0d79b4006f7ba8077" [metadata.files] anyio = [ @@ -1384,6 +1392,9 @@ pycparser = [ {file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"}, {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, ] +pycron = [ + {file = "pycron-3.0.0.tar.gz", hash = "sha256:b916044e3e8253d5409c68df3ac64a3472c4e608dab92f40e8f595e5d3acb3de"}, +] pydantic = [ {file = "pydantic-1.10.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:bb6ad4489af1bac6955d38ebcb95079a836af31e4c4f74aba1ca05bb9f6027bd"}, {file = "pydantic-1.10.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a1f5a63a6dfe19d719b1b6e6106561869d2efaca6167f84f5ab9347887d78b98"}, diff --git a/pyproject.toml b/pyproject.toml index b6ec1b31..d413dac0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ uvloop = { version = ">=0.16.0,<1", optional = true } watchdog = "^2.1.9" gitignore-parser = "^0.1.0" importlib-metadata = "<4.3" +pycron = "^3.0.0" [tool.poetry.dev-dependencies] @@ -59,8 +60,9 @@ uv = ["uvloop"] [tool.poetry.scripts] taskiq = "taskiq.__main__:main" -[tool.poetry.plugins.taskiq-cli] +[tool.poetry.plugins.taskiq_cli] worker = "taskiq.cli.worker.cmd:WorkerCMD" +scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD" [tool.mypy] strict = true diff --git a/taskiq/__init__.py b/taskiq/__init__.py index c9f8eefe..4a12354c 100644 --- a/taskiq/__init__.py +++ b/taskiq/__init__.py @@ -3,6 +3,7 @@ from taskiq.abc.formatter import TaskiqFormatter from taskiq.abc.middleware import TaskiqMiddleware from taskiq.abc.result_backend import AsyncResultBackend +from taskiq.abc.schedule_source import ScheduleSource from taskiq.brokers.inmemory_broker import InMemoryBroker from taskiq.brokers.shared_broker import async_shared_broker from taskiq.brokers.zmq_broker import ZeroMQBroker @@ -11,6 +12,7 @@ from taskiq.funcs import gather from taskiq.message import BrokerMessage, TaskiqMessage from taskiq.result import TaskiqResult +from taskiq.scheduler import ScheduledTask, TaskiqScheduler from taskiq.task import AsyncTaskiqTask __all__ = [ @@ -23,6 +25,9 @@ "TaskiqMessage", "BrokerMessage", "InMemoryBroker", + "ScheduleSource", + "ScheduledTask", + "TaskiqScheduler", "TaskiqFormatter", "AsyncTaskiqTask", "TaskiqMiddleware", diff --git a/taskiq/__main__.py b/taskiq/__main__.py index e6d4356a..019a7888 100644 --- a/taskiq/__main__.py +++ b/taskiq/__main__.py @@ -18,8 +18,7 @@ def main() -> None: # noqa: C901, WPS210 All arguments are passed to them as it was a normal call. """ - plugins = entry_points().select(group="taskiq-cli") - found_plugins = len(plugins) + found_plugins = len(entry_points().select(group="taskiq_cli")) parser = argparse.ArgumentParser( description=f""" CLI for taskiq. Distributed task queue. @@ -44,10 +43,11 @@ def main() -> None: # noqa: C901, WPS210 metavar="", dest="subcommand", ) - for entrypoint in entry_points().select(group="taskiq-cli"): + for entrypoint in entry_points().select(group="taskiq_cli"): try: cmd_class = entrypoint.load() except ImportError: + print(f"Could not load {entrypoint.value}") # noqa: WPS421 continue if issubclass(cmd_class, TaskiqCMD): subparsers.add_parser( diff --git a/taskiq/abc/schedule_source.py b/taskiq/abc/schedule_source.py new file mode 100644 index 00000000..f4b35505 --- /dev/null +++ b/taskiq/abc/schedule_source.py @@ -0,0 +1,35 @@ +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, List + +if TYPE_CHECKING: + from taskiq.scheduler.scheduler import ScheduledTask + + +class ScheduleSource(ABC): + """Abstract class for source of scheduled tasks.""" + + async def startup(self) -> None: + """Action to execute during startup.""" + + async def shutdown(self) -> None: + """Actions to execute during shutdown.""" + + @abstractmethod + async def get_schedules(self) -> List["ScheduledTask"]: + """Get list of taskiq schedules.""" + + async def add_schedule(self, schedule: "ScheduledTask") -> None: + """ + Add a new schedule. + + This function is used to add new schedules. + It's a convenient helper for people who want to add new schedules + for the current source. + + As an example, if your source works with a database, + you may want to add new rows to the table. + + Note that this function may do nothing. + + :param schedule: schedule to add. + """ diff --git a/taskiq/cli/common_args.py b/taskiq/cli/common_args.py new file mode 100644 index 00000000..f8a55151 --- /dev/null +++ b/taskiq/cli/common_args.py @@ -0,0 +1,11 @@ +import enum + + +class LogLevel(str, enum.Enum): # noqa: WPS600 + """Different log levels.""" + + INFO = "INFO" + WARNING = "WARNING" + DEBUG = "DEBUG" + ERROR = "ERROR" + FATAL = "FATAL" diff --git a/taskiq/cli/scheduler/__init__.py b/taskiq/cli/scheduler/__init__.py new file mode 100644 index 00000000..b75dcfa4 --- /dev/null +++ b/taskiq/cli/scheduler/__init__.py @@ -0,0 +1 @@ +"""Package to run scheduled jobs.""" diff --git a/taskiq/cli/scheduler/args.py b/taskiq/cli/scheduler/args.py new file mode 100644 index 00000000..9517e3b9 --- /dev/null +++ b/taskiq/cli/scheduler/args.py @@ -0,0 +1,62 @@ +from argparse import ZERO_OR_MORE, ArgumentDefaultsHelpFormatter, ArgumentParser +from dataclasses import dataclass +from typing import List, Optional, Sequence + +from taskiq.cli.common_args import LogLevel + + +@dataclass +class SchedulerArgs: + """Arguments for scheduler.""" + + scheduler: str + modules: List[str] + log_level: str = LogLevel.INFO.name + fs_discover: bool = False + pattern: str = "tasks.py" + tasks_pattern: str = "tasks.py" + + @classmethod + def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs": + """ + Build scheduler args from CLI arguments. + + This method takes arguments as args parameter. + + :param args: current CLI arguments, defaults to None + :return: instance of scheduler args. + """ + parser = ArgumentParser( + formatter_class=ArgumentDefaultsHelpFormatter, + description="Subcommand to run scheduler", + ) + parser.add_argument("scheduler", help="Path to scheduler") + parser.add_argument( + "modules", + help="List of modules where to look for tasks.", + nargs=ZERO_OR_MORE, + ) + parser.add_argument( + "--fs-discover", + "-fsd", + action="store_true", + help=( + "If this option is on, " + "taskiq will try to find tasks modules " + "in current directory recursievly. Name of file to search for " + "can be configured using `--tasks-pattern` option." + ), + ) + parser.add_argument( + "--tasks-pattern", + "-tp", + default="tasks.py", + help="Name of files in which taskiq will try to find modules.", + ) + parser.add_argument( + "--log-level", + default=LogLevel.INFO.name, + choices=[level.name for level in LogLevel], + help="scheduler log level", + ) + return cls(**parser.parse_args(args).__dict__) diff --git a/taskiq/cli/scheduler/cmd.py b/taskiq/cli/scheduler/cmd.py new file mode 100644 index 00000000..76500247 --- /dev/null +++ b/taskiq/cli/scheduler/cmd.py @@ -0,0 +1,26 @@ +import asyncio +from typing import Sequence + +from taskiq.abc.cmd import TaskiqCMD +from taskiq.cli.scheduler.args import SchedulerArgs +from taskiq.cli.scheduler.run import run_scheduler + + +class SchedulerCMD(TaskiqCMD): + """Command for taskiq scheduler.""" + + short_help = "Run task scheduler" + + def exec(self, args: Sequence[str]) -> None: + """ + Run task scheduler. + + This function starts scheduler function. + + It periodically loads schedule for tasks + and executes them. + + :param args: CLI arguments. + """ + parsed = SchedulerArgs.from_cli(args) + asyncio.run(run_scheduler(parsed)) diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py new file mode 100644 index 00000000..980f4ac2 --- /dev/null +++ b/taskiq/cli/scheduler/run.py @@ -0,0 +1,109 @@ +import asyncio +from datetime import datetime, timedelta +from logging import basicConfig, getLevelName, getLogger +from typing import List + +from pycron import is_now + +from taskiq.cli.scheduler.args import SchedulerArgs +from taskiq.cli.utils import import_object, import_tasks +from taskiq.kicker import AsyncKicker +from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler + +logger = getLogger(__name__) + + +async def schedules_updater( + scheduler: TaskiqScheduler, + current_schedules: List[ScheduledTask], +) -> None: + """ + Periodic update to schedules. + + This task preiodicaly checks for new schedules, + assembles the final list and replaces current + schedule with a new one. + + :param scheduler: current scheduler. + :param current_schedules: list of schedules. + """ + while True: + logger.debug("Started schedule update.") + new_schedules: "List[ScheduledTask]" = [] + for source in scheduler.sources: + try: + schedules = await source.get_schedules() + except Exception as exc: + logger.warning( + "Cannot update schedules with source: %s", + source, + ) + logger.debug(exc, exc_info=True) + continue + for schedule in scheduler.merge_func(new_schedules, schedules): + new_schedules.append(schedule) + current_schedules.clear() + current_schedules.extend(new_schedules) + await asyncio.sleep(scheduler.refresh_delay) + + +async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS213 + """ + Runs scheduler loop. + + This function imports taskiq scheduler + and runs tasks when needed. + + :param args: parsed CLI args. + """ + scheduler = import_object(args.scheduler) + if not isinstance(scheduler, TaskiqScheduler): + print( # noqa: WPS421 + "Imported scheduler is not a subclass of TaskiqScheduler.", + ) + exit(1) # noqa: WPS421 + import_tasks(args.modules, args.pattern, args.fs_discover) + basicConfig( + level=getLevelName(args.log_level), + format=( + "[%(asctime)s][%(levelname)-7s]" + "[%(module)s:%(funcName)s:%(lineno)d]" + " %(message)s" + ), + ) + for source in scheduler.sources: + await source.startup() + loop = asyncio.get_event_loop() + tasks: "List[ScheduledTask]" = [] + loop.create_task(schedules_updater(scheduler, tasks)) + logger.info("Starting scheduler.") + await scheduler.startup() + logger.info("Startup completed.") + while True: # noqa: WPS457 + not_fired_tasks = [] + for task in tasks: + try: + ready = is_now(task.cron) + except ValueError: + logger.warning( + "Cannot parse cron: %s for task: %s", + task.cron, + task.task_name, + ) + continue + if ready: + logger.info("Sending task %s.", task.task_name) + loop.create_task( + AsyncKicker(task.task_name, scheduler.broker, task.labels).kiq( + *task.args, + **task.kwargs, + ), + ) + else: + not_fired_tasks.append(task) + delay = ( + datetime.now().replace(second=1, microsecond=0) + + timedelta(minutes=1) + - datetime.now() + ) + await asyncio.sleep(delay.total_seconds()) diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index 31450433..68968f4f 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -1,17 +1,8 @@ -import enum from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from dataclasses import dataclass from typing import List, Optional, Sequence - -class LogLevel(str, enum.Enum): # noqa: WPS600 - """Different log levels.""" - - INFO = "INFO" - WARNING = "WARNING" - DEBUG = "DEBUG" - ERROR = "ERROR" - FATAL = "FATAL" +from taskiq.cli.common_args import LogLevel @dataclass diff --git a/taskiq/schedule_sources/__init__.py b/taskiq/schedule_sources/__init__.py new file mode 100644 index 00000000..1ad5a6fd --- /dev/null +++ b/taskiq/schedule_sources/__init__.py @@ -0,0 +1,6 @@ +"""Package for schedule sources.""" +from taskiq.schedule_sources.label_based import LabelScheduleSource + +__all__ = [ + "LabelScheduleSource", +] diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py new file mode 100644 index 00000000..b9d63102 --- /dev/null +++ b/taskiq/schedule_sources/label_based.py @@ -0,0 +1,42 @@ +from typing import List + +from taskiq.abc.broker import AsyncBroker +from taskiq.abc.schedule_source import ScheduleSource +from taskiq.scheduler.scheduler import ScheduledTask + + +class LabelScheduleSource(ScheduleSource): + """Schedule source based on labels.""" + + def __init__(self, broker: AsyncBroker) -> None: + self.broker = broker + + async def get_schedules(self) -> List["ScheduledTask"]: + """ + Collect schedules for all tasks. + + this function checks labels for all + tasks available to the broker. + + If task has a schedule label, + it will be parsed and retuned. + + :return: list of schedules. + """ + schedules = [] + for task_name, task in self.broker.available_tasks.items(): + if task.broker != self.broker: + continue + for schedule in task.labels.get("schedule", []): + labels = schedule.get("labels", {}) + labels.update(task.labels) + schedules.append( + ScheduledTask( + task_name=task_name, + labels=labels, + args=schedule.get("args", []), + kwargs=schedule.get("kwargs", {}), + cron=schedule["cron"], + ), + ) + return schedules diff --git a/taskiq/scheduler/__init__.py b/taskiq/scheduler/__init__.py new file mode 100644 index 00000000..a0463e01 --- /dev/null +++ b/taskiq/scheduler/__init__.py @@ -0,0 +1,10 @@ +"""Scheduler package.""" +from taskiq.scheduler.merge_functions import only_unique, preserve_all +from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler + +__all__ = [ + "only_unique", + "preserve_all", + "ScheduledTask", + "TaskiqScheduler", +] diff --git a/taskiq/scheduler/merge_functions.py b/taskiq/scheduler/merge_functions.py new file mode 100644 index 00000000..ac3b7a1e --- /dev/null +++ b/taskiq/scheduler/merge_functions.py @@ -0,0 +1,42 @@ +from typing import TYPE_CHECKING, List + +if TYPE_CHECKING: + from taskiq.scheduler.scheduler import ScheduledTask + + +def preserve_all( + old_tasks: List["ScheduledTask"], + new_tasks: List["ScheduledTask"], +) -> List["ScheduledTask"]: + """ + This function simply merges two lists. + + It adds new tasks to others. + + :param old_tasks: previously discovered tasks. + :param new_tasks: newly discovered tasks. + :return: merged list. + """ + return old_tasks + new_tasks + + +def only_unique( + old_tasks: List["ScheduledTask"], + new_tasks: List["ScheduledTask"], +) -> List["ScheduledTask"]: + """ + This function preserves only unique schedules. + + It checks every task and if the schedule is already + in list, it won't be added. + + :param old_tasks: previously discovered tasks. + :param new_tasks: newly discovered tasks. + :return: list of unique schedules. + """ + new_tasks = [] + new_tasks.extend(old_tasks) + for task in new_tasks: + if task not in new_tasks: + new_tasks.append(task) + return new_tasks diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py new file mode 100644 index 00000000..d94b1d86 --- /dev/null +++ b/taskiq/scheduler/scheduler.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Callable, Dict, List + +from taskiq.abc.broker import AsyncBroker +from taskiq.scheduler.merge_functions import preserve_all + +if TYPE_CHECKING: + from taskiq.abc.schedule_source import ScheduleSource + + +@dataclass(frozen=True, eq=True) +class ScheduledTask: + """Abstraction over task schedule.""" + + task_name: str + labels: Dict[str, Any] + args: List[Any] + kwargs: Dict[str, Any] + cron: str + + +class TaskiqScheduler: + """Scheduler class.""" + + def __init__( + self, + broker: AsyncBroker, + sources: List["ScheduleSource"], + merge_func: Callable[ + [List["ScheduledTask"], List["ScheduledTask"]], + List["ScheduledTask"], + ] = preserve_all, + refresh_delay: float = 30.0, + ) -> None: + self.broker = broker + self.sources = sources + self.refresh_delay = refresh_delay + self.merge_func = merge_func + + async def startup(self) -> None: + """ + This method is called on startup. + + Here you can do stuff, like creating + connections or anything you'd like. + """ + await self.broker.startup()