Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 0bd4e7b

Browse files
committed
feat: Multiplexed sessions - Remove handling of MethodNotImplemented exception from DatabaseSessionManager and add unit tests.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent c8e7a1c commit 0bd4e7b

File tree

3 files changed

+355
-69
lines changed

3 files changed

+355
-69
lines changed

google/cloud/spanner_v1/database_sessions_manager.py

Lines changed: 46 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,40 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import datetime
15-
import threading
16-
import time
17-
import weakref
18-
19-
from google.api_core.exceptions import MethodNotImplemented
14+
from datetime import timedelta
15+
from threading import Event, Lock, Thread
16+
from time import sleep, time
17+
from weakref import ref
2018

19+
from google.cloud.spanner_v1.session import Session
20+
from google.cloud.spanner_v1.session_options import TransactionType
2121
from google.cloud.spanner_v1._opentelemetry_tracing import (
2222
get_current_span,
2323
add_span_event,
2424
)
25-
from google.cloud.spanner_v1.session import Session
26-
from google.cloud.spanner_v1.session_options import TransactionType
2725

2826

2927
class DatabaseSessionsManager(object):
3028
"""Manages sessions for a Cloud Spanner database.
29+
3130
Sessions can be checked out from the database session manager for a specific
3231
transaction type using :meth:`get_session`, and returned to the session manager
3332
using :meth:`put_session`.
34-
The sessions returned by the session manager depend on the client's session options (see
35-
:class:`~google.cloud.spanner_v1.session_options.SessionOptions`) and the provided session
36-
pool (see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
33+
34+
The sessions returned by the session manager depend on the client's session options
35+
(see :class:`~google.cloud.spanner_v1.session_options.SessionOptions`) and the
36+
provided session pool (see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
37+
3738
:type database: :class:`~google.cloud.spanner_v1.database.Database`
3839
:param database: The database to manage sessions for.
40+
3941
:type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`
4042
:param pool: The pool to get non-multiplexed sessions from.
4143
"""
4244

4345
# Intervals for the maintenance thread to check and refresh the multiplexed session.
44-
_MAINTENANCE_THREAD_POLLING_INTERVAL = datetime.timedelta(minutes=10)
45-
_MAINTENANCE_THREAD_REFRESH_INTERVAL = datetime.timedelta(days=7)
46+
_MAINTENANCE_THREAD_POLLING_INTERVAL = timedelta(minutes=10)
47+
_MAINTENANCE_THREAD_REFRESH_INTERVAL = timedelta(days=7)
4648

4749
def __init__(self, database, pool):
4850
self._database = database
@@ -56,45 +58,29 @@ def __init__(self, database, pool):
5658
# so that the thread can terminate if the use of multiplexed session has been
5759
# disabled for all transactions.
5860
self._multiplexed_session = None
59-
self._multiplexed_session_maintenance_thread = None
60-
self._multiplexed_session_lock = threading.Lock()
61-
self._is_multiplexed_sessions_disabled_event = threading.Event()
62-
63-
@property
64-
def _logger(self):
65-
"""The logger used by this database session manager.
66-
67-
:rtype: :class:`logging.Logger`
68-
:returns: The logger.
69-
"""
70-
return self._database.logger
61+
self._multiplexed_session_thread = None
62+
self._multiplexed_session_lock = Lock()
63+
self._multiplexed_session_disabled_event = Event()
7164

7265
def get_session(self, transaction_type: TransactionType) -> Session:
7366
"""Returns a session for the given transaction type from the database session manager.
67+
7468
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
7569
:returns: a session for the given transaction type.
7670
"""
7771

7872
session_options = self._database.session_options
7973
use_multiplexed = session_options.use_multiplexed(transaction_type)
8074

75+
# TODO multiplexed: enable for read/write transactions
8176
if use_multiplexed and transaction_type == TransactionType.READ_WRITE:
8277
raise NotImplementedError(
8378
f"Multiplexed sessions are not yet supported for {transaction_type} transactions."
8479
)
8580

86-
if use_multiplexed:
87-
try:
88-
session = self._get_multiplexed_session()
89-
90-
# If multiplexed sessions are not supported, disable
91-
# them for all transactions and return a non-multiplexed session.
92-
except MethodNotImplemented:
93-
self._disable_multiplexed_sessions()
94-
session = self._pool.get()
95-
96-
else:
97-
session = self._pool.get()
81+
session = (
82+
self._get_multiplexed_session() if use_multiplexed else self._pool.get()
83+
)
9884

