Skip to content

Commit 46744b5

Browse files
committed
✨ feat: Optimize PostgreSQL Queries and Add Optional Detailed Query Logging
- Introduced a new environment variable `DEBUG_PGVECTOR_QUERIES` to enable detailed logging of pgvector operations. - Implemented query logging setup in the ExtendedPgVector class, capturing execution time and parameters for relevant queries. - Updated README.md to document the new environment variable.
1 parent b16e87b commit 46744b5

File tree

2 files changed

+62
-5
lines changed

2 files changed

+62
-5
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ The following environment variables are required to run the application:
6161
- `RAG_UPLOAD_DIR`: (Optional) The directory where uploaded files are stored. Default value is "./uploads/".
6262
- `PDF_EXTRACT_IMAGES`: (Optional) A boolean value indicating whether to extract images from PDF files. Default value is "False".
6363
- `DEBUG_RAG_API`: (Optional) Set to "True" to show more verbose logging output in the server console, and to enable postgresql database routes
64+
- `DEBUG_PGVECTOR_QUERIES`: (Optional) Set to "True" to enable detailed PostgreSQL query logging for pgvector operations. Useful for debugging performance issues with vector database queries.
6465
- `CONSOLE_JSON`: (Optional) Set to "True" to log as json for Cloud Logging aggregations
6566
- `EMBEDDINGS_PROVIDER`: (Optional) either "openai", "bedrock", "azure", "huggingface", "huggingfacetei", "vertexai", or "ollama", where "huggingface" uses sentence_transformers; defaults to "openai"
6667
- `EMBEDDINGS_MODEL`: (Optional) Set a valid embeddings model to use from the configured provider.

app/services/vector_store/extended_pg_vector.py

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,73 @@
1+
import os
2+
import time
3+
import logging
14
from typing import Optional
2-
5+
from sqlalchemy import event
36
from sqlalchemy import delete
47
from sqlalchemy.orm import Session
8+
from sqlalchemy.engine import Engine
59
from langchain_core.documents import Document
610
from langchain_community.vectorstores.pgvector import PGVector
711

12+
813
class ExtendedPgVector(PGVector):
14+
_query_logging_setup = False
15+
16+
def __init__(self, *args, **kwargs):
17+
super().__init__(*args, **kwargs)
18+
self.setup_query_logging()
19+
20+
def setup_query_logging(self):
21+
"""Enable query logging for this vector store only if DEBUG_PGVECTOR_QUERIES is set"""
22+
# Only setup logging if the environment variable is set to a truthy value
23+
debug_queries = os.getenv("DEBUG_PGVECTOR_QUERIES", "").lower()
24+
if debug_queries not in ["true", "1", "yes", "on"]:
25+
return
26+
27+
# Only setup once per class
28+
if ExtendedPgVector._query_logging_setup:
29+
return
30+
31+
logger = logging.getLogger("pgvector.queries")
32+
logger.setLevel(logging.INFO)
33+
34+
# Create handler if it doesn't exist
35+
if not logger.handlers:
36+
handler = logging.StreamHandler()
37+
formatter = logging.Formatter("%(asctime)s - PGVECTOR QUERY - %(message)s")
38+
handler.setFormatter(formatter)
39+
logger.addHandler(handler)
40+
41+
@event.listens_for(Engine, "before_cursor_execute")
42+
def receive_before_cursor_execute(
43+
conn, cursor, statement, parameters, context, executemany
44+
):
45+
if "langchain_pg_embedding" in statement:
46+
context._query_start_time = time.time()
47+
logger.info(f"STARTING QUERY: {statement}")
48+
logger.info(f"PARAMETERS: {parameters}")
49+
50+
@event.listens_for(Engine, "after_cursor_execute")
51+
def receive_after_cursor_execute(
52+
conn, cursor, statement, parameters, context, executemany
53+
):
54+
if "langchain_pg_embedding" in statement:
55+
total = time.time() - context._query_start_time
56+
logger.info(f"COMPLETED QUERY in {total:.4f}s")
57+
logger.info("-" * 50)
58+
59+
ExtendedPgVector._query_logging_setup = True
60+
961
def get_all_ids(self) -> list[str]:
1062
with Session(self._bind) as session:
1163
results = session.query(self.EmbeddingStore.custom_id).all()
1264
return [result[0] for result in results if result[0] is not None]
13-
65+
1466
def get_filtered_ids(self, ids: list[str]) -> list[str]:
1567
with Session(self._bind) as session:
16-
query = session.query(self.EmbeddingStore.custom_id).filter(self.EmbeddingStore.custom_id.in_(ids))
68+
query = session.query(self.EmbeddingStore.custom_id).filter(
69+
self.EmbeddingStore.custom_id.in_(ids)
70+
)
1771
results = query.all()
1872
return [result[0] for result in results if result[0] is not None]
1973

@@ -45,7 +99,9 @@ def _delete_multiple(
4599
if not collection:
46100
self.logger.warning("Collection not found")
47101
return
48-
stmt = stmt.where(self.EmbeddingStore.collection_id == collection.uuid)
102+
stmt = stmt.where(
103+
self.EmbeddingStore.collection_id == collection.uuid
104+
)
49105
stmt = stmt.where(self.EmbeddingStore.custom_id.in_(ids))
50106
session.execute(stmt)
51-
session.commit()
107+
session.commit()

0 commit comments

Comments
 (0)