Skip to content

Commit 14263ee

Browse files
committed
Merge pull request #1493 from dhermes/bigtable-complete-partial-rows-data
Implementing consume_*() methods on Bigtable PartialRowsData.
2 parents 0891906 + 3d98eb7 commit 14263ee

File tree

2 files changed

+139
-0
lines changed

2 files changed

+139
-0
lines changed

gcloud/bigtable/row_data.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,45 @@ def rows(self):
279279
# NOTE: To avoid duplicating large objects, this is just the
280280
# mutable private data.
281281
return self._rows
282+
283+
def cancel(self):
284+
"""Cancels the iterator, closing the stream."""
285+
self._response_iterator.cancel()
286+
287+
def consume_next(self):
288+
"""Consumes the next ``ReadRowsResponse`` from the stream.
289+
290+
Parses the response and stores it as a :class:`PartialRowData`
291+
in a dictionary owned by this object.
292+
293+
:raises: :class:`StopIteration <exceptions.StopIteration>` if the
294+
response iterator has no more responses to stream.
295+
"""
296+
read_rows_response = self._response_iterator.next()
297+
row_key = read_rows_response.row_key
298+
partial_row = self._rows.get(row_key)
299+
if partial_row is None:
300+
partial_row = self._rows[row_key] = PartialRowData(row_key)
301+
# NOTE: This is not atomic in the case of failures.
302+
partial_row.update_from_read_rows(read_rows_response)
303+
304+
def consume_all(self, max_loops=None):
305+
"""Consume the streamed responses until there are no more.
306+
307+
This simply calls :meth:`consume_next` until there are no
308+
more to consume.
309+
310+
:type max_loops: int
311+
:param max_loops: (Optional) Maximum number of times to try to consume
312+
an additional ``ReadRowsResponse``. You can use this
313+
to avoid long wait times.
314+
"""
315+
curr_loop = 0
316+
if max_loops is None:
317+
max_loops = float('inf')
318+
while curr_loop < max_loops:
319+
curr_loop += 1
320+
try:
321+
self.consume_next()
322+
except StopIteration:
323+
break

gcloud/bigtable/test_row_data.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,22 @@ def _getTargetClass(self):
386386
from gcloud.bigtable.row_data import PartialRowsData
387387
return PartialRowsData
388388

389+
def _getDoNothingClass(self):
390+
klass = self._getTargetClass()
391+
392+
class FakePartialRowsData(klass):
393+
394+
def __init__(self, *args, **kwargs):
395+
super(FakePartialRowsData, self).__init__(*args, **kwargs)
396+
self._consumed = []
397+
398+
def consume_next(self):
399+
value = self._response_iterator.next()
400+
self._consumed.append(value)
401+
return value
402+
403+
return FakePartialRowsData
404+
389405
def _makeOne(self, *args, **kwargs):
390406
return self._getTargetClass()(*args, **kwargs)
391407

@@ -425,3 +441,84 @@ def test_rows_getter(self):
425441
partial_rows_data = self._makeOne(None)
426442
partial_rows_data._rows = value = object()
427443
self.assertTrue(partial_rows_data.rows is value)
444+
445+
def test_cancel(self):
446+
response_iterator = _MockCancellableIterator()
447+
partial_rows_data = self._makeOne(response_iterator)
448+
self.assertEqual(response_iterator.cancel_calls, 0)
449+
partial_rows_data.cancel()
450+
self.assertEqual(response_iterator.cancel_calls, 1)
451+
452+
def test_consume_next(self):
453+
from gcloud.bigtable._generated import (
454+
bigtable_service_messages_pb2 as messages_pb2)
455+
from gcloud.bigtable.row_data import PartialRowData
456+
457+
row_key = b'row-key'
458+
value_pb = messages_pb2.ReadRowsResponse(row_key=row_key)
459+
response_iterator = _MockCancellableIterator(value_pb)
460+
partial_rows_data = self._makeOne(response_iterator)
461+
self.assertEqual(partial_rows_data.rows, {})
462+
partial_rows_data.consume_next()
463+
expected_rows = {row_key: PartialRowData(row_key)}
464+
self.assertEqual(partial_rows_data.rows, expected_rows)
465+
466+
def test_consume_next_row_exists(self):
467+
from gcloud.bigtable._generated import (
468+
bigtable_service_messages_pb2 as messages_pb2)
469+
from gcloud.bigtable.row_data import PartialRowData
470+
471+
row_key = b'row-key'
472+
chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True)
473+
value_pb = messages_pb2.ReadRowsResponse(row_key=row_key,
474+
chunks=[chunk])
475+
response_iterator = _MockCancellableIterator(value_pb)
476+
partial_rows_data = self._makeOne(response_iterator)
477+
existing_values = PartialRowData(row_key)
478+
partial_rows_data._rows[row_key] = existing_values
479+
self.assertFalse(existing_values.committed)
480+
partial_rows_data.consume_next()
481+
self.assertTrue(existing_values.committed)
482+
self.assertEqual(existing_values.cells, {})
483+
484+
def test_consume_next_empty_iter(self):
485+
response_iterator = _MockCancellableIterator()
486+
partial_rows_data = self._makeOne(response_iterator)
487+
with self.assertRaises(StopIteration):
488+
partial_rows_data.consume_next()
489+
490+
def test_consume_all(self):
491+
klass = self._getDoNothingClass()
492+
493+
value1, value2, value3 = object(), object(), object()
494+
response_iterator = _MockCancellableIterator(value1, value2, value3)
495+
partial_rows_data = klass(response_iterator)
496+
self.assertEqual(partial_rows_data._consumed, [])
497+
partial_rows_data.consume_all()
498+
self.assertEqual(partial_rows_data._consumed, [value1, value2, value3])
499+
500+
def test_consume_all_with_max_loops(self):
501+
klass = self._getDoNothingClass()
502+
503+
value1, value2, value3 = object(), object(), object()
504+
response_iterator = _MockCancellableIterator(value1, value2, value3)
505+
partial_rows_data = klass(response_iterator)
506+
self.assertEqual(partial_rows_data._consumed, [])
507+
partial_rows_data.consume_all(max_loops=1)
508+
self.assertEqual(partial_rows_data._consumed, [value1])
509+
# Make sure the iterator still has the remaining values.
510+
self.assertEqual(list(response_iterator.iter_values), [value2, value3])
511+
512+
513+
class _MockCancellableIterator(object):
514+
515+
cancel_calls = 0
516+
517+
def __init__(self, *values):
518+
self.iter_values = iter(values)
519+
520+
def cancel(self):
521+
self.cancel_calls += 1
522+
523+
def next(self):
524+
return next(self.iter_values)

0 commit comments

Comments
 (0)