Getting started
Getting started
Installation
You can install taskiq from pypi or directly from git using pip:
pip install taskiq
pip install git+https://github.com/taskiq-python/taskiq.git
After installation of the core library, you need to find the broker that fits you. You can do it using PyPI search.
Cool tip!
We highly recommend taskiq-aio-pika or taskiq-nats as the broker and taskiq-redis as the result backend for production use.
Running tasks
Now you need to create a python module with broker declaration. It's just a plain python file with the variable of your broker. For this particular example, I'm going to use the InMemoryBroker
.
Important note
The InMemoryBroker doesn't send any data over the network, and you cannot use this broker in a real-world scenario, but it's still useful for local development if you do not want to set up a taskiq worker.
# broker.py
from taskiq import InMemoryBroker
broker = InMemoryBroker()
And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further. Also, we call the startup
method at the beginning of the main
function.
# broker.py
import asyncio
from taskiq import InMemoryBroker
broker = InMemoryBroker()
@broker.task
async def add_one(value: int) -> int:
return value + 1
async def main() -> None:
# Never forget to call startup in the beginning.
await broker.startup()
# Send the task to the broker.
task = await add_one.kiq(1)
# Wait for the result.
result = await task.wait_result(timeout=2)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Cool warning!
Calling the startup
method is necessary. If you don't call it, you may get an undefined behaviour.
If you run this code, you will get this in your terminal:
❯ python mybroker.py
Task execution took: 7.3909759521484375e-06 seconds.
Returned value: 2
Ok, the code of the task execution is a little bit fancier than an ordinary function call, but it's still relatively simple to understand. To send a task to the broker, you need to call the .kiq
method on the function, it returns the TaskiqTask
object that can check whether the result is ready or not. Also it has methods to wait for the result to become available.
You can get more information about taskiq types, CLI and internal structure in the "Architecture overview" section.
Distributed run
Now let's change InMemoryBroker to some distributed broker instead. In this example we are going to use broker that works with rabbitMQ.
At first we must install the taskiq-aio-pika lib.
pip install taskiq-aio-pika
After the installation, replace the broker we defined earlier with the broker from the taskiq-aio-pika
.
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker('amqp://guest:guest@localhost:5672')
Also, AioPika broker requires to call startup before using it. Add this line at the beginning of the main function.
await broker.startup()
That's all you need to do.
Complete code
# broker.py
import asyncio
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker("amqp://guest:guest@localhost:5672")
@broker.task
async def add_one(value: int) -> int:
return value + 1
async def main() -> None:
await broker.startup()
# Send the task to the broker.
task = await add_one.kiq(1)
# Wait for the result.
result = await task.wait_result(timeout=2)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Let's run the worker process. First of all, we need rabbitMQ up and running. I highly recommend you use docker.
docker run --rm -d \
-p "5672:5672" \
-p "15672:15672" \
--env "RABBITMQ_DEFAULT_USER=guest" \
--env "RABBITMQ_DEFAULT_PASS=guest" \
--env "RABBITMQ_DEFAULT_VHOST=/" \
rabbitmq:3.8.27-management-alpine
docker run --rm -d ^
-p "5672:5672" ^
-p "15672:15672" ^
--env "RABBITMQ_DEFAULT_USER=guest" ^
--env "RABBITMQ_DEFAULT_PASS=guest" ^
--env "RABBITMQ_DEFAULT_VHOST=/" ^
rabbitmq:3.8.27-management-alpine
Now we need to start worker process by running taskiq cli command. You can get more info about the CLI in the CLI section.
taskiq worker broker:broker
After the worker is up, we can run our script as an ordinary python file and see how the worker executes tasks.
$ python broker.py
Task execution took: 0.0 seconds.
Returned value: None
But the printed result value is not correct. That happens because we didn't provide any result backend that can store results of task execution. To store results, we can use the taskiq-redis library.
pip install taskiq-redis
After the installation, add a new result backend to the broker.
from taskiq_redis import RedisAsyncResultBackend
broker = AioPikaBroker(
"amqp://guest:guest@localhost:5672",
).with_result_backend(RedisAsyncResultBackend("redis://localhost"))
Now we need to start redis.
docker run --rm -d \
-p "6379:6379" \
redis
docker run --rm -d ^
-p "6379:6379" ^
redis
Complete code
# broker.py
import asyncio
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend
broker = AioPikaBroker(
"amqp://guest:guest@localhost:5672",
).with_result_backend(RedisAsyncResultBackend("redis://localhost"))
@broker.task
async def add_one(value: int) -> int:
return value + 1
async def main() -> None:
await broker.startup()
# Send the task to the broker.
task = await add_one.kiq(1)
# Wait for the result.
result = await task.wait_result(timeout=2)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Let's run taskiq once again. The command is the same.
taskiq worker broker:broker
Now, if we run this file with python, we can get the correct results with a valid execution time.
$ python broker.py
Task execution took: 1.0013580322265625e-05 seconds.
Returned value: 2
Continue reading to get more information about taskiq internals.
Timeouts
If you want to restrict amount of time you want to run task, just add timeout label to the task.
You can do it either with decorator or when calling the task.
@broker.task(timeout=0.1)
async def mytask():
await asyncio.sleep(2)
await my_task.kicker().with_labels(timeout=0.3).kiq()
Cool alert
We use run_in_executor method to run sync functions. Timeouts will raise a TimeoutException, but synchronous function may not stop from execution. This is a constraint of python.