You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Sometime it shows Assertion error in _on_response function.
Code example
defon_subscribe(subscription, until=None):
"""Decorator factory that provides subscribed messages to function. It handle decorated function as callback. So message should be acked/nacked inside decorated function. Args: subscription (str): Subscription ID. Should be `projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}` until (datetime.datetime): This function will subscribe messages published until this timestamp. """def_callback_factory(func, finished, subscribe_until, **kwargs):
def_callback(message):
"""Callback function. It sends signal if subscribed all messages. """publish_time=datetime.fromtimestamp(
message.publish_time.timestamp())
ifsubscribe_untilandpublish_time<=subscribe_until:
returnfunc(message, **kwargs)
ifsubscribe_untilandnotfinished.is_set():
logging.info('Subscribed all messages published until %s',
subscribe_until)
finished.set()
message.nack()
return_callbackdef_wrapper(func):
@functools.wraps(func)def_inner_wrapper(**kwargs):
# Event variable that is triggered when all messages are subscribedall_subscribed=Event()
callback=_callback_factory(func=func,
finished=all_subscribed,
subscribe_until=subscribe_until,
**kwargs)
# Ensure closing subscriber for memory leak prevention.withpubsub_v1.SubscriberClient() assubscriber:
future=subscriber.subscribe(
subscription=subscription,
callback=callback,
await_callbacks_on_shutdown=True,
flow_control=pubsub_v1.types.FlowControl(max_messages=5000),
)
all_subscribed.wait(timeout=60)
# Wait until future is finished when it's cancelled.# If it cancelled by timeout or keyboard interrupt, ignore it.try:
future.cancel()
future.result(timeout=60)
except (KeyboardInterrupt, TimeoutError):
passexceptExceptionase:
logging.error("Error occurs during subscription to %s: %s",
subscription, str(e))
return_inner_wrapperreturn_wrapper@on_subscribe(subscription="SUBSCRIPTION")defcallback(message):
# Do something with message
Stack trace
Traceback (most recent call last):
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/bidi.py", line 657, in _thread_main
self._on_response(response)
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 1107, in _on_response
assert self._scheduler is not None
Explanation
It's because future.cancel() executes manager.close() which makes _scheduler as None and it makes _on_response raise AssertionError.
Maybe it has to be protected by threading lock somehow.
Environment details
google-cloud-pubsubversion: 2.18.4Steps to reproduce
_on_responsefunction.Code example
Stack trace
Explanation
It's because
future.cancel()executesmanager.close()which makes_schedulerasNoneand it makes_on_responseraise AssertionError.Maybe it has to be protected by threading lock somehow.