Skip to content

Commit 4c65872

Browse files
feat: Add HDFS as a feature registry (#5655)
* feature(): support hdfs as registry Signed-off-by: Chimey Rock <[email protected]> * fix(): add type for HDFSRegistryStore Signed-off-by: Chimey Rock <[email protected]> * fix(): reformat code of registry.py file Signed-off-by: Chimey Rock <[email protected]> * fix(): change hdfs remove api and fix hdfs registry test Signed-off-by: Chimey Rock <[email protected]> * ci: install hadoop dependencies for pyarrow.fs.HDFSFileSystem Signed-off-by: Chimey Rock <[email protected]> * ci: fix install-hadoop-dependencies-ci Signed-off-by: Chimey Rock <[email protected]> * ci: typo in install-hadoop-dependencies-ci Signed-off-by: Chimey Rock <[email protected]> * ci: add HADOOP_USER_NAME env var Signed-off-by: Chimey Rock <[email protected]> * docs: add document for HDFS registry Signed-off-by: Chimey Rock <[email protected]> * fix: change wait logs of hdfs_registry test to ensure containers are running Signed-off-by: Chimey Rock <[email protected]> * ci: fix typo in install-hadoop-dependencies Signed-off-by: Chimey Rock <[email protected]> * docs(): Add pre-requisites for hdfs registry Signed-off-by: Chimey Rock <[email protected]> * ci(): cache hadoop tarball Signed-off-by: Chimey Rock <[email protected]> * ci(): rename hadoop-3.4.2 to hadoop Signed-off-by: Chimey Rock <[email protected]> * ci(): readd install-hadoop-dependencies-ci Signed-off-by: Chimey Rock <[email protected]> --------- Signed-off-by: Chimey Rock <[email protected]>
1 parent 3aec5d5 commit 4c65872

File tree

8 files changed

+255
-2
lines changed

8 files changed

+255
-2
lines changed

.github/workflows/pr_local_integration_tests.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ jobs:
4242
enable-cache: true
4343
- name: Install dependencies
4444
run: make install-python-dependencies-ci
45+
- name: Cache Hadoop tarball
46+
uses: actions/cache@v4
47+
with:
48+
path: ~/hadoop-3.4.2.tar.gz
49+
key: hadoop-3.4.2
50+
- name: Install Hadoop dependencies
51+
run: make install-hadoop-dependencies-ci
4552
- name: Test local integration tests
4653
if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak
4754
run: make test-python-integration-local

Makefile

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,19 @@ install-python-dependencies-ci: ## Install Python CI dependencies in system envi
8787
uv pip sync --system sdk/python/requirements/py$(PYTHON_VERSION)-ci-requirements.txt
8888
uv pip install --system --no-deps -e .
8989

90-
# Used by multicloud/Dockerfile.dev
90+
# Used in github actions/ci
91+
install-hadoop-dependencies-ci: ## Install Hadoop dependencies
92+
@if [ ! -f $$HOME/hadoop-3.4.2.tar.gz ]; then \
93+
echo "Downloading Hadoop tarball..."; \
94+
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.2/hadoop-3.4.2.tar.gz -O $$HOME/hadoop-3.4.2.tar.gz; \
95+
else \
96+
echo "Using cached Hadoop tarball"; \
97+
fi
98+
@if [ ! -d $$HOME/hadoop ]; then \
99+
echo "Extracting Hadoop tarball..."; \
100+
tar -xzf $$HOME/hadoop-3.4.2.tar.gz -C $$HOME; \
101+
mv $$HOME/hadoop-3.4.2 $$HOME/hadoop; \
102+
fi
91103
install-python-ci-dependencies: ## Install Python CI dependencies in system environment using piptools
92104
python -m piptools sync sdk/python/requirements/py$(PYTHON_VERSION)-ci-requirements.txt
93105
pip install --no-deps -e .
@@ -146,6 +158,9 @@ test-python-integration: ## Run Python integration tests (CI)
146158
test-python-integration-local: ## Run Python integration tests (local dev mode)
147159
FEAST_IS_LOCAL_TEST=True \
148160
FEAST_LOCAL_ONLINE_CONTAINER=True \
161+
HADOOP_HOME=$$HOME/hadoop \
162+
CLASSPATH="$$( $$HADOOP_HOME/bin/hadoop classpath --glob ):$$CLASSPATH" \
163+
HADOOP_USER_NAME=root \
149164
python -m pytest --tb=short -v -n 8 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
150165
-k "not test_lambda_materialization and not test_snowflake_materialization" \
151166
-m "not rbac_remote_integration_test" \

docs/reference/registries/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ Please see [Registry](../../getting-started/components/registry.md) for a concep
2626
[snowflake.md](snowflake.md)
2727
{% endcontent-ref %}
2828

29+
{% content-ref url="hdfs.md" %}
30+
[hdfs.md](hdfs.md)
31+
{% endcontent-ref %}
32+
2933
{% content-ref url="remote.md" %}
3034
[remote.md](remote.md)
3135
{% endcontent-ref %}

docs/reference/registries/hdfs.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# HDFS Registry
2+
3+
## Description
4+
5+
HDFS registry provides support for storing the protobuf representation of your feature store objects (data sources, feature views, feature services, etc.) in Hadoop Distributed File System (HDFS).
6+
7+
While it can be used in production, there are still inherent limitations with a file-based registries, since changing a single field in the registry requires re-writing the whole registry file. With multiple concurrent writers, this presents a risk of data loss, or bottlenecks writes to the registry since all changes have to be serialized (e.g. when running materialization for multiple feature views or time ranges concurrently).
8+
9+
### Pre-requisites
10+
11+
The HDFS registry requires Hadoop 3.3+ to be installed and the `HADOOP_HOME` environment variable set.
12+
13+
### Authentication and User Configuration
14+
15+
The HDFS registry is using `pyarrow.fs.HadoopFileSystem` and **does not** support specifying HDFS users or Kerberos credentials directly in the `feature_store.yaml` configuration. It relies entirely on the Hadoop and system environment configuration available to the process running Feast.
16+
17+
By default, `pyarrow.fs.HadoopFileSystem` inherits authentication from the underlying Hadoop client libraries and environment variables, such as:
18+
19+
- `HADOOP_USER_NAME`
20+
- `KRB5CCNAME`
21+
- `hadoop.security.authentication`
22+
- Any other relevant properties in `core-site.xml` and `hdfs-site.xml`
23+
24+
For more information, refer to:
25+
- [pyarrow.fs.HadoopFileSystem API Reference](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html)
26+
- [Hadoop Security: Simple & Kerberos Authentication](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html)
27+
28+
## Example
29+
30+
An example of how to configure this would be:
31+
32+
{% code title="feature_store.yaml" %}
33+
```yaml
34+
project: feast_hdfs
35+
registry:
36+
path: hdfs://[YOUR NAMENODE HOST]:[YOUR NAMENODE PORT]/[PATH TO REGISTRY]/registry.pb
37+
cache_ttl_seconds: 60
38+
online_store: null
39+
offline_store: null
40+
```
41+
{% endcode %}
42+

sdk/python/feast/infra/registry/contrib/hdfs/__init__.py

Whitespace-only changes.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import json
2+
import uuid
3+
from pathlib import Path, PurePosixPath
4+
from typing import Optional
5+
from urllib.parse import urlparse
6+
7+
from pyarrow import fs
8+
9+
from feast.infra.registry.registry_store import RegistryStore
10+
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
11+
from feast.repo_config import RegistryConfig
12+
from feast.utils import _utc_now
13+
14+
15+
class HDFSRegistryStore(RegistryStore):
16+
"""HDFS implementation of RegistryStore.
17+
registryConfig.path should be a hdfs path like hdfs://namenode:8020/path/to/registry.db
18+
"""
19+
20+
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
21+
try:
22+
from pyarrow.fs import HadoopFileSystem
23+
except ImportError as e:
24+
from feast.errors import FeastExtrasDependencyImportError
25+
26+
raise FeastExtrasDependencyImportError(
27+
"pyarrow.fs.HadoopFileSystem", str(e)
28+
)
29+
uri = registry_config.path
30+
self._uri = urlparse(uri)
31+
if self._uri.scheme != "hdfs":
32+
raise ValueError(
33+
f"Unsupported scheme {self._uri.scheme} in HDFS path {uri}"
34+
)
35+
self._hdfs = HadoopFileSystem(self._uri.hostname, self._uri.port or 8020)
36+
self._path = PurePosixPath(self._uri.path)
37+
38+
def get_registry_proto(self):
39+
registry_proto = RegistryProto()
40+
if _check_hdfs_path_exists(self._hdfs, str(self._path)):
41+
with self._hdfs.open_input_file(str(self._path)) as f:
42+
registry_proto.ParseFromString(f.read())
43+
return registry_proto
44+
raise FileNotFoundError(
45+
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
46+
)
47+
48+
def update_registry_proto(self, registry_proto: RegistryProto):
49+
self._write_registry(registry_proto)
50+
51+
def teardown(self):
52+
if _check_hdfs_path_exists(self._hdfs, str(self._path)):
53+
self._hdfs.delete_file(str(self._path))
54+
else:
55+
# Nothing to do
56+
pass
57+
58+
def _write_registry(self, registry_proto: RegistryProto):
59+
"""Write registry protobuf to HDFS."""
60+
registry_proto.version_id = str(uuid.uuid4())
61+
registry_proto.last_updated.FromDatetime(_utc_now())
62+
63+
dir_path = self._path.parent
64+
if not _check_hdfs_path_exists(self._hdfs, str(dir_path)):
65+
self._hdfs.create_dir(str(dir_path), recursive=True)
66+
67+
with self._hdfs.open_output_stream(str(self._path)) as f:
68+
f.write(registry_proto.SerializeToString())
69+
70+
def set_project_metadata(self, project: str, key: str, value: str):
71+
"""Set a custom project metadata key-value pair in the registry (HDFS backend)."""
72+
registry_proto = self.get_registry_proto()
73+
found = False
74+
75+
for pm in registry_proto.project_metadata:
76+
if pm.project == project:
77+
# Load JSON metadata from project_uuid
78+
try:
79+
meta = json.loads(pm.project_uuid) if pm.project_uuid else {}
80+
except Exception:
81+
meta = {}
82+
83+
if not isinstance(meta, dict):
84+
meta = {}
85+
86+
meta[key] = value
87+
pm.project_uuid = json.dumps(meta)
88+
found = True
89+
break
90+
91+
if not found:
92+
# Create new ProjectMetadata entry
93+
from feast.project_metadata import ProjectMetadata
94+
95+
pm = ProjectMetadata(project_name=project)
96+
pm.project_uuid = json.dumps({key: value})
97+
registry_proto.project_metadata.append(pm.to_proto())
98+
99+
# Write back
100+
self.update_registry_proto(registry_proto)
101+
102+
def get_project_metadata(self, project: str, key: str) -> Optional[str]:
103+
"""Get custom project metadata key from registry (HDFS backend)."""
104+
registry_proto = self.get_registry_proto()
105+
106+
for pm in registry_proto.project_metadata:
107+
if pm.project == project:
108+
try:
109+
meta = json.loads(pm.project_uuid) if pm.project_uuid else {}
110+
except Exception:
111+
meta = {}
112+
113+
if not isinstance(meta, dict):
114+
return None
115+
return meta.get(key, None)
116+
return None
117+
118+
119+
def _check_hdfs_path_exists(hdfs, path: str) -> bool:
120+
info = hdfs.get_file_info([path])[0]
121+
return info.type != fs.FileType.NotFound

sdk/python/feast/infra/registry/registry.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@
6262
"S3RegistryStore": "feast.infra.registry.s3.S3RegistryStore",
6363
"FileRegistryStore": "feast.infra.registry.file.FileRegistryStore",
6464
"AzureRegistryStore": "feast.infra.registry.contrib.azure.azure_registry_store.AzBlobRegistryStore",
65+
"HDFSRegistryStore": "feast.infra.registry.contrib.hdfs.hdfs_registry_store.HDFSRegistryStore",
6566
}
6667

