Skip to content

Commit 40e7cf4

Browse files
tseaverlandrito
authored andcommitted
Remap new Gax conflict error code (googleapis#3443)
* Add testing support for 'ALREADY_EXISTS' gRPC error code. * Cover both possible gRPC conflict error codes. Closes googleapis#3175. * Exercise conflict-on-create in systests for topic/sub/snap.
1 parent 3674c5a commit 40e7cf4

File tree

4 files changed

+92
-12
lines changed

4 files changed

+92
-12
lines changed

core/google/cloud/_testing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ def _make_grpc_failed_precondition(self):
9595
from grpc import StatusCode
9696
return self._make_grpc_error(StatusCode.FAILED_PRECONDITION)
9797

98+
def _make_grpc_already_exists(self):
99+
from grpc import StatusCode
100+
return self._make_grpc_error(StatusCode.ALREADY_EXISTS)
101+
98102
def _make_grpc_deadline_exceeded(self):
99103
from grpc import StatusCode
100104
return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED)

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
from google.cloud.pubsub.subscription import Subscription
4343
from google.cloud.pubsub.topic import Topic
4444

45+
_CONFLICT_ERROR_CODES = (
46+
StatusCode.FAILED_PRECONDITION, StatusCode.ALREADY_EXISTS)
47+
4548

