Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def __init__(self, config: PolarDBGraphDBConfig):

# Create connection pool
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=5,
minconn=2,
maxconn=maxconn,
host=host,
port=port,
Expand All @@ -176,6 +176,7 @@ def __init__(self, config: PolarDBGraphDBConfig):
keepalives_idle=120, # Seconds of inactivity before sending keepalive (should be < server idle timeout)
keepalives_interval=15, # Seconds between keepalive retries
keepalives_count=5, # Number of keepalive retries before considering connection dead
keepalives=1,
options=f"-c search_path={self.db_name}_graph,ag_catalog,$user,public",
)

Expand Down Expand Up @@ -250,39 +251,40 @@ def _get_connection(self):
import psycopg2

timeout = self._connection_wait_timeout
if not self._semaphore.acquire(timeout=max(timeout, 0)):
if timeout is None or timeout <= 0:
self._semaphore.acquire()
elif not self._semaphore.acquire(timeout=timeout):
logger.warning(f"Timeout waiting for connection slot ({timeout}s)")
raise RuntimeError("Connection pool busy")
logger.info(
"Connection pool usage: %s/%s",
self.connection_pool.maxconn - self._semaphore._value,
self.connection_pool.maxconn,
)

conn = None
broken = False
try:
conn = self.connection_pool.getconn()
conn.autocommit = True
for attempt in range(2):
conn = self.connection_pool.getconn()
conn.autocommit = True
try:
with conn.cursor() as cur:
cur.execute("SELECT 1")
break
except psycopg2.Error:
logger.warning("Dead connection detected, recreating (attempt %d)", attempt + 1)
logger.warning(f"Dead connection detected, recreating (attempt {attempt + 1})")
self.connection_pool.putconn(conn, close=True)
conn = self.connection_pool.getconn()
conn.autocommit = True
conn = None
else:
raise RuntimeError("Cannot obtain valid DB connection after 2 attempts")
with conn.cursor() as cur:
cur.execute(f'SET search_path = {self.db_name}_graph, ag_catalog, "$user", public;')
yield conn
except Exception:
except (psycopg2.Error, psycopg2.OperationalError) as e:
broken = True
logger.exception(f"Database connection busy : {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error: {e}")
raise
finally:
if conn:
if conn is not None:
try:
self.connection_pool.putconn(conn, close=broken)
logger.debug(f"Returned connection {id(conn)} to pool (broken={broken})")
Expand Down Expand Up @@ -2194,7 +2196,7 @@ def get_grouped_counts(
SELECT {", ".join(cte_select_list)}
FROM "{self.db_name}_graph"."Memory"
{where_clause}
LIMIT 1000
LIMIT 100
)
SELECT {outer_select}, count(*) AS count
FROM t
Expand Down
Loading