Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion gcloud/bigquery/test_table.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# pylint: disable=too-many-lines
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion gcloud/bigtable/happybase/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _get_cluster(timeout=None):
:rtype: :class:`gcloud.bigtable.cluster.Cluster`
:returns: The unique cluster owned by the project inferred from
the environment.
:raises: :class:`ValueError <exceptions.ValueError>` if their is a failed
:raises: :class:`ValueError <exceptions.ValueError>` if there is a failed
zone or any number of clusters other than one.
"""
client_kwargs = {'admin': True}
Expand Down
108 changes: 108 additions & 0 deletions gcloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
from gcloud._helpers import _microseconds_from_datetime
from gcloud._helpers import _to_bytes
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)


_MAX_MUTATIONS = 100000
_PACK_I64 = struct.Struct('>q').pack


Expand Down Expand Up @@ -341,6 +344,111 @@ def delete_cells(self, column_family_id, columns, time_range=None,
# processed without error.
mutations_list.extend(to_append)

def _commit_mutate(self):
"""Makes a ``MutateRow`` API request.

Assumes no filter is set on the :class:`Row` and is meant to be called
by :meth:`commit`.

:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the ``_MAX_MUTATIONS``.
"""
mutations_list = self._get_mutations()
num_mutations = len(mutations_list)
if num_mutations == 0:
return
if num_mutations > _MAX_MUTATIONS:
raise ValueError('%d total mutations exceed the maximum allowable '
'%d.' % (num_mutations, _MAX_MUTATIONS))
request_pb = messages_pb2.MutateRowRequest(
table_name=self._table.name,
row_key=self._row_key,
mutations=mutations_list,
)
# We expect a `google.protobuf.empty_pb2.Empty`
client = self._table._cluster._client
client._data_stub.MutateRow(request_pb, client.timeout_seconds)

def _commit_check_and_mutate(self):
"""Makes a ``CheckAndMutateRow`` API request.

Assumes a filter is set on the :class:`Row` and is meant to be called
by :meth:`commit`.

:rtype: bool
:returns: Flag indicating if the filter was matched (which also
indicates which set of mutations were applied by the server).
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the ``_MAX_MUTATIONS``.
"""
true_mutations = self._get_mutations(state=True)
false_mutations = self._get_mutations(state=False)
num_true_mutations = len(true_mutations)
num_false_mutations = len(false_mutations)
if num_true_mutations == 0 and num_false_mutations == 0:
return
if (num_true_mutations > _MAX_MUTATIONS or
num_false_mutations > _MAX_MUTATIONS):
raise ValueError(
'Exceed the maximum allowable mutations (%d). Had %s true '
'mutations and %d false mutations.' % (
_MAX_MUTATIONS, num_true_mutations, num_false_mutations))

request_pb = messages_pb2.CheckAndMutateRowRequest(
table_name=self._table.name,
row_key=self._row_key,
predicate_filter=self._filter.to_pb(),
true_mutations=true_mutations,
false_mutations=false_mutations,
)
# We expect a `.messages_pb2.CheckAndMutateRowResponse`
client = self._table._cluster._client
resp = client._data_stub.CheckAndMutateRow(
request_pb, client.timeout_seconds)
return resp.predicate_matched

def clear_mutations(self):
"""Removes all currently accumulated mutations on the current row."""
if self._filter is None:
del self._pb_mutations[:]
else:
del self._true_pb_mutations[:]
del self._false_pb_mutations[:]

def commit(self):
"""Makes a ``MutateRow`` or ``CheckAndMutateRow`` API request.

If no mutations have been created in the row, no request is made.

Mutations are applied atomically and in order, meaning that earlier
mutations can be masked / negated by later ones. Cells already present
in the row are left unchanged unless explicitly changed by a mutation.

After committing the accumulated mutations, resets the local
mutations to an empty list.

In the case that a filter is set on the :class:`Row`, the mutations
will be applied conditionally, based on whether the filter matches
any cells in the :class:`Row` or not. (Each method which adds a
mutation has a ``state`` parameter for this purpose.)

:rtype: :class:`bool` or :data:`NoneType <types.NoneType>`
:returns: :data:`None` if there is no filter, otherwise a flag
indicating if the filter was matched (which also
indicates which set of mutations were applied by the server).
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the ``_MAX_MUTATIONS``.
"""
if self._filter is None:
result = self._commit_mutate()
else:
result = self._commit_check_and_mutate()

# Reset mutations after commit-ing request.
self.clear_mutations()

return result


class RowFilter(object):
"""Basic filter to apply to cells in a row.
Expand Down
209 changes: 209 additions & 0 deletions gcloud/bigtable/test_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,194 @@ def test_delete_cells_with_string_columns(self):
)
self.assertEqual(row._pb_mutations, [expected_pb1, expected_pb2])

