From 2df9776912e243bbdc39d6b4bac0749599998ddd Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 16 Oct 2023 14:09:19 +0400 Subject: [PATCH 1/2] Added API for easy-scheduling. Signed-off-by: Pavel Kirilin --- docs/examples/extending/schedule_source.py | 5 +- docs/examples/schedule/intro.py | 2 +- taskiq/__init__.py | 3 +- taskiq/abc/schedule_source.py | 7 ++- taskiq/api/scheduler.py | 2 +- taskiq/cli/scheduler/run.py | 3 +- taskiq/decor.py | 54 +++++++++++++++++ taskiq/kicker.py | 70 +++++++++++++++++++++- taskiq/schedule_sources/label_based.py | 2 +- taskiq/scheduler/__init__.py | 9 --- taskiq/scheduler/merge_functions.py | 2 +- taskiq/scheduler/scheduled_task.py | 42 +++++++++++++ taskiq/scheduler/scheduler.py | 31 ++-------- tests/cli/scheduler/test_task_delays.py | 2 +- tests/schedule_sources/test_label_based.py | 2 +- tests/scheduler/test_label_based_sched.py | 3 +- 16 files changed, 189 insertions(+), 50 deletions(-) create mode 100644 taskiq/scheduler/scheduled_task.py diff --git a/docs/examples/extending/schedule_source.py b/docs/examples/extending/schedule_source.py index 8d811bfe..59efa29c 100644 --- a/docs/examples/extending/schedule_source.py +++ b/docs/examples/extending/schedule_source.py @@ -24,8 +24,9 @@ async def get_schedules(self) -> List["ScheduledTask"]: # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. - async def add_schedule(self, schedule: "ScheduledTask") -> None: - return await super().add_schedule(schedule) + # This method can be either sync or async. + def add_schedule(self, schedule: "ScheduledTask") -> None: + print("New schedule added:", schedule) # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. diff --git a/docs/examples/schedule/intro.py b/docs/examples/schedule/intro.py index dd44750b..2faf91cb 100644 --- a/docs/examples/schedule/intro.py +++ b/docs/examples/schedule/intro.py @@ -1,7 +1,7 @@ from taskiq_aio_pika import AioPikaBroker from taskiq.schedule_sources import LabelScheduleSource -from taskiq.scheduler import TaskiqScheduler +from taskiq import TaskiqScheduler broker = AioPikaBroker("amqp://guest:guest@localhost:5672/") diff --git a/taskiq/__init__.py b/taskiq/__init__.py index 8fed981f..42779b34 100644 --- a/taskiq/__init__.py +++ b/taskiq/__init__.py @@ -28,7 +28,8 @@ from taskiq.middlewares.prometheus_middleware import PrometheusMiddleware from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware from taskiq.result import TaskiqResult -from taskiq.scheduler import ScheduledTask, TaskiqScheduler +from taskiq.scheduler.scheduled_task import ScheduledTask +from taskiq.scheduler.scheduler import TaskiqScheduler from taskiq.state import TaskiqState from taskiq.task import AsyncTaskiqTask diff --git a/taskiq/abc/schedule_source.py b/taskiq/abc/schedule_source.py index d550a76b..8a279ed4 100644 --- a/taskiq/abc/schedule_source.py +++ b/taskiq/abc/schedule_source.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, Any, Coroutine, List, Union if TYPE_CHECKING: # pragma: no cover - from taskiq.scheduler.scheduler import ScheduledTask + from taskiq.scheduler.scheduled_task import ScheduledTask class ScheduleSource(ABC): @@ -18,7 +18,10 @@ async def shutdown(self) -> None: # noqa: B027 async def get_schedules(self) -> List["ScheduledTask"]: """Get list of taskiq schedules.""" - async def add_schedule(self, schedule: "ScheduledTask") -> None: + def add_schedule( + self, + schedule: "ScheduledTask", + ) -> Union[None, Coroutine[Any, Any, None]]: """ Add a new schedule. diff --git a/taskiq/api/scheduler.py b/taskiq/api/scheduler.py index 17ec7eb0..97c59c78 100644 --- a/taskiq/api/scheduler.py +++ b/taskiq/api/scheduler.py @@ -1,5 +1,5 @@ from taskiq.cli.scheduler.run import run_scheduler_loop -from taskiq.scheduler import TaskiqScheduler +from taskiq.scheduler.scheduler import TaskiqScheduler async def run_scheduler_task( diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 6aa5198a..5246dfd3 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -10,7 +10,8 @@ from taskiq.abc.schedule_source import ScheduleSource from taskiq.cli.scheduler.args import SchedulerArgs from taskiq.cli.utils import import_object, import_tasks -from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler +from taskiq.scheduler.scheduled_task import ScheduledTask +from taskiq.scheduler.scheduler import TaskiqScheduler logger = getLogger(__name__) diff --git a/taskiq/decor.py b/taskiq/decor.py index 1441367c..baefb7e9 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -6,6 +7,7 @@ Dict, Generic, TypeVar, + Union, overload, ) @@ -16,6 +18,8 @@ if TYPE_CHECKING: # pragma: no cover from taskiq.abc.broker import AsyncBroker + from taskiq.abc.schedule_source import ScheduleSource + from taskiq.scheduler.scheduled_task import CronSpec _T = TypeVar("_T") _FuncParams = ParamSpec("_FuncParams") @@ -93,6 +97,56 @@ async def kiq( """ return await self.kicker().kiq(*args, **kwargs) + async def schedule_by_cron( + self, + source: "ScheduleSource", + cron: Union[str, "CronSpec"], + *args: _FuncParams.args, + **kwargs: _FuncParams.kwargs, + ) -> None: + """ + Schedule task to run on cron. + + This method requires a schedule source, + which is capable of dynamically adding new schedules. + + :param source: schedule source. + :param cron: cron string or a CronSpec instance. + :param args: function's arguments. + :param kwargs: function's key word arguments. + """ + await self.kicker().schedule_cron( + source, + cron, + *args, + **kwargs, + ) + + async def schedule_by_time( + self, + source: "ScheduleSource", + time: datetime, + *args: _FuncParams.args, + **kwargs: _FuncParams.kwargs, + ) -> None: + """ + Schedule task to run on specific time. + + This method requires a schedule source, + which is capable of dynamically adding new schedules. + + :param source: schedule source. + :param time: time to run task. + :param args: function's arguments. + :param kwargs: function's key word arguments. + """ + await self.kicker().schedule_time( + source, + time, + *args, + **kwargs, + ) + def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]: """ This function returns kicker object. diff --git a/taskiq/kicker.py b/taskiq/kicker.py index 67651a82..cc99f2c4 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -1,4 +1,5 @@ from dataclasses import asdict, is_dataclass +from datetime import datetime from logging import getLogger from typing import ( TYPE_CHECKING, @@ -16,13 +17,16 @@ from typing_extensions import ParamSpec from taskiq.abc.middleware import TaskiqMiddleware +from taskiq.compat import model_dump from taskiq.exceptions import SendTaskError from taskiq.message import TaskiqMessage +from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask from taskiq.task import AsyncTaskiqTask from taskiq.utils import maybe_awaitable if TYPE_CHECKING: # pragma: no cover from taskiq.abc.broker import AsyncBroker + from taskiq.abc.schedule_source import ScheduleSource _T = TypeVar("_T") _FuncParams = ParamSpec("_FuncParams") @@ -142,6 +146,70 @@ async def kiq( result_backend=self.broker.result_backend, ) + async def schedule_cron( + self, + source: "ScheduleSource", + cron: Union[str, "CronSpec"], + *args: _FuncParams.args, + **kwargs: _FuncParams.kwargs, + ) -> None: + """ + Function to schedule task with cron. + + :param source: schedule source. + :param cron: cron expression. + :param args: function's args. + :param cron_offset: cron offset. + :param kwargs: function's kwargs. + """ + message = self._prepare_message(*args, **kwargs) + cron_offset = None + if isinstance(cron, CronSpec): + cron_str = cron.to_cron() + cron_offset = cron.offset + else: + cron_str = cron + await maybe_awaitable( + source.add_schedule( + ScheduledTask( + task_name=message.task_name, + labels=message.labels, + args=message.args, + kwargs=message.kwargs, + cron=cron_str, + cron_offset=cron_offset, + ), + ), + ) + + async def schedule_time( + self, + source: "ScheduleSource", + time: datetime, + *args: _FuncParams.args, + **kwargs: _FuncParams.kwargs, + ) -> None: + """ + Function to schedule task to run at specific time. + + :param source: schedule source. + :param time: time to run task at. + :param args: function's args. + :param kwargs: function's kwargs. + """ + message = self._prepare_message(*args, **kwargs) + await maybe_awaitable( + source.add_schedule( + ScheduledTask( + task_name=message.task_name, + labels=message.labels, + args=message.args, + kwargs=message.kwargs, + time=time, + ), + ), + ) + @classmethod def _prepare_arg(cls, arg: Any) -> Any: """ @@ -154,7 +222,7 @@ def _prepare_arg(cls, arg: Any) -> Any: :return: Formatted argument. """ if isinstance(arg, BaseModel): - arg = arg.dict() + arg = model_dump(arg) if is_dataclass(arg): arg = asdict(arg) return arg diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index a484c130..e9116fd9 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -3,7 +3,7 @@ from taskiq.abc.broker import AsyncBroker from taskiq.abc.schedule_source import ScheduleSource -from taskiq.scheduler.scheduler import ScheduledTask +from taskiq.scheduler.scheduled_task import ScheduledTask logger = getLogger(__name__) diff --git a/taskiq/scheduler/__init__.py b/taskiq/scheduler/__init__.py index a0463e01..8e9ba4ad 100644 --- a/taskiq/scheduler/__init__.py +++ b/taskiq/scheduler/__init__.py @@ -1,10 +1 @@ """Scheduler package.""" -from taskiq.scheduler.merge_functions import only_unique, preserve_all -from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler - -__all__ = [ - "only_unique", - "preserve_all", - "ScheduledTask", - "TaskiqScheduler", -] diff --git a/taskiq/scheduler/merge_functions.py b/taskiq/scheduler/merge_functions.py index 077f1692..58c34da0 100644 --- a/taskiq/scheduler/merge_functions.py +++ b/taskiq/scheduler/merge_functions.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, List if TYPE_CHECKING: # pragma: no cover - from taskiq.scheduler.scheduler import ScheduledTask + from taskiq.scheduler.scheduled_task import ScheduledTask def preserve_all( diff --git a/taskiq/scheduler/scheduled_task.py b/taskiq/scheduler/scheduled_task.py new file mode 100644 index 00000000..dcfdeba4 --- /dev/null +++ b/taskiq/scheduler/scheduled_task.py @@ -0,0 +1,42 @@ +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Union + + +@dataclass +class CronSpec: + """Cron specification for running tasks.""" + + minutes: Optional[str | int] = "*" + hours: Optional[str | int] = "*" + days: Optional[str | int] = "*" + months: Optional[str | int] = "*" + weekdays: Optional[str | int] = "*" + + offset: Optional[Union[str, timedelta]] = None + + def to_cron(self) -> str: + """Converts cron spec to cron string.""" + return f"{self.minutes} {self.hours} {self.days} {self.months} {self.weekdays}" + + +@dataclass(frozen=True, eq=True) +class ScheduledTask: + """Abstraction over task schedule.""" + + task_name: str + labels: Dict[str, Any] + args: List[Any] + kwargs: Dict[str, Any] + cron: Optional[str] = field(default=None) + cron_offset: Optional[Union[str, timedelta]] = field(default=None) + time: Optional[datetime] = field(default=None) + + def __post_init__(self) -> None: + """ + This method validates, that either `cron` or `time` field is present. + + :raises ValueError: if cron and time are none. + """ + if self.cron is None and self.time is None: + raise ValueError("Either cron or datetime must be present.") diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index 086ebb3b..3087c7a6 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -1,44 +1,21 @@ -from dataclasses import dataclass, field -from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Callable, List -from taskiq.abc.broker import AsyncBroker from taskiq.kicker import AsyncKicker from taskiq.scheduler.merge_functions import only_new +from taskiq.scheduler.scheduled_task import ScheduledTask from taskiq.utils import maybe_awaitable if TYPE_CHECKING: # pragma: no cover + from taskiq.abc.broker import AsyncBroker from taskiq.abc.schedule_source import ScheduleSource -@dataclass(frozen=True, eq=True) -class ScheduledTask: - """Abstraction over task schedule.""" - - task_name: str - labels: Dict[str, Any] - args: List[Any] - kwargs: Dict[str, Any] - cron: Optional[str] = field(default=None) - cron_offset: Optional[Union[str, timedelta]] = field(default=None) - time: Optional[datetime] = field(default=None) - - def __post_init__(self) -> None: - """ - This method validates, that either `cron` or `time` field is present. - - :raises ValueError: if cron and time are none. - """ - if self.cron is None and self.time is None: - raise ValueError("Either cron or datetime must be present.") - - class TaskiqScheduler: """Scheduler class.""" def __init__( self, - broker: AsyncBroker, + broker: "AsyncBroker", sources: List["ScheduleSource"], merge_func: Callable[ [List["ScheduledTask"], List["ScheduledTask"]], diff --git a/tests/cli/scheduler/test_task_delays.py b/tests/cli/scheduler/test_task_delays.py index 124c213a..7e48e210 100644 --- a/tests/cli/scheduler/test_task_delays.py +++ b/tests/cli/scheduler/test_task_delays.py @@ -5,7 +5,7 @@ from tzlocal import get_localzone from taskiq.cli.scheduler.run import get_task_delay -from taskiq.scheduler.scheduler import ScheduledTask +from taskiq.scheduler.scheduled_task import ScheduledTask def test_should_run_success() -> None: diff --git a/tests/schedule_sources/test_label_based.py b/tests/schedule_sources/test_label_based.py index 6b0d6079..7226ee05 100644 --- a/tests/schedule_sources/test_label_based.py +++ b/tests/schedule_sources/test_label_based.py @@ -5,7 +5,7 @@ from taskiq.brokers.inmemory_broker import InMemoryBroker from taskiq.schedule_sources.label_based import LabelScheduleSource -from taskiq.scheduler.scheduler import ScheduledTask +from taskiq.scheduler.scheduled_task import ScheduledTask @pytest.mark.anyio diff --git a/tests/scheduler/test_label_based_sched.py b/tests/scheduler/test_label_based_sched.py index 064079a6..f288f15d 100644 --- a/tests/scheduler/test_label_based_sched.py +++ b/tests/scheduler/test_label_based_sched.py @@ -9,7 +9,8 @@ from taskiq.cli.scheduler.args import SchedulerArgs from taskiq.cli.scheduler.run import run_scheduler from taskiq.schedule_sources.label_based import LabelScheduleSource -from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler +from taskiq.scheduler.scheduled_task import ScheduledTask +from taskiq.scheduler.scheduler import TaskiqScheduler @pytest.mark.anyio From 6524f88d2497f5dcfe67596dababa214ef0ac064 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 16 Oct 2023 14:19:12 +0400 Subject: [PATCH 2/2] Fixed types. Signed-off-by: Pavel Kirilin --- taskiq/scheduler/scheduled_task.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/taskiq/scheduler/scheduled_task.py b/taskiq/scheduler/scheduled_task.py index dcfdeba4..90f83741 100644 --- a/taskiq/scheduler/scheduled_task.py +++ b/taskiq/scheduler/scheduled_task.py @@ -7,11 +7,11 @@ class CronSpec: """Cron specification for running tasks.""" - minutes: Optional[str | int] = "*" - hours: Optional[str | int] = "*" - days: Optional[str | int] = "*" - months: Optional[str | int] = "*" - weekdays: Optional[str | int] = "*" + minutes: Optional[Union[str, int]] = "*" + hours: Optional[Union[str, int]] = "*" + days: Optional[Union[str, int]] = "*" + months: Optional[Union[str, int]] = "*" + weekdays: Optional[Union[str, int]] = "*" offset: Optional[Union[str, timedelta]] = None