Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions examples/kfto_feast_rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ This example notebook provides a step-by-step demonstration of building and usin

4. RAG System Implementation
- **Embedding Model**: `all-MiniLM-L6-v2` (configurable)
- **Generator Model**: `granite-3.2-2b-instruct` (configurable)
- **Vector Store**: Custom implementation with Feast integration
- **Retriever**: Custom implementation with Feast integration extending HuggingFace's RagRetriever
- **Generator Model**: `granite-3.2-2b-instruct` (configurable)
- **Vector Store**: Feast’s built-in FeastVectorStore backed by Milvus
- **Retriever**: Feast’s native RAG retriever FeastRAGRetriever

5. Query Demonstration
- Perform inference with retrieved context
Expand All @@ -37,17 +37,6 @@ This example notebook provides a step-by-step demonstration of building and usin
From the workbench, clone this repository: https://github.com/opendatahub-io/distributed-workloads.git
Navigate to the distributed-workloads/examples/kfto-feast-rag directory. Here you will find the following files:

* **feast_rag_retriever.py**
This module implements a custom RAG retriever by combining Feast feature store capabilities with HuggingFace transformer-based models. The implementation provides:

- A flexible vector store interface with Feast integration (`FeastVectorStore`)
- A custom RAG retriever (`FeastRAGRetriever`) that supports three search modes:
- Text-based search
- Vector-based search
- Hybrid search
- Seamless integration with HuggingFace transformers library and sentence-transformers
- Configurable document formatting and retrieval options

* **feature_repo/feature_store.yaml**
This is the core configuration file for the RAG project's feature store, configuring a Milvus online store on a local provider.
* In order to configure Milvus you should:
Expand All @@ -56,7 +45,7 @@ Navigate to the distributed-workloads/examples/kfto-feast-rag directory. Here yo
- port (default: 19530)
- credentials (if required)

* **__feature_repo/rag_project_repo.py__**
* **__feature_repo/ragproject_repo.py__**
This is the Feast feature repository configuration that defines the schema and data source for Wikipedia passage embeddings.

* **__rag_feast_kfto.ipynb__**
Expand Down
204 changes: 0 additions & 204 deletions examples/kfto_feast_rag/feast_rag_retriever.py

This file was deleted.

2 changes: 1 addition & 1 deletion examples/kfto_feast_rag/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ provider: local
registry: data/registry.db
online_store:
type: milvus
host: # Insert Milvus route host
host: http:// # Insert Milvus route host
username: # Insert Milvus username if required
password: # Insert Milvus password if required
port: 19530
Expand Down
75 changes: 52 additions & 23 deletions examples/kfto_feast_rag/rag_feast_kfto.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Note: `faiss-cpu` is included here due to an assumption in the Hugging Face `RagRetriever` class\n",
"# that the FAISS library is required, even though it's not directly used in this example.\n",
"%pip install --quiet feast[milvus] sentence-transformers datasets faiss-cpu\n",
"%pip install --quiet feast[milvus] sentence-transformers datasets\n",
"%pip install bigtree==0.19.2\n",
"%pip install marshmallow==3.10.0 "
]
Expand Down Expand Up @@ -51,31 +49,50 @@
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dataset is chunked to contain a preset number of chars, which is the max supported by Feast. Ensuring the chunk only contains whole words, thus the retrieved context can form sentences without incomplete words."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def chunk_dataset(examples, chunk_size=100, overlap=20, max_chars=500):\n",
"def chunk_dataset(examples, max_chars=380):\n",
" all_chunks = []\n",
" all_ids = []\n",
" all_titles = []\n",
"\n",
" for i, text in enumerate(examples['text']): # Iterate over texts in the batch\n",
" for i, text in enumerate(examples['text']): # Iterate over texts in the batch\n",
" words = text.split()\n",
" chunks = []\n",
" for j in range(0, len(words), chunk_size - overlap):\n",
" chunk_words = words[j:j + chunk_size]\n",
" if len(chunk_words) < 20:\n",
" continue\n",
" chunk_text_value = ' '.join(chunk_words) # Store the chunk text\n",
" chunk_text_value = chunk_text_value[:max_chars]\n",
" chunks.append(chunk_text_value)\n",
" all_ids.append(f\"{examples['id'][i]}_{j}\") # Unique ID for the chunk\n",
" all_titles.append(examples['title'][i])\n",
" if not words:\n",
" continue\n",
"\n",
" all_chunks.extend(chunks)\n",
" current_chunk_words = []\n",
" for word in words:\n",
" # Check if adding the next word exceeds the character limit\n",
" if len(' '.join(current_chunk_words + [word])) > max_chars:\n",
" # If the current chunk is valid, save it\n",
" if current_chunk_words:\n",
" chunk_text = ' '.join(current_chunk_words)\n",
" all_chunks.append(chunk_text)\n",
" all_ids.append(f\"{examples['id'][i]}_{len(all_chunks)}\") # Unique ID for the chunk\n",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential issue with chunk ID generation.

