Skip to main content

Getting started

...

Getting started

Installation

You can install taskiq from pypi or directly from git using pip:

pypi
pip install taskiq

After installation of the core library, you need to find the broker that fits you. You can do it using PyPIopen in new window search.

Cool tip!

We highly recommend taskiq-aio-pikaopen in new window or taskiq-natsopen in new window as the broker and taskiq-redisopen in new window as the result backend for production use.

Running tasks

Now you need to create a python module with broker declaration. It's just a plain python file with the variable of your broker. For this particular example, I'm going to use the InMemoryBroker.

Important note

The InMemoryBroker doesn't send any data over the network, and you cannot use this broker in a real-world scenario, but it's still useful for local development if you do not want to set up a taskiq worker.

# broker.py
from taskiq import InMemoryBroker

broker = InMemoryBroker()

And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further. Also, we call the startup method at the beginning of the main function.

# broker.py
import asyncio

from taskiq import InMemoryBroker

broker = InMemoryBroker()


@broker.task
async def add_one(value: int) -> int:
    return value + 1


async def main() -> None:
    # Never forget to call startup in the beginning.
    await broker.startup()
    # Send the task to the broker.
    task = await add_one.kiq(1)
    # Wait for the result.
    result = await task.wait_result(timeout=2)
    print(f"Task execution took: {result.execution_time} seconds.")
    if not result.is_err:
        print(f"Returned value: {result.return_value}")
    else:
        print("Error found while executing task.")
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Cool warning!

Calling the startup method is necessary. If you don't call it, you may get an undefined behaviour.

If you run this code, you will get this in your terminal:

❯ python mybroker.py
Task execution took: 7.3909759521484375e-06 seconds.
Returned value: 2

Ok, the code of the task execution is a little bit fancier than an ordinary function call, but it's still relatively simple to understand. To send a task to the broker, you need to call the .kiq method on the function, it returns the TaskiqTask object that can check whether the result is ready or not. Also it has methods to wait for the result to become available.

You can get more information about taskiq types, CLI and internal structure in the "Architecture overview" section.

Distributed run

Now let's change InMemoryBroker to some distributed broker instead. In this example we are going to use broker that works with rabbitMQ.

At first we must install the taskiq-aio-pikaopen in new window lib.

pip install taskiq-aio-pika

After the installation, replace the broker we defined earlier with the broker from the taskiq-aio-pika.

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker('amqp://guest:guest@localhost:5672')

Also, AioPika broker requires to call startup before using it. Add this line at the beginning of the main function.

await broker.startup()

That's all you need to do.

Complete code
# broker.py
import asyncio

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker("amqp://guest:guest@localhost:5672")


@broker.task
async def add_one(value: int) -> int:
    return value + 1


async def main() -> None:
    await broker.startup()
    # Send the task to the broker.
    task = await add_one.kiq(1)
    # Wait for the result.
    result = await task.wait_result(timeout=2)
    print(f"Task execution took: {result.execution_time} seconds.")
    if not result.is_err:
        print(f"Returned value: {result.return_value}")
    else:
        print("Error found while executing task.")
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Let's run the worker process. First of all, we need rabbitMQ up and running. I highly recommend you use docker.

linux|macos
docker run --rm -d \
    -p "5672:5672" \
    -p "15672:15672" \
    --env "RABBITMQ_DEFAULT_USER=guest" \
    --env "RABBITMQ_DEFAULT_PASS=guest" \
    --env "RABBITMQ_DEFAULT_VHOST=/" \
    rabbitmq:3.8.27-management-alpine

Now we need to start worker process by running taskiq cli command. You can get more info about the CLI in the CLI section.

taskiq worker broker:broker

After the worker is up, we can run our script as an ordinary python file and see how the worker executes tasks.

$ python broker.py
Task execution took: 0.0 seconds.
Returned value: None

But the printed result value is not correct. That happens because we didn't provide any result backend that can store results of task execution. To store results, we can use the taskiq-redisopen in new window library.

pip install taskiq-redis

After the installation, add a new result backend to the broker.

from taskiq_redis import RedisAsyncResultBackend

broker = AioPikaBroker(
    "amqp://guest:guest@localhost:5672",
).with_result_backend(RedisAsyncResultBackend("redis://localhost"))

Now we need to start redis.

linux|macos
docker run --rm -d \
    -p "6379:6379" \
    redis
Complete code
# broker.py
import asyncio

from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend

broker = AioPikaBroker(
    "amqp://guest:guest@localhost:5672",
).with_result_backend(RedisAsyncResultBackend("redis://localhost"))


@broker.task
async def add_one(value: int) -> int:
    return value + 1


async def main() -> None:
    await broker.startup()
    # Send the task to the broker.
    task = await add_one.kiq(1)
    # Wait for the result.
    result = await task.wait_result(timeout=2)
    print(f"Task execution took: {result.execution_time} seconds.")
    if not result.is_err:
        print(f"Returned value: {result.return_value}")
    else:
        print("Error found while executing task.")
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Let's run taskiq once again. The command is the same.

taskiq worker broker:broker

Now, if we run this file with python, we can get the correct results with a valid execution time.

$ python broker.py
Task execution took: 1.0013580322265625e-05 seconds.
Returned value: 2

Continue reading to get more information about taskiq internals.

Timeouts

If you want to restrict amount of time you want to run task, just add timeout label to the task.

You can do it either with decorator or when calling the task.

decorator
@broker.task(timeout=0.1)
async def mytask():
    await asyncio.sleep(2)

Cool alert

We use run_in_executoropen in new window method to run sync functions. Timeouts will raise a TimeoutException, but synchronous function may not stop from execution. This is a constraint of python.