-
-
Notifications
You must be signed in to change notification settings - Fork 590
Description
The Pipeline
class has several callbacks that appear not actually be callbacks in the sense that they are all called more than once during the process of sending a prompt. What I mean by that is, when I prompt a pipeline from the UI, both the inlet
and pipe
method are called more than once, which seems to defeat the purpose of a callback in the sense that a callback should be called exactly once at a deterministic point in the process.
For example, I am doing some testing with the pipeline found at https://github.com/open-webui/pipelines/blob/main/examples/pipelines/events_pipeline.py.
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import time
# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class Pipeline:
def __init__(self):
self.name = "Pipeline with Status Event"
self.description = (
"This is a pipeline that demonstrates how to use the status event."
)
self.debug = False
self.version = "0.1.0"
self.author = "Anthony Durussel"
self.inlet_count = 0
self.pipe_count = 0
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
This function is called before the OpenAI API request is made. You
can modify the form data before it is sent to the OpenAI API.
"""
self.inlet_count += 1
print()
print(f"inlet {self.inlet_count}: {__name__}")
print()
if self.debug:
print(f"inlet: {__name__} - body:")
pprint(body)
print(f"inlet: {__name__} - user:")
pprint(user)
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
This function is called after the OpenAI API response is completed.
You can modify the messages after they are received from the OpenAI API.
"""
print()
print(f"outlet: {__name__}")
print()
if self.debug:
print(f"outlet: {__name__} - body:")
pprint(body)
print(f"outlet: {__name__} - user:")
pprint(user)
return body
def pipe(
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict,
) -> Union[str, Generator, Iterator]:
self.pipe_count += 1
print()
print(f"pipe {self.pipe_count}: {__name__}")
print()
if self.debug:
print(f"pipe: {__name__} - received message from user: {user_message}")
print(f"pipe: {__name__} - messages:")
pprint(messages)
print(f"pipe: {__name__} - body:")
pprint(body)
yield {
"event": {
"type": "status",
"data": {
"description": "Fake Status",
"done": False,
},
}
}
time.sleep(0.5) # Sleep for 5 seconds
yield f"user_message -- {user_message}"
yield {
"event": {
"type": "status",
"data": {
"description": "",
"done": True,
},
}
}
With this pipeline, if I prompt the pipeline with any message, I can view the following in the docker logs for the pipeline
PIPELINES_REQUIREMENTS_PATH not specified. Skipping installation of requirements.
Download + install Executed in mode: full
RESET_PIPELINES_DIR is not set to true. No action taken.
PIPELINES_REQUIREMENTS_PATH not specified. Skipping installation of requirements.
PIPELINES_URLS not specified. Skipping pipelines download and installation.
Running via Mode: full
Loaded module: test_pipeline
on_startup: test_pipeline
INFO: 172.18.0.1:54296 - "GET /models HTTP/1.1" 200 OK
inlet 1: test_pipeline
INFO: 172.18.0.1:49898 - "POST /test_pipeline/filter/inlet HTTP/1.1" 200 OK
INFO: 172.18.0.1:49900 - "GET /models HTTP/1.1" 200 OK
test_pipeline
test_pipeline
INFO: 172.18.0.1:49916 - "POST /chat/completions HTTP/1.1" 200 OK
pipe 1: test_pipeline
inlet 2: test_pipeline
INFO: 172.18.0.1:49922 - "POST /test_pipeline/filter/inlet HTTP/1.1" 200 OK
test_pipeline
test_pipeline
pipe 2: test_pipeline
outlet: test_pipeline
INFO: 172.18.0.1:49940 - "POST /test_pipeline/filter/outlet HTTP/1.1" 200 OK
INFO: 172.18.0.1:49934 - "POST /chat/completions HTTP/1.1" 200 OK
inlet 3: test_pipeline
INFO: 172.18.0.1:49950 - "POST /test_pipeline/filter/inlet HTTP/1.1" 200 OK
INFO: 172.18.0.1:49964 - "GET /models HTTP/1.1" 200 OK
test_pipeline
test_pipeline
pipe 3: test_pipeline
INFO: 172.18.0.1:49972 - "POST /chat/completions HTTP/1.1" 200 OK
inlet 4: test_pipeline
INFO: 172.18.0.1:49982 - "POST /test_pipeline/filter/inlet HTTP/1.1" 200 OK
test_pipeline
test_pipeline
pipe 4: test_pipeline
INFO: 172.18.0.1:49986 - "POST /chat/completions HTTP/1.1" 200 OK
The above docker logs are from one prompt from the UI to the pipeline. From these logs and the above pipeline, you can see that the inlet
method is called, then the pipe
method, then the inlet
method again, then the outlet
method, and then again the inlet
method, and again the pipe
method, and then inlet
and then pipe
.
Is there documentation explaining why these callbacks are all called more than once?
Thank you!