Skip to content

Commit c271537

Browse files
committed
Adding HappyBase Batch.put() and helpers.
1 parent 5d79d58 commit c271537

File tree

2 files changed

+337
-0
lines changed

2 files changed

+337
-0
lines changed

gcloud/bigtable/happybase/batch.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import datetime
1919
import warnings
2020

21+
import six
22+
2123
from gcloud._helpers import _datetime_from_microseconds
2224
from gcloud.bigtable.row import TimestampRange
2325

@@ -105,6 +107,62 @@ def send(self):
105107
self._row_map.clear()
106108
self._mutation_count = 0
107109

110+
def _try_send(self):
111+
"""Send / commit the batch if mutations have exceeded batch size."""
112+
if self._batch_size and self._mutation_count >= self._batch_size:
113+
self.send()
114+
115+
def _get_row(self, row_key):
116+
"""Gets a row that will hold mutations.
117+
118+
If the row is not already cached on the current batch, a new row will
119+
be created.
120+
121+
:type row_key: str
122+
:param row_key: The row key for a row stored in the map.
123+
124+
:rtype: :class:`Row <gcloud.bigtable.row.Row>`
125+
:returns: The newly created or stored row that will hold mutations.
126+
"""
127+
if row_key not in self._row_map:
128+
table = self._table._low_level_table
129+
self._row_map[row_key] = table.row(row_key)
130+
131+
return self._row_map[row_key]
132+
133+
def put(self, row, data, wal=_WAL_SENTINEL):
134+
"""Insert data into a row in the table owned by this batch.
135+
136+
:type row: str
137+
:param row: The row key where the mutation will be "put".
138+
139+
:type data: dict
140+
:param data: Dictionary containing the data to be inserted. The keys
141+
are columns names (of the form ``fam:col``) and the values
142+
are strings (bytes) to be stored in those columns.
143+
144+
:type wal: object
145+
:param wal: Unused parameter (to over-ride the default on the
146+
instance). Provided for compatibility with HappyBase, but
147+
irrelevant for Cloud Bigtable since it does not have a
148+
Write Ahead Log.
149+
"""
150+
if wal is not _WAL_SENTINEL:
151+
_WARN(_WAL_WARNING)
152+
153+
row_object = self._get_row(row)
154+
# Make sure all the keys are valid before beginning
155+
# to add mutations.
156+
column_pairs = _get_column_pairs(six.iterkeys(data),
157+
require_qualifier=True)
158+
for column_family_id, column_qualifier in column_pairs:
159+
value = data[column_family_id + ':' + column_qualifier]
160+
row_object.set_cell(column_family_id, column_qualifier,
161+
value, timestamp=self._timestamp)
162+
163+
self._mutation_count += len(data)
164+
self._try_send()
165+
108166
def __enter__(self):
109167
"""Enter context manager, no set-up required."""
110168
return self
@@ -133,3 +191,54 @@ def __exit__(self, exc_type, exc_value, traceback):
133191
# NOTE: For non-transactional batches, this will even commit mutations
134192
# if an error occurred during the context manager.
135193
self.send()
194+
195+
196+
def _get_column_pairs(columns, require_qualifier=False):
197+
"""Turns a list of column or column families into parsed pairs.
198+
199+
Turns a column family (``fam`` or ``fam:``) into a pair such
200+
as ``['fam', None]`` and turns a column (``fam:col``) into
201+
``['fam', 'col']``.
202+
203+
:type columns: list
204+
:param columns: Iterable containing column names (as
205+
strings). Each column name can be either
206+
207+
* an entire column family: ``fam`` or ``fam:``
208+
* an single column: ``fam:col``
209+
210+
:type require_qualifier: bool
211+
:param require_qualifier: Boolean indicating if the columns should
212+
all have a qualifier or not.
213+
214+
:rtype: list
215+
:returns: List of pairs, where the first element in each pair is the
216+
column family and the second is the column qualifier
217+
(or :data:`None`).
218+
:raises: :class:`ValueError <exceptions.ValueError>` if any of the columns
219+
are not of the expected format.
220+
:class:`ValueError <exceptions.ValueError>` if
221+
``require_qualifier`` is :data:`True` and one of the values is
222+
for an entire column family
223+
"""
224+
column_pairs = []
225+
for column in columns:
226+
if isinstance(column, six.binary_type):
227+
column = column.decode('utf-8')
228+
# Remove trailing colons (i.e. for standalone column family).
229+
if column.endswith(u':'):
230+
column = column[:-1]
231+
num_colons = column.count(u':')
232+
if num_colons == 0:
233+
# column is a column family.
234+
if require_qualifier:
235+
raise ValueError('column does not contain a qualifier',
236+
column)
237+
else:
238+
column_pairs.append([column, None])
239+
elif num_colons == 1:
240+
column_pairs.append(column.split(u':'))
241+
else:
242+
raise ValueError('Column contains the : separator more than once')
243+
244+
return column_pairs

