Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions docs/reference/compute-engine/ray.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,22 @@ batch_engine:
| `max_parallelism_multiplier` | int | 2 | Parallelism as multiple of CPU cores |
| `target_partition_size_mb` | int | 64 | Target partition size (MB) |
| `window_size_for_joins` | string | "1H" | Time window for distributed joins |
| `ray_address` | string | None | Ray cluster address (None = local Ray) |
| `ray_address` | string | None | Ray cluster address (triggers REMOTE mode) |
| `use_kuberay` | boolean | None | Enable KubeRay mode (overrides ray_address) |
| `kuberay_conf` | dict | None | **KubeRay configuration dict** with keys: `cluster_name` (required), `namespace` (default: "default"), `auth_token`, `auth_server`, `skip_tls` (default: false) |
| `enable_ray_logging` | boolean | false | Enable Ray progress bars and logging |
| `enable_distributed_joins` | boolean | true | Enable distributed joins for large datasets |
| `staging_location` | string | None | Remote path for batch materialization jobs |
| `ray_conf` | dict | None | Ray configuration parameters |
| `execution_timeout_seconds` | int | None | Timeout for job execution in seconds |
| `ray_conf` | dict | None | Ray configuration parameters (memory, CPU limits) |

### Mode Detection Precedence

The Ray compute engine automatically detects the execution mode:

1. **Environment Variables** → KubeRay mode (if `FEAST_RAY_USE_KUBERAY=true`)
2. **Config `kuberay_conf`** → KubeRay mode
3. **Config `ray_address`** → Remote mode
4. **Default** → Local mode

## Usage Examples

