Skip to content

Commit 659e34c

Browse files
committed
Implementing consume_*() methods on Bigtable PartialRowsData.
These methods read from a stream of ReadRowsResponse's, parse them and then store them on the PartialRowsData instance.
1 parent d332809 commit 659e34c

File tree

2 files changed

+174
-0
lines changed

2 files changed

+174
-0
lines changed

gcloud/bigtable/row_data.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,19 @@ def clear(self):
147147
self._chunks_encountered = False
148148
self._cells.clear()
149149

150+
def update_from_read_rows(self, read_rows_response_pb):
151+
"""Updates the current row from a ``ReadRows`` response.
152+
153+
:type read_rows_response_pb:
154+
:class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse`
155+
:param read_rows_response_pb: A response streamed back as part of a
156+
``ReadRows`` request.
157+
158+
:raises: :class:`NotImplementedError <exceptions.NotImplementedError>`
159+
always
160+
"""
161+
raise NotImplementedError
162+
150163

151164
class PartialRowsData(object):
152165
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.
@@ -180,3 +193,45 @@ def rows(self):
180193
# NOTE: To avoid duplicating large objects, this is just the
181194
# mutable private data.
182195
return self._rows
196+
197+
def cancel(self):
198+
"""Cancels the iterator, closing the stream."""
199+
self._response_iterator.cancel()
200+
201+
def consume_next(self):
202+
"""Consumes the next ``ReadRowsResponse`` from the stream.
203+
204+
Parses the response and stores it as a :class:`PartialRowData`
205+
in a dictionary owned by this object.
206+
207+
:raises: :class:`StopIteration <exceptions.StopIteration>` if the
208+
response iterator has no more responses to stream.
209+
"""
210+
read_rows_response = self._response_iterator.next()
211+
row_key = read_rows_response.row_key
212+
partial_row = self._rows.get(row_key)
213+
if partial_row is None:
214+
partial_row = self._rows[row_key] = PartialRowData(row_key)
215+
# NOTE: This is not atomic in the case of failures.
216+
partial_row.update_from_read_rows(read_rows_response)
217+
218+
def consume_all(self, max_loops=None):
219+
"""Consume the streamed responses until there are no more.
220+
221+
This simply calls :meth:`consume_next` until there are no
222+
more to consume.
223+
224+
:type max_loops: int
225+
:param max_loops: (Optional) Maximum number of times to try to consume
226+
an additional ``ReadRowsResponse``. You can use this
227+
to avoid long wait times.
228+
"""
229+
curr_loop = 0
230+
if max_loops is None:
231+
max_loops = float('inf')
232+
while curr_loop < max_loops:
233+
curr_loop += 1
234+
try:
235+
self.consume_next()
236+
except StopIteration:
237+
break

gcloud/bigtable/test_row_data.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,34 @@ def test_clear(self):
207207
self.assertFalse(partial_row_data._chunks_encountered)
208208
self.assertEqual(partial_row_data.cells, {})
209209

210+
def test_update_from_read_rows(self):
211+
partial_row_data = self._makeOne(None)
212+
with self.assertRaises(NotImplementedError):
213+
partial_row_data.update_from_read_rows(None)
214+
210215

211216
class TestPartialRowsData(unittest2.TestCase):
212217

213218
def _getTargetClass(self):
214219
from gcloud.bigtable.row_data import PartialRowsData
215220
return PartialRowsData
216221

222+
def _getDoNothingClass(self):
223+
klass = self._getTargetClass()
224+
225+
class FakePartialRowsData(klass):
226+
227+
def __init__(self, *args, **kwargs):
228+
super(FakePartialRowsData, self).__init__(*args, **kwargs)
229+
self._consumed = []
230+
231+
def consume_next(self):
232+
value = self._response_iterator.next()
233+
self._consumed.append(value)
234+
return value
235+
236+
return FakePartialRowsData
237+
217238
def _makeOne(self, *args, **kwargs):
218239
return self._getTargetClass()(*args, **kwargs)
219240