gcloud/bigtable/happybase/test_batch.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,172 @@ def test_send(self):
122122
self.assertEqual(batch._mutation_count, 0)
123123
self.assertEqual(row_map, {})
124124

125+
def test__try_send_no_batch_size(self):
126+
klass = self._getTargetClass()
127+
128+
class BatchWithSend(_SendMixin, klass):
129+
pass
130+
131+
table = object()
132+
batch = BatchWithSend(table)
133+
134+
self.assertEqual(batch._batch_size, None)
135+
self.assertFalse(batch._send_called)
136+
batch._try_send()
137+
self.assertFalse(batch._send_called)
138+
139+
def test__try_send_too_few_mutations(self):
140+
klass = self._getTargetClass()
141+
142+
class BatchWithSend(_SendMixin, klass):
143+
pass
144+
145+
table = object()
146+
batch_size = 10
147+
batch = BatchWithSend(table, batch_size=batch_size)
148+
149+
self.assertEqual(batch._batch_size, batch_size)
150+
self.assertFalse(batch._send_called)
151+
mutation_count = 2
152+
batch._mutation_count = mutation_count
153+
self.assertTrue(mutation_count < batch_size)
154+
batch._try_send()
155+
self.assertFalse(batch._send_called)
156+
157+
def test__try_send_actual_send(self):
158+
klass = self._getTargetClass()
159+
160+
class BatchWithSend(_SendMixin, klass):
161+
pass
162+
163+
table = object()
164+
batch_size = 10
165+
batch = BatchWithSend(table, batch_size=batch_size)
166+
167+
self.assertEqual(batch._batch_size, batch_size)
168+
self.assertFalse(batch._send_called)
169+
mutation_count = 12
170+
batch._mutation_count = mutation_count
171+
self.assertTrue(mutation_count > batch_size)
172+
batch._try_send()
173+
self.assertTrue(batch._send_called)
174+
175+
def test__get_row_exists(self):
176+
table = object()
177+
batch = self._makeOne(table)
178+
179+
row_key = 'row-key'
180+
row_obj = object()
181+
batch._row_map[row_key] = row_obj
182+
result = batch._get_row(row_key)
183+
self.assertEqual(result, row_obj)
184+
185+
def test__get_row_create_new(self):
186+
# Make mock batch and make sure we can create a low-level table.
187+
low_level_table = _MockLowLevelTable()
188+
table = _MockTable(low_level_table)
189+
batch = self._makeOne(table)
190+
191+
# Make sure row map is empty.
192+
self.assertEqual(batch._row_map, {})
193+
194+
# Customize/capture mock table creation.
195+
low_level_table.mock_row = mock_row = object()
196+
197+
# Actually get the row (which creates a row via a low-level table).
198+
row_key = 'row-key'
199+
result = batch._get_row(row_key)
200+
self.assertEqual(result, mock_row)
201+
202+
# Check all the things that were constructed.
203+
self.assertEqual(low_level_table.rows_made, [row_key])
204+
# Check how the batch was updated.
205+
self.assertEqual(batch._row_map, {row_key: mock_row})
206+
207+
def test_put_bad_wal(self):
208+
from gcloud._testing import _Monkey
209+
from gcloud.bigtable.happybase import batch as MUT
210+
211+
warned = []
212+
213+
def mock_warn(message):
214+
warned.append(message)
215+
# Raise an exception so we don't
216+
raise RuntimeError('No need to execute the rest.')
217+
218+
table = object()
219+
batch = self._makeOne(table)
220+
221+
row = 'row-key'
222+
data = {}
223+
wal = None
224+
225+
self.assertNotEqual(wal, MUT._WAL_SENTINEL)
226+
with _Monkey(MUT, _WARN=mock_warn):
227+
with self.assertRaises(RuntimeError):
228+
batch.put(row, data, wal=wal)
229+
230+
self.assertEqual(warned, [MUT._WAL_WARNING])
231+
232+
def test_put(self):
233+
import operator
234+
235+
table = object()
236+
batch = self._makeOne(table)
237+
batch._timestamp = timestamp = object()
238+
row_key = 'row-key'
239+
batch._row_map[row_key] = row = _MockRow()
240+
241+
col1_fam = 'cf1'
242+
col1_qual = 'qual1'
243+
value1 = 'value1'
244+
col2_fam = 'cf2'
245+
col2_qual = 'qual2'
246+
value2 = 'value2'
247+
data = {col1_fam + ':' + col1_qual: value1,
248+
col2_fam + ':' + col2_qual: value2}
249+
250+
self.assertEqual(batch._mutation_count, 0)
251+
self.assertEqual(row.set_cell_calls, [])
252+
batch.put(row_key, data)
253+
self.assertEqual(batch._mutation_count, 2)
254+
# Since the calls depend on data.keys(), the order
255+
# is non-deterministic.
256+
first_elt = operator.itemgetter(0)
257+
ordered_calls = sorted(row.set_cell_calls, key=first_elt)
258+
259+
cell1_args = (col1_fam, col1_qual, value1)
260+
cell1_kwargs = {'timestamp': timestamp}
261+
cell2_args = (col2_fam, col2_qual, value2)
262+
cell2_kwargs = {'timestamp': timestamp}
263+
self.assertEqual(ordered_calls, [
264+
(cell1_args, cell1_kwargs),
265+
(cell2_args, cell2_kwargs),
266+
])
267+
268+
def test_put_call_try_send(self):
269+
klass = self._getTargetClass()
270+
271+
class CallTrySend(klass):
272+
273+
try_send_calls = 0
274+
275+
def _try_send(self):
276+
self.try_send_calls += 1
277+
278+
table = object()
279+
batch = CallTrySend(table)
280+
281+
row_key = 'row-key'
282+
batch._row_map[row_key] = _MockRow()
283+
284+
self.assertEqual(batch._mutation_count, 0)
285+
self.assertEqual(batch.try_send_calls, 0)
286+
# No data so that nothing happens
287+
batch.put(row_key, data={})
288+
self.assertEqual(batch._mutation_count, 0)
289+
self.assertEqual(batch.try_send_calls, 1)
290+
125291
def test_context_manager(self):
126292
klass = self._getTargetClass()
127293