4649
class _PublisherAPI(object):
4750
"""Helper mapping publisher-related APIs.
@@ -105,7 +108,7 @@ def topic_create(self, topic_path):
105108
try:
106109
topic_pb = self._gax_api.create_topic(topic_path)
107110
except GaxError as exc:
108-
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
111+
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
109112
raise Conflict(topic_path)
110113
raise
111114
return {'name': topic_pb.name}
@@ -337,7 +340,7 @@ def subscription_create(self, subscription_path, topic_path,
337340
retain_acked_messages=retain_acked_messages,
338341
message_retention_duration=message_retention_duration)
339342
except GaxError as exc:
340-
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
343+
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
341344
raise Conflict(topic_path)
342345
raise
343346
return MessageToDict(sub_pb)
@@ -584,7 +587,7 @@ def snapshot_create(self, snapshot_path, subscription_path):
584587
snapshot_pb = self._gax_api.create_snapshot(
585588
snapshot_path, subscription_path)
586589
except GaxError as exc:
587-
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
590+
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
588591
raise Conflict(snapshot_path)
589592
elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
590593
raise NotFound(subscription_path)

pubsub/tests/system.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import httplib2
2323

2424
from google.cloud.environment_vars import PUBSUB_EMULATOR
25+
from google.cloud.exceptions import Conflict
2526
from google.cloud.pubsub import client
2627

2728
from test_utils.retry import RetryInstanceState
@@ -113,6 +114,9 @@ def test_create_topic(self):
113114
self.assertTrue(topic.exists())
114115
self.assertEqual(topic.name, topic_name)
115116

117+
with self.assertRaises(Conflict):
118+
topic.create()
119+
116120
def test_list_topics(self):
117121
before = _consume_topics(Config.CLIENT)
118122
topics_to_create = [
@@ -152,6 +156,9 @@ def test_create_subscription_defaults(self):
152156
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
153157
self.assertIs(subscription.topic, topic)
154158

159+
with self.assertRaises(Conflict):
160+
subscription.create()
161+
155162
def test_create_subscription_w_ack_deadline(self):
156163
TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-')
157164
topic = Config.CLIENT.topic(TOPIC_NAME)
@@ -350,6 +357,9 @@ def full_name(obj):
350357
self.assertIn(snapshot.full_name, map(full_name, after_snapshots))
351358
self.assertNotIn(snapshot.full_name, map(full_name, before_snapshots))
352359

360+
with self.assertRaises(Conflict):
361+
snapshot.create()
362+
353363

354364
def test_seek(self):
355365
TOPIC_NAME = 'seek-e2e' + unique_resource_id('-')

pubsub/tests/unit/test__gax.py

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,24 @@ def test_topic_create(self):
141141
self.assertEqual(topic_path, self.TOPIC_PATH)
142142
self.assertIsNone(options)
143143

144+
def test_topic_create_failed_precondition(self):
145+
from google.cloud.exceptions import Conflict
146+
147+
gax_api = _GAXPublisherAPI(_create_topic_failed_precondition=True)
148+
client = _Client(self.PROJECT)
149+
api = self._make_one(gax_api, client)
150+
151+
with self.assertRaises(Conflict):
152+
api.topic_create(self.TOPIC_PATH)
153+
154+
topic_path, options = gax_api._create_topic_called_with
155+
self.assertEqual(topic_path, self.TOPIC_PATH)
156+
self.assertIsNone(options)
157+
144158
def test_topic_create_already_exists(self):
145159
from google.cloud.exceptions import Conflict
146160

147-
gax_api = _GAXPublisherAPI(_create_topic_conflict=True)
161+
gax_api = _GAXPublisherAPI(_create_topic_already_exists=True)
148162
client = _Client(self.PROJECT)
149163
api = self._make_one(gax_api, client)
150164

@@ -597,11 +611,35 @@ def test_subscription_create_optional_params(self):
597611
expected_message_retention_duration.total_seconds())
598612
self.assertIsNone(options)
599613

614+
def test_subscription_create_failed_precondition(self):
615+
from google.cloud.exceptions import Conflict
616+
617+
DEADLINE = 600
618+
gax_api = _GAXSubscriberAPI(
619+
_create_subscription_failed_precondition=True)
620+
client = _Client(self.PROJECT)
621+
api = self._make_one(gax_api, client)
622+
623+
with self.assertRaises(Conflict):
624+
api.subscription_create(
625+
self.SUB_PATH, self.TOPIC_PATH, DEADLINE, self.PUSH_ENDPOINT)
626+
627+
(name, topic, push_config, ack_deadline, retain_acked_messages,
628+
message_retention_duration, options) = (
629+
gax_api._create_subscription_called_with)
630+
self.assertEqual(name, self.SUB_PATH)
631+
self.assertEqual(topic, self.TOPIC_PATH)
632+
self.assertEqual(push_config.push_endpoint, self.PUSH_ENDPOINT)
633+
self.assertEqual(ack_deadline, DEADLINE)
634+
self.assertIsNone(retain_acked_messages)
635+
self.assertIsNone(message_retention_duration)
636+
self.assertIsNone(options)
637+
600638
def test_subscription_create_already_exists(self):
601639
from google.cloud.exceptions import Conflict
602640

603641
DEADLINE = 600
604-
gax_api = _GAXSubscriberAPI(_create_subscription_conflict=True)
642+
gax_api = _GAXSubscriberAPI(_create_subscription_already_exists=True)
605643
client = _Client(self.PROJECT)
606644
api = self._make_one(gax_api, client)
607645

@@ -1121,10 +1159,26 @@ def test_snapshot_create(self):
11211159
self.assertEqual(subscription, self.SUB_PATH)
11221160
self.assertIsNone(options)
11231161

1162+
def test_snapshot_create_failed_precondition(self):
1163+
from google.cloud.exceptions import Conflict
1164+
1165+
gax_api = _GAXSubscriberAPI(_create_snapshot_failed_precondition=True)
1166+
client = _Client(self.PROJECT)
1167+
api = self._make_one(gax_api, client)
1168+
1169+
with self.assertRaises(Conflict):
1170+
api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH)
1171+
1172+
name, subscription, options = (
1173+
gax_api._create_snapshot_called_with)
1174+
self.assertEqual(name, self.SNAPSHOT_PATH)
1175+
self.assertEqual(subscription, self.SUB_PATH)
1176+
self.assertIsNone(options)
1177+
11241178
def test_snapshot_create_already_exists(self):
11251179
from google.cloud.exceptions import Conflict
11261180

1127-
gax_api = _GAXSubscriberAPI(_create_snapshot_conflict=True)
1181+
gax_api = _GAXSubscriberAPI(_create_snapshot_already_exists=True)
11281182
client = _Client(self.PROJECT)
11291183
api = self._make_one(gax_api, client)
11301184

@@ -1371,7 +1425,8 @@ def mock_insecure_channel(host):
13711425

13721426
class _GAXPublisherAPI(_GAXBaseAPI):
13731427

1374-
_create_topic_conflict = False
1428+
_create_topic_failed_precondition = False
1429+
_create_topic_already_exists = False
13751430

13761431
def list_topics(self, name, page_size, options):
13771432
self._list_topics_called_with = name, page_size, options
@@ -1383,8 +1438,10 @@ def create_topic(self, name, options=None):
13831438
self._create_topic_called_with = name, options
13841439
if self._random_gax_error:
13851440
raise GaxError('error')
1386-
if self._create_topic_conflict:
1441+
if self._create_topic_failed_precondition:
13871442
raise GaxError('conflict', self._make_grpc_failed_precondition())
1443+
if self._create_topic_already_exists:
1444+
raise GaxError('conflict', self._make_grpc_already_exists())
13881445
return self._create_topic_response
13891446

13901447
def get_topic(self, name, options=None):
@@ -1432,8 +1489,10 @@ def list_topic_subscriptions(self, topic, page_size, options=None):
14321489

14331490
class _GAXSubscriberAPI(_GAXBaseAPI):
14341491

1435-
_create_snapshot_conflict = False
1436-
_create_subscription_conflict = False
1492+
_create_snapshot_already_exists = False
1493+
_create_snapshot_failed_precondition = False
1494+
_create_subscription_already_exists = False
1495+
_create_subscription_failed_precondition = False
14371496
_modify_push_config_ok = False
14381497
_acknowledge_ok = False
14391498
_modify_ack_deadline_ok = False
@@ -1456,8 +1515,10 @@ def create_subscription(self, name, topic, push_config=None,
14561515
retain_acked_messages, message_retention_duration, options)
14571516
if self._random_gax_error:
14581517
raise GaxError('error')
1459-
if self._create_subscription_conflict:
1518+
if self._create_subscription_failed_precondition:
14601519
raise GaxError('conflict', self._make_grpc_failed_precondition())
1520+
if self._create_subscription_already_exists:
1521+
raise GaxError('conflict', self._make_grpc_already_exists())
14611522
return self._create_subscription_response
14621523

14631524
def get_subscription(self, name, options=None):
@@ -1533,7 +1594,9 @@ def create_snapshot(self, name, subscription, options=None):
15331594
self._create_snapshot_called_with = (name, subscription, options)
15341595
if self._random_gax_error:
15351596
raise GaxError('error')
1536-
if self._create_snapshot_conflict:
1597+
if self._create_snapshot_already_exists:
1598+
raise GaxError('conflict', self._make_grpc_already_exists())
1599+
if self._create_snapshot_failed_precondition:
15371600
raise GaxError('conflict', self._make_grpc_failed_precondition())
15381601
if self._snapshot_create_subscription_miss:
15391602
raise GaxError('miss', self._make_grpc_not_found())

0 commit comments

Comments
 (0)