|
21 | 21 | import io |
22 | 22 | import operator |
23 | 23 |
|
| 24 | +import google.api_core.retry |
24 | 25 | import pkg_resources |
25 | 26 | import pytest |
26 | 27 | import pytz |
|
37 | 38 | PANDAS_INT64_VERSION = pkg_resources.parse_version("1.0.0") |
38 | 39 |
|
39 | 40 |
|
| 41 | +class MissingDataError(Exception): |
| 42 | + pass |
| 43 | + |
| 44 | + |
40 | 45 | def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_id): |
41 | 46 | """Test that a DataFrame with dtypes that map well to BigQuery types |
42 | 47 | can be uploaded without specifying a schema. |
@@ -657,27 +662,34 @@ def test_insert_rows_from_dataframe(bigquery_client, dataset_id): |
657 | 662 | ) |
658 | 663 | for errors in chunk_errors: |
659 | 664 | assert not errors |
660 | | - |
661 | | - # Use query to fetch rows instead of listing directly from the table so |
662 | | - # that we get values from the streaming buffer. |
663 | | - rows = list( |
664 | | - bigquery_client.query( |
665 | | - "SELECT * FROM `{}.{}.{}`".format( |
666 | | - table.project, table.dataset_id, table.table_id |
667 | | - ) |
668 | | - ) |
669 | | - ) |
670 | | - |
671 | | - sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) |
672 | | - row_tuples = [r.values() for r in sorted_rows] |
673 | 665 | expected = [ |
674 | 666 | # Pandas often represents NULL values as NaN. Convert to None for |
675 | 667 | # easier comparison. |
676 | 668 | tuple(None if col != col else col for col in data_row) |
677 | 669 | for data_row in dataframe.itertuples(index=False) |
678 | 670 | ] |
679 | 671 |
|
680 | | - assert len(row_tuples) == len(expected) |
| 672 | + # Use query to fetch rows instead of listing directly from the table so |
| 673 | + # that we get values from the streaming buffer "within a few seconds". |
| 674 | + # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability |
| 675 | + @google.api_core.retry.Retry( |
| 676 | + predicate=google.api_core.retry.if_exception_type(MissingDataError) |
| 677 | + ) |
| 678 | + def get_rows(): |
| 679 | + rows = list( |
| 680 | + bigquery_client.query( |
| 681 | + "SELECT * FROM `{}.{}.{}`".format( |
| 682 | + table.project, table.dataset_id, table.table_id |
| 683 | + ) |
| 684 | + ) |
| 685 | + ) |
| 686 | + if len(rows) != len(expected): |
| 687 | + raise MissingDataError() |
| 688 | + return rows |
| 689 | + |
| 690 | + rows = get_rows() |
| 691 | + sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) |
| 692 | + row_tuples = [r.values() for r in sorted_rows] |
681 | 693 |
|
682 | 694 | for row, expected_row in zip(row_tuples, expected): |
683 | 695 | assert ( |
|
0 commit comments