Skip to content

Commit 3674971

Browse files
feat: Enable transformations on PDFs (#5172)
1 parent 7f3e528 commit 3674971

File tree

8 files changed

+425
-9
lines changed

8 files changed

+425
-9
lines changed

sdk/python/feast/feature_store.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,16 +1566,34 @@ def _get_feature_view_and_df_for_online_write(
15661566
if feature_view.singleton
15671567
else df.to_dict(orient="list")
15681568
)
1569-
transformed_data = feature_view.feature_transformation.udf(input_dict)
1569+
if feature_view.singleton:
1570+
transformed_data = df.apply(
1571+
feature_view.feature_transformation.udf, axis=1
1572+
)
1573+
transformed_data = pd.DataFrame(
1574+
transformed_data.to_list()
1575+
).applymap(
1576+
lambda x: x[0] if isinstance(x, list) and len(x) == 1 else x
1577+
)
1578+
else:
1579+
transformed_data = feature_view.feature_transformation.udf(
1580+
input_dict
1581+
)
15701582
if feature_view.write_to_online_store:
15711583
entities = [
15721584
self.get_entity(entity)
15731585
for entity in (feature_view.entities or [])
15741586
]
15751587
join_keys = [entity.join_key for entity in entities if entity]
15761588
join_keys = [k for k in join_keys if k in input_dict.keys()]
1577-
transformed_df = pd.DataFrame(transformed_data)
1578-
input_df = pd.DataFrame(input_dict)
1589+
transformed_df = (
1590+
pd.DataFrame(transformed_data)
1591+
if not isinstance(transformed_data, pd.DataFrame)
1592+
else transformed_data
1593+
)
1594+
input_df = pd.DataFrame(
1595+
[input_dict] if feature_view.singleton else input_dict
1596+
)
15791597
if input_df.shape[0] == transformed_df.shape[0]:
15801598
for k in input_dict:
15811599
if k not in transformed_data:

sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ def online_read(
354354
feature_name_feast_primitive_type_map = {
355355
f.name: f.dtype for f in table.features
356356
}
357+
if getattr(table, "write_to_online_store", False):
358+
feature_name_feast_primitive_type_map.update(
359+
{f.name: f.dtype for f in table.schema}
360+
)
357361
# Build a dictionary mapping composite key -> (res_ts, res)
358362
results_dict: Dict[
359363
str, Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
@@ -394,6 +398,7 @@ def online_read(
394398
"int64_val",
395399
"float_val",
396400
"double_val",
401+
"string_val",
397402
]:
398403
setattr(
399404
val,
@@ -420,7 +425,7 @@ def online_read(
420425
setattr(val, proto_attr, field_value)
421426
else:
422427
raise ValueError(
423-
f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value {field_value}"
428+
f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value type {proto_attr}"
424429
)
425430
# res[field] = val
426431
key_to_use = field.split(":", 1)[-1] if ":" in field else field

sdk/python/feast/on_demand_feature_view.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,9 @@ def _construct_random_input(
678678
) -> dict[str, Union[list[Any], Any]]:
679679
rand_dict_value: dict[ValueType, Union[list[Any], Any]] = {
680680
ValueType.BYTES: [str.encode("hello world")],
681+
ValueType.PDF_BYTES: [
682+
b"%PDF-1.3\n3 0 obj\n<</Type /Page\n/Parent 1 0 R\n/Resources 2 0 R\n/Contents 4 0 R>>\nendobj\n4 0 obj\n<</Filter /FlateDecode /Length 115>>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<</Type /Pages\n/Kids [3 0 R ]\n/Count 1\n/MediaBox [0 0 595.28 841.89]\n>>\nendobj\n5 0 obj\n<</Type /Font\n/BaseFont /Helvetica\n/Subtype /Type1\n/Encoding /WinAnsiEncoding\n>>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n"
683+
],
681684
ValueType.STRING: ["hello world"],
682685
ValueType.INT32: [1],
683686
ValueType.INT64: [1],

sdk/python/feast/transformation/python_transformation.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,19 @@ def infer_features(
5858
f"Failed to infer type for feature '{feature_name}' with value "
5959
+ f"'{feature_value}' since no items were returned by the UDF."
6060
)
61-
inferred_type = type(feature_value[0])
6261
inferred_value = feature_value[0]
63-
if singleton:
64-
inferred_value = feature_value
65-
inferred_type = None # type: ignore
62+
if singleton and isinstance(inferred_value, list):
63+
# If we have a nested list like [[0.5, 0.5, ...]]
64+
if len(inferred_value) > 0:
65+
# Get the actual element type from the inner list
66+
inferred_type = type(inferred_value[0])
67+
else:
68+
raise TypeError(
69+
f"Failed to infer type for nested feature '{feature_name}' - inner list is empty"
70+
)
71+
else:
72+
# For non-nested lists or when singleton is False
73+
inferred_type = type(inferred_value)
6674

6775
else:
6876
inferred_type = type(feature_value)

sdk/python/feast/types.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = {
2424
"INVALID": "UNKNOWN",
2525
"BYTES": "BYTES",
26+
"PDF_BYTES": "PDF_BYTES",
2627
"STRING": "STRING",
2728
"INT32": "INT32",
2829
"INT64": "INT64",
@@ -79,6 +80,7 @@ class PrimitiveFeastType(Enum):
7980
FLOAT32 = 6
8081
BOOL = 7
8182
UNIX_TIMESTAMP = 8
83+
PDF_BYTES = 9
8284

8385
def to_value_type(self) -> ValueType:
8486
"""
@@ -102,6 +104,7 @@ def __hash__(self):
102104

103105
Invalid = PrimitiveFeastType.INVALID
104106
Bytes = PrimitiveFeastType.BYTES
107+
PdfBytes = PrimitiveFeastType.PDF_BYTES
105108
String = PrimitiveFeastType.STRING
106109
Bool = PrimitiveFeastType.BOOL
107110
Int32 = PrimitiveFeastType.INT32
@@ -114,6 +117,7 @@ def __hash__(self):
114117
Invalid,
115118
String,
116119
Bytes,
120+
PdfBytes,
117121
Bool,
118122
Int32,
119123
Int64,
@@ -126,6 +130,7 @@ def __hash__(self):
126130
"INVALID": "Invalid",
127131
"STRING": "String",
128132
"BYTES": "Bytes",
133+
"PDF_BYTES": "PdfBytes",
129134
"BOOL": "Bool",
130135
"INT32": "Int32",
131136
"INT64": "Int64",
@@ -168,6 +173,7 @@ def __str__(self):
168173
VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = {
169174
ValueType.UNKNOWN: Invalid,
170175
ValueType.BYTES: Bytes,
176+
ValueType.PDF_BYTES: PdfBytes,
171177
ValueType.STRING: String,
172178
ValueType.INT32: Int32,
173179
ValueType.INT64: Int64,

sdk/python/feast/value_type.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ValueType(enum.Enum):
4848
BOOL_LIST = 17
4949
UNIX_TIMESTAMP_LIST = 18
5050
NULL = 19
51+
PDF_BYTES = 20
5152

5253

5354
ListType = Union[

sdk/python/tests/unit/online_store/test_online_retrieval.py

Lines changed: 195 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sqlite3
55
import sys
66
import time
7+
from typing import Any
78

89
import numpy as np
910
import pandas as pd
@@ -1056,7 +1057,7 @@ def test_local_milvus() -> None:
10561057
client.drop_collection(collection_name=COLLECTION_NAME)
10571058

10581059

1059-
def test_milvus_lite_get_online_documents_v2() -> None:
1060+
def test_milvus_lite_retrieve_online_documents_v2() -> None:
10601061
"""
10611062
Test retrieving documents from the online store in local mode.
10621063
"""
@@ -1226,6 +1227,199 @@ def test_milvus_lite_get_online_documents_v2() -> None:
12261227
assert len(result["distance"]) == len(results[0])
12271228

12281229

1230+
def test_milvus_stored_writes_with_explode() -> None:
1231+
"""
1232+
Test storing and retrieving exploded document embeddings with Milvus online store.
1233+
"""
1234+
from feast import (
1235+
Entity,
1236+
RequestSource,
1237+
)
1238+
from feast.field import Field
1239+
from feast.on_demand_feature_view import on_demand_feature_view
1240+
from feast.types import (
1241+
Array,
1242+
Bytes,
1243+
Float32,
1244+
String,
1245+
ValueType,
1246+
)
1247+
1248+
random.seed(42)
1249+
vector_length = 10
1250+
runner = CliRunner()
1251+
with runner.local_repo(
1252+
example_repo_py=get_example_repo("example_rag_feature_repo.py"),
1253+
offline_store="file",
1254+
online_store="milvus",
1255+
apply=False,
1256+
teardown=False,
1257+
) as store:
1258+
# Define entities and sources
1259+
chunk = Entity(
1260+
name="chunk", join_keys=["chunk_id"], value_type=ValueType.STRING
1261+
)
1262+
document = Entity(
1263+
name="document", join_keys=["document_id"], value_type=ValueType.STRING
1264+
)
1265+
1266+
input_explode_request_source = RequestSource(
1267+
name="document_source",
1268+
schema=[
1269+
Field(name="document_id", dtype=String),
1270+
Field(name="document_text", dtype=String),
1271+
Field(name="document_bytes", dtype=Bytes),
1272+
],
1273+
)
1274+
1275+
@on_demand_feature_view(
1276+
entities=[chunk, document],
1277+
sources=[input_explode_request_source],
1278+
schema=[
1279+
Field(name="document_id", dtype=String),
1280+
Field(name="chunk_id", dtype=String),
1281+
Field(name="chunk_text", dtype=String),
1282+
Field(
1283+
name="vector",
1284+
dtype=Array(Float32),
1285+
vector_index=True,
1286+
vector_search_metric="COSINE", # Use COSINE like in Milvus test
1287+
),
1288+
],
1289+
mode="python",
1290+
write_to_online_store=True,
1291+
)
1292+
def milvus_explode_feature_view(inputs: dict[str, Any]):
1293+
output: dict[str, Any] = {
1294+
"document_id": ["doc_1", "doc_1", "doc_2", "doc_2"],
1295+
"chunk_id": ["chunk-1", "chunk-2", "chunk-1", "chunk-2"],
1296+
"chunk_text": [
1297+
"hello friends",
1298+
"how are you?",
1299+
"This is a test.",
1300+
"Document chunking example.",
1301+
],
1302+
"vector": [
1303+
[0.1] * vector_length,
1304+
[0.2] * vector_length,
1305+
[0.3] * vector_length,
1306+
[0.4] * vector_length,
1307+
],
1308+
}
1309+
return output
1310+
1311+
# Apply the feature store configuration
1312+
store.apply(
1313+
[
1314+
chunk,
1315+
document,
1316+
input_explode_request_source,
1317+
milvus_explode_feature_view,
1318+
]
1319+
)
1320+
1321+
# Verify feature view registration
1322+
odfv_applied = store.get_on_demand_feature_view("milvus_explode_feature_view")
1323+
assert odfv_applied.features[1].vector_index
1324+
assert odfv_applied.entities == [chunk.name, document.name]
1325+
assert odfv_applied.entity_columns[0].name == document.join_key
1326+
assert odfv_applied.entity_columns[1].name == chunk.join_key
1327+
1328+
# Write to online store
1329+
odfv_entity_rows_to_write = [
1330+
{
1331+
"document_id": "document_1",
1332+
"document_text": "Hello world. How are you?",
1333+
},
1334+
{
1335+
"document_id": "document_2",
1336+
"document_text": "This is a test. Document chunking example.",
1337+
},
1338+
]
1339+
store.write_to_online_store(
1340+
feature_view_name="milvus_explode_feature_view",
1341+
df=odfv_entity_rows_to_write,
1342+
)
1343+
1344+
# Verify feature retrieval
1345+
fv_entity_rows_to_read = [
1346+
{
1347+
"document_id": "doc_1",
1348+
"chunk_id": "chunk-2",
1349+
},
1350+
{
1351+
"document_id": "doc_2",
1352+
"chunk_id": "chunk-1",
1353+
},
1354+
]
1355+
1356+
online_response = store.get_online_features(
1357+
entity_rows=fv_entity_rows_to_read,
1358+
features=[
1359+
"milvus_explode_feature_view:document_id",
1360+
"milvus_explode_feature_view:chunk_id",
1361+
"milvus_explode_feature_view:chunk_text",
1362+
],
1363+
).to_dict()
1364+
1365+
assert sorted(list(online_response.keys())) == sorted(
1366+
[
1367+
"chunk_id",
1368+
"chunk_text",
1369+
"document_id",
1370+
]
1371+
)
1372+
1373+
# Test vector search using Milvus
1374+
query_embedding = [0.1] * vector_length
1375+
1376+
# First get Milvus client and search directly
1377+
client = store._provider._online_store.client
1378+
collection_name = client.list_collections()[0]
1379+
search_params = {
1380+
"metric_type": "COSINE",
1381+
"params": {"nprobe": 10},
1382+
}
1383+
1384+
direct_results = client.search(
1385+
collection_name=collection_name,
1386+
data=[query_embedding],
1387+
anns_field="vector",
1388+
search_params=search_params,
1389+
limit=2,
1390+
output_fields=["document_id", "chunk_id", "chunk_text"],
1391+
)
1392+
1393+
# Then use the Feast API
1394+
feast_results = store.retrieve_online_documents_v2(
1395+
features=[
1396+
"milvus_explode_feature_view:document_id",
1397+
"milvus_explode_feature_view:chunk_id",
1398+
"milvus_explode_feature_view:chunk_text",
1399+
],
1400+
query=query_embedding,
1401+
top_k=2,
1402+
).to_dict()
1403+
1404+
# Validate vector search results
1405+
assert "document_id" in feast_results
1406+
assert "chunk_id" in feast_results
1407+
assert "chunk_text" in feast_results
1408+
assert "distance" in feast_results
1409+
assert len(feast_results["distance"]) == 2
1410+
assert len(feast_results["document_id"]) == 2
1411+
assert (
1412+
len(direct_results[0]) == 2
1413+
) # Verify both approaches return same number of results
1414+
del feast_results["distance"]
1415+
1416+
assert feast_results == {
1417+
"document_id": ["doc_2", "doc_1"],
1418+
"chunk_id": ["chunk-1", "chunk-2"],
1419+
"chunk_text": ["This is a test.", "how are you?"],
1420+
}
1421+
1422+
12291423
def test_milvus_native_from_feast_data() -> None:
12301424
import random
12311425
from datetime import datetime

0 commit comments

Comments
 (0)