Skip to content

Commit 2439b83

Browse files
committed
feat: Added kuberay support
Signed-off-by: ntkathole <[email protected]>
1 parent da6257c commit 2439b83

File tree

12 files changed

+1000
-145
lines changed

12 files changed

+1000
-145
lines changed

docs/reference/compute-engine/ray.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,22 @@ batch_engine:
6262
| `max_parallelism_multiplier` | int | 2 | Parallelism as multiple of CPU cores |
6363
| `target_partition_size_mb` | int | 64 | Target partition size (MB) |
6464
| `window_size_for_joins` | string | "1H" | Time window for distributed joins |
65-
| `ray_address` | string | None | Ray cluster address (None = local Ray) |
65+
| `ray_address` | string | None | Ray cluster address (triggers REMOTE mode) |
66+
| `use_kuberay` | boolean | None | Enable KubeRay mode (overrides ray_address) |
67+
| `kuberay_conf` | dict | None | **KubeRay configuration dict** with keys: `cluster_name` (required), `namespace` (default: "default"), `auth_token`, `auth_server`, `skip_tls` (default: false) |
68+
| `enable_ray_logging` | boolean | false | Enable Ray progress bars and logging |
6669
| `enable_distributed_joins` | boolean | true | Enable distributed joins for large datasets |
6770
| `staging_location` | string | None | Remote path for batch materialization jobs |
68-
| `ray_conf` | dict | None | Ray configuration parameters |
69-
| `execution_timeout_seconds` | int | None | Timeout for job execution in seconds |
71+
| `ray_conf` | dict | None | Ray configuration parameters (memory, CPU limits) |
72+
73+
### Mode Detection Precedence
74+
75+
The Ray compute engine automatically detects the execution mode:
76+
77+
1. **Environment Variables** → KubeRay mode (if `FEAST_RAY_USE_KUBERAY=true`)
78+
2. **Config `kuberay_conf`** → KubeRay mode
79+
3. **Config `ray_address`** → Remote mode
80+
4. **Default** → Local mode
7081

7182
## Usage Examples
7283