9985
add_span_event(
10086
get_current_span(),
@@ -106,6 +92,7 @@ def get_session(self, transaction_type: TransactionType) -> Session:
10692

10793
def put_session(self, session: Session) -> None:
10894
"""Returns the session to the database session manager.
95+
10996
:type session: :class:`~google.cloud.spanner_v1.session.Session`
11097
:param session: The session to return to the database session manager.
11198
"""
@@ -124,12 +111,12 @@ def put_session(self, session: Session) -> None:
124111

125112
def _get_multiplexed_session(self) -> Session:
126113
"""Returns a multiplexed session from the database session manager.
114+
127115
If the multiplexed session is not defined, creates a new multiplexed
128116
session and starts a maintenance thread to periodically delete and
129117
recreate it so that it remains valid. Otherwise, simply returns the
130118
current multiplexed session.
131-
:raises MethodNotImplemented:
132-
if multiplexed sessions are not supported.
119+
133120
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
134121
:returns: a multiplexed session.
135122
"""
@@ -138,18 +125,14 @@ def _get_multiplexed_session(self) -> Session:
138125
if self._multiplexed_session is None:
139126
self._multiplexed_session = self._build_multiplexed_session()
140127

141-
# Build and start a thread to maintain the multiplexed session.
142-
self._multiplexed_session_maintenance_thread = (
143-
self._build_maintenance_thread()
144-
)
145-
self._multiplexed_session_maintenance_thread.start()
128+
self._multiplexed_session_thread = self._build_maintenance_thread()
129+
self._multiplexed_session_thread.start()
146130

147131
return self._multiplexed_session
148132

149133
def _build_multiplexed_session(self) -> Session:
150134
"""Builds and returns a new multiplexed session for the database session manager.
151-
:raises MethodNotImplemented:
152-
if multiplexed sessions are not supported.
135+
153136
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
154137
:returns: a new multiplexed session.
155138
"""
@@ -159,34 +142,34 @@ def _build_multiplexed_session(self) -> Session:
159142
database_role=self._database.database_role,
160143
is_multiplexed=True,
161144
)
162-
163145
session.create()
164146

165-
self._logger.info("Created multiplexed session.")
147+
self._database.logger.info("Created multiplexed session.")
166148

167149
return session
168150

169151
def _disable_multiplexed_sessions(self) -> None:
170152
"""Disables multiplexed sessions for all transactions."""
171153

172154
self._multiplexed_session = None
173-
self._is_multiplexed_sessions_disabled_event.set()
174-
self._database.session_options.disable_multiplexed(self._logger)
155+
self._multiplexed_session_disabled_event.set()
156+
self._database.session_options.disable_multiplexed(self._database.logger)
175157

176-
def _build_maintenance_thread(self) -> threading.Thread:
158+
def _build_maintenance_thread(self) -> Thread:
177159
"""Builds and returns a multiplexed session maintenance thread for
178160
the database session manager. This thread will periodically delete
179161
and recreate the multiplexed session to ensure that it is always valid.
162+
180163
:rtype: :class:`threading.Thread`
181164
:returns: a multiplexed session maintenance thread.
182165
"""
183166

184167
# Use a weak reference to the database session manager to avoid
185168
# creating a circular reference that would prevent the database
186169
# session manager from being garbage collected.
187-
session_manager_ref = weakref.ref(self)
170+
session_manager_ref = ref(self)
188171