6768
REGISTRY_STORE_CLASS_FOR_SCHEME = {
6869
"gs": "GCSRegistryStore",
6970
"s3": "S3RegistryStore",
7071
"file": "FileRegistryStore",
72+
"hdfs": "HDFSRegistryStore",
7173
"": "FileRegistryStore",
7274
}
7375

@@ -143,7 +145,7 @@ def get_registry_store_class_from_scheme(registry_path: str):
143145
if uri.scheme not in REGISTRY_STORE_CLASS_FOR_SCHEME:
144146
raise Exception(
145147
f"Registry path {registry_path} has unsupported scheme {uri.scheme}. "
146-
f"Supported schemes are file, s3 and gs."
148+
f"Supported schemes are file, s3, gs and hdfs."
147149
)
148150
else:
149151
registry_store_type = REGISTRY_STORE_CLASS_FOR_SCHEME[uri.scheme]

sdk/python/tests/integration/registration/test_universal_registry.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@
2222

2323
import grpc_testing
2424
import pandas as pd
25+
import pyarrow.fs as fs
2526
import pytest
2627
from pytest_lazyfixture import lazy_fixture
28+
from testcontainers.core.container import DockerContainer
29+
from testcontainers.core.network import Network
30+
from testcontainers.core.waiting_utils import wait_for_logs
2731
from testcontainers.mysql import MySqlContainer
2832
from testcontainers.postgres import PostgresContainer
2933

