-
Notifications
You must be signed in to change notification settings - Fork 302
Description
import time
import json
import pymysql
from collections import defaultdict
from datetime import datetime
BATCH_SIZE = 1000
USER_BATCH_LIMIT = 200
TOP_K = 10
SLEEP_ON_EMPTY = 0.5
DB_CONFIG = {
'host': 'localhost',
'user': 'app_user',
'password': 'secret',
'db': 'appdb',
'autocommit': False,
'cursorclass': pymysql.cursors.DictCursor,
'charset': 'utf8mb4',
}
def get_conn():
return pymysql.connect(**DB_CONFIG)
def fetch_event_batch(cur, limit, last_processed_id):
cur.execute("""
SELECT id, user_id, item_id, item_title, item_type, ts
FROM item_access_events
WHERE id > %s
ORDER BY id ASC
LIMIT %s
""", (last_processed_id, limit))
return cur.fetchall()
def get_last_processed_id(cur, worker_name='recent_items'):
cur.execute("SELECT last_processed_id FROM worker_progress WHERE worker_name=%s", (worker_name,))
r = cur.fetchone()
return r['last_processed_id'] if r else 0
def mark_events_processed(cur, last_id, worker_name='recent_items'):
cur.execute("""
INSERT INTO worker_progress (worker_name, last_processed_id, updated_at)
VALUES (%s, %s, NOW(6))
ON DUPLICATE KEY UPDATE last_processed_id = VALUES(last_processed_id), updated_at = VALUES(updated_at)
""", (worker_name, last_id))
def group_by_user(events):
d = defaultdict(list)
for ev in events:
# Make sure ts is a native datetime
d[ev['user_id']].append({
'event_id': ev['id'],
'item_id': ev['item_id'],
'title': ev['item_title'],
'type': ev['item_type'],
'ts': ev['ts'] if isinstance(ev['ts'], datetime) else datetime.strptime(ev['ts'], '%Y-%m-%d %H:%M:%S.%f')
})
return d
def fetch_existing_recent(cur, user_ids):
if not user_ids:
return {}
format_strings = ','.join(['%s'] * len(user_ids))
cur.execute(f"SELECT user_id, recent_items FROM user_recent_items WHERE user_id IN ({format_strings})", tuple(user_ids))
rows = cur.fetchall()
res = {}
for r in rows:
# Parse JSON, convert ts back to datetime for robust merge
items = json.loads(r['recent_items'])
for item in items:
item['ts'] = datetime.fromisoformat(item['ts'])
res[r['user_id']] = items
return res
def mysql_get_lock(cur, lock_key, timeout=3):
cur.execute("SELECT GET_LOCK(%s, %s) AS got", (lock_key, timeout))
r = cur.fetchone()
return bool(r['got'])
def mysql_release_lock(cur, lock_key):
cur.execute("SELECT RELEASE_LOCK(%s) AS released", (lock_key,))
r = cur.fetchone()
return bool(r['released'])
def merge_and_trim(existing_list, new_events, top_k=TOP_K):
merged = {}
# Seed from existing
for it in existing_list:
merged[it['item_id']] = {
'item_id': it['item_id'],
'ts': it['ts'],
'title': it.get('title'),
'type': it.get('type')
}
# Apply new
for ev in new_events:
existing = merged.get(ev['item_id'])
if (existing is None) or (ev['ts'] > existing['ts']):
merged[ev['item_id']] = {
'item_id': ev['item_id'],
'ts': ev['ts'],
'title': ev.get('title'),
'type': ev.get('type')
}
# Sort by ts desc, limit top_k
top = sorted(merged.values(), key=lambda x: x['ts'], reverse=True)[:top_k]
# Convert all datetime to isoformat before storing to JSON
for t in top:
t['ts'] = t['ts'].isoformat()
return top
def upsert_user_recent(cur, user_id, recent_items):
cur.execute("""
INSERT INTO user_recent_items (user_id, recent_items, updated_at)
VALUES (%s, %s, NOW(6))
ON DUPLICATE KEY UPDATE recent_items = VALUES(recent_items), updated_at = VALUES(updated_at)
""", (user_id, json.dumps(recent_items)))
def main_loop():
conn = get_conn()
try:
while True:
with conn.cursor() as cur:
last_id = get_last_processed_id(cur)
events = fetch_event_batch(cur, BATCH_SIZE, last_id)
if not events:
time.sleep(SLEEP_ON_EMPTY)
continue
grouped = group_by_user(events)
user_ids = list(grouped.keys())[:USER_BATCH_LIMIT]
with conn.cursor() as cur:
existing_map = fetch_existing_recent(cur, user_ids)
max_processed_id = last_id
for uid in user_ids:
lock_key = f"recent_items_{uid}"
with conn.cursor() as cur:
got = mysql_get_lock(cur, lock_key, timeout=1)
if not got:
continue
try:
existing = existing_map.get(uid, []) or []
new_events = grouped[uid]
merged = merge_and_trim(existing, new_events, TOP_K)
upsert_user_recent(cur, uid, merged)
conn.commit()
# Mark last processed event for this user
max_processed_id = max(max_processed_id, max(e['event_id'] for e in new_events))
finally:
mysql_release_lock(cur, lock_key)
# Mark batch progress if any made
if max_processed_id > last_id:
with conn.cursor() as cur:
mark_events_processed(cur, max_processed_id)
conn.commit()
finally:
conn.close()
if name == "main":
main_loop()