Brokers
Brokers
To add a new broker you need to implement two methods kick
and listen
of the taskiq.abc.broker.AsyncBroker
abstract class. But along with them we have helper methods. Such as shutdown and startup.
Here is a template for new brokers:
from typing import AsyncGenerator, Union
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
class MyBroker(AsyncBroker):
def __init__(self) -> None:
# Please call this super method to set default values to
# many different fields.
super().__init__()
async def startup(self) -> None:
# Here you can do some startup magic.
# Like opening a connection.
return await super().startup()
async def shutdown(self) -> None:
# Here you can perform shutdown operations.
# Like closing connections.
return await super().shutdown()
async def kick(self, message: BrokerMessage) -> None:
# Send a message.message.
pass
async def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
while True:
# Get new message.
new_message: bytes = ... # type: ignore
# Yield it!
yield new_message
About kick and listen
The kick
method takes a BrokerMessage
as a parameter. The BrokerMessage
class is a handy helper class for brokers. You can use information from the BrokerMessage to alter the delivery method.
"cool warning!"
As a broker developer, please send only raw bytes from the message
field of a BrokerMessage if possible. Serializing it to the string may result in a problem if message bytes are not utf-8 compatible.
Acknowledgement
The listen
method should yield raw bytes of a message. But if your broker supports acking or rejecting messages, the broker should return taskiq.AckableMessage
with required fields.
For example:
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
for message in self.my_channel:
yield AckableMessage(
data=message.bytes,
# Ack is a function that takes no parameters.
# So you either set here method of a message,
# or you can make a closure.
ack=message.ack
# Can be set to None if broker doesn't support it.
reject=message.reject
)
Conventions
For brokers, we have several conventions. It's good if your broker implements them. These rules are optional, and it's ok if your broker doesn't implement them.
- If the message has the
delay
label with int or float number, this task'sexecution
must be delayed with the same number of seconds as in the delay label. - If the message has the
priority
label, this message must be sent with priority. Tasks with higher priorities are executed sooner.