Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ test-python-integration-local: ## Run Python integration tests (local dev mode)
test-python-integration-rbac-remote: ## Run Python remote RBAC integration tests
FEAST_IS_LOCAL_TEST=True \
FEAST_LOCAL_ONLINE_CONTAINER=True \
python -m pytest --tb=short -v -n 4 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
python -m pytest --tb=short -v -n 8 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
-k "not test_lambda_materialization and not test_snowflake_materialization" \
-m "rbac_remote_integration_test" \
--log-cli-level=INFO -s \
Expand Down
132 changes: 113 additions & 19 deletions sdk/python/tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import atexit
import json
import logging
import random
import tempfile
import time
from multiprocessing import Manager
from pathlib import Path
from typing import Optional

import pytest
from testcontainers.keycloak import KeycloakContainer
Expand All @@ -14,28 +18,118 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

shared_state = Manager().dict()
# Shared Keycloak state
_keycloak_container: Optional[KeycloakContainer] = None
_keycloak_info_file = Path(tempfile.gettempdir()) / "feast_keycloak_info.json"


def _is_keycloak_healthy(url: str) -> bool:
"""Health check for Keycloak."""
try:
import requests

response = requests.get(f"{url}/health/ready", timeout=3)
return response.status_code == 200
except Exception:
try:
import requests

response = requests.get(f"{url}/auth/realms/master", timeout=3)
return response.status_code in [200, 404]
except Exception:
return False


def _get_shared_keycloak_url() -> Optional[str]:
"""Get URL of existing Keycloak instance if available."""
try:
if _keycloak_info_file.exists():
with open(_keycloak_info_file, "r") as f:
info = json.load(f)

url = info.get("url")
if url and _is_keycloak_healthy(url):
return url
else:
_keycloak_info_file.unlink()
except Exception as e:
logger.debug(f"Error reading Keycloak info: {e}")
try:
_keycloak_info_file.unlink()
except Exception:
pass
return None


def _save_keycloak_info(url: str):
"""Save Keycloak info to shared file."""
try:
info = {"url": url, "timestamp": time.time()}
with open(_keycloak_info_file, "w") as f:
json.dump(info, f)
except Exception as e:
logger.warning(f"Failed to save Keycloak info: {e}")


def _cleanup_keycloak():
"""Cleanup Keycloak container on exit."""
global _keycloak_container
if _keycloak_container:
try:
logger.info("Stopping Keycloak container")
_keycloak_container.stop()
except Exception as e:
logger.warning(f"Error stopping Keycloak: {e}")
finally:
_keycloak_container = None
try:
_keycloak_info_file.unlink()
except Exception:
pass


@pytest.fixture(scope="session")
def start_keycloak_server():
# Add random sleep between 0 and 2 before checking the state to avoid concurrency issues.
random_sleep_time = random.uniform(0, 2)
time.sleep(random_sleep_time)

# If the Keycloak instance is already started (in any worker), reuse it
if shared_state.get("keycloak_started", False):
return shared_state["keycloak_url"]
logger.info("Starting keycloak instance")
with KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") as keycloak_container:
setup_permissions_on_keycloak(keycloak_container.get_client())
shared_state["keycloak_started"] = True
shared_state["keycloak_url"] = keycloak_container.get_url()
yield shared_state["keycloak_url"]

# After the fixture is done, cleanup the shared state
del shared_state["keycloak_started"]
del shared_state["keycloak_url"]
global _keycloak_container

existing_url = _get_shared_keycloak_url()
if existing_url:
logger.info(f"Reusing existing Keycloak at {existing_url}")
yield existing_url
return

time.sleep(random.uniform(0, 0.5))

existing_url = _get_shared_keycloak_url()
if existing_url:
logger.info(f"Found Keycloak started by another process: {existing_url}")
yield existing_url
return

try:
logger.info("Starting new Keycloak instance")
_keycloak_container = KeycloakContainer("quay.io/keycloak/keycloak:24.0.1")
_keycloak_container.start()

setup_permissions_on_keycloak(_keycloak_container.get_client())

keycloak_url = _keycloak_container.get_url()

_save_keycloak_info(keycloak_url)
atexit.register(_cleanup_keycloak)

logger.info(f"Keycloak ready at {keycloak_url}")
yield keycloak_url

except Exception as e:
logger.error(f"Failed to start Keycloak: {e}")
if _keycloak_container:
try:
_keycloak_container.stop()
except Exception:
pass
_keycloak_container = None
raise


@pytest.fixture(scope="session")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,20 @@ def setup(self, registry: RegistryConfig):
)
return "grpc+tcp://{}:{}".format(host, self.server_port)

def teardown(self):
super().teardown()
if self.proc is not None:
self.proc.kill()

# wait server to free the port
wait_retry_backoff(
lambda: (
None,
not check_port_open("localhost", self.server_port),
),
timeout_secs=30,
)


class RemoteOfflineTlsStoreDataSourceCreator(FileDataSourceCreator):
def __init__(self, project_name: str, *args, **kwargs):
Expand Down
Loading