Expand Down
124 changes: 119 additions & 5 deletions docs/reference/offline-stores/ray.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ The Ray offline store is a data I/O implementation that leverages [Ray](https://

The Ray offline store provides:
- Ray-based data reading from file sources (Parquet, CSV, etc.)
- Support for both local and distributed Ray clusters
- Support for local, remote, and KubeRay (Kubernetes-managed) clusters
- Integration with various storage backends (local files, S3, GCS, HDFS)
- Efficient data filtering and column selection
- Timestamp-based data processing with timezone awareness
- Enterprise-ready KubeRay cluster support via CodeFlare SDK


## Functionality Matrix
Expand Down Expand Up @@ -59,9 +60,15 @@ For complex feature processing, historical feature retrieval, and distributed jo

## Configuration

The Ray offline store can be configured in your `feature_store.yaml` file. Below are two main configuration patterns:
The Ray offline store can be configured in your `feature_store.yaml` file. It supports **three execution modes**:

### Basic Ray Offline Store
1. **LOCAL**: Ray runs locally on the same machine (default)
2. **REMOTE**: Connects to a remote Ray cluster via `ray_address`
3. **KUBERAY**: Connects to Ray clusters on Kubernetes via CodeFlare SDK

### Execution Modes

#### Local Mode (Default)

For simple data I/O operations without distributed processing:

Expand All @@ -72,7 +79,44 @@ provider: local
offline_store:
type: ray
storage_path: data/ray_storage # Optional: Path for storing datasets
ray_address: localhost:10001 # Optional: Ray cluster address
```

#### Remote Ray Cluster

Connect to an existing Ray cluster:

```yaml
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data
ray_address: "ray://my-cluster.example.com:10001"
```

#### KubeRay Cluster (Kubernetes)

Connect to Ray clusters on Kubernetes using CodeFlare SDK:

```yaml
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data
use_kuberay: true
kuberay_conf:
cluster_name: "feast-ray-cluster"
namespace: "feast-system"
auth_token: "${RAY_AUTH_TOKEN}"
auth_server: "https://api.openshift.com:6443"
skip_tls: false
enable_ray_logging: false
```

**Environment Variables** (alternative to config file):
```bash
export FEAST_RAY_USE_KUBERAY=true
export FEAST_RAY_CLUSTER_NAME=feast-ray-cluster
export FEAST_RAY_AUTH_TOKEN=your-token
export FEAST_RAY_AUTH_SERVER=https://api.openshift.com:6443
export FEAST_RAY_NAMESPACE=feast-system
```

### Ray Offline Store + Compute Engine
Expand Down Expand Up @@ -175,8 +219,29 @@ batch_engine:
|--------|------|---------|-------------|
| `type` | string | Required | Must be `feast.offline_stores.contrib.ray_offline_store.ray.RayOfflineStore` or `ray` |
| `storage_path` | string | None | Path for storing temporary files and datasets |
| `ray_address` | string | None | Address of the Ray cluster (e.g., "localhost:10001") |
| `ray_address` | string | None | Ray cluster address (triggers REMOTE mode, e.g., "ray://host:10001") |
| `use_kuberay` | boolean | None | Enable KubeRay mode (overrides ray_address) |
| `kuberay_conf` | dict | None | **KubeRay configuration dict** with keys: `cluster_name` (required), `namespace` (default: "default"), `auth_token`, `auth_server`, `skip_tls` (default: false) |
| `enable_ray_logging` | boolean | false | Enable Ray progress bars and verbose logging |
| `ray_conf` | dict | None | Ray initialization parameters for resource management (e.g., memory, CPU limits) |
| `broadcast_join_threshold_mb` | int | 100 | Size threshold for broadcast joins (MB) |
| `enable_distributed_joins` | boolean | true | Enable distributed joins for large datasets |
| `max_parallelism_multiplier` | int | 2 | Parallelism as multiple of CPU cores |
| `target_partition_size_mb` | int | 64 | Target partition size (MB) |
| `window_size_for_joins` | string | "1H" | Time window for distributed joins |

#### Mode Detection Precedence

The Ray offline store automatically detects the execution mode using the following precedence:

1. **Environment Variables** (highest priority)
- `FEAST_RAY_USE_KUBERAY`, `FEAST_RAY_CLUSTER_NAME`, etc.
2. **Config `kuberay_conf`**
- If present → KubeRay mode
3. **Config `ray_address`**
- If present → Remote mode
4. **Default**
- Local mode (lowest priority)

#### Ray Compute Engine Options

Expand Down Expand Up @@ -385,6 +450,8 @@ job.persist(hdfs_storage, allow_overwrite=True)

### Using Ray Cluster

#### Standard Ray Cluster

To use Ray in cluster mode for distributed data access:

1. Start a Ray cluster:
Expand All @@ -406,6 +473,53 @@ offline_store:
ray start --address='head-node-ip:10001'
```

#### KubeRay Cluster (Kubernetes)

To use Feast with Ray clusters on Kubernetes via CodeFlare SDK:

**Prerequisites:**
- KubeRay cluster deployed on Kubernetes
- CodeFlare SDK installed: `pip install codeflare-sdk`
- Access credentials for the Kubernetes cluster

**Configuration:**

1. Using configuration file:
```yaml
offline_store:
type: ray
use_kuberay: true
storage_path: s3://my-bucket/feast-data
kuberay_conf:
cluster_name: "feast-ray-cluster"
namespace: "feast-system"
auth_token: "${RAY_AUTH_TOKEN}"
auth_server: "https://api.openshift.com:6443"
skip_tls: false
enable_ray_logging: false
```

2. Using environment variables:
```bash
export FEAST_RAY_USE_KUBERAY=true
export FEAST_RAY_CLUSTER_NAME=feast-ray-cluster
export FEAST_RAY_AUTH_TOKEN=your-k8s-token
export FEAST_RAY_AUTH_SERVER=https://api.openshift.com:6443
export FEAST_RAY_NAMESPACE=feast-system
export FEAST_RAY_SKIP_TLS=false

# Then use standard Feast code
python your_feast_script.py
```

**Features:**
- The CodeFlare SDK handles cluster connection and authentication
- Automatic TLS certificate management
- Authentication with Kubernetes clusters
- Namespace isolation
- Secure communication between client and Ray cluster
- Automatic cluster discovery

### Data Source Validation

The Ray offline store validates data sources to ensure compatibility:
Expand Down
41 changes: 7 additions & 34 deletions sdk/python/feast/infra/compute_engines/ray/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from datetime import datetime
from typing import Sequence, Union

import ray

from feast import (
BatchFeatureView,
Entity,
Expand All @@ -26,6 +24,10 @@
)
from feast.infra.compute_engines.ray.utils import write_to_online_store
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.ray_initializer import (
ensure_ray_initialized,
get_ray_wrapper,
)
from feast.infra.registry.base_registry import BaseRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,37 +60,7 @@ def __init__(

def _ensure_ray_initialized(self):
"""Ensure Ray is initialized with proper configuration."""
if not ray.is_initialized():
if self.config.ray_address:
ray.init(
address=self.config.ray_address,
ignore_reinit_error=True,
include_dashboard=False,
)
else:
ray_init_args = {
"ignore_reinit_error": True,
"include_dashboard": False,
}

# Add configuration from ray_conf if provided
if self.config.ray_conf:
ray_init_args.update(self.config.ray_conf)

ray.init(**ray_init_args)

# Configure Ray context for optimal performance
from ray.data.context import DatasetContext

ctx = DatasetContext.get_current()
ctx.enable_tensor_extension_casting = False

# Log Ray cluster information
cluster_resources = ray.cluster_resources()
logger.info(
f"Ray cluster initialized with {cluster_resources.get('CPU', 0)} CPUs, "
f"{cluster_resources.get('memory', 0) / (1024**3):.1f}GB memory"
)
ensure_ray_initialized(self.config)

def update(
self,
Expand Down Expand Up @@ -230,7 +202,8 @@ def _materialize_from_offline_store(

# Write to sink_source using Ray data
try:
ray_dataset = ray.data.from_arrow(arrow_table)
ray_wrapper = get_ray_wrapper()
ray_dataset = ray_wrapper.from_arrow(arrow_table)
ray_dataset.write_parquet(sink_source.path)
except Exception as e:
logger.error(
Expand Down
19 changes: 16 additions & 3 deletions sdk/python/feast/infra/compute_engines/ray/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class RayComputeEngineConfig(FeastConfigBaseModel):
enable_optimization: bool = True
"""Enable automatic performance optimizations."""

execution_timeout_seconds: Optional[int] = None
"""Timeout for job execution in seconds."""

@property
def window_size_timedelta(self) -> timedelta:
"""Convert window size string to timedelta."""
Expand All @@ -64,3 +61,19 @@ def window_size_timedelta(self) -> timedelta:
else:
# Default to 1 hour
return timedelta(hours=1)

# KubeRay/CodeFlare SDK configurations
use_kuberay: Optional[bool] = None
"""Whether to use KubeRay/CodeFlare SDK for Ray cluster management"""

cluster_name: Optional[str] = None
"""Name of the KubeRay cluster to connect to (required for KubeRay mode)"""

auth_token: Optional[str] = None
"""Authentication token for Ray cluster connection (for secure clusters)"""

kuberay_conf: Optional[Dict[str, Any]] = None
"""KubeRay/CodeFlare configuration parameters (passed to CodeFlare SDK)"""

enable_ray_logging: bool = False
"""Enable Ray progress bars and verbose logging"""
7 changes: 4 additions & 3 deletions sdk/python/feast/infra/compute_engines/ray/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import pandas as pd
import pyarrow as pa
import ray
from ray.data import Dataset

from feast import OnDemandFeatureView
Expand All @@ -21,6 +20,7 @@
from feast.infra.compute_engines.dag.value import DAGValue
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata
from feast.infra.ray_initializer import get_ray_wrapper
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDatasetStorage

Expand Down Expand Up @@ -69,10 +69,11 @@ def _ensure_executed(self) -> DAGValue:
self._result_dataset = result.data
else:
# If result is not a Ray Dataset, convert it
ray_wrapper = get_ray_wrapper()
if isinstance(result.data, pd.DataFrame):
self._result_dataset = ray.data.from_pandas(result.data)
self._result_dataset = ray_wrapper.from_pandas(result.data)
elif isinstance(result.data, pa.Table):
self._result_dataset = ray.data.from_arrow(result.data)
self._result_dataset = ray_wrapper.from_arrow(result.data)
else:
raise ValueError(
f"Unsupported result type: {type(result.data)}"
Expand Down
13 changes: 9 additions & 4 deletions sdk/python/feast/infra/compute_engines/ray/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
write_to_online_store,
)
from feast.infra.compute_engines.utils import create_offline_store_retrieval_job
from feast.infra.ray_initializer import get_ray_wrapper
from feast.infra.ray_shared_utils import (
apply_field_mapping,
broadcast_join,
Expand Down Expand Up @@ -72,10 +73,12 @@ def execute(self, context: ExecutionContext) -> DAGValue:
else:
try:
arrow_table = retrieval_job.to_arrow()
ray_dataset = ray.data.from_arrow(arrow_table)
ray_wrapper = get_ray_wrapper()
ray_dataset = ray_wrapper.from_arrow(arrow_table)
except Exception:
df = retrieval_job.to_df()
ray_dataset = ray.data.from_pandas(df)
ray_wrapper = get_ray_wrapper()
ray_dataset = ray_wrapper.from_pandas(df)

field_mapping = getattr(self.source, "field_mapping", None)
if field_mapping:
Expand Down Expand Up @@ -130,7 +133,8 @@ def execute(self, context: ExecutionContext) -> DAGValue:

entity_df = context.entity_df
if isinstance(entity_df, pd.DataFrame):
entity_dataset = ray.data.from_pandas(entity_df)
ray_wrapper = get_ray_wrapper()
entity_dataset = ray_wrapper.from_pandas(entity_df)
else:
entity_dataset = entity_df

Expand Down Expand Up @@ -423,7 +427,8 @@ def _fallback_pandas_aggregation(self, dataset: Dataset, agg_dict: dict) -> Data
result_df = result_df.reset_index()

# Convert back to Ray Dataset
return ray.data.from_pandas(result_df)
ray_wrapper = get_ray_wrapper()
return ray_wrapper.from_pandas(result_df)
else:
return dataset

Expand Down
Loading
Loading