-
Notifications
You must be signed in to change notification settings - Fork 104
feat: support transaction isolation level in dbapi #1327
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -29,7 +29,7 @@ | |||||
| from google.cloud.spanner_dbapi.parsed_statement import ParsedStatement, Statement | ||||||
| from google.cloud.spanner_dbapi.transaction_helper import TransactionRetryHelper | ||||||
| from google.cloud.spanner_dbapi.cursor import Cursor | ||||||
| from google.cloud.spanner_v1 import RequestOptions | ||||||
| from google.cloud.spanner_v1 import RequestOptions, TransactionOptions | ||||||
| from google.cloud.spanner_v1.snapshot import Snapshot | ||||||
|
|
||||||
| from google.cloud.spanner_dbapi.exceptions import ( | ||||||
|
|
@@ -112,6 +112,7 @@ def __init__(self, instance, database=None, read_only=False, **kwargs): | |||||
| self._staleness = None | ||||||
| self.request_priority = None | ||||||
| self._transaction_begin_marked = False | ||||||
| self._transaction_isolation_level = None | ||||||
| # whether transaction started at Spanner. This means that we had | ||||||
| # made at least one call to Spanner. | ||||||
| self._spanner_transaction_started = False | ||||||
|
|
@@ -283,6 +284,33 @@ def transaction_tag(self, value): | |||||
| """ | ||||||
| self._connection_variables["transaction_tag"] = value | ||||||
|
|
||||||
| @property | ||||||
| def isolation_level(self): | ||||||
| """The default isolation level that is used for all read/write | ||||||
| transactions on this `Connection`. | ||||||
|
|
||||||
| Returns: | ||||||
| google.cloud.spanner_v1.types.TransactionOptions.IsolationLevel: | ||||||
| The isolation level that is used for read/write transactions on | ||||||
| this `Connection`. | ||||||
| """ | ||||||
| return self._connection_variables.get( | ||||||
| "isolation_level", | ||||||
| TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, | ||||||
| ) | ||||||
|
|
||||||
| @isolation_level.setter | ||||||
| def isolation_level(self, value: TransactionOptions.IsolationLevel): | ||||||
| """Sets the isolation level that is used for all read/write | ||||||
| transactions on this `Connection`. | ||||||
|
|
||||||
| Args: | ||||||
| value (google.cloud.spanner_v1.types.TransactionOptions.IsolationLevel): | ||||||
| The isolation level for all read/write transactions on this | ||||||
| `Connection`. | ||||||
| """ | ||||||
| self._connection_variables["isolation_level"] = value | ||||||
|
|
||||||
| @property | ||||||
| def staleness(self): | ||||||
| """Current read staleness option value of this `Connection`. | ||||||
|
|
@@ -363,6 +391,12 @@ def transaction_checkout(self): | |||||
| if not self._spanner_transaction_started: | ||||||
| self._transaction = self._session_checkout().transaction() | ||||||
| self._transaction.transaction_tag = self.transaction_tag | ||||||
| if self._transaction_isolation_level: | ||||||
| self._transaction.isolation_level = ( | ||||||
| self._transaction_isolation_level | ||||||
| ) | ||||||
| else: | ||||||
| self._transaction.isolation_level = self.isolation_level | ||||||
| self.transaction_tag = None | ||||||
| self._snapshot = None | ||||||
| self._spanner_transaction_started = True | ||||||
|
|
@@ -405,7 +439,7 @@ def close(self): | |||||
| self.is_closed = True | ||||||
|
|
||||||
| @check_not_closed | ||||||
| def begin(self): | ||||||
| def begin(self, isolation_level=None): | ||||||
| """ | ||||||
| Marks the transaction as started. | ||||||
|
|
||||||
|
|
@@ -421,6 +455,7 @@ def begin(self): | |||||
| "is already running" | ||||||
| ) | ||||||
| self._transaction_begin_marked = True | ||||||
| self._transaction_isolation_level = isolation_level | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but we do that at a slightly later moment when the transaction is actually being created here:
The reason for doing it like this is that this allows us to support a statement like The use of the default from the connection is covered by this test case:
|
||||||
|
|
||||||
| def commit(self): | ||||||
| """Commits any pending transaction to the database. | ||||||
|
|
@@ -465,6 +500,7 @@ def _reset_post_commit_or_rollback(self): | |||||
| self._release_session() | ||||||
| self._transaction_helper.reset() | ||||||
| self._transaction_begin_marked = False | ||||||
| self._transaction_isolation_level = None | ||||||
| self._spanner_transaction_started = False | ||||||
|
|
||||||
| @check_not_closed | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| # Copyright 2025 Google LLC All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from google.cloud.spanner_dbapi import Connection | ||
| from google.cloud.spanner_v1 import ( | ||
| BeginTransactionRequest, | ||
| TransactionOptions, | ||
| ) | ||
| from tests.mockserver_tests.mock_server_test_base import ( | ||
| MockServerTestBase, | ||
| add_update_count, | ||
| ) | ||
|
|
||
|
|
||
| class TestDbapiIsolationLevel(MockServerTestBase): | ||
| @classmethod | ||
| def setup_class(cls): | ||
| super().setup_class() | ||
| add_update_count("insert into singers (id, name) values (1, 'Some Singer')", 1) | ||
|
|
||
| def test_isolation_level_default(self): | ||
| connection = Connection(self.instance, self.database) | ||
| with connection.cursor() as cursor: | ||
| cursor.execute("insert into singers (id, name) values (1, 'Some Singer')") | ||
| self.assertEqual(1, cursor.rowcount) | ||
| connection.commit() | ||
| begin_requests = list( | ||
| filter( | ||
| lambda msg: isinstance(msg, BeginTransactionRequest), | ||
| self.spanner_service.requests, | ||
| ) | ||
| ) | ||
| self.assertEqual(1, len(begin_requests)) | ||
| self.assertEqual( | ||
| begin_requests[0].options.isolation_level, | ||
| TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, | ||
| ) | ||
|
|
||
| def test_custom_isolation_level(self): | ||
| connection = Connection(self.instance, self.database) | ||
| for level in [ | ||
| TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, | ||
| TransactionOptions.IsolationLevel.REPEATABLE_READ, | ||
| TransactionOptions.IsolationLevel.SERIALIZABLE, | ||
| ]: | ||
| connection.isolation_level = level | ||
| with connection.cursor() as cursor: | ||
| cursor.execute( | ||
| "insert into singers (id, name) values (1, 'Some Singer')" | ||
| ) | ||
| self.assertEqual(1, cursor.rowcount) | ||
| connection.commit() | ||
| begin_requests = list( | ||
| filter( | ||
| lambda msg: isinstance(msg, BeginTransactionRequest), | ||
| self.spanner_service.requests, | ||
| ) | ||
| ) | ||
| self.assertEqual(1, len(begin_requests)) | ||
| self.assertEqual(begin_requests[0].options.isolation_level, level) | ||
| MockServerTestBase.spanner_service.clear_requests() | ||
|
|
||
| def test_isolation_level_in_connection_kwargs(self): | ||
| for level in [ | ||
| TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, | ||
| TransactionOptions.IsolationLevel.REPEATABLE_READ, | ||
| TransactionOptions.IsolationLevel.SERIALIZABLE, | ||
| ]: | ||
| connection = Connection(self.instance, self.database, isolation_level=level) | ||
| with connection.cursor() as cursor: | ||
| cursor.execute( | ||
| "insert into singers (id, name) values (1, 'Some Singer')" | ||
| ) | ||
| self.assertEqual(1, cursor.rowcount) | ||
| connection.commit() | ||
| begin_requests = list( | ||
| filter( | ||
| lambda msg: isinstance(msg, BeginTransactionRequest), | ||
| self.spanner_service.requests, | ||
| ) | ||
| ) | ||
| self.assertEqual(1, len(begin_requests)) | ||
| self.assertEqual(begin_requests[0].options.isolation_level, level) | ||
| MockServerTestBase.spanner_service.clear_requests() | ||
|
|
||
| def test_transaction_isolation_level(self): | ||
| connection = Connection(self.instance, self.database) | ||
| for level in [ | ||
| TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, | ||
| TransactionOptions.IsolationLevel.REPEATABLE_READ, | ||
| TransactionOptions.IsolationLevel.SERIALIZABLE, | ||
| ]: | ||
| connection.begin(isolation_level=level) | ||
| with connection.cursor() as cursor: | ||
| cursor.execute( | ||
| "insert into singers (id, name) values (1, 'Some Singer')" | ||
| ) | ||
| self.assertEqual(1, cursor.rowcount) | ||
| connection.commit() | ||
| begin_requests = list( | ||
| filter( | ||
| lambda msg: isinstance(msg, BeginTransactionRequest), | ||
| self.spanner_service.requests, | ||
| ) | ||
| ) | ||
| self.assertEqual(1, len(begin_requests)) | ||
| self.assertEqual(begin_requests[0].options.isolation_level, level) | ||
| MockServerTestBase.spanner_service.clear_requests() |
Uh oh!
There was an error while loading. Please reload this page.