diff --git a/pyproject.toml b/pyproject.toml index 43a8811c..d9b10551 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ packaging = ">=19" # For prometheus metrics prometheus_client = { version = "^0", optional = true } # For ZMQBroker -pyzmq = { version = "^23.2.0", optional = true } +pyzmq = { version = "^23.2.0", optional = true, markers = "python_version < '3.12'"} # For speed uvloop = { version = ">=0.16.0,<1", optional = true, markers = "sys_platform != 'win32'" } # For hot-reload. diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 55c525cf..1760c2ab 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -113,8 +113,10 @@ def interrupt_handler(signum: int, _frame: Any) -> None: event.set() if uvloop is not None: - logger.debug("UVLOOP found. Installing policy.") - uvloop.install() + logger.debug("UVLOOP found. Using it as async runner") + loop = uvloop.new_event_loop() # type: ignore + else: + loop = asyncio.new_event_loop() # This option signals that current # broker is running as a worker. # We must set this field before importing tasks, @@ -128,8 +130,6 @@ def interrupt_handler(signum: int, _frame: Any) -> None: receiver_type = get_receiver_type(args) receiver_kwargs = dict(args.receiver_arg) - loop = asyncio.get_event_loop() - try: logger.debug("Initialize receiver.") with ThreadPoolExecutor(args.max_threadpool_threads) as pool: