Skip to main content

Dynamic Environments

...

This article is for people who want to:

  • Create brokers dynamically.
  • Register tasks, and run them inside their code.
  • Implement more complex logic.

Taskiq allows you to set up broker instances throughout your application and register tasks for dynamic execution. However, tasks created this way won't be found by the taskiq worker command.

To define tasks and assign them to a broker, use register_task method.

import asyncio

from taskiq_redis import ListQueueBroker


async def main() -> None:
    # Here we define a broker.
    dyn_broker = ListQueueBroker("redis://localhost")
    await dyn_broker.startup()

    # Now we register lambda as a task.
    dyn_task = dyn_broker.register_task(
        lambda x: print("A", x),
        task_name="dyn_task",
    )

    # now we can send it.
    await dyn_task.kiq(x=1)

    await dyn_broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

In this example, the task is defined using a lambda within the main function. As the lambda is not visible outside of the main function scope, the task is not executable by taskiq worker command.

To overcome this issue, you can:

  • Create a dynamic worker task within the current event loop.
  • Implement your own broker listener with the information about all of your tasks.

Here's an example of a dynamic worker task creation:

import asyncio

from taskiq_redis import ListQueueBroker

from taskiq.api import run_receiver_task


async def main() -> None:
    # Here we define a broker.
    dyn_broker = ListQueueBroker("redis://localhost")
    await dyn_broker.startup()
    worker_task = asyncio.create_task(run_receiver_task(dyn_broker))

    # Now we register lambda as a task.
    dyn_task = dyn_broker.register_task(
        lambda x: print("A", x),
        task_name="dyn_task",
    )

    # Now we can send it.
    await dyn_task.kiq(x=1)

    await asyncio.sleep(2)

    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        print("Worker successfully exited.")

    await dyn_broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

In this example, a named dynamic lambda task is created and registered in a broker, similar to the previous example. The difference is the creation of a new receiver coroutine for the worker task. It will listen to the new messages and execute them. The worker task will be executed in the current event loop. After exiting the scope, the worker task will get cancelled. For illustration purposes it is cancelled explicitly.

It's possible to run a scheduler in the current event loop as well:

import asyncio
import datetime

from taskiq_redis import ListQueueBroker

from taskiq import TaskiqScheduler
from taskiq.api import run_receiver_task, run_scheduler_task
from taskiq.schedule_sources import LabelScheduleSource


async def main() -> None:
    # Here we define a broker.
    dyn_broker = ListQueueBroker("redis://localhost")
    dyn_scheduler = TaskiqScheduler(dyn_broker, [LabelScheduleSource(dyn_broker)])

    await dyn_broker.startup()

    # Now we register lambda as a task.
    dyn_task = dyn_broker.register_task(
        lambda x: print("A", x),
        task_name="dyn_task",
        # We add a schedule when to run task.
        schedule=[
            {
                # Here we also can specify cron instead of time.
                "time": datetime.datetime.utcnow() + datetime.timedelta(seconds=2),
                "args": [22],
            },
        ],
    )

    # We create scheduler after the task declaration,
    # so we don't have to wait a minute before it gets to the task.
    # However, defining a scheduler before the task declaration is also possible.
    # But we have to wait till it gets to task execution for the second time.
    worker_task = asyncio.create_task(run_receiver_task(dyn_broker))
    scheduler_task = asyncio.create_task(run_scheduler_task(dyn_scheduler))

    # We still able to send the task.
    await dyn_task.kiq(x=1)

    await asyncio.sleep(10)

    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        print("Worker successfully exited.")

    scheduler_task.cancel()
    try:
        await scheduler_task
    except asyncio.CancelledError:
        print("Scheduler successfully exited.")

    await dyn_broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())