Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/examples/extending/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ async def get_schedules(self) -> List["ScheduledTask"]:

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
async def add_schedule(self, schedule: "ScheduledTask") -> None:
return await super().add_schedule(schedule)
# This method can be either sync or async.
def add_schedule(self, schedule: "ScheduledTask") -> None:
print("New schedule added:", schedule)

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/schedule/intro.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from taskiq_aio_pika import AioPikaBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq.scheduler import TaskiqScheduler
from taskiq import TaskiqScheduler

broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")

Expand Down
3 changes: 2 additions & 1 deletion taskiq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from taskiq.middlewares.prometheus_middleware import PrometheusMiddleware
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
from taskiq.result import TaskiqResult
from taskiq.scheduler import ScheduledTask, TaskiqScheduler
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler
from taskiq.state import TaskiqState
from taskiq.task import AsyncTaskiqTask

Expand Down
7 changes: 5 additions & 2 deletions taskiq/abc/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import TYPE_CHECKING, Any, Coroutine, List, Union

if TYPE_CHECKING: # pragma: no cover
from taskiq.scheduler.scheduler import ScheduledTask
from taskiq.scheduler.scheduled_task import ScheduledTask


class ScheduleSource(ABC):
Expand All @@ -18,7 +18,10 @@ async def shutdown(self) -> None: # noqa: B027
async def get_schedules(self) -> List["ScheduledTask"]:
"""Get list of taskiq schedules."""

async def add_schedule(self, schedule: "ScheduledTask") -> None:
def add_schedule(
self,
schedule: "ScheduledTask",
) -> Union[None, Coroutine[Any, Any, None]]:
"""
Add a new schedule.

Expand Down
2 changes: 1 addition & 1 deletion taskiq/api/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from taskiq.cli.scheduler.run import run_scheduler_loop
from taskiq.scheduler import TaskiqScheduler
from taskiq.scheduler.scheduler import TaskiqScheduler


async def run_scheduler_task(
Expand Down
3 changes: 2 additions & 1 deletion taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.utils import import_object, import_tasks
from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler

logger = getLogger(__name__)

Expand Down
54 changes: 54 additions & 0 deletions taskiq/decor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -6,6 +7,7 @@
Dict,
Generic,
TypeVar,
Union,
overload,
)

Expand All @@ -16,6 +18,8 @@

if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.scheduler.scheduled_task import CronSpec

_T = TypeVar("_T")
_FuncParams = ParamSpec("_FuncParams")
Expand Down Expand Up @@ -93,6 +97,56 @@ async def kiq(
"""
return await self.kicker().kiq(*args, **kwargs)

