Skip to content

Commit 7e97a2e

Browse files
committed
Adding HappyBase Table.scan().
1 parent ade1724 commit 7e97a2e

File tree

2 files changed

+255
-10
lines changed

2 files changed

+255
-10
lines changed

gcloud/bigtable/happybase/table.py

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717

1818
import struct
19+
import warnings
1920

2021
import six
2122

@@ -40,6 +41,7 @@
4041
from gcloud.bigtable.table import Table as _LowLevelTable
4142

4243

44+
_WARN = warnings.warn
4345
_UNPACK_I64 = struct.Struct('>q').unpack
4446
_SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule)
4547

@@ -367,15 +369,63 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
367369
:param kwargs: Remaining keyword arguments. Provided for HappyBase
368370
compatibility.
369371
370-
:raises: :class:`ValueError <exceptions.ValueError>` if ``batch_size``
371-
or ``scan_batching`` are used, or if ``limit`` is set but
372-
non-positive, or if row prefix is used with row start/stop,
372+
:raises: If ``limit`` is set but non-positive, or if row prefix is
373+
used with row start/stop,
373374
:class:`TypeError <exceptions.TypeError>` if a string
374-
``filter`` is used,
375-
:class:`NotImplementedError <exceptions.NotImplementedError>`
376-
always (until the method is implemented).
375+
``filter`` is used.
377376
"""
378-
raise NotImplementedError
377+
legacy_args = []
378+
for kw_name in ('batch_size', 'scan_batching', 'sorted_columns'):
379+
if kw_name in kwargs:
380+
legacy_args.append(kw_name)
381+
kwargs.pop(kw_name)
382+
if legacy_args:
383+
legacy_args = ', '.join(legacy_args)
384+
message = ('The HappyBase legacy arguments %s were used. These '
385+
'arguments are unused by gcloud.' % (legacy_args,))
386+
_WARN(message)
387+
if kwargs:
388+
raise TypeError('Received unexpected arguments', kwargs.keys())
389+
390+
if limit is not None and limit < 1:
391+
raise ValueError('limit must be positive')
392+
if row_prefix is not None:
393+
if row_start is not None or row_stop is not None:
394+
raise ValueError('row_prefix cannot be combined with '
395+
'row_start or row_stop')
396+
row_start = row_prefix
397+
row_stop = _string_successor(row_prefix)
398+
399+
filters = []
400+
if isinstance(filter, six.string_types):
401+
raise TypeError('Specifying filters as a string is not supported '
402+
'by Cloud Bigtable. Use a '
403+
'gcloud.bigtable.row.RowFilter instead.')
404+
elif filter is not None:
405+
filters.append(filter)
406+
407+
if columns is not None:
408+
filters.append(_columns_filter_helper(columns))
409+
# versions == 1 since we only want the latest.
410+
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
411+
filters=filters)
412+
413+
partial_rows_data = self._low_level_table.read_rows(
414+
start_key=row_start, end_key=row_stop,
415+
limit=limit, filter_=filter_)
416+
417+
# Mutable copy of data.
418+
rows_dict = partial_rows_data.rows
419+
while True:
420+
try:
421+
partial_rows_data.consume_next()
422+
row_key, curr_row_data = rows_dict.popitem()
423+
# NOTE: We expect len(rows_dict) == 0, but don't check it.
424+
curr_row_dict = _partial_row_to_dict(
425+
curr_row_data, include_timestamp=include_timestamp)
426+
yield (row_key, curr_row_dict)
427+
except StopIteration:
428+
break
379429

380430
def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL):
381431
"""Insert data into a row in this table.

gcloud/bigtable/happybase/test_table.py

Lines changed: 198 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,13 +507,203 @@ def mock_cells_to_pairs(*args, **kwargs):
507507
self.assertEqual(mock_cells,
508508
[((fake_cells,), to_pairs_kwargs)])
509509

