Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions docs/examples/extending/schedule_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import Any, Coroutine, List

from taskiq import ScheduledTask, ScheduleSource

Expand All @@ -24,10 +24,14 @@ async def get_schedules(self) -> List["ScheduledTask"]:

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
# This method can be either sync or async.
def add_schedule(self, schedule: "ScheduledTask") -> None:
async def add_schedule(self, schedule: "ScheduledTask") -> None:
print("New schedule added:", schedule)

# This method is completely optional, but if you want to support
# schedule cancelation, you must implement it.
async def delete_schedule(self, schedule_id: str) -> None:
print("Deleting schedule:", schedule_id)

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
async def pre_send(self, task: "ScheduledTask") -> None:
Expand Down
2 changes: 2 additions & 0 deletions docs/extending-taskiq/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ To create new `schedule source` you have to implement the `taskiq.abc.schedule_s
Here's a minimal example of a schedule source:

@[code python](../examples/extending/schedule_source.py)

You can implement a schedule source that write schedules in the database and have delayed tasks in runtime.
17 changes: 15 additions & 2 deletions taskiq/abc/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ async def shutdown(self) -> None: # noqa: B027
async def get_schedules(self) -> List["ScheduledTask"]:
"""Get list of taskiq schedules."""

def add_schedule(
async def add_schedule(
self,
schedule: "ScheduledTask",
) -> Union[None, Coroutine[Any, Any, None]]:
) -> None:
"""
Add a new schedule.

Expand All @@ -40,6 +40,19 @@ def add_schedule(
f"The source {self.__class__.__name__} does not support adding schedules.",
)

async def delete_schedule(self, schedule_id: str) -> None:
"""
Method to delete schedule by id.

This is useful for schedule cancelation.

:param schedule_id: id of schedule to delete.
"""
raise NotImplementedError(
f"The source {self.__class__.__name__} does "
"not support deleting schedules.",
)

def pre_send( # noqa: B027
self,
task: "ScheduledTask",
Expand Down
11 changes: 7 additions & 4 deletions taskiq/decor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing_extensions import ParamSpec

from taskiq.kicker import AsyncKicker
from taskiq.scheduler.created_schedule import CreatedSchedule
from taskiq.task import AsyncTaskiqTask

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -103,7 +104,7 @@ async def schedule_by_cron(
cron: Union[str, "CronSpec"],
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
) -> CreatedSchedule[_ReturnType]:
"""
Schedule task to run on cron.

Expand All @@ -114,8 +115,9 @@ async def schedule_by_cron(
:param cron: cron string or a CronSpec instance.
:param args: function's arguments.
:param kwargs: function's key word arguments.
:return: schedule id.
"""
await self.kicker().schedule_cron(
return await self.kicker().schedule_by_cron(
source,
cron,
*args,
Expand All @@ -128,7 +130,7 @@ async def schedule_by_time(
time: datetime,
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
) -> CreatedSchedule[_ReturnType]:
"""
Schedule task to run on specific time.

Expand All @@ -139,8 +141,9 @@ async def schedule_by_time(
:param time: time to run task.
:param args: function's arguments.
:param kwargs: function's key word arguments.
:return: schedule id.
"""
await self.kicker().schedule_time(
return await self.kicker().schedule_by_time(
source,
time,
*args,
Expand Down
53 changes: 28 additions & 25 deletions taskiq/kicker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from taskiq.compat import model_dump
from taskiq.exceptions import SendTaskError
from taskiq.message import TaskiqMessage
from taskiq.scheduler.created_schedule import CreatedSchedule
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
from taskiq.task import AsyncTaskiqTask
from taskiq.utils import maybe_awaitable
Expand Down Expand Up @@ -146,13 +147,13 @@ async def kiq(
result_backend=self.broker.result_backend,
)

async def schedule_cron(
async def schedule_by_cron(
self,
source: "ScheduleSource",
cron: Union[str, "CronSpec"],
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
) -> CreatedSchedule[_ReturnType]:
"""
Function to schedule task with cron.

Expand All @@ -161,34 +162,36 @@ async def schedule_cron(
:param args: function's args.
:param cron_offset: cron offset.
:param kwargs: function's kwargs.

:return: schedule id.
"""
schedule_id = self.broker.id_generator()
message = self._prepare_message(*args, **kwargs)
cron_offset = None
if isinstance(cron, CronSpec):
cron_str = cron.to_cron()
cron_offset = cron.offset
else:
cron_str = cron
await maybe_awaitable(
source.add_schedule(
ScheduledTask(
task_name=message.task_name,
labels=message.labels,
args=message.args,
kwargs=message.kwargs,
cron=cron_str,
cron_offset=cron_offset,
),
),
scheduled = ScheduledTask(
schedule_id=schedule_id,
task_name=message.task_name,
labels=message.labels,
args=message.args,
kwargs=message.kwargs,
cron=cron_str,
cron_offset=cron_offset,
)
await source.add_schedule(scheduled)
return CreatedSchedule(self, source, scheduled)

async def schedule_time(
async def schedule_by_time(
self,
source: "ScheduleSource",
time: datetime,
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
) -> CreatedSchedule[_ReturnType]:
"""
Function to schedule task to run at specific time.

Expand All @@ -197,18 +200,18 @@ async def schedule_time(
:param args: function's args.
:param kwargs: function's kwargs.
"""
schedule_id = self.broker.id_generator()
message = self._prepare_message(*args, **kwargs)
await maybe_awaitable(
source.add_schedule(
ScheduledTask(
task_name=message.task_name,
labels=message.labels,
args=message.args,
kwargs=message.kwargs,
time=time,
),
),
scheduled = ScheduledTask(
schedule_id=schedule_id,
task_name=message.task_name,
labels=message.labels,
args=message.args,
kwargs=message.kwargs,
time=time,
)
await source.add_schedule(scheduled)
return CreatedSchedule(self, source, scheduled)

@classmethod
def _prepare_arg(cls, arg: Any) -> Any:
Expand Down
59 changes: 59 additions & 0 deletions taskiq/scheduler/created_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import TYPE_CHECKING, Any, Coroutine, Generic, TypeVar, overload

from taskiq.abc.schedule_source import ScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.task import AsyncTaskiqTask

if TYPE_CHECKING:
from taskiq.kicker import AsyncKicker

_ReturnType = TypeVar("_ReturnType")
_T = TypeVar("_T")


class CreatedSchedule(Generic[_ReturnType]):
"""A schedule that has been created."""

def __init__(
self,
kicker: "AsyncKicker[Any,_ReturnType]",
source: ScheduleSource,
task: ScheduledTask,
) -> None:
self.kicker = kicker
self.source = source
self.task = task
self.schedule_id = task.schedule_id

@overload
async def kiq(
self: "CreatedSchedule[Coroutine[Any,Any, _T]]",
) -> AsyncTaskiqTask[_T]:
...

@overload
async def kiq(self: "CreatedSchedule[_ReturnType]") -> AsyncTaskiqTask[_ReturnType]:
...

async def kiq(self) -> Any:
"""Kick the task as if you were not scheduling it."""
return await self.kicker.kiq(
*self.task.args,
**self.task.kwargs,
)

async def unschedule(self) -> None:
"""Unschedule the task."""
await self.source.delete_schedule(self.task.schedule_id)

def __str__(self) -> str:
return (
"CreatedSchedule("
f"id={self.schedule_id}, "
f"time={self.task.time}, "
f"cron={self.task.cron}, "
f"cron_offset={self.task.cron_offset or 'UTC'}, "
f"task_name={self.task.task_name}, "
f"args={self.task.args}, "
f"kwargs={self.task.kwargs})"
)
2 changes: 2 additions & 0 deletions taskiq/scheduler/scheduled_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
Expand Down Expand Up @@ -28,6 +29,7 @@ class ScheduledTask:
labels: Dict[str, Any]
args: List[Any]
kwargs: Dict[str, Any]
schedule_id: str = field(default_factory=lambda: uuid.uuid4().hex)
cron: Optional[str] = field(default=None)
cron_offset: Optional[Union[str, timedelta]] = field(default=None)
time: Optional[datetime] = field(default=None)
Expand Down
1 change: 1 addition & 0 deletions tests/schedule_sources/test_label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def task() -> None:
schedules = await source.get_schedules()
assert schedules == [
ScheduledTask(
schedule_id=schedules[0].schedule_id,
cron=schedule_label[0].get("cron"),
time=schedule_label[0].get("time"),
task_name="test_task",
Expand Down
1 change: 1 addition & 0 deletions tests/scheduler/test_label_based_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def task() -> None:
schedules = await LabelScheduleSource(broker).get_schedules()
assert schedules == [
ScheduledTask(
schedule_id=schedules[0].schedule_id,
cron=schedule_label[0].get("cron"),
time=schedule_label[0].get("time"),
task_name="test_task",
Expand Down