Taskiq + FastAPI
Taskiq + FastAPI
FastAPI is a highly popular async web framework in Python. It has gained its popularity because of two things:
- It's easy to use;
- Cool dependency injection.
In taskiq, we try to make our libraries easy to use, and We have a dependency injection too. So we have created the library "taskiq-fastapi" to make integration with FastAPI as smooth as possible.
Let's see what we got here. In this library, we provide users with only one public function called init
. It takes a broker and a string path (as in uvicorn) to the fastapi application (or factory function). People should call this function in their main broker file.
from taskiq import ZeroMQBroker
import taskiq_fastapi
broker = ZeroMQBroker()
taskiq_fastapi.init(broker, "my_package.application:app")
There are two rules to make everything work as you expect:
- Add
TaskiqDepends
as a default value for every parameter withRequest
orHTTPConnection
types in base dependencies. Or if you useAnnotated
, please annotate these types withTaskiqDepends
. - Use only
TaskiqDepends
in tasks.
Cool and important note!
The Request
or HTTPConnection
that you'll get injected in your task is not the same request or connection you have had in your handler when you were sending the task!
Many fastapi dependency functions depend on fastapi.Request
. We provide a mocked request to such dependencies. But taskiq cannot resolve dependencies until you explicitly specify that this parameter must be injected.
As an example. If you previously had a dependency like this:
from fastapi import Request
from typing import Any
def get_redis_pool(request: Request) -> Any:
return request.app.state.redis_pool
To make it resolvable in taskiq, people should mark default fastapi dependencies (such as Request
and HTTPConnection
) with TaskiqDepends
. Like this:
from typing import Annotated
from fastapi import Request
from taskiq import TaskiqDepends
async def get_redis_pool(request: Annotated[Request, TaskiqDepends()]):
return request.app.state.redis_pool
from fastapi import Request
from taskiq import TaskiqDepends
async def get_redis_pool(request: Request = TaskiqDepends()):
return request.app.state.redis_pool
Also you want to call startup of your brokers somewhere.
from fastapi import FastAPI
from your_project.taskiq import broker
app = FastAPI()
@app.on_event("startup")
async def app_startup():
if not broker.is_worker_process:
await broker.startup()
@app.on_event("shutdown")
async def app_shutdown():
if not broker.is_worker_process:
await broker.shutdown()
And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependenices. You can find bigger examples in the examples repo.
Testing
Testing is no different from general testing advice from articles about testing. But if you use InMemoryBroker
in your tests, you need to provide it with a custom dependency context because it doesn't run as a worker process.
Let's imagine that you have a fixture of your application. It returns a new fastapi application to use in tests.
@pytest.fixture
def fastapi_app() -> FastAPI:
return get_app()
Right after this fixture, we define another one.
import taskiq_fastapi
@pytest.fixture(autouse=True)
def init_taskiq_deps(fastapi_app: FastAPI):
# This is important part. Here we add dependency context,
# this thing helps in resolving dependencies for tasks
# for inmemory broker.
taskiq_fastapi.populate_dependency_context(broker, fastapi_app)
yield
broker.custom_dependency_context = {}
This fixture has autouse flag, which means it would run on every test automatically.