510-
def test_scan(self):
510+
def test_scan_with_batch_size(self):
511+
from gcloud._testing import _Monkey
512+
from gcloud.bigtable.happybase import table as MUT
513+
514+
warned = []
515+
516+
def mock_warn(msg):
517+
warned.append(msg)
518+
511519
name = 'table-name'
512520
connection = None
513521
table = self._makeOne(name, connection)
522+
# Use unknown to force a TypeError, so we don't need to
523+
# stub out the rest of the method.
524+
with self.assertRaises(TypeError):
525+
with _Monkey(MUT, _WARN=mock_warn):
526+
list(table.scan(batch_size=object(), unknown=None))
514527

515-
with self.assertRaises(NotImplementedError):
516-
table.scan()
528+
self.assertEqual(len(warned), 1)
529+
self.assertIn('batch_size', warned[0])
530+
531+
def test_scan_with_scan_batching(self):
532+
from gcloud._testing import _Monkey
533+
from gcloud.bigtable.happybase import table as MUT
534+
535+
warned = []
536+
537+
def mock_warn(msg):
538+
warned.append(msg)
539+
540+
name = 'table-name'
541+
connection = None
542+
table = self._makeOne(name, connection)
543+
# Use unknown to force a TypeError, so we don't need to
544+
# stub out the rest of the method.
545+
with self.assertRaises(TypeError):
546+
with _Monkey(MUT, _WARN=mock_warn):
547+
list(table.scan(scan_batching=object(), unknown=None))
548+
549+
self.assertEqual(len(warned), 1)
550+
self.assertIn('scan_batching', warned[0])
551+
552+
def test_scan_with_sorted_columns(self):
553+
from gcloud._testing import _Monkey
554+
from gcloud.bigtable.happybase import table as MUT
555+
556+
warned = []
557+
558+
def mock_warn(msg):
559+
warned.append(msg)
560+
561+
name = 'table-name'
562+
connection = None
563+
table = self._makeOne(name, connection)
564+
# Use unknown to force a TypeError, so we don't need to
565+
# stub out the rest of the method.
566+
with self.assertRaises(TypeError):
567+
with _Monkey(MUT, _WARN=mock_warn):
568+
list(table.scan(sorted_columns=object(), unknown=None))
569+
570+
self.assertEqual(len(warned), 1)
571+
self.assertIn('sorted_columns', warned[0])
572+
573+
def test_scan_with_invalid_limit(self):
574+
name = 'table-name'
575+
connection = None
576+
table = self._makeOne(name, connection)
577+
with self.assertRaises(ValueError):
578+
list(table.scan(limit=-10))
579+
580+
def test_scan_with_row_prefix_and_row_start(self):
581+
name = 'table-name'
582+
connection = None
583+
table = self._makeOne(name, connection)
584+
with self.assertRaises(ValueError):
585+
list(table.scan(row_prefix='a', row_stop='abc'))
586+
587+
def test_scan_with_string_filter(self):
588+
name = 'table-name'
589+
connection = None
590+
table = self._makeOne(name, connection)
591+
with self.assertRaises(TypeError):
592+
list(table.scan(filter='some-string'))
593+
594+
def _scan_test_helper(self, row_limits=(None, None), row_prefix=None,
595+
columns=None, filter_=None, timestamp=None,
596+
include_timestamp=False, limit=None, rr_result=None,
597+
expected_result=None):
598+
import types
599+
from gcloud._testing import _Monkey
600+
from gcloud.bigtable.happybase import table as MUT
601+
602+
name = 'table-name'
603+
row_start, row_stop = row_limits
604+
connection = None
605+
table = self._makeOne(name, connection)
606+
table._low_level_table = _MockLowLevelTable()
607+
rr_result = rr_result or _MockPartialRowsData()
608+
table._low_level_table.read_rows_result = rr_result
609+
self.assertEqual(rr_result.consume_next_calls, 0)
610+
611+
# Set-up mocks.
612+
fake_col_filter = object()
613+
mock_columns = []
614+
615+
def mock_columns_filter_helper(*args):
616+
mock_columns.append(args)
617+
return fake_col_filter
618+
619+
fake_filter = object()
620+
mock_filters = []
621+
622+
def mock_filter_chain_helper(**kwargs):
623+
mock_filters.append(kwargs)
624+
return fake_filter
625+
626+
with _Monkey(MUT, _filter_chain_helper=mock_filter_chain_helper,
627+
_columns_filter_helper=mock_columns_filter_helper):
628+
result = table.scan(row_start=row_start, row_stop=row_stop,
629+
row_prefix=row_prefix, columns=columns,
630+
filter=filter_, timestamp=timestamp,
631+
include_timestamp=include_timestamp,
632+
limit=limit)
633+
self.assertTrue(isinstance(result, types.GeneratorType))
634+
# Need to consume the result while the monkey patch is applied.
635+
# read_rows_result == Empty PartialRowsData --> No results.
636+
expected_result = expected_result or []
637+
self.assertEqual(list(result), expected_result)
638+
639+
read_rows_args = ()
640+
if row_prefix:
641+
row_start = row_prefix
642+
row_stop = MUT._string_successor(row_prefix)
643+
read_rows_kwargs = {
644+
'end_key': row_stop,
645+
'filter_': fake_filter,
646+
'limit': limit,
647+
'start_key': row_start,
648+
}
649+
self.assertEqual(table._low_level_table.read_rows_calls, [
650+
(read_rows_args, read_rows_kwargs),
651+
])
652+
self.assertEqual(rr_result.consume_next_calls,
653+
rr_result.iterations + 1)
654+
655+
if columns is not None:
656+
self.assertEqual(mock_columns, [(columns,)])
657+
else:
658+
self.assertEqual(mock_columns, [])
659+
660+
filters = []
661+
if filter_ is not None:
662+
filters.append(filter_)
663+
if columns:
664+
filters.append(fake_col_filter)
665+
expected_kwargs = {
666+
'filters': filters,
667+
'versions': 1,
668+
'timestamp': timestamp,
669+
}
670+
self.assertEqual(mock_filters, [expected_kwargs])
671+
672+
def test_scan_with_columns(self):
673+
columns = object()
674+
self._scan_test_helper(columns=columns)
675+
676+
def test_scan_with_row_start_and_stop(self):
677+
row_start = 'bar'
678+
row_stop = 'foo'
679+
row_limits = (row_start, row_stop)
680+
self._scan_test_helper(row_limits=row_limits)
681+
682+
def test_scan_with_row_prefix(self):
683+
row_prefix = 'row-prefi'
684+
self._scan_test_helper(row_prefix=row_prefix)
685+
686+
def test_scan_with_filter(self):
687+
mock_filter = object()
688+
self._scan_test_helper(filter_=mock_filter)
689+
690+
def test_scan_with_no_results(self):
691+
limit = 1337
692+
timestamp = object()
693+
self._scan_test_helper(timestamp=timestamp, limit=limit)
694+
695+
def test_scan_with_results(self):
696+
from gcloud.bigtable.row_data import PartialRowData
697+
698+
row_key1 = 'row-key1'
699+
row1 = PartialRowData(row_key1)
700+
rr_result = _MockPartialRowsData(rows={row_key1: row1}, iterations=1)
701+
702+
include_timestamp = object()
703+
expected_result = [(row_key1, {})]
704+
self._scan_test_helper(include_timestamp=include_timestamp,
705+
rr_result=rr_result,
706+
expected_result=expected_result)
517707

518708
def test_put(self):
519709
from gcloud._testing import _Monkey
@@ -1292,3 +1482,8 @@ def __init__(self, rows=None, iterations=0):
12921482

12931483
def consume_all(self):
12941484
self.consume_all_calls += 1
1485+
1486+
def consume_next(self):
1487+
self.consume_next_calls += 1
1488+
if self.consume_next_calls > self.iterations:
1489+
raise StopIteration

0 commit comments

Comments
 (0)