Skip to content

Commit 8333a83

Browse files
authored
Revert "Support async predictors (#2010)" (#2022)
This reverts commit a86adcd.
1 parent a8e138f commit 8333a83

File tree

9 files changed

+37
-407
lines changed

9 files changed

+37
-407
lines changed

python/cog/server/connection.py

Lines changed: 0 additions & 91 deletions
This file was deleted.

python/cog/server/eventtypes.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@
55

66
# From worker parent process
77
#
8-
@define
9-
class Cancel:
10-
# TODO: identify which prediction!
11-
pass
12-
13-
148
@define
159
class PredictionInput:
1610
payload: Dict[str, Any]

python/cog/server/helpers.py

Lines changed: 1 addition & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import threading
1111
import uuid
1212
from types import TracebackType
13-
from typing import Any, BinaryIO, Callable, Dict, List, Sequence, TextIO, Union
13+
from typing import Any, Callable, Dict, List, Sequence, TextIO, Union
1414

1515
import pydantic
1616
from typing_extensions import Self
@@ -19,45 +19,6 @@
1919
from .errors import CogRuntimeError, CogTimeoutError
2020

2121

22-
class _SimpleStreamWrapper(io.TextIOWrapper):
23-
"""
24-
_SimpleStreamWrapper wraps a binary I/O buffer and provides a TextIOWrapper
25-
interface (primarily write and flush methods) which call a provided
26-
callback function instead of (or, if `tee` is True, in addition to) writing
27-
to the underlying buffer.
28-
"""
29-
30-
def __init__(
31-
self,
32-
buffer: BinaryIO,
33-
callback: Callable[[str, str], None],
34-
tee: bool = False,
35-
) -> None:
36-
super().__init__(buffer, line_buffering=True)
37-
38-
self._callback = callback
39-
self._tee = tee
40-
self._buffer = []
41-
42-
def write(self, s: str) -> int:
43-
length = len(s)
44-
self._buffer.append(s)
45-
if self._tee:
46-
super().write(s)
47-
else:
48-
# If we're not teeing, we have to handle automatic flush on
49-
# newline. When `tee` is true, this is handled by the write method.
50-
if "\n" in s or "\r" in s:
51-
self.flush()
52-
return length
53-
54-
def flush(self) -> None:
55-
self._callback(self.name, "".join(self._buffer))
56-
self._buffer.clear()
57-
if self._tee:
58-
super().flush()
59-
60-
6122
class _StreamWrapper:
6223
def __init__(self, name: str, stream: TextIO) -> None:
6324
self.name = name
@@ -125,66 +86,6 @@ def original(self) -> TextIO:
12586
return self._original_fp
12687

12788

128-
if sys.version_info < (3, 9):
129-
130-
class _AsyncStreamRedirectorBase(contextlib.AbstractContextManager):
131-
pass
132-
else:
133-
134-
class _AsyncStreamRedirectorBase(
135-
contextlib.AbstractContextManager["AsyncStreamRedirector"]
136-
):
137-
pass
138-
139-
140-
class AsyncStreamRedirector(_AsyncStreamRedirectorBase):
141-
"""
142-
AsyncStreamRedirector is a context manager that redirects I/O streams to a
143-
callback function. If `tee` is True, it also writes output to the original
144-
streams.
145-
146-
Unlike StreamRedirector, the underlying stream file descriptors are not
147-
modified, which means that only stream writes from Python code will be
148-
captured. Writes from native code will not be captured.
149-
150-
Unlike StreamRedirector, the streams redirected cannot be configured. The
151-
context manager is only able to redirect STDOUT and STDERR.
152-
"""
153-
154-
def __init__(
155-
self,
156-
callback: Callable[[str, str], None],
157-
tee: bool = False,
158-
) -> None:
159-
self._callback = callback
160-
self._tee = tee
161-
162-
stdout_wrapper = _SimpleStreamWrapper(sys.stdout.buffer, callback, tee)
163-
stderr_wrapper = _SimpleStreamWrapper(sys.stderr.buffer, callback, tee)
164-
self._stdout_ctx = contextlib.redirect_stdout(stdout_wrapper)
165-
self._stderr_ctx = contextlib.redirect_stderr(stderr_wrapper)
166-
167-
def __enter__(self) -> Self:
168-
self._stdout_ctx.__enter__()
169-
self._stderr_ctx.__enter__()
170-
return self
171-
172-
def __exit__(
173-
self,
174-
exc_type: type[BaseException] | None,
175-
exc_value: BaseException | None,
176-
traceback: TracebackType | None,
177-
) -> None:
178-
self._stdout_ctx.__exit__(exc_type, exc_value, traceback)
179-
self._stderr_ctx.__exit__(exc_type, exc_value, traceback)
180-
181-
def drain(self, timeout: float = 0.0) -> None:
182-
# Draining isn't complicated for AsyncStreamRedirector, since we're not
183-
# moving data between threads. We just need to flush the streams.
184-
sys.stdout.flush()
185-
sys.stderr.flush()
186-
187-
18889
if sys.version_info < (3, 9):
18990

19091
class _StreamRedirectorBase(contextlib.AbstractContextManager):

0 commit comments

Comments
 (0)