diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 4d88844df..03ed12997 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -165,7 +165,7 @@ def __init__(self, config: PolarDBGraphDBConfig): # Create connection pool self.connection_pool = psycopg2.pool.ThreadedConnectionPool( - minconn=5, + minconn=2, maxconn=maxconn, host=host, port=port, @@ -176,6 +176,7 @@ def __init__(self, config: PolarDBGraphDBConfig): keepalives_idle=120, # Seconds of inactivity before sending keepalive (should be < server idle timeout) keepalives_interval=15, # Seconds between keepalive retries keepalives_count=5, # Number of keepalive retries before considering connection dead + keepalives=1, options=f"-c search_path={self.db_name}_graph,ag_catalog,$user,public", ) @@ -250,39 +251,40 @@ def _get_connection(self): import psycopg2 timeout = self._connection_wait_timeout - if not self._semaphore.acquire(timeout=max(timeout, 0)): + if timeout is None or timeout <= 0: + self._semaphore.acquire() + elif not self._semaphore.acquire(timeout=timeout): logger.warning(f"Timeout waiting for connection slot ({timeout}s)") raise RuntimeError("Connection pool busy") - logger.info( - "Connection pool usage: %s/%s", - self.connection_pool.maxconn - self._semaphore._value, - self.connection_pool.maxconn, - ) + conn = None broken = False try: - conn = self.connection_pool.getconn() - conn.autocommit = True for attempt in range(2): + conn = self.connection_pool.getconn() + conn.autocommit = True try: with conn.cursor() as cur: cur.execute("SELECT 1") break except psycopg2.Error: - logger.warning("Dead connection detected, recreating (attempt %d)", attempt + 1) + logger.warning(f"Dead connection detected, recreating (attempt {attempt + 1})") self.connection_pool.putconn(conn, close=True) - conn = self.connection_pool.getconn() - conn.autocommit = True + conn = None else: raise RuntimeError("Cannot obtain valid DB connection after 2 attempts") with conn.cursor() as cur: cur.execute(f'SET search_path = {self.db_name}_graph, ag_catalog, "$user", public;') yield conn - except Exception: + except (psycopg2.Error, psycopg2.OperationalError) as e: broken = True + logger.exception(f"Database connection busy : {e}") + raise + except Exception as e: + logger.exception(f"Unexpected error: {e}") raise finally: - if conn: + if conn is not None: try: self.connection_pool.putconn(conn, close=broken) logger.debug(f"Returned connection {id(conn)} to pool (broken={broken})") @@ -2194,7 +2196,7 @@ def get_grouped_counts( SELECT {", ".join(cte_select_list)} FROM "{self.db_name}_graph"."Memory" {where_clause} - LIMIT 1000 + LIMIT 100 ) SELECT {outer_select}, count(*) AS count FROM t