@@ -154,11 +154,13 @@ def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema):
154154 arrays .append (
155155 pyarrow .array (
156156 (row [name ] for row in block ),
157- type = arrow_schema .field_by_name (name ).type ,
157+ type = arrow_schema .field (name ).type ,
158158 size = len (block ),
159159 )
160160 )
161- arrow_batches .append (pyarrow .RecordBatch .from_arrays (arrays , arrow_schema ))
161+ arrow_batches .append (
162+ pyarrow .RecordBatch .from_arrays (arrays , schema = arrow_schema )
163+ )
162164 return arrow_batches
163165
164166
@@ -173,6 +175,22 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173175 return arrow_batches
174176
175177
178+ def _pages_w_nonresumable_internal_error (avro_blocks ):
179+ for block in avro_blocks :
180+ yield block
181+ raise google .api_core .exceptions .InternalServerError (
182+ "INTERNAL: Got a nonresumable error."
183+ )
184+
185+
186+ def _pages_w_resumable_internal_error (avro_blocks ):
187+ for block in avro_blocks :
188+ yield block
189+ raise google .api_core .exceptions .InternalServerError (
190+ "INTERNAL: Received RST_STREAM with error code 2."
191+ )
192+
193+
176194def _pages_w_unavailable (pages ):
177195 for page in pages :
178196 yield page
@@ -363,6 +381,29 @@ def test_rows_w_timeout(class_under_test, mock_client):
363381 mock_client .read_rows .assert_not_called ()
364382
365383
384+ def test_rows_w_nonresumable_internal_error (class_under_test , mock_client ):
385+ bq_columns = [{"name" : "int_col" , "type" : "int64" }]
386+ avro_schema = _bq_to_avro_schema (bq_columns )
387+ read_session = _generate_avro_read_session (avro_schema )
388+ bq_blocks = [[{"int_col" : 1024 }, {"int_col" : 512 }], [{"int_col" : 256 }]]
389+ avro_blocks = _pages_w_nonresumable_internal_error (
390+ _bq_to_avro_blocks (bq_blocks , avro_schema )
391+ )
392+
393+ stream_position = bigquery_storage_v1beta1 .types .StreamPosition (
394+ stream = {"name" : "test" }
395+ )
396+
397+ reader = class_under_test (avro_blocks , mock_client , stream_position , {})
398+
399+ with pytest .raises (
400+ google .api_core .exceptions .InternalServerError , match = "nonresumable error"
401+ ):
402+ list (reader .rows (read_session ))
403+
404+ mock_client .read_rows .assert_not_called ()
405+
406+
366407def test_rows_w_reconnect (class_under_test , mock_client ):
367408 bq_columns = [{"name" : "int_col" , "type" : "int64" }]
368409 avro_schema = _bq_to_avro_schema (bq_columns )
@@ -372,13 +413,18 @@ def test_rows_w_reconnect(class_under_test, mock_client):
372413 [{"int_col" : 345 }, {"int_col" : 456 }],
373414 ]
374415 avro_blocks_1 = _pages_w_unavailable (_bq_to_avro_blocks (bq_blocks_1 , avro_schema ))
375- bq_blocks_2 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
416+ bq_blocks_2 = [[{"int_col" : 1024 }, {"int_col" : 512 }], [{"int_col" : 256 }]]
376417 avro_blocks_2 = _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
418+ avro_blocks_2 = _pages_w_resumable_internal_error (
419+ _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
420+ )
421+ bq_blocks_3 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
422+ avro_blocks_3 = _bq_to_avro_blocks (bq_blocks_3 , avro_schema )
377423
378- for block in avro_blocks_2 :
424+ for block in avro_blocks_3 :
379425 block .status .estimated_row_count = 7
380426
381- mock_client .read_rows .return_value = avro_blocks_2
427+ mock_client .read_rows .side_effect = ( avro_blocks_2 , avro_blocks_3 )
382428 stream_position = bigquery_storage_v1beta1 .types .StreamPosition (
383429 stream = {"name" : "test" }
384430 )
@@ -395,17 +441,24 @@ def test_rows_w_reconnect(class_under_test, mock_client):
395441 itertools .chain (
396442 itertools .chain .from_iterable (bq_blocks_1 ),
397443 itertools .chain .from_iterable (bq_blocks_2 ),
444+ itertools .chain .from_iterable (bq_blocks_3 ),
398445 )
399446 )
400447
401448 assert tuple (got ) == expected
402449 assert got .total_rows == 7
403- mock_client .read_rows .assert_called_once_with (
450+ mock_client .read_rows .assert_any_call (
404451 bigquery_storage_v1beta1 .types .StreamPosition (
405452 stream = {"name" : "test" }, offset = 4
406453 ),
407454 metadata = {"test-key" : "test-value" },
408455 )
456+ mock_client .read_rows .assert_called_with (
457+ bigquery_storage_v1beta1 .types .StreamPosition (
458+ stream = {"name" : "test" }, offset = 7
459+ ),
460+ metadata = {"test-key" : "test-value" },
461+ )
409462
410463
411464def test_rows_w_reconnect_by_page (class_under_test , mock_client ):
0 commit comments