Skip to main content

Brokers

...

Brokers

To add a new broker you need to implement two methods kick and listen of the taskiq.abc.broker.AsyncBroker abstract class. But along with them we have helper methods. Such as shutdown and startup.

Here is a template for new brokers:

from typing import AsyncGenerator, Union

from taskiq import AckableMessage, AsyncBroker, BrokerMessage


class MyBroker(AsyncBroker):
    def __init__(self) -> None:
        # Please call this super method to set default values to
        # many different fields.
        super().__init__()

    async def startup(self) -> None:
        # Here you can do some startup magic.
        # Like opening a connection.
        return await super().startup()

    async def shutdown(self) -> None:
        # Here you can perform shutdown operations.
        # Like closing connections.
        return await super().shutdown()

    async def kick(self, message: BrokerMessage) -> None:
        # Send a message.message.
        pass

    async def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
        while True:
            # Get new message.
            new_message: bytes = ...  # type: ignore
            # Yield it!
            yield new_message

About kick and listen

The kick method takes a BrokerMessage as a parameter. The BrokerMessage class is a handy helper class for brokers. You can use information from the BrokerMessage to alter the delivery method.

"cool warning!"

As a broker developer, please send only raw bytes from the message field of a BrokerMessage if possible. Serializing it to the string may result in a problem if message bytes are not utf-8 compatible.

Acknowledgement

The listen method should yield raw bytes of a message. But if your broker supports acking or rejecting messages, the broker should return taskiq.AckableMessage with required fields.

For example:


async def listen(self) -> AsyncGenerator[AckableMessage, None]:
   for message in self.my_channel:
      yield AckableMessage(
         data=message.bytes,
         # Ack is a function that takes no parameters.
         # So you either set here method of a message,
         # or you can make a closure.
         ack=message.ack
         # Can be set to None if broker doesn't support it.
         reject=message.reject
      )

Conventions

For brokers, we have several conventions. It's good if your broker implements them. These rules are optional, and it's ok if your broker doesn't implement them.

  1. If the message has the delay label with int or float number, this task's execution must be delayed with the same number of seconds as in the delay label.
  2. If the message has the priority label, this message must be sent with priority. Tasks with higher priorities are executed sooner.