Skip to content

Commit d714a70

Browse files
8W9aGnevillelyh
andauthored
Add ability to wait for an environment (#1957)
* Send in app threads directly from args * This allows us not to read the config for the app threads until absolutely necessary. * Load config right before necessary * Delay loading the cog yaml until it is read from in the code. * Add waiting for a wait file * Allow environment variables to control whether we wait for a file to appear before further processing * This allows the python interpreter to boot up while we wait for other files to become available * Add wait_for_imports ability * Allow waiting for a general environment by importing select python packages while the system boots up around cog. * Fix lint on src_path * Fix watchdog version * Remove load_config in openapi_schema cmd * Do not access root files on GHA workers * Set recursive to true * I shouldn’t need to do this but want to see if it relieves the errors on linux. * Watch the directory instead * Add code_xforms test * A small test to make sense of what the code stripper is doing. * Add http server to test to let it respond * Abstract away cog config * Create a class for accessing cog config * Only access variables from the config by properties * Gate those properties with environment variable function decorators to allow fetching the config from the environment rather than a file. * This allows the environment to begin running without needing the /src * Wait for environment before executing setup * While waiting for the environment to boot, load The designated imports to speed up interpreter Time. * Fix Type on lower python versions * Skip test_strip_model_source_code if < 3.9 * In strip model source code we use an AST function that isn’t available prior to 3.9 * Change COG_WAIT_IMPORTS to COG_EAGER_IMPORTS * We aren’t waiting for these imports, we are loading them eagerly while we wait. * Bump integration test timeout to 20 mins * We have so many that we need to increase this. * Add tests for Config class * Fix an issue with env_property where it could not handle Optional or Union * Fix get_args and get_origin in python 3.7 * Create these functions because they don’t exist in python 3.7 * Add more tests for config * Check wait flag has fallen before eager import * Add watch handler tests * Remove watchdog and use SIGUSR2 for signalling * Fix no torch import in tests * Do naive waiting for file * Do not wait for a signal, just use a while loop with a 10 ms interval to check for presence of file. * Consolidate code_xforms tests * test_code_xforms actually lived in the server directory and not in the cog directory, move these tests into the cog directory. * Add a test and code for removing function decorators from the target functions, these can remove the ability to load the slimmed predictor. * Add logic to keep referenced globals * Keep a global if it is referenced as a default from a functions arguments. * Convert set of globals to list * Add logging for file waiting * Fix lint issue by checking that tree is ast.Module * Use typing.List instead of list for older python * Fix more List issues * Add further logging if the module fast loader failed * Change tuple to Tuple for older pythons * Add setup logs to mirror prediction logs * Allow the logs to show when certain setup steps are happening * Add 3.12 to torch 2.3.0 * Add insertion of python path * Add support for handling subclasses in slim predict * Fix listing issues * Wrap in str * Remove extraneous print * Fix python path race condition * Remove predictor import * Fix pydantic 2 errors * More pydantic 2 fixes * Support multiple class names and method names * This allows consumers to strip source code but Maintain multiple classes and methods * This is important if they want to maintain both Predict and train types * Use lists instead single instances for stripping * Fix types imports * Revert "Add 3.12 to torch 2.3.0" This reverts commit 096c45b. * Fix imports after merge * Add COG_TRAIN_PREDICTOR * Ensure that train/predictor stripped code is kept separate in the event a user uses separate files to refer to these different modes. * Rename env vars to be sensical * Old ones used to be many overloaded concepts. * Add PYTHONPATH environment variables * We can’t just update the system path, we also need to change the PYTHONPATH environment variable * This allows forked processes to see this environment variable and act accordingly in user code. * Add consistent debug logging in config * Some tests previously checked the debug logging * Make sure we conform to the same debug logging * Add test_strip_model_source_code_keeps_referenced_class_from_function * Confirm that this behaviour is consistent with functions. * Explicitly check return code in test train * Allows better debugging of stderr et al * Handle response types in _predict * Currently predict assumes that PredictionResponse is the serialisation target. * This isn’t the case if we are calling _predict from a training endpoint * Allow response_type to be fed into the _predict method to inform it of what kind of response it should expect. * Make cog train a first class CLI function * Use the proper endpoints to call training functions. * Log the command properly to the user. * Add back missing imports from merge * Remove connection import * Fix lint * Add R8_ prefix to PYTHON_VERSION (#2058) --------- Signed-off-by: Will Sackfield <[email protected]> Co-authored-by: Neville Li <[email protected]>
1 parent 465afe1 commit d714a70

File tree

6 files changed

+213
-0
lines changed

6 files changed

+213
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ docs/README.md
1919
docs/CONTRIBUTING.md
2020
venv
2121
base-image
22+
flag_file

python/cog/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
load_full_predictor_from_file,
2323
)
2424
from .types import CogConfig
25+
from .wait import wait_for_env
2526

2627
COG_YAML_FILE = "cog.yaml"
2728
COG_PREDICT_TYPE_STUB_ENV_VAR = "COG_PREDICT_TYPE_STUB"
@@ -67,6 +68,7 @@ def _cog_config(self) -> CogConfig:
6768
"""
6869
config = self._config
6970
if config is None:
71+
wait_for_env(include_imports=False)
7072
config_path = os.path.abspath(COG_YAML_FILE)
7173
try:
7274
with open(config_path, encoding="utf-8") as handle:
@@ -108,6 +110,7 @@ def _predictor_code(
108110
if source_code is not None:
109111
return source_code
110112
if sys.version_info >= (3, 9):
113+
wait_for_env(include_imports=False)
111114
with open(module_path, encoding="utf-8") as file:
112115
return strip_model_source_code(file.read(), [class_name], [method_name])
113116
else:
@@ -130,6 +133,7 @@ def _load_predictor_for_types(
130133
log.info(f"[{module_name}] fast loader failed: {e}")
131134
if module is None:
132135
log.debug(f"[{module_name}] falling back to slow loader")
136+
wait_for_env(include_imports=False)
133137
module = load_full_predictor_from_file(module_path, module_name)
134138
return get_predictor(module, class_name)
135139

python/cog/server/worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ..json import make_encodeable
3131
from ..predictor import get_predict, load_predictor_from_ref, run_setup
3232
from ..types import PYDANTIC_V2, URLPath
33+
from ..wait import wait_for_env
3334
from .connection import AsyncConnection, LockedConnection
3435
from .eventtypes import (
3536
Cancel,
@@ -371,6 +372,7 @@ def _current_tag(self) -> Optional[str]:
371372

372373
def _setup(self, redirector: AsyncStreamRedirector) -> None:
373374
done = Done()
375+
wait_for_env()
374376
try:
375377
self._predictor = load_predictor_from_ref(self._predictor_ref)
376378
# Could be a function or a class

python/cog/wait.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import importlib
2+
import os
3+
import sys
4+
import time
5+
6+
import structlog
7+
8+
COG_WAIT_FILE_ENV_VAR = "COG_WAIT_FILE"
9+
COG_EAGER_IMPORTS_ENV_VAR = "COG_EAGER_IMPORTS"
10+
COG_PYENV_PATH_ENV_VAR = "COG_PYENV_PATH"
11+
PYTHONPATH_ENV_VAR = "PYTHONPATH"
12+
PYTHON_VERSION_ENV_VAR = "R8_PYTHON_VERSION"
13+
14+
log = structlog.get_logger("cog.wait")
15+
16+
17+
def _wait_flag_fallen() -> bool:
18+
wait_file = os.environ.get(COG_WAIT_FILE_ENV_VAR)
19+
if wait_file is None:
20+
return True
21+
return os.path.exists(wait_file)
22+
23+
24+
def _insert_pythonpath() -> None:
25+
pyenv_path = os.environ.get(COG_PYENV_PATH_ENV_VAR)
26+
if pyenv_path is None:
27+
return
28+
full_module_path = os.path.join(
29+
pyenv_path,
30+
"lib",
31+
"python" + os.environ[PYTHON_VERSION_ENV_VAR],
32+
"site-packages",
33+
)
34+
if full_module_path not in sys.path:
35+
sys.path.append(full_module_path)
36+
os.environ[PYTHONPATH_ENV_VAR] = ":".join(sys.path)
37+
38+
39+
def wait_for_file(timeout: float = 60.0) -> bool:
40+
"""Wait for a file in the environment variables."""
41+
wait_file = os.environ.get(COG_WAIT_FILE_ENV_VAR)
42+
if wait_file is None:
43+
return True
44+
if os.path.exists(wait_file):
45+
log.info(f"Wait file found {wait_file}...")
46+
return True
47+
log.info(f"Waiting for file {wait_file}...")
48+
time_taken = 0.0
49+
while time_taken < timeout:
50+
sleep_time = 0.01
51+
time.sleep(sleep_time)
52+
time_taken += sleep_time
53+
if os.path.exists(wait_file):
54+
return True
55+
log.info(f"Waiting for file {wait_file} timed out.")
56+
return False
57+
58+
59+
def eagerly_import_modules() -> int:
60+
"""Wait for python to import big modules."""
61+
wait_imports = os.environ.get(COG_EAGER_IMPORTS_ENV_VAR)
62+
import_count = 0
63+
if wait_imports is None:
64+
return import_count
65+
log.info(f"Eagerly importing {wait_imports}.")
66+
for import_statement in wait_imports.split(","):
67+
importlib.import_module(import_statement)
68+
import_count += 1
69+
return import_count
70+
71+
72+
def wait_for_env(file_timeout: float = 60.0, include_imports: bool = True) -> bool:
73+
"""Wait for the environment to load."""
74+
if _wait_flag_fallen():
75+
_insert_pythonpath()
76+
return True
77+
if include_imports:
78+
eagerly_import_modules()
79+
waited = wait_for_file(timeout=file_timeout)
80+
_insert_pythonpath()
81+
return waited

python/tests/test_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,12 @@ def test_get_predictor_types_with_env_var():
130130
from cog import BasePredictor, Path
131131
from typing import Optional
132132
from pydantic import BaseModel
133+
133134
class ModelOutput(BaseModel):
134135
success: bool
135136
error: Optional[str]
136137
segmentedImage: Optional[Path]
138+
137139
class Predictor(BasePredictor):
138140
def predict(self, msg: str) -> ModelOutput:
139141
return None

python/tests/test_wait.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import os
2+
import sys
3+
import tempfile
4+
import threading
5+
import time
6+
from pathlib import Path
7+
8+
from cog.wait import (
9+
COG_EAGER_IMPORTS_ENV_VAR,
10+
COG_PYENV_PATH_ENV_VAR,
11+
COG_WAIT_FILE_ENV_VAR,
12+
PYTHON_VERSION_ENV_VAR,
13+
PYTHONPATH_ENV_VAR,
14+
eagerly_import_modules,
15+
wait_for_env,
16+
wait_for_file,
17+
)
18+
19+
20+
def test_wait_for_file_no_env_var():
21+
if COG_WAIT_FILE_ENV_VAR in os.environ:
22+
del os.environ[COG_WAIT_FILE_ENV_VAR]
23+
result = wait_for_file()
24+
assert result, "We should immediately return when no wait file is specified."
25+
26+
27+
def test_wait_for_file_exists():
28+
with tempfile.NamedTemporaryFile() as tmpfile:
29+
os.environ[COG_WAIT_FILE_ENV_VAR] = tmpfile.name
30+
result = wait_for_file(timeout=5.0)
31+
del os.environ[COG_WAIT_FILE_ENV_VAR]
32+
assert result, "We should immediately return when the file already exists."
33+
34+
35+
def test_wait_for_file_waits_for_file():
36+
wait_file = os.path.join(os.path.dirname(__file__), "flag_file")
37+
if os.path.exists(wait_file):
38+
os.remove(wait_file)
39+
os.environ[COG_WAIT_FILE_ENV_VAR] = wait_file
40+
41+
def create_file():
42+
time.sleep(2.0)
43+
Path(wait_file).touch()
44+
45+
thread = threading.Thread(target=create_file)
46+
thread.start()
47+
result = wait_for_file(timeout=5.0)
48+
del os.environ[COG_WAIT_FILE_ENV_VAR]
49+
os.remove(wait_file)
50+
assert result, "We should return when the file is touched."
51+
52+
53+
def test_wait_for_file_timeout():
54+
os.environ[COG_WAIT_FILE_ENV_VAR] = os.path.join(
55+
os.path.dirname(__file__), "a_file_unknown"
56+
)
57+
result = wait_for_file(timeout=5.0)
58+
del os.environ[COG_WAIT_FILE_ENV_VAR]
59+
assert not result, "We should return false when the timeout triggers."
60+
61+
62+
def test_eagerly_import_modules_no_env_var():
63+
if COG_EAGER_IMPORTS_ENV_VAR in os.environ:
64+
del os.environ[COG_EAGER_IMPORTS_ENV_VAR]
65+
eagerly_import_modules()
66+
67+
68+
def test_eagerly_import_modules():
69+
os.environ[COG_EAGER_IMPORTS_ENV_VAR] = "pytest,pathlib,time"
70+
import_count = eagerly_import_modules()
71+
del os.environ[COG_EAGER_IMPORTS_ENV_VAR]
72+
assert import_count == 3, "There should be 3 imports performed"
73+
74+
75+
def test_wait_for_env_no_env_vars():
76+
if COG_WAIT_FILE_ENV_VAR in os.environ:
77+
del os.environ[COG_WAIT_FILE_ENV_VAR]
78+
if COG_EAGER_IMPORTS_ENV_VAR in os.environ:
79+
del os.environ[COG_EAGER_IMPORTS_ENV_VAR]
80+
result = wait_for_env()
81+
assert (
82+
result
83+
), "We should return true if we have no env vars associated with the wait."
84+
85+
86+
def test_wait_for_env():
87+
with tempfile.NamedTemporaryFile() as tmpfile:
88+
os.environ[COG_WAIT_FILE_ENV_VAR] = tmpfile.name
89+
os.environ[COG_EAGER_IMPORTS_ENV_VAR] = "pytest,pathlib,time"
90+
result = wait_for_env()
91+
assert (
92+
result
93+
), "We should return true if we have waited for the right environment."
94+
del os.environ[COG_EAGER_IMPORTS_ENV_VAR]
95+
del os.environ[COG_WAIT_FILE_ENV_VAR]
96+
97+
98+
def test_wait_inserts_pythonpath():
99+
with tempfile.NamedTemporaryFile() as tmpfile:
100+
original_sys_path = sys.path.copy()
101+
original_python_path = os.environ.get(PYTHONPATH_ENV_VAR)
102+
pyenv_path = os.path.dirname(tmpfile.name)
103+
os.environ[COG_WAIT_FILE_ENV_VAR] = tmpfile.name
104+
os.environ[COG_EAGER_IMPORTS_ENV_VAR] = "pytest,pathlib,time"
105+
os.environ[COG_PYENV_PATH_ENV_VAR] = pyenv_path
106+
os.environ[PYTHON_VERSION_ENV_VAR] = "3.11"
107+
wait_for_env()
108+
del os.environ[PYTHON_VERSION_ENV_VAR]
109+
del os.environ[COG_PYENV_PATH_ENV_VAR]
110+
del os.environ[COG_EAGER_IMPORTS_ENV_VAR]
111+
del os.environ[COG_WAIT_FILE_ENV_VAR]
112+
current_python_path = os.environ[PYTHONPATH_ENV_VAR]
113+
if original_python_path is None:
114+
del os.environ[PYTHONPATH_ENV_VAR]
115+
else:
116+
os.environ[PYTHONPATH_ENV_VAR] = original_python_path
117+
sys.path = original_sys_path
118+
expected_path = ":".join(
119+
original_sys_path + [pyenv_path + "/lib/python3.11/site-packages"]
120+
)
121+
assert (
122+
expected_path == current_python_path
123+
), "Our python path should be updated with the pyenv path."

0 commit comments

Comments
 (0)