Skip to content

Commit 3d0e871

Browse files
committed
Merge pull request #1038 from tseaver/bigquery-tabledata_api_methods
Add tabledata API methods
2 parents deaab19 + 12d9ee1 commit 3d0e871

File tree

2 files changed

+662
-2
lines changed

2 files changed

+662
-2
lines changed

gcloud/bigquery/table.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,30 @@ def _require_client(self, client):
312312
client = self._dataset._client
313313
return client
314314

315+
def _parse_schema_resource(self, info):
316+
"""Parse a resource fragment into a schema field.
317+
318+
:type info: mapping
319+
:param info: should contain a "fields" key to be parsed
320+
321+
:rtype: list of :class:`SchemaField`, or ``NoneType``
322+
:returns: a list of parsed fields, or ``None`` if no "fields" key is
323+
present in ``info``.
324+
"""
325+
if 'fields' not in info:
326+
return None
327+
328+
schema = []
329+
for r_field in info['fields']:
330+
name = r_field['name']
331+
field_type = r_field['type']
332+
mode = r_field['mode']
333+
description = r_field.get('description')
334+
sub_fields = self._parse_schema_resource(r_field)
335+
schema.append(
336+
SchemaField(name, field_type, mode, description, sub_fields))
337+
return schema
338+
315339
def _set_properties(self, api_response):
316340
"""Update properties from resource in body of ``api_response``
317341
@@ -320,6 +344,8 @@ def _set_properties(self, api_response):
320344
"""
321345
self._properties.clear()
322346
cleaned = api_response.copy()
347+
schema = cleaned.pop('schema', {})
348+
self.schema = self._parse_schema_resource(schema)
323349
if 'creationTime' in cleaned:
324350
cleaned['creationTime'] = float(cleaned['creationTime'])
325351
if 'lastModifiedTime' in cleaned:
@@ -525,3 +551,185 @@ def delete(self, client=None):
525551
"""
526552
client = self._require_client(client)
527553
client.connection.api_request(method='DELETE', path=self.path)
554+
555+
def fetch_data(self, max_results=None, page_token=None, client=None):
556+
"""API call: fetch the table data via a GET request
557+
558+
See:
559+
https://cloud.google.com/bigquery/reference/rest/v2/tabledata/list
560+
561+
.. note::
562+
563+
This method assumes that its instance's ``schema`` attribute is
564+
up-to-date with the schema as defined on the back-end: if the
565+
two schemas are not identical, the values returned may be
566+
incomplete. To ensure that the local copy of the schema is
567+
up-to-date, call the table's ``reload`` method.
568+
569+
:type max_results: integer or ``NoneType``
570+
:param max_results: maximum number of rows to return.
571+
572+
:type page_token: string or ``NoneType``
573+
:param page_token: token representing a cursor into the table's rows.
574+
575+
:type client: :class:`gcloud.bigquery.client.Client` or ``NoneType``
576+
:param client: the client to use. If not passed, falls back to the
577+
``client`` stored on the current dataset.
578+
579+
:rtype: tuple
580+
:returns: ``(row_data, total_rows, page_token)``, where ``row_data``
581+
is a list of tuples, one per result row, containing only
582+
the values; ``total_rows`` is a count of the total number
583+
of rows in the table; and ``page_token`` is an opaque
584+
string which can be used to fetch the next batch of rows
585+
(``None`` if no further batches can be fetched).
586+
"""
587+
client = self._require_client(client)
588+
params = {}
589+
590+
if max_results is not None:
591+
params['maxResults'] = max_results
592+
593+
if page_token is not None:
594+
params['pageToken'] = page_token
595+
596+
response = client.connection.api_request(method='GET',
597+
path='%s/data' % self.path,
598+
query_params=params)
599+
total_rows = response.get('totalRows')
600+
page_token = response.get('pageToken')
601+
rows_data = []
602+
603+
for row in response.get('rows', ()):
604+
row_data = []
605+
for field, cell in zip(self._schema, row['f']):
606+
converter = _CELLDATA_FROM_JSON[field.field_type]
607+
if field.mode == 'REPEATED':
608+
row_data.append([converter(item, field)
609+
for item in cell['v']])
610+
else:
611+
row_data.append(converter(cell['v'], field))
612+
rows_data.append(tuple(row_data))
613+
614+
return rows_data, total_rows, page_token
615+
616+
def insert_data(self,
617+
rows,
618+
row_ids=None,
619+
skip_invalid_rows=None,
620+
ignore_unknown_values=None,
621+
client=None):
622+
"""API call: insert table data via a POST request
623+
624+
See:
625+
https://cloud.google.com/bigquery/reference/rest/v2/tabledata/insertAll
626+
627+
:type rows: list of tuples
628+
:param rows: row data to be inserted
629+
630+
:type row_ids: list of string
631+
:param row_ids: Unique ids, one per row being inserted. If not
632+
passed, no de-duplication occurs.
633+
634+
:type skip_invalid_rows: boolean or ``NoneType``
635+
:param skip_invalid_rows: skip rows w/ invalid data?
636+
637+
:type ignore_unknown_values: boolean or ``NoneType``
638+
:param ignore_unknown_values: ignore columns beyond schema?
639+
640+
:type client: :class:`gcloud.bigquery.client.Client` or ``NoneType``
641+
:param client: the client to use. If not passed, falls back to the
642+
``client`` stored on the current dataset.
643+
644+
:rtype: list of mappings
645+
:returns: One mapping per row with insert errors: the "index" key
646+
identifies the row, and the "errors" key contains a list
647+
of the mappings describing one or more problems with the
648+
row.
649+
"""
650+
client = self._require_client(client)
651+
rows_info = []
652+
data = {'rows': rows_info}
653+
654+
for index, row in enumerate(rows):
655+
row_info = {}
656+
657+
for field, value in zip(self._schema, row):
658+
if field.field_type == 'TIMESTAMP':
659+
value = _prop_from_datetime(value)
660+
row_info[field.name] = value
661+
662+
info = {'json': row_info}
663+
if row_ids is not None:
664+
info['insertId'] = row_ids[index]
665+
666+
rows_info.append(info)
667+
668+
if skip_invalid_rows is not None:
669+
data['skipInvalidRows'] = skip_invalid_rows
670+
671+
if ignore_unknown_values is not None:
672+
data['ignoreUnknownValues'] = ignore_unknown_values
673+
674+
response = client.connection.api_request(
675+
method='POST',
676+
path='%s/insertAll' % self.path,
677+
data=data)
678+
errors = []
679+
680+
for error in response.get('insertErrors', ()):
681+
errors.append({'index': int(error['index']),
682+
'errors': error['errors']})
683+
684+
return errors
685+
686+
687+
def _not_null(value, field):
688+
return value is not None or field.mode != 'NULLABLE'
689+
690+
691+
def _int_from_json(value, field):
692+
if _not_null(value, field):
693+
return int(value)
694+
695+
696+
def _float_from_json(value, field):
697+
if _not_null(value, field):
698+
return float(value)
699+
700+
701+
def _bool_from_json(value, field):
702+
if _not_null(value, field):
703+
return value.lower() in ['t', 'true', '1']
704+
705+
706+
def _datetime_from_json(value, field):
707+
if _not_null(value, field):
708+
return _datetime_from_prop(float(value))
709+
710+
711+
def _record_from_json(value, field):
712+
if _not_null(value, field):
713+
record = {}
714+
for subfield, cell in zip(field.fields, value['f']):
715+
converter = _CELLDATA_FROM_JSON[subfield.field_type]
716+
if field.mode == 'REPEATED':
717+
value = [converter(item, field) for item in cell['v']]
718+
else:
719+
value = converter(cell['v'], field)
720+
record[subfield.name] = value
721+
return record
722+
723+
724+
def _string_from_json(value, _):
725+
return value
726+
727+
728+
_CELLDATA_FROM_JSON = {
729+
'INTEGER': _int_from_json,
730+
'FLOAT': _float_from_json,
731+
'BOOLEAN': _bool_from_json,
732+
'TIMESTAMP': _datetime_from_json,
733+
'RECORD': _record_from_json,
734+
'STRING': _string_from_json,
735+
}

0 commit comments

Comments
 (0)