def test_commit(self):
from google.protobuf import empty_pb2
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
table_name = 'projects/more-stuff'
column_family_id = u'column_family_id'
column = b'column'
timeout_seconds = 711
client = _Client(timeout_seconds=timeout_seconds)
table = _Table(table_name, client=client)
row = self._makeOne(row_key, table)

# Create request_pb
value = b'bytes-value'
mutation = data_pb2.Mutation(
set_cell=data_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
timestamp_micros=-1, # Default value.
value=value,
),
)
request_pb = messages_pb2.MutateRowRequest(
table_name=table_name,
row_key=row_key,
mutations=[mutation],
)

# Create response_pb
response_pb = empty_pb2.Empty()

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_pb)

# Create expected_result.
expected_result = None # commit() has no return value when no filter.

# Perform the method and check the result.
row.set_cell(column_family_id, column, value)
result = row.commit()
self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'MutateRow',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(row._pb_mutations, [])
self.assertEqual(row._true_pb_mutations, None)
self.assertEqual(row._false_pb_mutations, None)

def test_commit_too_many_mutations(self):
from gcloud._testing import _Monkey
from gcloud.bigtable import row as MUT

row_key = b'row_key'
table = object()
row = self._makeOne(row_key, table)
row._pb_mutations = [1, 2, 3]
num_mutations = len(row._pb_mutations)
with _Monkey(MUT, _MAX_MUTATIONS=num_mutations - 1):
with self.assertRaises(ValueError):
row.commit()

def test_commit_no_mutations(self):
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
client = _Client()
table = _Table(None, client=client)
row = self._makeOne(row_key, table)
self.assertEqual(row._pb_mutations, [])

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub()

# Perform the method and check the result.
result = row.commit()
self.assertEqual(result, None)
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])

def test_commit_with_filter(self):
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub
from gcloud.bigtable.row import RowSampleFilter

row_key = b'row_key'
table_name = 'projects/more-stuff'
column_family_id = u'column_family_id'
column = b'column'
timeout_seconds = 262
client = _Client(timeout_seconds=timeout_seconds)
table = _Table(table_name, client=client)
row_filter = RowSampleFilter(0.33)
row = self._makeOne(row_key, table, filter_=row_filter)

# Create request_pb
value1 = b'bytes-value'
mutation1 = data_pb2.Mutation(
set_cell=data_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
timestamp_micros=-1, # Default value.
value=value1,
),
)
value2 = b'other-bytes'
mutation2 = data_pb2.Mutation(
set_cell=data_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
timestamp_micros=-1, # Default value.
value=value2,
),
)
request_pb = messages_pb2.CheckAndMutateRowRequest(
table_name=table_name,
row_key=row_key,
predicate_filter=row_filter.to_pb(),
true_mutations=[mutation1],
false_mutations=[mutation2],
)

# Create response_pb
predicate_matched = True
response_pb = messages_pb2.CheckAndMutateRowResponse(
predicate_matched=predicate_matched)

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_pb)

# Create expected_result.
expected_result = predicate_matched

# Perform the method and check the result.
row.set_cell(column_family_id, column, value1, state=True)
row.set_cell(column_family_id, column, value2, state=False)
result = row.commit()
self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'CheckAndMutateRow',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(row._pb_mutations, None)
self.assertEqual(row._true_pb_mutations, [])
self.assertEqual(row._false_pb_mutations, [])

def test_commit_with_filter_too_many_mutations(self):
from gcloud._testing import _Monkey
from gcloud.bigtable import row as MUT

row_key = b'row_key'
table = object()
filter_ = object()
row = self._makeOne(row_key, table, filter_=filter_)
row._true_pb_mutations = [1, 2, 3]
num_mutations = len(row._true_pb_mutations)
with _Monkey(MUT, _MAX_MUTATIONS=num_mutations - 1):
with self.assertRaises(ValueError):
row.commit()

def test_commit_with_filter_no_mutations(self):
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
client = _Client()
table = _Table(None, client=client)
filter_ = object()
row = self._makeOne(row_key, table, filter_=filter_)
self.assertEqual(row._true_pb_mutations, [])
self.assertEqual(row._false_pb_mutations, [])

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub()

# Perform the method and check the result.
result = row.commit()
self.assertEqual(result, None)
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])


class Test_BoolFilter(unittest2.TestCase):

Expand Down Expand Up @@ -1345,3 +1533,24 @@ def test_to_pb_false_only(self):
),
)
self.assertEqual(filter_pb, expected_pb)


class _Client(object):

data_stub = None

def __init__(self, timeout_seconds=None):
self.timeout_seconds = timeout_seconds


class _Cluster(object):

def __init__(self, client=None):
self._client = client


class _Table(object):

def __init__(self, name, client=None):
self.name = name
self._cluster = _Cluster(client)
Loading