Skip to main content

Middlewares

...

Middlewares

Middlewares are super helpful. You can inject some code before or after task's execution.

Middlewares must implement taskiq.abc.middleware.TaskiqMiddlewareopen in new window abstract class. Every method of a middleware can be either sync or async. Taskiq will execute it as you expect.

For example:

sync
from time import sleep
from typing import Any

from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult


class MyMiddleware(TaskiqMiddleware):
    def startup(self) -> None:
        print("RUN STARTUP")
        sleep(1)

    def shutdown(self) -> None:
        print("RUN SHUTDOWN")
        sleep(1)

    def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
        sleep(1)
        return message

    def post_save(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
        sleep(1)
        print("Post save")

Also, middlewares always have reference to the current broker in self.broker field. If you want to kick a message during the execution of some middleware hooks, you may use self.broker to do so.

Taskiq-pipelinesopen in new window uses middlewares to call next tasks.