Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def offline_write_batch(
assert isinstance(feature_view.batch_source, BigQuerySource)

pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source(
config, feature_view.batch_source
config, feature_view.batch_source, timestamp_unit="ns"
)
if column_names != table.column_names:
raise ValueError(
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore:


def get_pyarrow_schema_from_batch_source(
config: RepoConfig, batch_source: DataSource
config: RepoConfig, batch_source: DataSource, timestamp_unit: str = "us"
) -> Tuple[pa.Schema, List[str]]:
"""Returns the pyarrow schema and column names for the given batch source."""
column_names_and_types = batch_source.get_table_column_names_and_types(config)
Expand All @@ -244,7 +244,8 @@ def get_pyarrow_schema_from_batch_source(
(
column_name,
feast_value_type_to_pa(
batch_source.source_datatype_to_feast_value_type()(column_type)
batch_source.source_datatype_to_feast_value_type()(column_type),
timestamp_unit=timestamp_unit,
),
)
)
Expand Down
8 changes: 5 additions & 3 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,9 @@ def pg_type_to_feast_value_type(type_str: str) -> ValueType:
return value


def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType":
def feast_value_type_to_pa(
feast_type: ValueType, timestamp_unit: str = "us"
) -> "pyarrow.DataType":
import pyarrow

type_map = {
Expand All @@ -855,15 +857,15 @@ def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType":
ValueType.STRING: pyarrow.string(),
ValueType.BYTES: pyarrow.binary(),
ValueType.BOOL: pyarrow.bool_(),
ValueType.UNIX_TIMESTAMP: pyarrow.timestamp("us"),
ValueType.UNIX_TIMESTAMP: pyarrow.timestamp(timestamp_unit),
ValueType.INT32_LIST: pyarrow.list_(pyarrow.int32()),
ValueType.INT64_LIST: pyarrow.list_(pyarrow.int64()),
ValueType.DOUBLE_LIST: pyarrow.list_(pyarrow.float64()),
ValueType.FLOAT_LIST: pyarrow.list_(pyarrow.float32()),
ValueType.STRING_LIST: pyarrow.list_(pyarrow.string()),
ValueType.BYTES_LIST: pyarrow.list_(pyarrow.binary()),
ValueType.BOOL_LIST: pyarrow.list_(pyarrow.bool_()),
ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp("us")),
ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp(timestamp_unit)),
ValueType.NULL: pyarrow.null(),
}
return type_map[feast_type]
Expand Down