async def schedule_by_cron(
self,
source: "ScheduleSource",
cron: Union[str, "CronSpec"],
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
"""
Schedule task to run on cron.

This method requires a schedule source,
which is capable of dynamically adding new schedules.

:param source: schedule source.
:param cron: cron string or a CronSpec instance.
:param args: function's arguments.
:param kwargs: function's key word arguments.
"""
await self.kicker().schedule_cron(
source,
cron,
*args,
**kwargs,
)

async def schedule_by_time(
self,
source: "ScheduleSource",
time: datetime,
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
"""
Schedule task to run on specific time.

This method requires a schedule source,
which is capable of dynamically adding new schedules.

:param source: schedule source.
:param time: time to run task.
:param args: function's arguments.
:param kwargs: function's key word arguments.
"""
await self.kicker().schedule_time(
source,
time,
*args,
**kwargs,
)

def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
"""
This function returns kicker object.
Expand Down
70 changes: 69 additions & 1 deletion taskiq/kicker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import asdict, is_dataclass
from datetime import datetime
from logging import getLogger
from typing import (
TYPE_CHECKING,
Expand All @@ -16,13 +17,16 @@
from typing_extensions import ParamSpec

from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.compat import model_dump
from taskiq.exceptions import SendTaskError
from taskiq.message import TaskiqMessage
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
from taskiq.task import AsyncTaskiqTask
from taskiq.utils import maybe_awaitable

if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.schedule_source import ScheduleSource

_T = TypeVar("_T")
_FuncParams = ParamSpec("_FuncParams")
Expand Down Expand Up @@ -142,6 +146,70 @@ async def kiq(
result_backend=self.broker.result_backend,
)

async def schedule_cron(
self,
source: "ScheduleSource",
cron: Union[str, "CronSpec"],
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
"""
Function to schedule task with cron.

:param source: schedule source.
:param cron: cron expression.
:param args: function's args.
:param cron_offset: cron offset.
:param kwargs: function's kwargs.
"""
message = self._prepare_message(*args, **kwargs)
cron_offset = None
if isinstance(cron, CronSpec):
cron_str = cron.to_cron()
cron_offset = cron.offset
else:
cron_str = cron
await maybe_awaitable(
source.add_schedule(
ScheduledTask(
task_name=message.task_name,
labels=message.labels,
args=message.args,
kwargs=message.kwargs,
cron=cron_str,
cron_offset=cron_offset,
),
),
)

async def schedule_time(
self,
source: "ScheduleSource",
time: datetime,
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> None:
"""
Function to schedule task to run at specific time.

:param source: schedule source.
:param time: time to run task at.
:param args: function's args.
:param kwargs: function's kwargs.
"""
message = self._prepare_message(*args, **kwargs)
await maybe_awaitable(
source.add_schedule(
ScheduledTask(
task_name=message.task_name,
labels=message.labels,
args=message.args,
kwargs=message.kwargs,
time=time,
),
),
)

@classmethod
def _prepare_arg(cls, arg: Any) -> Any:
"""
Expand All @@ -154,7 +222,7 @@ def _prepare_arg(cls, arg: Any) -> Any:
:return: Formatted argument.
"""
if isinstance(arg, BaseModel):
arg = arg.dict()
arg = model_dump(arg)
if is_dataclass(arg):
arg = asdict(arg)
return arg
Expand Down
2 changes: 1 addition & 1 deletion taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.scheduler.scheduler import ScheduledTask
from taskiq.scheduler.scheduled_task import ScheduledTask

logger = getLogger(__name__)

Expand Down
9 changes: 0 additions & 9 deletions taskiq/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1 @@
"""Scheduler package."""
from taskiq.scheduler.merge_functions import only_unique, preserve_all
from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler

__all__ = [
"only_unique",
"preserve_all",
"ScheduledTask",
"TaskiqScheduler",
]
2 changes: 1 addition & 1 deletion taskiq/scheduler/merge_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import TYPE_CHECKING, List

if TYPE_CHECKING: # pragma: no cover
from taskiq.scheduler.scheduler import ScheduledTask
from taskiq.scheduler.scheduled_task import ScheduledTask


def preserve_all(
Expand Down
42 changes: 42 additions & 0 deletions taskiq/scheduler/scheduled_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union


@dataclass
class CronSpec:
"""Cron specification for running tasks."""

minutes: Optional[Union[str, int]] = "*"
hours: Optional[Union[str, int]] = "*"
days: Optional[Union[str, int]] = "*"
months: Optional[Union[str, int]] = "*"
weekdays: Optional[Union[str, int]] = "*"

offset: Optional[Union[str, timedelta]] = None

def to_cron(self) -> str:
"""Converts cron spec to cron string."""
return f"{self.minutes} {self.hours} {self.days} {self.months} {self.weekdays}"


@dataclass(frozen=True, eq=True)
class ScheduledTask:
"""Abstraction over task schedule."""

task_name: str
labels: Dict[str, Any]
args: List[Any]
kwargs: Dict[str, Any]
cron: Optional[str] = field(default=None)
cron_offset: Optional[Union[str, timedelta]] = field(default=None)
time: Optional[datetime] = field(default=None)

def __post_init__(self) -> None:
"""
This method validates, that either `cron` or `time` field is present.

:raises ValueError: if cron and time are none.
"""
if self.cron is None and self.time is None:
raise ValueError("Either cron or datetime must be present.")
31 changes: 4 additions & 27 deletions taskiq/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,21 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Callable, List

from taskiq.abc.broker import AsyncBroker
from taskiq.kicker import AsyncKicker
from taskiq.scheduler.merge_functions import only_new
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.utils import maybe_awaitable

if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.schedule_source import ScheduleSource


@dataclass(frozen=True, eq=True)
class ScheduledTask:
"""Abstraction over task schedule."""

task_name: str
labels: Dict[str, Any]
args: List[Any]
kwargs: Dict[str, Any]
cron: Optional[str] = field(default=None)
cron_offset: Optional[Union[str, timedelta]] = field(default=None)
time: Optional[datetime] = field(default=None)

def __post_init__(self) -> None:
"""
This method validates, that either `cron` or `time` field is present.

:raises ValueError: if cron and time are none.
"""
if self.cron is None and self.time is None:
raise ValueError("Either cron or datetime must be present.")


class TaskiqScheduler:
"""Scheduler class."""

def __init__(
self,
broker: AsyncBroker,
broker: "AsyncBroker",
sources: List["ScheduleSource"],
merge_func: Callable[
[List["ScheduledTask"], List["ScheduledTask"]],
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/scheduler/test_task_delays.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tzlocal import get_localzone

from taskiq.cli.scheduler.run import get_task_delay
from taskiq.scheduler.scheduler import ScheduledTask
from taskiq.scheduler.scheduled_task import ScheduledTask


def test_should_run_success() -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/schedule_sources/test_label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.schedule_sources.label_based import LabelScheduleSource
from taskiq.scheduler.scheduler import ScheduledTask
from taskiq.scheduler.scheduled_task import ScheduledTask


@pytest.mark.anyio
Expand Down
3 changes: 2 additions & 1 deletion tests/scheduler/test_label_based_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import run_scheduler
from taskiq.schedule_sources.label_based import LabelScheduleSource
from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler


@pytest.mark.anyio
Expand Down