Skip to main content

Schedule source

...

Schedule source

Schedule sources are used to get schedule for tasks. To create new schedule source you have to implement the taskiq.abc.schedule_source.ScheduleSource abstract class.

Here's a minimal example of a schedule source:

from typing import List

from taskiq import ScheduledTask, ScheduleSource


class MyScheduleSource(ScheduleSource):
    async def startup(self) -> None:
        """Do something when starting broker."""

    async def shutdown(self) -> None:
        """Do something on shutdown."""

    async def get_schedules(self) -> List["ScheduledTask"]:
        # Here you must return list of scheduled tasks from your source.
        return [
            ScheduledTask(
                task_name="",
                labels={},
                args=[],
                kwargs={},
                cron="* * * * *",
            ),
        ]

    # This method is optional. You may not implement this.
    # It's just a helper to people to be able to interact with your source.
    async def add_schedule(self, schedule: "ScheduledTask") -> None:
        print("New schedule added:", schedule)

    # This method is completely optional, but if you want to support
    # schedule cancelation, you must implement it.
    async def delete_schedule(self, schedule_id: str) -> None:
        print("Deleting schedule:", schedule_id)

    # This method is optional. You may not implement this.
    # It's just a helper to people to be able to interact with your source.
    async def pre_send(self, task: "ScheduledTask") -> None:
        """
        Actions to execute before task will be sent to broker.

        This method may raise ScheduledTaskCancelledError.
        This cancels the task execution.

        :param task: task that will be sent
        """

    # This method is optional. You may not implement this.
    # It's just a helper to people to be able to interact with your source.
    async def post_send(self, task: "ScheduledTask") -> None:
        """
        Actions to execute after task was sent to broker.

        :param task: task that just have sent
        """

You can implement a schedule source that write schedules in the database and have delayed tasks in runtime.