docs/reference/offline-stores/ray.md

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ The Ray offline store is a data I/O implementation that leverages [Ray](https://
99

1010
The Ray offline store provides:
1111
- Ray-based data reading from file sources (Parquet, CSV, etc.)
12-
- Support for both local and distributed Ray clusters
12+
- Support for local, remote, and KubeRay (Kubernetes-managed) clusters
1313
- Integration with various storage backends (local files, S3, GCS, HDFS)
1414
- Efficient data filtering and column selection
1515
- Timestamp-based data processing with timezone awareness
16+
- Enterprise-ready KubeRay cluster support via CodeFlare SDK
1617

1718

1819
## Functionality Matrix
@@ -59,9 +60,15 @@ For complex feature processing, historical feature retrieval, and distributed jo
5960

6061
## Configuration
6162

62-
The Ray offline store can be configured in your `feature_store.yaml` file. Below are two main configuration patterns:
63+
The Ray offline store can be configured in your `feature_store.yaml` file. It supports **three execution modes**:
6364

64-
### Basic Ray Offline Store
65+
1. **LOCAL**: Ray runs locally on the same machine (default)
66+
2. **REMOTE**: Connects to a remote Ray cluster via `ray_address`
67+
3. **KUBERAY**: Connects to Ray clusters on Kubernetes via CodeFlare SDK
68+
69+
### Execution Modes
70+
71+
#### Local Mode (Default)
6572

6673
For simple data I/O operations without distributed processing:
6774

@@ -72,7 +79,44 @@ provider: local
7279
offline_store:
7380
type: ray
7481
storage_path: data/ray_storage # Optional: Path for storing datasets
75-
ray_address: localhost:10001 # Optional: Ray cluster address
82+
```
83+
84+
#### Remote Ray Cluster
85+
86+
Connect to an existing Ray cluster:
87+
88+
```yaml
89+
offline_store:
90+
type: ray
91+
storage_path: s3://my-bucket/feast-data
92+
ray_address: "ray://my-cluster.example.com:10001"
93+
```
94+
95+
#### KubeRay Cluster (Kubernetes)
96+
97+
Connect to Ray clusters on Kubernetes using CodeFlare SDK:
98+
99+
```yaml
100+
offline_store:
101+
type: ray
102+
storage_path: s3://my-bucket/feast-data
103+
use_kuberay: true
104+
kuberay_conf:
105+
cluster_name: "feast-ray-cluster"
106+
namespace: "feast-system"
107+
auth_token: "${RAY_AUTH_TOKEN}"
108+
auth_server: "https://api.openshift.com:6443"
109+
skip_tls: false
110+
enable_ray_logging: false
111+
```
112+
113+
**Environment Variables** (alternative to config file):
114+
```bash
115+
export FEAST_RAY_USE_KUBERAY=true
116+
export FEAST_RAY_CLUSTER_NAME=feast-ray-cluster
117+
export FEAST_RAY_AUTH_TOKEN=your-token
118+
export FEAST_RAY_AUTH_SERVER=https://api.openshift.com:6443
119+
export FEAST_RAY_NAMESPACE=feast-system
76120
```
77121

78122
### Ray Offline Store + Compute Engine
@@ -175,8 +219,29 @@ batch_engine:
175219
|--------|------|---------|-------------|
176220
| `type` | string | Required | Must be `feast.offline_stores.contrib.ray_offline_store.ray.RayOfflineStore` or `ray` |
177221
| `storage_path` | string | None | Path for storing temporary files and datasets |
178-
| `ray_address` | string | None | Address of the Ray cluster (e.g., "localhost:10001") |
222+
| `ray_address` | string | None | Ray cluster address (triggers REMOTE mode, e.g., "ray://host:10001") |
223+
| `use_kuberay` | boolean | None | Enable KubeRay mode (overrides ray_address) |
224+
| `kuberay_conf` | dict | None | **KubeRay configuration dict** with keys: `cluster_name` (required), `namespace` (default: "default"), `auth_token`, `auth_server`, `skip_tls` (default: false) |
225+
| `enable_ray_logging` | boolean | false | Enable Ray progress bars and verbose logging |
179226
| `ray_conf` | dict | None | Ray initialization parameters for resource management (e.g., memory, CPU limits) |
227+
| `broadcast_join_threshold_mb` | int | 100 | Size threshold for broadcast joins (MB) |
228+
| `enable_distributed_joins` | boolean | true | Enable distributed joins for large datasets |
229+
| `max_parallelism_multiplier` | int | 2 | Parallelism as multiple of CPU cores |
230+
| `target_partition_size_mb` | int | 64 | Target partition size (MB) |
231+
| `window_size_for_joins` | string | "1H" | Time window for distributed joins |
232+
233+
#### Mode Detection Precedence
234+
235+
The Ray offline store automatically detects the execution mode using the following precedence:
236+
237+
1. **Environment Variables** (highest priority)
238+
- `FEAST_RAY_USE_KUBERAY`, `FEAST_RAY_CLUSTER_NAME`, etc.
239+
2. **Config `kuberay_conf`**
240+
- If present → KubeRay mode
241+
3. **Config `ray_address`**
242+
- If present → Remote mode
243+
4. **Default**
244+
- Local mode (lowest priority)
180245

181246
#### Ray Compute Engine Options
182247

@@ -385,6 +450,8 @@ job.persist(hdfs_storage, allow_overwrite=True)
385450

386451
### Using Ray Cluster
387452

453+
#### Standard Ray Cluster
454+
388455
To use Ray in cluster mode for distributed data access:
389456

390457
1. Start a Ray cluster:
@@ -406,6 +473,53 @@ offline_store:
406473
ray start --address='head-node-ip:10001'
407474
```
408475

476+
#### KubeRay Cluster (Kubernetes)
477+
478+
To use Feast with Ray clusters on Kubernetes via CodeFlare SDK:
479+
480+
**Prerequisites:**
481+
- KubeRay cluster deployed on Kubernetes
482+
- CodeFlare SDK installed: `pip install codeflare-sdk`
483+
- Access credentials for the Kubernetes cluster
484+
485+
**Configuration:**
486+
487+
1. Using configuration file:
488+
```yaml
489+
offline_store:
490+
type: ray
491+
use_kuberay: true
492+
storage_path: s3://my-bucket/feast-data
493+
kuberay_conf:
494+
cluster_name: "feast-ray-cluster"
495+
namespace: "feast-system"
496+
auth_token: "${RAY_AUTH_TOKEN}"
497+
auth_server: "https://api.openshift.com:6443"
498+
skip_tls: false
499+
enable_ray_logging: false
500+
```
501+
502+
2. Using environment variables:
503+
```bash
504+
export FEAST_RAY_USE_KUBERAY=true
505+
export FEAST_RAY_CLUSTER_NAME=feast-ray-cluster
506+
export FEAST_RAY_AUTH_TOKEN=your-k8s-token
507+
export FEAST_RAY_AUTH_SERVER=https://api.openshift.com:6443
508+
export FEAST_RAY_NAMESPACE=feast-system
509+
export FEAST_RAY_SKIP_TLS=false
510+
511+
# Then use standard Feast code
512+
python your_feast_script.py
513+
```
514+
515+
**Features:**
516+
- The CodeFlare SDK handles cluster connection and authentication
517+
- Automatic TLS certificate management
518+
- Authentication with Kubernetes clusters
519+
- Namespace isolation
520+
- Secure communication between client and Ray cluster
521+
- Automatic cluster discovery
522+
409523
### Data Source Validation
410524

411525
The Ray offline store validates data sources to ensure compatibility:

sdk/python/feast/infra/compute_engines/ray/compute.py

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from datetime import datetime
33
from typing import Sequence, Union
44

5-
import ray
6-
75
from feast import (
86
BatchFeatureView,
97
Entity,
@@ -26,6 +24,10 @@
2624
)
2725
from feast.infra.compute_engines.ray.utils import write_to_online_store
2826
from feast.infra.offline_stores.offline_store import RetrievalJob
27+
from feast.infra.ray_initializer import (
28+
ensure_ray_initialized,
29+
get_ray_wrapper,
30+
)
2931
from feast.infra.registry.base_registry import BaseRegistry
3032

3133
logger = logging.getLogger(__name__)
@@ -58,37 +60,7 @@ def __init__(
5860

5961
def _ensure_ray_initialized(self):
6062
"""Ensure Ray is initialized with proper configuration."""
61-
if not ray.is_initialized():
62-
if self.config.ray_address:
63-
ray.init(
64-
address=self.config.ray_address,
65-
ignore_reinit_error=True,
66-
include_dashboard=False,
67-
)
68-
else:
69-
ray_init_args = {
70-
"ignore_reinit_error": True,
71-
"include_dashboard": False,
72-
}
73-
74-
# Add configuration from ray_conf if provided
75-
if self.config.ray_conf:
76-
ray_init_args.update(self.config.ray_conf)
77-
78-
ray.init(**ray_init_args)
79-
80-
# Configure Ray context for optimal performance
81-
from ray.data.context import DatasetContext
82-
83-
ctx = DatasetContext.get_current()
84-
ctx.enable_tensor_extension_casting = False
85-
86-
# Log Ray cluster information
87-
cluster_resources = ray.cluster_resources()
88-
logger.info(
89-
f"Ray cluster initialized with {cluster_resources.get('CPU', 0)} CPUs, "
90-
f"{cluster_resources.get('memory', 0) / (1024**3):.1f}GB memory"
91-
)
63+
ensure_ray_initialized(self.config)
9264

9365
def update(
9466
self,
@@ -230,7 +202,8 @@ def _materialize_from_offline_store(
230202

231203
# Write to sink_source using Ray data
232204
try:
233-
ray_dataset = ray.data.from_arrow(arrow_table)
205+
ray_wrapper = get_ray_wrapper()
206+
ray_dataset = ray_wrapper.from_arrow(arrow_table)
234207
ray_dataset.write_parquet(sink_source.path)
235208
except Exception as e:
236209
logger.error(

sdk/python/feast/infra/compute_engines/ray/config.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ class RayComputeEngineConfig(FeastConfigBaseModel):
4646
enable_optimization: bool = True
4747
"""Enable automatic performance optimizations."""
4848

49-
execution_timeout_seconds: Optional[int] = None
50-
"""Timeout for job execution in seconds."""
51-
5249
@property
5350
def window_size_timedelta(self) -> timedelta:
5451
"""Convert window size string to timedelta."""
@@ -64,3 +61,19 @@ def window_size_timedelta(self) -> timedelta:
6461
else:
6562
# Default to 1 hour
6663
return timedelta(hours=1)
64+
65+
# KubeRay/CodeFlare SDK configurations
66+
use_kuberay: Optional[bool] = None
67+
"""Whether to use KubeRay/CodeFlare SDK for Ray cluster management"""
68+
69+
cluster_name: Optional[str] = None
70+
"""Name of the KubeRay cluster to connect to (required for KubeRay mode)"""
71+
72+
auth_token: Optional[str] = None
73+
"""Authentication token for Ray cluster connection (for secure clusters)"""
74+
75+
kuberay_conf: Optional[Dict[str, Any]] = None
76+
"""KubeRay/CodeFlare configuration parameters (passed to CodeFlare SDK)"""
77+
78+
enable_ray_logging: bool = False
79+
"""Enable Ray progress bars and verbose logging"""

sdk/python/feast/infra/compute_engines/ray/job.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import pandas as pd
77
import pyarrow as pa
8-
import ray
98
from ray.data import Dataset
109

1110
from feast import OnDemandFeatureView
@@ -21,6 +20,7 @@
2120
from feast.infra.compute_engines.dag.value import DAGValue
2221
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
2322
from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata
23+
from feast.infra.ray_initializer import get_ray_wrapper
2424
from feast.repo_config import RepoConfig
2525
from feast.saved_dataset import SavedDatasetStorage
2626

@@ -69,10 +69,11 @@ def _ensure_executed(self) -> DAGValue:
6969
self._result_dataset = result.data
7070
else:
7171
# If result is not a Ray Dataset, convert it
72+
ray_wrapper = get_ray_wrapper()
7273
if isinstance(result.data, pd.DataFrame):
73-
self._result_dataset = ray.data.from_pandas(result.data)
74+
self._result_dataset = ray_wrapper.from_pandas(result.data)
7475
elif isinstance(result.data, pa.Table):
75-
self._result_dataset = ray.data.from_arrow(result.data)
76+
self._result_dataset = ray_wrapper.from_arrow(result.data)
7677
else:
7778
raise ValueError(
7879
f"Unsupported result type: {type(result.data)}"

sdk/python/feast/infra/compute_engines/ray/nodes.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
write_to_online_store,
2424
)
2525
from feast.infra.compute_engines.utils import create_offline_store_retrieval_job
26+
from feast.infra.ray_initializer import get_ray_wrapper
2627
from feast.infra.ray_shared_utils import (
2728
apply_field_mapping,
2829
broadcast_join,
@@ -72,10 +73,12 @@ def execute(self, context: ExecutionContext) -> DAGValue:
7273
else:
7374
try:
7475
arrow_table = retrieval_job.to_arrow()
75-
ray_dataset = ray.data.from_arrow(arrow_table)
76+
ray_wrapper = get_ray_wrapper()
77+
ray_dataset = ray_wrapper.from_arrow(arrow_table)
7678
except Exception:
7779
df = retrieval_job.to_df()
78-
ray_dataset = ray.data.from_pandas(df)
80+
ray_wrapper = get_ray_wrapper()
81+
ray_dataset = ray_wrapper.from_pandas(df)
7982

8083
field_mapping = getattr(self.source, "field_mapping", None)
8184
if field_mapping:
@@ -130,7 +133,8 @@ def execute(self, context: ExecutionContext) -> DAGValue:
130133

131134
entity_df = context.entity_df
132135
if isinstance(entity_df, pd.DataFrame):
133-
entity_dataset = ray.data.from_pandas(entity_df)
136+
ray_wrapper = get_ray_wrapper()
137+
entity_dataset = ray_wrapper.from_pandas(entity_df)
134138
else:
135139
entity_dataset = entity_df
136140

@@ -423,7 +427,8 @@ def _fallback_pandas_aggregation(self, dataset: Dataset, agg_dict: dict) -> Data
423427
result_df = result_df.reset_index()
424428

425429
# Convert back to Ray Dataset
426-
return ray.data.from_pandas(result_df)
430+
ray_wrapper = get_ray_wrapper()
431+
return ray_wrapper.from_pandas(result_df)
427432
else:
428433
return dataset
429434

0 commit comments

Comments
 (0)