Skip to content

Commit ec663e5

Browse files
committed
spanner/dbapi: retry every statement in transaction stack
When retrying a transaction that failed, we MUST retry every single prior statement as well as the current one, until .commit() or .rollback() are invoked. Fixes #285 Fixes #296
1 parent cb51b75 commit ec663e5

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed

packages/django-google-spanner/spanner/dbapi/connection.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def __init__(self, db_handle, session, discard_session):
1616
self.__txn = None
1717
self.__dbhandle = db_handle
1818
self.__closed = False
19+
self.__on_transaction_clean_up = None
1920
self.__ddl_statements = []
2021

2122
def __raise_if_already_closed(self):
@@ -56,7 +57,13 @@ def __can_commit_or_rollback(self):
5657
# https://github.com/googleapis/python-spanner/issues/13
5758
return self.__txn and not (self.__txn.committed or self.__txn._rolled_back)
5859

60+
def __clean_up_transaction_state(self):
61+
if self.__on_transaction_clean_up:
62+
self.__on_transaction_clean_up()
63+
5964
def commit(self):
65+
self.__clean_up_transaction_state()
66+
6067
if self.__can_commit_or_rollback():
6168
res = self.__txn.commit()
6269
self.__txn = None
@@ -65,6 +72,8 @@ def commit(self):
6572
self.__txn.stop()
6673

6774
def rollback(self):
75+
self.__clean_up_transaction_state()
76+
6877
if self.__can_commit_or_rollback():
6978
res = self.__txn.rollback()
7079
self.__txn = None
@@ -75,7 +84,9 @@ def rollback(self):
7584
def cursor(self):
7685
self.__raise_if_already_closed()
7786

78-
return Cursor(self)
87+
cur = Cursor(self)
88+
self.__on_transaction_clean_up = cur._clear_transaction_state
89+
return cur
7990

8091
def discard_aborted_txn(self):
8192
# Discard the prior, now bad transaction.

packages/django-google-spanner/spanner/dbapi/cursor.py

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
# license that can be found in the LICENSE file or at
55
# https://developers.google.com/open-source/licenses/bsd
66

7+
import time
8+
79
import google.api_core.exceptions as grpc_exceptions
810

911
from .exceptions import (
@@ -26,6 +28,7 @@ def __init__(self, db_handle):
2628
self.__connection = db_handle
2729
self.__last_op = None
2830
self.__closed = False
31+
self.__sql_in_same_txn = []
2932

3033
# arraysize is a readable and writable property mandated
3134
# by PEP-0249 https://www.python.org/dev/peps/pep-0249/#arraysize
@@ -64,7 +67,7 @@ def __discard_aborted_txn(self):
6467
def __get_txn(self):
6568
return self.__connection.get_txn()
6669

67-
def execute(self, sql, args=None, current_retry=0):
70+
def execute(self, sql, args=None, already_in_retry=False):
6871
"""
6972
Abstracts and implements execute SQL statements on Cloud Spanner.
7073
If it encounters grpc_exceptions.Aborted error, it optimistically retries
@@ -102,21 +105,79 @@ def execute(self, sql, args=None, current_retry=0):
102105
self.__handle_insert(self.__get_txn(), sql, args or None)
103106
else:
104107
self.__handle_update(self.__get_txn(), sql, args or None)
108+
109+
except grpc_exceptions.InvalidArgument as e: # We can't retry a syntax issue, fail fast.
110+
self.__discard_aborted_txn()
111+
raise ProgrammingError(e.details if hasattr(e, 'details') else e)
112+
105113
except (grpc_exceptions.AlreadyExists, grpc_exceptions.FailedPrecondition) as e:
114+
# We can't retry an integrity error within the same transaction regardless.
115+
self.__discard_aborted_txn()
106116
raise IntegrityError(e.details if hasattr(e, 'details') else e)
117+
118+
except Exception as e:
119+
# Firstly discard the aborted transaction.
120+
self.__discard_aborted_txn()
121+
122+
if already_in_retry: # It is already being retried, so return immediately.
123+
raise e
124+
125+
# Attempt to replay all the prior sql within the same transaction.
126+
sql_args_tuples = self.__sql_in_same_txn[:]
127+
sql_args_tuples.append((sql, args,))
128+
129+
return self.__replay_all_prior_statements_in_transaction(sql_args_tuples)
130+
else: # No error here
131+
self.__sql_in_same_txn.append((sql, args,))
132+
133+
def _clear_transaction_state(self):
134+
"""
135+
Invoked on every Connection.commit() or Connection.rollback()
136+
"""
137+
if self.__sql_in_same_txn:
138+
self.__sql_in_same_txn.clear()
139+
140+
def __replay_all_prior_statements_in_transaction(self, sql_args_tuples):
141+
if not sql_args_tuples:
142+
return
143+
144+
lastException = None
145+
146+
for i in range(5):
147+
# Clean up before attempting the replay.
148+
self.__sql_in_same_txn.clear()
149+
150+
print("\033[31mAttempting transaction replay #%d with elements:\n%s\033[00m" % (i, sql_args_tuples))
151+
152+
for sql, args in sql_args_tuples:
153+
try:
154+
self.execute(sql, args, already_in_retry=True)
155+
except grpc_exceptions.InvalidArgument as e: # We can't retry a syntax issue, fail fast.
156+
raise ProgrammingError(e.details if hasattr(e, 'details') else e)
157+
except (grpc_exceptions.AlreadyExists, grpc_exceptions.FailedPrecondition) as e:
158+
raise IntegrityError(e.details if hasattr(e, 'details') else e)
159+
except Exception as e:
160+
lastException = e
161+
# TODO: Use exponential backoff with jitter, before retrying.
162+
time.sleep(0.57)
163+
break
164+
else:
165+
# All the elements in sql_args_tuples were executed,
166+
# thus we can now break out of the retry loop.
167+
# But first, reset all the executed (sql, args) for future replay.
168+
self.__sql_in_same_txn = sql_args_tuples[:]
169+
break
170+
171+
try:
172+
if lastException:
173+
self.__discard_aborted_txn()
174+
raise lastException
107175
except grpc_exceptions.InvalidArgument as e:
108176
raise ProgrammingError(e.details if hasattr(e, 'details') else e)
109177
except grpc_exceptions.InternalServerError as e:
110178
raise OperationalError(e.details if hasattr(e, 'details') else e)
111179
except grpc_exceptions.Aborted as e:
112-
if current_retry > 2: # Arbitrary limit that should probably be a setting.
113-
raise InternalError(e.details if hasattr(e, 'details') else e)
114-
115-
self.__discard_aborted_txn()
116-
# Otherwise retry it.
117-
print('\033[31mRetrying execution #%d of:\nSQL: %s\nArgs: %s\033[00m' % (
118-
current_retry, sql, args))
119-
return self.execute(sql, args, current_retry=current_retry+1)
180+
raise InternalError(e.details if hasattr(e, 'details') else e)
120181

121182
def __handle_update(self, txn, sql, params, param_types=None):
122183
sql = ensure_where_clause(sql)

0 commit comments

Comments
 (0)