diff --git a/docs/examples/extending/schedule_source.py b/docs/examples/extending/schedule_source.py index 59efa29c..8c0ec9f9 100644 --- a/docs/examples/extending/schedule_source.py +++ b/docs/examples/extending/schedule_source.py @@ -1,4 +1,4 @@ -from typing import List +from typing import Any, Coroutine, List from taskiq import ScheduledTask, ScheduleSource @@ -24,10 +24,14 @@ async def get_schedules(self) -> List["ScheduledTask"]: # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. - # This method can be either sync or async. - def add_schedule(self, schedule: "ScheduledTask") -> None: + async def add_schedule(self, schedule: "ScheduledTask") -> None: print("New schedule added:", schedule) + # This method is completely optional, but if you want to support + # schedule cancelation, you must implement it. + async def delete_schedule(self, schedule_id: str) -> None: + print("Deleting schedule:", schedule_id) + # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. async def pre_send(self, task: "ScheduledTask") -> None: diff --git a/docs/extending-taskiq/schedule-sources.md b/docs/extending-taskiq/schedule-sources.md index 3ac3150e..c4a130ae 100644 --- a/docs/extending-taskiq/schedule-sources.md +++ b/docs/extending-taskiq/schedule-sources.md @@ -10,3 +10,5 @@ To create new `schedule source` you have to implement the `taskiq.abc.schedule_s Here's a minimal example of a schedule source: @[code python](../examples/extending/schedule_source.py) + +You can implement a schedule source that write schedules in the database and have delayed tasks in runtime. diff --git a/taskiq/abc/schedule_source.py b/taskiq/abc/schedule_source.py index 8a279ed4..1341d460 100644 --- a/taskiq/abc/schedule_source.py +++ b/taskiq/abc/schedule_source.py @@ -18,10 +18,10 @@ async def shutdown(self) -> None: # noqa: B027 async def get_schedules(self) -> List["ScheduledTask"]: """Get list of taskiq schedules.""" - def add_schedule( + async def add_schedule( self, schedule: "ScheduledTask", - ) -> Union[None, Coroutine[Any, Any, None]]: + ) -> None: """ Add a new schedule. @@ -40,6 +40,19 @@ def add_schedule( f"The source {self.__class__.__name__} does not support adding schedules.", ) + async def delete_schedule(self, schedule_id: str) -> None: + """ + Method to delete schedule by id. + + This is useful for schedule cancelation. + + :param schedule_id: id of schedule to delete. + """ + raise NotImplementedError( + f"The source {self.__class__.__name__} does " + "not support deleting schedules.", + ) + def pre_send( # noqa: B027 self, task: "ScheduledTask", diff --git a/taskiq/decor.py b/taskiq/decor.py index baefb7e9..eb30a09f 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -14,6 +14,7 @@ from typing_extensions import ParamSpec from taskiq.kicker import AsyncKicker +from taskiq.scheduler.created_schedule import CreatedSchedule from taskiq.task import AsyncTaskiqTask if TYPE_CHECKING: # pragma: no cover @@ -103,7 +104,7 @@ async def schedule_by_cron( cron: Union[str, "CronSpec"], *args: _FuncParams.args, **kwargs: _FuncParams.kwargs, - ) -> None: + ) -> CreatedSchedule[_ReturnType]: """ Schedule task to run on cron. @@ -114,8 +115,9 @@ async def schedule_by_cron( :param cron: cron string or a CronSpec instance. :param args: function's arguments. :param kwargs: function's key word arguments. + :return: schedule id. """ - await self.kicker().schedule_cron( + return await self.kicker().schedule_by_cron( source, cron, *args, @@ -128,7 +130,7 @@ async def schedule_by_time( time: datetime, *args: _FuncParams.args, **kwargs: _FuncParams.kwargs, - ) -> None: + ) -> CreatedSchedule[_ReturnType]: """ Schedule task to run on specific time. @@ -139,8 +141,9 @@ async def schedule_by_time( :param time: time to run task. :param args: function's arguments. :param kwargs: function's key word arguments. + :return: schedule id. """ - await self.kicker().schedule_time( + return await self.kicker().schedule_by_time( source, time, *args, diff --git a/taskiq/kicker.py b/taskiq/kicker.py index cc99f2c4..7ec8451f 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -20,6 +20,7 @@ from taskiq.compat import model_dump from taskiq.exceptions import SendTaskError from taskiq.message import TaskiqMessage +from taskiq.scheduler.created_schedule import CreatedSchedule from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask from taskiq.task import AsyncTaskiqTask from taskiq.utils import maybe_awaitable @@ -146,13 +147,13 @@ async def kiq( result_backend=self.broker.result_backend, ) - async def schedule_cron( + async def schedule_by_cron( self, source: "ScheduleSource", cron: Union[str, "CronSpec"], *args: _FuncParams.args, **kwargs: _FuncParams.kwargs, - ) -> None: + ) -> CreatedSchedule[_ReturnType]: """ Function to schedule task with cron. @@ -161,7 +162,10 @@ async def schedule_cron( :param args: function's args. :param cron_offset: cron offset. :param kwargs: function's kwargs. + + :return: schedule id. """ + schedule_id = self.broker.id_generator() message = self._prepare_message(*args, **kwargs) cron_offset = None if isinstance(cron, CronSpec): @@ -169,26 +173,25 @@ async def schedule_cron( cron_offset = cron.offset else: cron_str = cron - await maybe_awaitable( - source.add_schedule( - ScheduledTask( - task_name=message.task_name, - labels=message.labels, - args=message.args, - kwargs=message.kwargs, - cron=cron_str, - cron_offset=cron_offset, - ), - ), + scheduled = ScheduledTask( + schedule_id=schedule_id, + task_name=message.task_name, + labels=message.labels, + args=message.args, + kwargs=message.kwargs, + cron=cron_str, + cron_offset=cron_offset, ) + await source.add_schedule(scheduled) + return CreatedSchedule(self, source, scheduled) - async def schedule_time( + async def schedule_by_time( self, source: "ScheduleSource", time: datetime, *args: _FuncParams.args, **kwargs: _FuncParams.kwargs, - ) -> None: + ) -> CreatedSchedule[_ReturnType]: """ Function to schedule task to run at specific time. @@ -197,18 +200,18 @@ async def schedule_time( :param args: function's args. :param kwargs: function's kwargs. """ + schedule_id = self.broker.id_generator() message = self._prepare_message(*args, **kwargs) - await maybe_awaitable( - source.add_schedule( - ScheduledTask( - task_name=message.task_name, - labels=message.labels, - args=message.args, - kwargs=message.kwargs, - time=time, - ), - ), + scheduled = ScheduledTask( + schedule_id=schedule_id, + task_name=message.task_name, + labels=message.labels, + args=message.args, + kwargs=message.kwargs, + time=time, ) + await source.add_schedule(scheduled) + return CreatedSchedule(self, source, scheduled) @classmethod def _prepare_arg(cls, arg: Any) -> Any: diff --git a/taskiq/scheduler/created_schedule.py b/taskiq/scheduler/created_schedule.py new file mode 100644 index 00000000..8e870834 --- /dev/null +++ b/taskiq/scheduler/created_schedule.py @@ -0,0 +1,59 @@ +from typing import TYPE_CHECKING, Any, Coroutine, Generic, TypeVar, overload + +from taskiq.abc.schedule_source import ScheduleSource +from taskiq.scheduler.scheduled_task import ScheduledTask +from taskiq.task import AsyncTaskiqTask + +if TYPE_CHECKING: + from taskiq.kicker import AsyncKicker + +_ReturnType = TypeVar("_ReturnType") +_T = TypeVar("_T") + + +class CreatedSchedule(Generic[_ReturnType]): + """A schedule that has been created.""" + + def __init__( + self, + kicker: "AsyncKicker[Any,_ReturnType]", + source: ScheduleSource, + task: ScheduledTask, + ) -> None: + self.kicker = kicker + self.source = source + self.task = task + self.schedule_id = task.schedule_id + + @overload + async def kiq( + self: "CreatedSchedule[Coroutine[Any,Any, _T]]", + ) -> AsyncTaskiqTask[_T]: + ... + + @overload + async def kiq(self: "CreatedSchedule[_ReturnType]") -> AsyncTaskiqTask[_ReturnType]: + ... + + async def kiq(self) -> Any: + """Kick the task as if you were not scheduling it.""" + return await self.kicker.kiq( + *self.task.args, + **self.task.kwargs, + ) + + async def unschedule(self) -> None: + """Unschedule the task.""" + await self.source.delete_schedule(self.task.schedule_id) + + def __str__(self) -> str: + return ( + "CreatedSchedule(" + f"id={self.schedule_id}, " + f"time={self.task.time}, " + f"cron={self.task.cron}, " + f"cron_offset={self.task.cron_offset or 'UTC'}, " + f"task_name={self.task.task_name}, " + f"args={self.task.args}, " + f"kwargs={self.task.kwargs})" + ) diff --git a/taskiq/scheduler/scheduled_task.py b/taskiq/scheduler/scheduled_task.py index 90f83741..4e567d32 100644 --- a/taskiq/scheduler/scheduled_task.py +++ b/taskiq/scheduler/scheduled_task.py @@ -1,3 +1,4 @@ +import uuid from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union @@ -28,6 +29,7 @@ class ScheduledTask: labels: Dict[str, Any] args: List[Any] kwargs: Dict[str, Any] + schedule_id: str = field(default_factory=lambda: uuid.uuid4().hex) cron: Optional[str] = field(default=None) cron_offset: Optional[Union[str, timedelta]] = field(default=None) time: Optional[datetime] = field(default=None) diff --git a/tests/schedule_sources/test_label_based.py b/tests/schedule_sources/test_label_based.py index 7226ee05..9e683917 100644 --- a/tests/schedule_sources/test_label_based.py +++ b/tests/schedule_sources/test_label_based.py @@ -30,6 +30,7 @@ def task() -> None: schedules = await source.get_schedules() assert schedules == [ ScheduledTask( + schedule_id=schedules[0].schedule_id, cron=schedule_label[0].get("cron"), time=schedule_label[0].get("time"), task_name="test_task", diff --git a/tests/scheduler/test_label_based_sched.py b/tests/scheduler/test_label_based_sched.py index f288f15d..156e8498 100644 --- a/tests/scheduler/test_label_based_sched.py +++ b/tests/scheduler/test_label_based_sched.py @@ -34,6 +34,7 @@ def task() -> None: schedules = await LabelScheduleSource(broker).get_schedules() assert schedules == [ ScheduledTask( + schedule_id=schedules[0].schedule_id, cron=schedule_label[0].get("cron"), time=schedule_label[0].get("time"), task_name="test_task",