@@ -253,3 +274,101 @@ def test_rows_getter(self):
253274
partial_rows_data = self._makeOne(None)
254275
partial_rows_data._rows = value = object()
255276
self.assertTrue(partial_rows_data.rows is value)
277+
278+
def test_cancel(self):
279+
response_iterator = _MockCancellableIterator()
280+
partial_rows_data = self._makeOne(response_iterator)
281+
self.assertEqual(response_iterator.cancel_calls, 0)
282+
partial_rows_data.cancel()
283+
self.assertEqual(response_iterator.cancel_calls, 1)
284+
285+
def test_consume_next(self):
286+
from gcloud._testing import _Monkey
287+
from gcloud.bigtable._generated import (
288+
bigtable_service_messages_pb2 as messages_pb2)
289+
from gcloud.bigtable import row_data as MUT
290+
291+
row_key = b'row-key'
292+
value_pb = messages_pb2.ReadRowsResponse(row_key=row_key)
293+
response_iterator = _MockCancellableIterator(value_pb)
294+
partial_rows_data = self._makeOne(response_iterator)
295+
self.assertEqual(partial_rows_data.rows, {})
296+
with _Monkey(MUT, PartialRowData=_MockPartialRowData):
297+
partial_rows_data.consume_next()
298+
self.assertEqual(len(partial_rows_data.rows), 1)
299+
partial_row = partial_rows_data.rows[row_key]
300+
self.assertTrue(isinstance(partial_row, _MockPartialRowData))
301+
self.assertEqual(partial_row.row_key, row_key)
302+
self.assertEqual(partial_row.read_rows_updates, [value_pb])
303+
304+
def test_consume_next_row_exists(self):
305+
from gcloud._testing import _Monkey
306+
from gcloud.bigtable._generated import (
307+
bigtable_service_messages_pb2 as messages_pb2)
308+
from gcloud.bigtable import row_data as MUT
309+
310+
row_key = b'row-key'
311+
chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True)
312+
value_pb = messages_pb2.ReadRowsResponse(row_key=row_key,
313+
chunks=[chunk])
314+
response_iterator = _MockCancellableIterator(value_pb)
315+
partial_rows_data = self._makeOne(response_iterator)
316+
existing_values = _MockPartialRowData(row_key)
317+
partial_rows_data._rows[row_key] = existing_values
318+
self.assertFalse(existing_values.committed)
319+
with _Monkey(MUT, PartialRowData=_MockPartialRowData):
320+
partial_rows_data.consume_next()
321+
self.assertEqual(existing_values.read_rows_updates, [value_pb])
322+
323+
def test_consume_next_empty_iter(self):
324+
response_iterator = _MockCancellableIterator()
325+
partial_rows_data = self._makeOne(response_iterator)
326+
with self.assertRaises(StopIteration):
327+
partial_rows_data.consume_next()
328+
329+
def test_consume_all(self):
330+
klass = self._getDoNothingClass()
331+
332+
value1, value2, value3 = object(), object(), object()
333+
response_iterator = _MockCancellableIterator(value1, value2, value3)
334+
partial_rows_data = klass(response_iterator)
335+
self.assertEqual(partial_rows_data._consumed, [])
336+
partial_rows_data.consume_all()
337+
self.assertEqual(partial_rows_data._consumed, [value1, value2, value3])
338+
339+
def test_consume_all_with_max_loops(self):
340+
klass = self._getDoNothingClass()
341+
342+
value1, value2, value3 = object(), object(), object()
343+
response_iterator = _MockCancellableIterator(value1, value2, value3)
344+
partial_rows_data = klass(response_iterator)
345+
self.assertEqual(partial_rows_data._consumed, [])
346+
partial_rows_data.consume_all(max_loops=1)
347+
self.assertEqual(partial_rows_data._consumed, [value1])
348+
# Make sure the iterator still has the remaining values.
349+
self.assertEqual(list(response_iterator.iter_values), [value2, value3])
350+
351+
352+
class _MockCancellableIterator(object):
353+
354+
cancel_calls = 0
355+
356+
def __init__(self, *values):
357+
self.iter_values = iter(values)
358+
359+
def cancel(self):
360+
self.cancel_calls += 1
361+
362+
def next(self):
363+
return next(self.iter_values)
364+
365+
366+
class _MockPartialRowData(object):
367+
368+
def __init__(self, row_key):
369+
self.row_key = row_key
370+
self.committed = False
371+
self.read_rows_updates = []
372+
373+
def update_from_read_rows(self, read_rows_response):
374+
self.read_rows_updates.append(read_rows_response)

0 commit comments

Comments
 (0)