Middlewares
...
Middlewares
Middlewares are super helpful. You can inject some code before or after task's execution.
Middlewares must implement taskiq.abc.middleware.TaskiqMiddleware
abstract class. Every method of a middleware can be either sync or async. Taskiq will execute it as you expect.
For example:
sync
from time import sleep
from typing import Any
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
class MyMiddleware(TaskiqMiddleware):
def startup(self) -> None:
print("RUN STARTUP")
sleep(1)
def shutdown(self) -> None:
print("RUN SHUTDOWN")
sleep(1)
def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
sleep(1)
return message
def post_save(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
sleep(1)
print("Post save")
async
from asyncio import sleep
from typing import Any
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
class MyMiddleware(TaskiqMiddleware):
async def startup(self) -> None:
print("RUN STARTUP")
await sleep(1)
async def shutdown(self) -> None:
print("RUN SHUTDOWN")
await sleep(1)
async def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage:
await sleep(1)
return message
async def post_save(
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
) -> None:
await sleep(1)
print("Post save")
Also, middlewares always have reference to the current broker in self.broker
field. If you want to kick a message during the execution of some middleware hooks, you may use self.broker
to do so.
Taskiq-pipelines uses middlewares to call next tasks.