@@ -174,6 +340,45 @@ class BatchWithSend(_SendMixin, klass):
174340
self.assertTrue(batch._send_called)
175341

176342

343+
class Test__get_column_pairs(unittest2.TestCase):
344+
345+
def _callFUT(self, *args, **kwargs):
346+
from gcloud.bigtable.happybase.batch import _get_column_pairs
347+
return _get_column_pairs(*args, **kwargs)
348+
349+
def test_it(self):
350+
columns = [b'cf1', u'cf2:', 'cf3::', 'cf3:name1', 'cf3:name2']
351+
result = self._callFUT(columns)
352+
expected_result = [
353+
['cf1', None],
354+
['cf2', None],
355+
['cf3', ''],
356+
['cf3', 'name1'],
357+
['cf3', 'name2'],
358+
]
359+
self.assertEqual(result, expected_result)
360+
361+
def test_bad_column(self):
362+
columns = ['a:b:c']
363+
with self.assertRaises(ValueError):
364+
self._callFUT(columns)
365+
366+
def test_bad_column_type(self):
367+
columns = [None]
368+
with self.assertRaises(AttributeError):
369+
self._callFUT(columns)
370+
371+
def test_bad_columns_var(self):
372+
columns = None
373+
with self.assertRaises(TypeError):
374+
self._callFUT(columns)
375+
376+
def test_column_family_with_require_qualifier(self):
377+
columns = ['a:']
378+
with self.assertRaises(ValueError):
379+
self._callFUT(columns, require_qualifier=True)
380+
381+
177382
class _MockRowMap(dict):
178383

179384
clear_count = 0
@@ -187,6 +392,29 @@ class _MockRow(object):
187392

188393
def __init__(self):
189394
self.commits = 0
395+
self.set_cell_calls = []
190396

191397
def commit(self):
192398
self.commits += 1
399+
400+
def set_cell(self, *args, **kwargs):
401+
self.set_cell_calls.append((args, kwargs))
402+
403+
404+
class _MockTable(object):
405+
406+
def __init__(self, low_level_table):
407+
self._low_level_table = low_level_table
408+
409+
410+
class _MockLowLevelTable(object):
411+
412+
def __init__(self, *args, **kwargs):
413+
self.args = args
414+
self.kwargs = kwargs
415+
self.rows_made = []
416+
self.mock_row = None
417+
418+
def row(self, row_key):
419+
self.rows_made.append(row_key)
420+
return self.mock_row

0 commit comments

Comments
 (0)