Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 40 additions & 23 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,17 @@ def to_arrow(
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

if (
bqstorage_client or create_bqstorage_client
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
Expand Down Expand Up @@ -1707,33 +1718,39 @@ def to_dataframe(
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

try:
progress_bar = self._get_progress_bar(progress_bar_type)
if pyarrow is not None:
# If pyarrow is available, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for frame in self.to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
# The bqstorage_client is only used if pyarrow is available, so the
# rest of this method only needs to account for tabledata.list.
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))
frames = []
for frame in self.to_dataframe_iterable(dtypes=dtypes):
frames.append(frame)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()

# Avoid concatting an empty list.
if not frames:
Expand Down
7 changes: 6 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2369,7 +2369,12 @@ def test_nested_table_to_dataframe(self):
row = df.iloc[0]
# verify the row content
self.assertEqual(row["string_col"], "Some value")
self.assertEqual(row["record_col"], record)
expected_keys = tuple(sorted(record.keys()))
row_keys = tuple(sorted(row["record_col"].keys()))
self.assertEqual(row_keys, expected_keys)
# Can't compare numpy arrays, which pyarrow encodes the embedded
# repeated column to, so convert to list.
self.assertEqual(list(row["record_col"]["nested_repeated"]), [0, 1, 2])
# verify that nested data can be accessed with indices/keys
self.assertEqual(row["record_col"]["nested_repeated"][0], 0)
self.assertEqual(
Expand Down
Loading