The chunk ID generation uses len(all_chunks) which includes the current chunk being added, potentially causing inconsistent or duplicate IDs.

Apply this diff to fix the ID generation:

-                    all_ids.append(f"{examples['id'][i]}_{len(all_chunks)}")  # Unique ID for the chunk
+                    all_ids.append(f"{examples['id'][i]}_{len(all_chunks) + 1}")  # Unique ID for the chunk
-            all_ids.append(f"{examples['id'][i]}_{len(all_chunks)}")  # Unique ID for the chunk
+            all_ids.append(f"{examples['id'][i]}_{len(all_chunks) + 1}")  # Unique ID for the chunk

Also applies to: 94-94

🤖 Prompt for AI Agents
In examples/kfto_feast_rag/rag_feast_kfto.ipynb at lines 83 and 94, the chunk ID
generation uses len(all_chunks) which counts the current chunk being added,
risking duplicate or inconsistent IDs. To fix this, replace len(all_chunks) with
the index of the chunk before appending it, ensuring each chunk ID is unique and
consistent by using the correct chunk index value.

" all_titles.append(examples['title'][i])\n",
" # Start a new chunk with the current word\n",
" current_chunk_words = [word]\n",
" else:\n",
" current_chunk_words.append(word)\n",
"\n",
" # Add the last remaining chunk\n",
" if current_chunk_words:\n",
" chunk_text = ' '.join(current_chunk_words)\n",
" all_chunks.append(chunk_text)\n",
" all_ids.append(f\"{examples['id'][i]}_{len(all_chunks)}\") # Unique ID for the chunk\n",
" all_titles.append(examples['title'][i])\n",
"\n",
" return {'id': all_ids, 'title': all_titles, 'text': all_chunks}\n",
"\n",
Expand Down Expand Up @@ -120,6 +137,15 @@
"#### Create parquet file as historical data source"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%mkdir feature_repo/data"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -145,7 +171,7 @@
"print(df[\"embedding\"].apply(lambda x: len(x) if isinstance(x, list) else str(type(x))).value_counts()) # Check lengths\n",
"\n",
"# Save to Parquet\n",
"df.to_parquet(\"wiki_dpr.parquet\", index=False)\n",
"df.to_parquet(\"feature_repo/data/wiki_dpr.parquet\", index=False)\n",
"print(\"Saved to wiki_dpr.parquet\")"
]
},
Expand Down Expand Up @@ -231,8 +257,9 @@
"source": [
"import sys\n",
"sys.path.append(\"..\")\n",
"from feast_rag_retriever import FeastVectorStore, FeastRAGRetriever, FeastIndex\n",
"from rag_project_repo import wiki_passage_feature_view\n",
"from ragproject_repo import wiki_passage_feature_view\n",
"from feast.vector_store import FeastVectorStore\n",
"from feast.rag_retriever import FeastIndex, FeastRAGRetriever\n",
"\n",
"generator_config=generator_model.config\n",
"question_encoder = AutoModel.from_pretrained(\"sentence-transformers/all-MiniLM-L6-v2\")\n",
Expand All @@ -245,12 +272,12 @@
"}\n",
"\n",
"vector_store = FeastVectorStore(\n",
" store=store,\n",
" repo_path=\".\",\n",
" rag_view=wiki_passage_feature_view,\n",
" features=[\"wiki_passages:passage_text\", \"wiki_passages:embedding\"]\n",
" features=[\"wiki_passages:passage_text\", \"wiki_passages:embedding\", \"wiki_passages:passage_id\"]\n",
")\n",
"\n",
"feast_index = FeastIndex(vector_store=vector_store)\n",
"feast_index = FeastIndex()\n",
"\n",
"config = RagConfig(\n",
" question_encoder=query_encoder_config,\n",
Expand All @@ -262,10 +289,12 @@
" question_encoder_tokenizer=question_encoder_tokenizer,\n",
" generator_tokenizer=generator_tokenizer,\n",
" feast_repo_path=\".\",\n",
" vector_store=vector_store,\n",
" feature_view=vector_store.rag_view,\n",
" features=vector_store.features,\n",
" generator_model=generator_model, \n",
" search_type=\"vector\",\n",
" id_field=\"passage_id\",\n",
" text_field=\"passage_text\",\n",
" config=config,\n",
" index=feast_index,\n",
")"
Expand Down