Skip to content

Commit 8749101

Browse files
committed
Merge pull request #1495 from dhermes/bigtable-read-rows
Adding Bigtable Table.read_row().
2 parents bc718d3 + 123eca0 commit 8749101

File tree

2 files changed

+315
-0
lines changed

2 files changed

+315
-0
lines changed

gcloud/bigtable/table.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
"""User friendly container for Google Cloud Bigtable Table."""
1616

1717

18+
from gcloud._helpers import _to_bytes
19+
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
1820
from gcloud.bigtable._generated import (
1921
bigtable_table_service_messages_pb2 as messages_pb2)
2022
from gcloud.bigtable._generated import (
2123
bigtable_service_messages_pb2 as data_messages_pb2)
2224
from gcloud.bigtable.column_family import _gc_rule_from_pb
2325
from gcloud.bigtable.column_family import ColumnFamily
2426
from gcloud.bigtable.row import Row
27+
from gcloud.bigtable.row_data import PartialRowData
2528

2629

2730
class Table(object):
@@ -218,6 +221,40 @@ def list_column_families(self):
218221
result[column_family_id] = column_family
219222
return result
220223

224+
def read_row(self, row_key, filter_=None):
225+
"""Read a single row from this table.
226+
227+
:type row_key: bytes
228+
:param row_key: The key of the row to read from.
229+
230+
:type filter_: :class:`.row.RowFilter`
231+
:param filter_: (Optional) The filter to apply to the contents of the
232+
row. If unset, returns the entire row.
233+
234+
:rtype: :class:`.PartialRowData`, :data:`NoneType <types.NoneType>`
235+
:returns: The contents of the row if any chunks were returned in
236+
the response, otherwise :data:`None`.
237+
:raises: :class:`ValueError <exceptions.ValueError>` if a commit row
238+
chunk is never encountered.
239+
"""
240+
request_pb = _create_row_request(self.name, row_key=row_key,
241+
filter_=filter_)
242+
client = self._cluster._client
243+
response_iterator = client._data_stub.ReadRows(request_pb,
244+
client.timeout_seconds)
245+
# We expect an iterator of `data_messages_pb2.ReadRowsResponse`
246+
result = PartialRowData(row_key)
247+
for read_rows_response in response_iterator:
248+
result.update_from_read_rows(read_rows_response)
249+
250+
# Make sure the result actually contains data.
251+
if not result._chunks_encountered:
252+
return None
253+
# Make sure the result was committed by the back-end.
254+
if not result.committed:
255+
raise ValueError('The row remains partial / is not committed.')
256+
return result
257+
221258
def sample_row_keys(self):
222259
"""Read a sample of row keys in the table.
223260
@@ -255,3 +292,80 @@ def sample_row_keys(self):
255292
response_iterator = client._data_stub.SampleRowKeys(
256293
request_pb, client.timeout_seconds)
257294
return response_iterator
295+
296+
297+
def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
298+
filter_=None, allow_row_interleaving=None, limit=None):
299+
"""Creates a request to read rows in a table.
300+
301+
:type table_name: str
302+
:param table_name: The name of the table to read from.
303+
304+
:type row_key: bytes
305+
:param row_key: (Optional) The key of a specific row to read from.
306+
307+
:type start_key: bytes
308+
:param start_key: (Optional) The beginning of a range of row keys to
309+
read from. The range will include ``start_key``. If
310+
left empty, will be interpreted as the empty string.
311+
312+
:type end_key: bytes
313+
:param end_key: (Optional) The end of a range of row keys to read from.
314+
The range will not include ``end_key``. If left empty,
315+
will be interpreted as an infinite string.
316+
317+
:type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`,
318+
:class:`.row.RowFilterUnion` or
319+
:class:`.row.ConditionalRowFilter`
320+
:param filter_: (Optional) The filter to apply to the contents of the
321+
specified row(s). If unset, reads the entire table.
322+
323+
:type allow_row_interleaving: bool
324+
:param allow_row_interleaving: (Optional) By default, rows are read
325+
sequentially, producing results which are
326+
guaranteed to arrive in increasing row
327+
order. Setting
328+
``allow_row_interleaving`` to
329+
:data:`True` allows multiple rows to be
330+
interleaved in the response stream,
331+
which increases throughput but breaks
332+
this guarantee, and may force the
333+
client to use more memory to buffer
334+
partially-received rows.
335+
336+
:type limit: int
337+
:param limit: (Optional) The read will terminate after committing to N
338+
rows' worth of results. The default (zero) is to return
339+
all results. Note that if ``allow_row_interleaving`` is
340+
set to :data:`True`, partial results may be returned for
341+
more than N rows. However, only N ``commit_row`` chunks
342+
will be sent.
343+
344+
:rtype: :class:`data_messages_pb2.ReadRowsRequest`
345+
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
346+
:raises: :class:`ValueError <exceptions.ValueError>` if both
347+
``row_key`` and one of ``start_key`` and ``end_key`` are set
348+
"""
349+
request_kwargs = {'table_name': table_name}
350+
if (row_key is not None and
351+
(start_key is not None or end_key is not None)):
352+
raise ValueError('Row key and row range cannot be '
353+
'set simultaneously')
354+
if row_key is not None:
355+
request_kwargs['row_key'] = _to_bytes(row_key)
356+
if start_key is not None or end_key is not None:
357+
range_kwargs = {}
358+
if start_key is not None:
359+
range_kwargs['start_key'] = _to_bytes(start_key)
360+
if end_key is not None:
361+
range_kwargs['end_key'] = _to_bytes(end_key)
362+
row_range = data_pb2.RowRange(**range_kwargs)
363+
request_kwargs['row_range'] = row_range
364+
if filter_ is not None:
365+
request_kwargs['filter'] = filter_.to_pb()
366+
if allow_row_interleaving is not None:
367+
request_kwargs['allow_row_interleaving'] = allow_row_interleaving
368+
if limit is not None:
369+
request_kwargs['num_rows_limit'] = limit
370+
371+
return data_messages_pb2.ReadRowsRequest(**request_kwargs)

gcloud/bigtable/test_table.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,85 @@ def test_delete(self):
291291
{},
292292
)])
293293

294+
def _read_row_helper(self, chunks):
295+
from gcloud._testing import _Monkey
296+
from gcloud.bigtable._generated import (
297+
bigtable_service_messages_pb2 as messages_pb2)
298+
from gcloud.bigtable._testing import _FakeStub
299+
from gcloud.bigtable.row_data import PartialRowData
300+
from gcloud.bigtable import table as MUT
301+
302+
project_id = 'project-id'
303+
zone = 'zone'
304+
cluster_id = 'cluster-id'
305+
table_id = 'table-id'
306+
timeout_seconds = 596
307+
client = _Client(timeout_seconds=timeout_seconds)
308+
cluster_name = ('projects/' + project_id + '/zones/' + zone +
309+
'/clusters/' + cluster_id)
310+
cluster = _Cluster(cluster_name, client=client)
311+
table = self._makeOne(table_id, cluster)
312+
313+
# Create request_pb
314+
request_pb = object() # Returned by our mock.
315+
mock_created = []
316+
317+
def mock_create_row_request(table_name, row_key, filter_):
318+
mock_created.append((table_name, row_key, filter_))
319+
return request_pb
320+
321+
# Create response_iterator
322+
row_key = b'row-key'
323+
response_pb = messages_pb2.ReadRowsResponse(row_key=row_key,
324+
chunks=chunks)
325+
response_iterator = [response_pb]
326+
327+
# Patch the stub used by the API method.
328+
client._data_stub = stub = _FakeStub(response_iterator)
329+
330+
# Create expected_result.
331+
if chunks:
332+
expected_result = PartialRowData(row_key)
333+
expected_result._committed = True
334+
expected_result._chunks_encountered = True
335+
else:
336+
expected_result = None
337+
338+
# Perform the method and check the result.
339+
filter_obj = object()
340+
with _Monkey(MUT, _create_row_request=mock_create_row_request):
341+
result = table.read_row(row_key, filter_=filter_obj)
342+
343+
self.assertEqual(result, expected_result)
344+
self.assertEqual(stub.method_calls, [(
345+
'ReadRows',
346+
(request_pb, timeout_seconds),
347+
{},
348+
)])
349+
self.assertEqual(mock_created, [(table.name, row_key, filter_obj)])
350+
351+
def test_read_row(self):
352+
from gcloud.bigtable._generated import (
353+
bigtable_service_messages_pb2 as messages_pb2)
354+
355+
chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True)
356+
chunks = [chunk]
357+
self._read_row_helper(chunks)
358+
359+
def test_read_empty_row(self):
360+
chunks = []
361+
self._read_row_helper(chunks)
362+
363+
def test_read_row_still_partial(self):
364+
from gcloud.bigtable._generated import (
365+
bigtable_service_messages_pb2 as messages_pb2)
366+
367+
# There is never a "commit row".
368+
chunk = messages_pb2.ReadRowsResponse.Chunk(reset_row=True)
369+
chunks = [chunk]
370+
with self.assertRaises(ValueError):
371+
self._read_row_helper(chunks)
372+
294373
def test_sample_row_keys(self):
295374
from gcloud.bigtable._generated import (
296375
bigtable_service_messages_pb2 as messages_pb2)
@@ -331,6 +410,128 @@ def test_sample_row_keys(self):
331410
)])
332411

333412

413+
class Test__create_row_request(unittest2.TestCase):
414+
415+
def _callFUT(self, table_name, row_key=None, start_key=None, end_key=None,
416+
filter_=None, allow_row_interleaving=None, limit=None):
417+
from gcloud.bigtable.table import _create_row_request
418+
return _create_row_request(
419+
table_name, row_key=row_key, start_key=start_key, end_key=end_key,
420+
filter_=filter_, allow_row_interleaving=allow_row_interleaving,
421+
limit=limit)
422+
423+
def test_table_name_only(self):
424+
from gcloud.bigtable._generated import (
425+
bigtable_service_messages_pb2 as messages_pb2)
426+
427+
table_name = 'table_name'
428+
result = self._callFUT(table_name)
429+
expected_result = messages_pb2.ReadRowsRequest(table_name=table_name)
430+
self.assertEqual(result, expected_result)
431+
432+
def test_row_key_row_range_conflict(self):
433+
with self.assertRaises(ValueError):
434+
self._callFUT(None, row_key=object(), end_key=object())
435+
436+
def test_row_key(self):
437+
from gcloud.bigtable._generated import (
438+
bigtable_service_messages_pb2 as messages_pb2)
439+
440+
table_name = 'table_name'
441+
row_key = b'row_key'
442+
result = self._callFUT(table_name, row_key=row_key)
443+
expected_result = messages_pb2.ReadRowsRequest(
444+
table_name=table_name,
445+
row_key=row_key,
446+
)
447+
self.assertEqual(result, expected_result)
448+
449+
def test_row_range_start_key(self):
450+
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
451+
from gcloud.bigtable._generated import (
452+
bigtable_service_messages_pb2 as messages_pb2)
453+
454+
table_name = 'table_name'
455+
start_key = b'start_key'
456+
result = self._callFUT(table_name, start_key=start_key)
457+
expected_result = messages_pb2.ReadRowsRequest(
458+
table_name=table_name,
459+
row_range=data_pb2.RowRange(start_key=start_key),
460+
)
461+
self.assertEqual(result, expected_result)
462+
463+
def test_row_range_end_key(self):
464+
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
465+
from gcloud.bigtable._generated import (
466+
bigtable_service_messages_pb2 as messages_pb2)
467+
468+
table_name = 'table_name'
469+
end_key = b'end_key'
470+
result = self._callFUT(table_name, end_key=end_key)
471+
expected_result = messages_pb2.ReadRowsRequest(
472+
table_name=table_name,
473+
row_range=data_pb2.RowRange(end_key=end_key),
474+
)
475+
self.assertEqual(result, expected_result)
476+
477+
def test_row_range_both_keys(self):
478+
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
479+
from gcloud.bigtable._generated import (
480+
bigtable_service_messages_pb2 as messages_pb2)
481+
482+
table_name = 'table_name'
483+
start_key = b'start_key'
484+
end_key = b'end_key'
485+
result = self._callFUT(table_name, start_key=start_key,
486+
end_key=end_key)
487+
expected_result = messages_pb2.ReadRowsRequest(
488+
table_name=table_name,
489+
row_range=data_pb2.RowRange(start_key=start_key, end_key=end_key),
490+
)
491+
self.assertEqual(result, expected_result)
492+
493+
def test_with_filter(self):
494+
from gcloud.bigtable._generated import (
495+
bigtable_service_messages_pb2 as messages_pb2)
496+
from gcloud.bigtable.row import RowSampleFilter
497+
498+
table_name = 'table_name'
499+
row_filter = RowSampleFilter(0.33)
500+
result = self._callFUT(table_name, filter_=row_filter)
501+
expected_result = messages_pb2.ReadRowsRequest(
502+
table_name=table_name,
503+
filter=row_filter.to_pb(),
504+
)
505+
self.assertEqual(result, expected_result)
506+
507+
def test_with_allow_row_interleaving(self):
508+
from gcloud.bigtable._generated import (
509+
bigtable_service_messages_pb2 as messages_pb2)
510+
511+
table_name = 'table_name'
512+
allow_row_interleaving = True
513+
result = self._callFUT(table_name,
514+
allow_row_interleaving=allow_row_interleaving)
515+
expected_result = messages_pb2.ReadRowsRequest(
516+
table_name=table_name,
517+
allow_row_interleaving=allow_row_interleaving,
518+
)
519+
self.assertEqual(result, expected_result)
520+
521+
def test_with_limit(self):
522+
from gcloud.bigtable._generated import (
523+
bigtable_service_messages_pb2 as messages_pb2)
524+
525+
table_name = 'table_name'
526+
limit = 1337
527+
result = self._callFUT(table_name, limit=limit)
528+
expected_result = messages_pb2.ReadRowsRequest(
529+
table_name=table_name,
530+
num_rows_limit=limit,
531+
)
532+
self.assertEqual(result, expected_result)
533+
534+
334535
class _Client(object):
335536

336537
data_stub = None

0 commit comments

Comments
 (0)