189-
return threading.Thread(
172+
return Thread(
190173
target=self._maintain_multiplexed_session,
191174
name=f"maintenance-multiplexed-session-{self._multiplexed_session.name}",
192175
args=[session_manager_ref],
@@ -196,10 +179,12 @@ def _build_maintenance_thread(self) -> threading.Thread:
196179
@staticmethod
197180
def _maintain_multiplexed_session(session_manager_ref) -> None:
198181
"""Maintains the multiplexed session for the database session manager.
182+
199183
This method will delete and recreate the referenced database session manager's
200184
multiplexed session to ensure that it is always valid. The method will run until
201185
the database session manager is deleted, the multiplexed session is deleted, or
202186
building a multiplexed session fails.
187+
203188
:type session_manager_ref: :class:`_weakref.ReferenceType`
204189
:param session_manager_ref: A weak reference to the database session manager.
205190
"""
@@ -215,7 +200,7 @@ def _maintain_multiplexed_session(session_manager_ref) -> None:
215200
session_manager._MAINTENANCE_THREAD_REFRESH_INTERVAL.total_seconds()
216201
)
217202

218-
session_created_time = time.time()
203+
session_created_time = time()
219204

220205
while True:
221206
# Terminate the thread is the database session manager has been deleted.
@@ -224,26 +209,18 @@ def _maintain_multiplexed_session(session_manager_ref) -> None:
224209
return
225210

226211
# Terminate the thread if the use of multiplexed sessions has been disabled.
227-
if session_manager._is_multiplexed_sessions_disabled_event.is_set():
212+
if session_manager._multiplexed_session_disabled_event.is_set():
228213
return
229214

230215
# Wait for until the refresh interval has elapsed.
231-
if time.time() - session_created_time < refresh_interval_seconds:
232-
time.sleep(polling_interval_seconds)
216+
if time() - session_created_time < refresh_interval_seconds:
217+
sleep(polling_interval_seconds)
233218
continue
234219

235220
with session_manager._multiplexed_session_lock:
236221
session_manager._multiplexed_session.delete()
222+
session_manager._multiplexed_session = (
223+
session_manager._build_multiplexed_session()
224+
)
237225

238-
try:
239-
session_manager._multiplexed_session = (
240-
session_manager._build_multiplexed_session()
241-
)
242-
243-
# Disable multiplexed sessions for all transactions and terminate
244-
# the thread if building a multiplexed session fails.
245-
except MethodNotImplemented:
246-
session_manager._disable_multiplexed_sessions()
247-
return
248-
249-
session_created_time = time.time()
226+
session_created_time = time()

tests/_builders.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,75 @@
1414

1515
from mock import create_autospec
1616

17+
# Default values used to populate required or expected attributes.
18+
# Tests should not depend on them: if a test requires a specific
19+
# identifier or name, it should set it explicitly.
20+
_PROJECT_ID = "default-project-id"
21+
_INSTANCE_ID = "default-instance-id"
22+
_DATABASE_ID = "default-database-id"
23+
1724

1825
def build_logger():
1926
"""Builds and returns a logger for testing."""
2027
from logging import Logger
2128

2229
return create_autospec(Logger, instance=True)
30+
31+
32+
# Client objects
33+
# --------------
34+
35+
36+
def build_client(**kwargs):
37+
"""Builds and returns a client for testing using the given arguments.
38+
If a required argument is not provided, a default value will be used."""
39+
from google.cloud.spanner_v1 import Client
40+
41+
if "project" not in kwargs:
42+
kwargs["project"] = _PROJECT_ID
43+
44+
return Client(**kwargs)
45+
46+
47+
def build_database(**kwargs):
48+
"""Builds and returns a database for testing using the given arguments.
49+
If a required argument is not provided, a default value will be used.."""
50+
from google.cloud.spanner_v1.database import Database
51+
52+
if "database_id" not in kwargs:
53+
kwargs["database_id"] = _DATABASE_ID
54+
55+
if "logger" not in kwargs:
56+
kwargs["logger"] = build_logger()
57+
58+
if "instance" not in kwargs or isinstance(kwargs["instance"], dict):
59+
instance_args = kwargs.pop("instance", {})
60+
kwargs["instance"] = build_instance(**instance_args)
61+
62+
database = Database(**kwargs)
63+
database._spanner_api = build_spanner_api()
64+
65+
return database
66+
67+
68+
def build_instance(**kwargs):
69+
"""Builds and returns an instance for testing using the given arguments.
70+
If a required argument is not provided, a default value will be used."""
71+
from google.cloud.spanner_v1.instance import Instance
72+
73+
if "instance_id" not in kwargs:
74+
kwargs["instance_id"] = _INSTANCE_ID
75+
76+
if "client" not in kwargs or isinstance(kwargs["client"], dict):
77+
client_args = kwargs.pop("client", {})
78+
kwargs["client"] = build_client(**client_args)
79+
80+
return Instance(**kwargs)
81+
82+
83+
def build_spanner_api():
84+
"""Builds and returns a mock Spanner Client API for testing using the given arguments.
85+
Commonly used methods are mocked to return default values."""
86+
from google.cloud.spanner_v1 import SpannerClient
87+
88+
return create_autospec(SpannerClient, instance=True)

0 commit comments

Comments
 (0)