Skip to content

Commit 5209171

Browse files
authored
ci: harden subscriber samples against EC (#520)
Closes #461. Closes #462. Closes #501.
1 parent e6db2e2 commit 5209171

File tree

1 file changed

+129
-91
lines changed

1 file changed

+129
-91
lines changed

packages/google-cloud-pubsub/samples/snippets/subscriber_test.py

Lines changed: 129 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def eventually_consistent_test() -> None:
225225
eventually_consistent_test()
226226

227227

228-
def test_create(
228+
def test_create_subscription(
229229
subscriber_client: pubsub_v1.SubscriberClient,
230230
subscription_admin: str,
231231
capsys: CaptureFixture,
@@ -357,39 +357,50 @@ def test_create_subscription_with_ordering(
357357
assert "enable_message_ordering: true" in out
358358

359359

360-
def test_create_push(
360+
def test_create_push_subscription(
361361
subscriber_client: pubsub_v1.SubscriberClient,
362362
subscription_admin: str,
363363
capsys: CaptureFixture,
364364
) -> None:
365365
# The scope of `subscription_path` is limited to this function.
366-
subscription_path = subscriber_client.subscription_path(
367-
PROJECT_ID, SUBSCRIPTION_ADMIN
368-
)
369-
try:
370-
subscriber_client.delete_subscription(
371-
request={"subscription": subscription_path}
366+
@backoff.on_exception(backoff.expo, AssertionError, max_time=60)
367+
def eventually_consistent_test() -> None:
368+
subscription_path = subscriber_client.subscription_path(
369+
PROJECT_ID, SUBSCRIPTION_ADMIN
372370
)
373-
except NotFound:
374-
pass
371+
try:
372+
subscriber_client.delete_subscription(
373+
request={"subscription": subscription_path}
374+
)
375+
except NotFound:
376+
pass
375377

376-
subscriber.create_push_subscription(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT)
378+
subscriber.create_push_subscription(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT)
377379

378-
out, _ = capsys.readouterr()
379-
assert f"{subscription_admin}" in out
380+
out, _ = capsys.readouterr()
381+
assert f"{subscription_admin}" in out
380382

383+
eventually_consistent_test()
381384

382-
def test_update(subscription_admin: str, capsys: CaptureFixture) -> None:
383-
subscriber.update_push_subscription(
384-
PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT
385-
)
386385

387-
out, _ = capsys.readouterr()
388-
assert "Subscription updated" in out
389-
assert f"{subscription_admin}" in out
386+
def test_update_push_suscription(
387+
subscription_admin: str,
388+
capsys: CaptureFixture,
389+
) -> None:
390+
@backoff.on_exception(backoff.expo, AssertionError, max_time=60)
391+
def eventually_consistent_test() -> None:
392+
subscriber.update_push_subscription(
393+
PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT
394+
)
395+
396+
out, _ = capsys.readouterr()
397+
assert "Subscription updated" in out
398+
assert f"{subscription_admin}" in out
399+
400+
eventually_consistent_test()
390401

391402

392-
def test_delete(
403+
def test_delete_subscription(
393404
subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str
394405
) -> None:
395406
subscriber.delete_subscription(PROJECT_ID, SUBSCRIPTION_ADMIN)
@@ -410,14 +421,19 @@ def test_receive(
410421
subscription_async: str,
411422
capsys: CaptureFixture,
412423
) -> None:
413-
_publish_messages(publisher_client, topic)
414424

415-
subscriber.receive_messages(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
425+
@backoff.on_exception(backoff.expo, Unknown, max_time=60)
426+
def eventually_consistent_test() -> None:
427+
_publish_messages(publisher_client, topic)
416428

417-
out, _ = capsys.readouterr()
418-
assert "Listening" in out
419-
assert subscription_async in out
420-
assert "message" in out
429+
subscriber.receive_messages(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
430+
431+
out, _ = capsys.readouterr()
432+
assert "Listening" in out
433+
assert subscription_async in out
434+
assert "message" in out
435+
436+
eventually_consistent_test()
421437

422438

423439
def test_receive_with_custom_attributes(
@@ -427,17 +443,21 @@ def test_receive_with_custom_attributes(
427443
capsys: CaptureFixture,
428444
) -> None:
429445

430-
_publish_messages(publisher_client, topic, origin="python-sample")
446+
@backoff.on_exception(backoff.expo, Unknown, max_time=60)
447+
def eventually_consistent_test() -> None:
448+
_publish_messages(publisher_client, topic, origin="python-sample")
431449

432-
subscriber.receive_messages_with_custom_attributes(
433-
PROJECT_ID, SUBSCRIPTION_ASYNC, 5
434-
)
450+
subscriber.receive_messages_with_custom_attributes(
451+
PROJECT_ID, SUBSCRIPTION_ASYNC, 5
452+
)
435453

436-
out, _ = capsys.readouterr()
437-
assert subscription_async in out
438-
assert "message" in out
439-
assert "origin" in out
440-
assert "python-sample" in out
454+
out, _ = capsys.readouterr()
455+
assert subscription_async in out
456+
assert "message" in out
457+
assert "origin" in out
458+
assert "python-sample" in out
459+
460+
eventually_consistent_test()
441461

442462

443463
def test_receive_with_flow_control(
@@ -447,14 +467,18 @@ def test_receive_with_flow_control(
447467
capsys: CaptureFixture,
448468
) -> None:
449469

450-
_publish_messages(publisher_client, topic)
470+
@backoff.on_exception(backoff.expo, Unknown, max_time=300)
471+
def eventually_consistent_test() -> None:
472+
_publish_messages(publisher_client, topic)
451473

452-
subscriber.receive_messages_with_flow_control(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
474+
subscriber.receive_messages_with_flow_control(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
453475

454-
out, _ = capsys.readouterr()
455-
assert "Listening" in out
456-
assert subscription_async in out
457-
assert "message" in out
476+
out, _ = capsys.readouterr()
477+
assert "Listening" in out
478+
assert subscription_async in out
479+
assert "message" in out
480+
481+
eventually_consistent_test()
458482

459483

460484
def test_receive_with_blocking_shutdown(
@@ -463,57 +487,67 @@ def test_receive_with_blocking_shutdown(
463487
subscription_async: str,
464488
capsys: CaptureFixture,
465489
) -> None:
466-
_publish_messages(publisher_client, topic, message_num=3)
467490

468-
subscriber.receive_messages_with_blocking_shutdown(
469-
PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0
470-
)
491+
_received = re.compile(r".*received.*message.*", flags=re.IGNORECASE)
492+
_done = re.compile(r".*done processing.*message.*", flags=re.IGNORECASE)
493+
_canceled = re.compile(r".*streaming pull future canceled.*", flags=re.IGNORECASE)
494+
_shut_down = re.compile(r".*done waiting.*stream shutdown.*", flags=re.IGNORECASE)
471495

472-
out, _ = capsys.readouterr()
473-
out_lines = out.splitlines()
474-
475-
msg_received_lines = [
476-
i
477-
for i, line in enumerate(out_lines)
478-
if re.search(r".*received.*message.*", line, flags=re.IGNORECASE)
479-
]
480-
msg_done_lines = [
481-
i
482-
for i, line in enumerate(out_lines)
483-
if re.search(r".*done processing.*message.*", line, flags=re.IGNORECASE)
484-
]
485-
stream_canceled_lines = [
486-
i
487-
for i, line in enumerate(out_lines)
488-
if re.search(r".*streaming pull future canceled.*", line, flags=re.IGNORECASE)
489-
]
490-
shutdown_done_waiting_lines = [
491-
i
492-
for i, line in enumerate(out_lines)
493-
if re.search(r".*done waiting.*stream shutdown.*", line, flags=re.IGNORECASE)
494-
]
496+
@backoff.on_exception(backoff.expo, Unknown, max_time=300)
497+
def eventually_consistent_test() -> None:
498+
_publish_messages(publisher_client, topic, message_num=3)
495499

496-
try:
497-
assert "Listening" in out
498-
assert subscription_async in out
500+
subscriber.receive_messages_with_blocking_shutdown(
501+
PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0
502+
)
503+
504+
out, _ = capsys.readouterr()
505+
out_lines = out.splitlines()
506+
507+
msg_received_lines = [
508+
i
509+
for i, line in enumerate(out_lines)
510+
if _received.search(line)
511+
]
512+
msg_done_lines = [
513+
i
514+
for i, line in enumerate(out_lines)
515+
if _done.search(line)
516+
]
517+
stream_canceled_lines = [
518+
i
519+
for i, line in enumerate(out_lines)
520+
if _canceled.search(line)
521+
]
522+
shutdown_done_waiting_lines = [
523+
i
524+
for i, line in enumerate(out_lines)
525+
if _shut_down.search(line)
526+
]
527+
528+
try:
529+
assert "Listening" in out
530+
assert subscription_async in out
499531

500-
assert len(stream_canceled_lines) == 1
501-
assert len(shutdown_done_waiting_lines) == 1
502-
assert len(msg_received_lines) == 3
503-
assert len(msg_done_lines) == 3
532+
assert len(stream_canceled_lines) == 1
533+
assert len(shutdown_done_waiting_lines) == 1
534+
assert len(msg_received_lines) == 3
535+
assert len(msg_done_lines) == 3
504536

505-
# The stream should have been canceled *after* receiving messages, but before
506-
# message processing was done.
507-
assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0]
537+
# The stream should have been canceled *after* receiving messages, but before
538+
# message processing was done.
539+
assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0]
508540

509-
# Yet, waiting on the stream shutdown should have completed *after*
510-
# the processing of received messages has ended.
511-
assert msg_done_lines[-1] < shutdown_done_waiting_lines[0]
512-
except AssertionError: # pragma: NO COVER
513-
from pprint import pprint
541+
# Yet, waiting on the stream shutdown should have completed *after*
542+
# the processing of received messages has ended.
543+
assert msg_done_lines[-1] < shutdown_done_waiting_lines[0]
544+
except AssertionError: # pragma: NO COVER
545+
from pprint import pprint
514546

515-
pprint(out_lines) # To make possible flakiness debugging easier.
516-
raise
547+
pprint(out_lines) # To make possible flakiness debugging easier.
548+
raise
549+
550+
eventually_consistent_test()
517551

518552

519553
def test_listen_for_errors(
@@ -523,13 +557,17 @@ def test_listen_for_errors(
523557
capsys: CaptureFixture,
524558
) -> None:
525559

526-
_publish_messages(publisher_client, topic)
560+
@backoff.on_exception(backoff.expo, Unknown, max_time=60)
561+
def eventually_consistent_test() -> None:
562+
_publish_messages(publisher_client, topic)
527563

528-
subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
564+
subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
529565

530-
out, _ = capsys.readouterr()
531-
assert subscription_async in out
532-
assert "threw an exception" in out
566+
out, _ = capsys.readouterr()
567+
assert subscription_async in out
568+
assert "threw an exception" in out
569+
570+
eventually_consistent_test()
533571

534572

535573
def test_receive_synchronously(

0 commit comments

Comments
 (0)