Skip to content

Commit 10879ec

Browse files
authored
fix: Fix remote rbac integration tests (#5473)
* fix: Fix remote rbac integration tests Signed-off-by: ntkathole <[email protected]> * fix: Spin up the only one instance of the keycloak Signed-off-by: ntkathole <[email protected]> --------- Signed-off-by: ntkathole <[email protected]>
1 parent dc5b2af commit 10879ec

File tree

3 files changed

+128
-20
lines changed

3 files changed

+128
-20
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ test-python-integration-local: ## Run Python integration tests (local dev mode)
155155
test-python-integration-rbac-remote: ## Run Python remote RBAC integration tests
156156
FEAST_IS_LOCAL_TEST=True \
157157
FEAST_LOCAL_ONLINE_CONTAINER=True \
158-
python -m pytest --tb=short -v -n 4 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
158+
python -m pytest --tb=short -v -n 8 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
159159
-k "not test_lambda_materialization and not test_snowflake_materialization" \
160160
-m "rbac_remote_integration_test" \
161161
--log-cli-level=INFO -s \

sdk/python/tests/integration/conftest.py

Lines changed: 113 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
import atexit
2+
import json
13
import logging
24
import random
5+
import tempfile
36
import time
4-
from multiprocessing import Manager
7+
from pathlib import Path
8+
from typing import Optional
59

610
import pytest
711
from testcontainers.keycloak import KeycloakContainer
@@ -14,28 +18,118 @@
1418
logger = logging.getLogger(__name__)
1519
logger.setLevel(logging.INFO)
1620

17-
shared_state = Manager().dict()
21+
# Shared Keycloak state
22+
_keycloak_container: Optional[KeycloakContainer] = None
23+
_keycloak_info_file = Path(tempfile.gettempdir()) / "feast_keycloak_info.json"
24+
25+
26+
def _is_keycloak_healthy(url: str) -> bool:
27+
"""Health check for Keycloak."""
28+
try:
29+
import requests
30+
31+
response = requests.get(f"{url}/health/ready", timeout=3)
32+
return response.status_code == 200
33+
except Exception:
34+
try:
35+
import requests
36+
37+
response = requests.get(f"{url}/auth/realms/master", timeout=3)
38+
return response.status_code in [200, 404]
39+
except Exception:
40+
return False
41+
42+
43+
def _get_shared_keycloak_url() -> Optional[str]:
44+
"""Get URL of existing Keycloak instance if available."""
45+
try:
46+
if _keycloak_info_file.exists():
47+
with open(_keycloak_info_file, "r") as f:
48+
info = json.load(f)
49+
50+
url = info.get("url")
51+
if url and _is_keycloak_healthy(url):
52+
return url
53+
else:
54+
_keycloak_info_file.unlink()
55+
except Exception as e:
56+
logger.debug(f"Error reading Keycloak info: {e}")
57+
try:
58+
_keycloak_info_file.unlink()
59+
except Exception:
60+
pass
61+
return None
62+
63+
64+
def _save_keycloak_info(url: str):
65+
"""Save Keycloak info to shared file."""
66+
try:
67+
info = {"url": url, "timestamp": time.time()}
68+
with open(_keycloak_info_file, "w") as f:
69+
json.dump(info, f)
70+
except Exception as e:
71+
logger.warning(f"Failed to save Keycloak info: {e}")
72+
73+
74+
def _cleanup_keycloak():
75+
"""Cleanup Keycloak container on exit."""
76+
global _keycloak_container
77+
if _keycloak_container:
78+
try:
79+
logger.info("Stopping Keycloak container")
80+
_keycloak_container.stop()
81+
except Exception as e:
82+
logger.warning(f"Error stopping Keycloak: {e}")
83+
finally:
84+
_keycloak_container = None
85+
try:
86+
_keycloak_info_file.unlink()
87+
except Exception:
88+
pass
1889

1990

2091
@pytest.fixture(scope="session")
2192
def start_keycloak_server():
22-
# Add random sleep between 0 and 2 before checking the state to avoid concurrency issues.
23-
random_sleep_time = random.uniform(0, 2)
24-
time.sleep(random_sleep_time)
25-
26-
# If the Keycloak instance is already started (in any worker), reuse it
27-
if shared_state.get("keycloak_started", False):
28-
return shared_state["keycloak_url"]
29-
logger.info("Starting keycloak instance")
30-
with KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") as keycloak_container:
31-
setup_permissions_on_keycloak(keycloak_container.get_client())
32-
shared_state["keycloak_started"] = True
33-
shared_state["keycloak_url"] = keycloak_container.get_url()
34-
yield shared_state["keycloak_url"]
35-
36-
# After the fixture is done, cleanup the shared state
37-
del shared_state["keycloak_started"]
38-
del shared_state["keycloak_url"]
93+
global _keycloak_container
94+
95+
existing_url = _get_shared_keycloak_url()
96+
if existing_url:
97+
logger.info(f"Reusing existing Keycloak at {existing_url}")
98+
yield existing_url
99+
return
100+
101+
time.sleep(random.uniform(0, 0.5))
102+
103+
existing_url = _get_shared_keycloak_url()
104+
if existing_url:
105+
logger.info(f"Found Keycloak started by another process: {existing_url}")
106+
yield existing_url
107+
return
108+
109+
try:
110+
logger.info("Starting new Keycloak instance")
111+
_keycloak_container = KeycloakContainer("quay.io/keycloak/keycloak:24.0.1")
112+
_keycloak_container.start()
113+
114+
setup_permissions_on_keycloak(_keycloak_container.get_client())
115+
116+
keycloak_url = _keycloak_container.get_url()
117+
118+
_save_keycloak_info(keycloak_url)
119+
atexit.register(_cleanup_keycloak)
120+
121+
logger.info(f"Keycloak ready at {keycloak_url}")
122+
yield keycloak_url
123+
124+
except Exception as e:
125+
logger.error(f"Failed to start Keycloak: {e}")
126+
if _keycloak_container:
127+
try:
128+
_keycloak_container.stop()
129+
except Exception:
130+
pass
131+
_keycloak_container = None
132+
raise
39133

40134

41135
@pytest.fixture(scope="session")

sdk/python/tests/integration/feature_repos/universal/data_sources/file.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,20 @@ def setup(self, registry: RegistryConfig):
417417
)
418418
return "grpc+tcp://{}:{}".format(host, self.server_port)
419419

420+
def teardown(self):
421+
super().teardown()
422+
if self.proc is not None:
423+
self.proc.kill()
424+
425+
# wait server to free the port
426+
wait_retry_backoff(
427+
lambda: (
428+
None,
429+
not check_port_open("localhost", self.server_port),
430+
),
431+
timeout_secs=30,
432+
)
433+
420434

421435
class RemoteOfflineTlsStoreDataSourceCreator(FileDataSourceCreator):
422436
def __init__(self, project_name: str, *args, **kwargs):

0 commit comments

Comments
 (0)