Middlewares
October 30, 2025
Middlewares
Middlewares are super helpful. You can inject some code before or after task's execution.
Middlewares must implement taskiq.abc.middleware.TaskiqMiddleware abstract class. Every method of a middleware can be either sync or async. Taskiq will execute it as you expect.
For example:
sync
from time import sleep
from typing import Any
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
class MyMiddleware(TaskiqMiddleware):
    def startup(self) -> None:
        print("RUN STARTUP")
        sleep(1)
    def shutdown(self) -> None:
        print("RUN SHUTDOWN")
        sleep(1)
    def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
        sleep(1)
        return message
    def post_save(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
        sleep(1)
        print("Post save")async
from asyncio import sleep
from typing import Any
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
class MyMiddleware(TaskiqMiddleware):
    async def startup(self) -> None:
        print("RUN STARTUP")
        await sleep(1)
    async def shutdown(self) -> None:
        print("RUN SHUTDOWN")
        await sleep(1)
    async def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
        await sleep(1)
        return message
    async def post_save(
        self,
        message: "TaskiqMessage",
        result: "TaskiqResult[Any]",
    ) -> None:
        await sleep(1)
        print("Post save")Also, middlewares always have reference to the current broker in self.broker field. If you want to kick a message during the execution of some middleware hooks, you may use self.broker to do so.
Taskiq-pipelines uses middlewares to call next tasks.
Contributors
Dmitrii Anfimov