-
Notifications
You must be signed in to change notification settings - Fork 45
chore: Support x-ld-envid in updates #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: mk/sdk-1408/diagnostics
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,8 @@ | |
| ) | ||
| from ldclient.impl.http import HTTPFactory, _http_factory | ||
| from ldclient.impl.util import ( | ||
| _LD_ENVID_HEADER, | ||
| _LD_FD_FALLBACK_HEADER, | ||
| http_error_message, | ||
| is_http_error_recoverable, | ||
| log | ||
|
|
@@ -58,7 +60,6 @@ | |
|
|
||
| STREAMING_ENDPOINT = "/sdk/stream" | ||
|
|
||
|
|
||
| SseClientBuilder = Callable[[Config, SelectorStore], SSEClient] | ||
|
|
||
|
|
||
|
|
@@ -154,29 +155,35 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| if action.error is None: | ||
| continue | ||
|
|
||
| (update, should_continue) = self._handle_error(action.error) | ||
| envid = action.headers.get(_LD_ENVID_HEADER) if action.headers is not None else None | ||
|
|
||
| (update, should_continue) = self._handle_error(action.error, envid) | ||
| if update is not None: | ||
| yield update | ||
|
|
||
| if not should_continue: | ||
| break | ||
| continue | ||
|
|
||
| envid = None | ||
| if isinstance(action, Start) and action.headers is not None: | ||
| fallback = action.headers.get('X-LD-FD-Fallback') == 'true' | ||
| fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' | ||
| envid = action.headers.get(_LD_ENVID_HEADER) | ||
|
|
||
| if fallback: | ||
| self._record_stream_init(True) | ||
| yield Update( | ||
| state=DataSourceState.OFF, | ||
| revert_to_fdv1=True | ||
| revert_to_fdv1=True, | ||
| environment_id=envid, | ||
| ) | ||
| break | ||
|
|
||
| if not isinstance(action, Event): | ||
| continue | ||
|
|
||
| try: | ||
| update = self._process_message(action, change_set_builder) | ||
| update = self._process_message(action, change_set_builder, envid) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Environment ID Fails to PersistThe |
||
| if update is not None: | ||
| self._record_stream_init(False) | ||
| self._connection_attempt_start_time = None | ||
|
|
@@ -187,7 +194,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| ) | ||
| self._sse.interrupt() | ||
|
|
||
| (update, should_continue) = self._handle_error(e) | ||
| (update, should_continue) = self._handle_error(e, envid) | ||
| if update is not None: | ||
| yield update | ||
| if not should_continue: | ||
|
|
@@ -204,7 +211,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| DataSourceErrorKind.UNKNOWN, 0, time(), str(e) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| self._sse.close() | ||
|
|
@@ -226,7 +233,7 @@ def _record_stream_init(self, failed: bool): | |
|
|
||
| # pylint: disable=too-many-return-statements | ||
| def _process_message( | ||
| self, msg: Event, change_set_builder: ChangeSetBuilder | ||
| self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str] | ||
| ) -> Optional[Update]: | ||
| """ | ||
| Processes a single message from the SSE stream and returns an Update | ||
|
|
@@ -247,7 +254,7 @@ def _process_message( | |
| change_set_builder.expect_changes() | ||
| return Update( | ||
| state=DataSourceState.VALID, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| return None | ||
|
|
||
|
|
@@ -293,13 +300,13 @@ def _process_message( | |
| return Update( | ||
| state=DataSourceState.VALID, | ||
| change_set=change_set, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| log.info("Unexpected event found in stream: %s", msg.event) | ||
| return None | ||
|
|
||
| def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | ||
| def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]: | ||
| """ | ||
| This method handles errors that occur during the streaming process. | ||
|
|
||
|
|
@@ -328,7 +335,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| DataSourceErrorKind.INVALID_DATA, 0, time(), str(error) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| return (update, True) | ||
|
|
||
|
|
@@ -344,11 +351,15 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| str(error), | ||
| ) | ||
|
|
||
| if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true': | ||
| if envid is None and error.headers is not None: | ||
| envid = error.headers.get(_LD_ENVID_HEADER) | ||
|
|
||
| if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': | ||
| update = Update( | ||
| state=DataSourceState.OFF, | ||
| error=error_info, | ||
| revert_to_fdv1=True | ||
| revert_to_fdv1=True, | ||
| environment_id=envid, | ||
| ) | ||
| return (update, False) | ||
|
|
||
|
|
@@ -364,7 +375,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| ), | ||
| error=error_info, | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| if not is_recoverable: | ||
|
|
@@ -386,7 +397,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| DataSourceErrorKind.UNKNOWN, 0, time(), str(error) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| # no stacktrace here because, for a typical connection error, it'll | ||
| # just be a lengthy tour of urllib3 internals | ||
|
|
@@ -411,5 +422,4 @@ def __init__(self, config: Config): | |
|
|
||
| def build(self) -> StreamingDataSource: | ||
| """Builds a StreamingDataSource instance with the configured parameters.""" | ||
| # TODO(fdv2): Add in the other controls here. | ||
| return StreamingDataSource(self._config) | ||
Uh oh!
There was an error while loading. Please reload this page.