Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/examples/dynamics/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def main() -> None:
task_name="dyn_task",
)

# now we can send it.
# Now we can send it.
await dyn_task.kiq(x=1)

await asyncio.sleep(2)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/dynamics/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def main() -> None:
# We create scheduler after the task declaration,
# so we don't have to wait a minute before it gets to the task.
# However, defining a scheduler before the task declaration is also possible.
# but we have to wait till it gets to task execution for the second time.
# But we have to wait till it gets to task execution for the second time.
worker_task = asyncio.create_task(run_receiver_task(dyn_broker))
scheduler_task = asyncio.create_task(run_scheduler_task(dyn_scheduler))

Expand Down
34 changes: 15 additions & 19 deletions docs/guide/dynamic-brokers.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,31 @@ title: Dynamic Environments
order: 9
---

This article is for all the people who want to dynamically create brokers, register tasks, and run them inside their code. Or maybe implement more complex logic.
This article is for people who want to:

The Taskiq allows you to create broker instances in all parts of your application. You
can register tasks dynamically and run them. But when tasks are created dynamically,
the `taskiq worker` command won't be able to find them.
* Create brokers dynamically.
* Register tasks, and run them inside their code.
* Implement more complex logic.

To define tasks and assign them to broker, use `register_task` method.
Taskiq allows you to set up broker instances throughout your application and register tasks for dynamic execution. However, tasks created this way won't be found by the `taskiq worker` command.

To define tasks and assign them to a broker, use `register_task` method.

@[code python](../examples/dynamics/broker.py)

The problem with this code is that if we run the `taskiq worker` command, it won't be able
to execute our tasks. Because lambdas are created within the `main` function and they
are not visible outside of it.
In this example, the task is defined using a lambda within the `main` function. As the lambda is not visible outside of the `main` function scope, the task is not executable by `taskiq worker` command.

To overcome this issue, you can:

To surpass this issue, we need to create a dynamic worker task within the current loop.
Or, we can create a code that can listen to our brokers and have all information about dynamic
functions.
* Create a dynamic worker task within the current event loop.
* Implement your own broker listener with the information about all of your tasks.

Here I won't be showing how to create your own CLI command, but I'll show you how to create
a dynamic worker within the current loop.
Here's an example of a dynamic worker task creation:

@[code python](../examples/dynamics/receiver.py)

Here we define a dynamic lambda task with some name, assign it to broker, as we did before.
The only difference is that we start our receiver coroutine, that will listen to the new
messages and execute them. Receiver task will be executed in the current loop, and when main function
exits, the receriver task is canceled. But for illustration purpose, I canceled it manually.
In this example, a named dynamic lambda task is created and registered in a broker, similar to the previous example. The difference is the creation of a new receiver coroutine for the worker task. It will listen to the new messages and execute them. The worker task will be executed in the current event loop. After exiting the scope, the worker task will get cancelled. For illustration purposes it is cancelled explicitly.

Sometimes you need to run not only receiver, but a scheduler as well. You can do it, by using
another function that also can work within the current loop.
It's possible to run a scheduler in the current event loop as well:

@[code python](../examples/dynamics/scheduler.py)