@@ -280,6 +284,60 @@ def sqlite_registry():
280284
yield SqlRegistry(registry_config, "project", None)
281285

282286

287+
@pytest.fixture(scope="function")
288+
def hdfs_registry():
289+
HADOOP_NAMENODE_IMAGE = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8"
290+
HADOOP_DATANODE_IMAGE = "bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8"
291+
HDFS_CLUSTER_NAME = "feast-hdfs-cluster"
292+
HADOOP_NAMENODE_WAIT_LOG = "namenode.NameNode: NameNode RPC up"
293+
HADOOP_DATANODE_WAIT_LOG = "datanode.DataNode: .*successfully registered with NN"
294+
with Network() as network:
295+
namenode = None
296+
datanode = None
297+
298+
try:
299+
namenode = (
300+
DockerContainer(HADOOP_NAMENODE_IMAGE)
301+
.with_network(network)
302+
.with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME)
303+
.with_exposed_ports(8020)
304+
.with_network_aliases("namenode")
305+
.with_kwargs(hostname="namenode")
306+
.start()
307+
)
308+
wait_for_logs(namenode, HADOOP_NAMENODE_WAIT_LOG, timeout=120)
309+
namenode_ip = namenode.get_container_host_ip()
310+
namenode_port = int(namenode.get_exposed_port(8020))
311+
312+
datanode = (
313+
DockerContainer(HADOOP_DATANODE_IMAGE)
314+
.with_network(network)
315+
.with_exposed_ports(9867)
316+
.with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME)
317+
.with_env("CORE_CONF_fs_defaultFS", "hdfs://namenode:8020")
318+
.with_network_aliases("datanode")
319+
.with_kwargs(hostname="datanode")
320+
.start()
321+
)
322+
323+
wait_for_logs(datanode, HADOOP_DATANODE_WAIT_LOG, timeout=120)
324+
325+
hdfs = fs.HadoopFileSystem(host=namenode_ip, port=namenode_port)
326+
hdfs.create_dir("/feast")
327+
registry_path = f"hdfs://{namenode_ip}:{namenode_port}/feast/registry.db"
328+
with hdfs.open_output_stream(registry_path) as f:
329+
f.write(b"")
330+
331+
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
332+
reg = Registry("project", registry_config, None)
333+
yield reg
334+
finally:
335+
if datanode:
336+
datanode.stop()
337+
if namenode:
338+
namenode.stop()
339+
340+
283341
class GrpcMockChannel:
284342
def __init__(self, service, servicer):
285343
self.service = service
@@ -350,6 +408,10 @@ def mock_remote_registry():
350408
lazy_fixture("mock_remote_registry"),
351409
marks=pytest.mark.rbac_remote_integration_test,
352410
),
411+
pytest.param(
412+
lazy_fixture("hdfs_registry"),
413+
marks=pytest.mark.xdist_group(name="hdfs_registry"),
414+
),
353415
]
354416

355417
sql_fixtures = [

0 commit comments

Comments
 (0)