Skip to content

DNFWM #519

@xzm9gqzc65-maker

Description

@xzm9gqzc65-maker

import time
import json
import pymysql
from collections import defaultdict
from datetime import datetime

BATCH_SIZE = 1000
USER_BATCH_LIMIT = 200
TOP_K = 10
SLEEP_ON_EMPTY = 0.5

DB_CONFIG = {
'host': 'localhost',
'user': 'app_user',
'password': 'secret',
'db': 'appdb',
'autocommit': False,
'cursorclass': pymysql.cursors.DictCursor,
'charset': 'utf8mb4',
}

def get_conn():
return pymysql.connect(**DB_CONFIG)

def fetch_event_batch(cur, limit, last_processed_id):
cur.execute("""
SELECT id, user_id, item_id, item_title, item_type, ts
FROM item_access_events
WHERE id > %s
ORDER BY id ASC
LIMIT %s
""", (last_processed_id, limit))
return cur.fetchall()

def get_last_processed_id(cur, worker_name='recent_items'):
cur.execute("SELECT last_processed_id FROM worker_progress WHERE worker_name=%s", (worker_name,))
r = cur.fetchone()
return r['last_processed_id'] if r else 0

def mark_events_processed(cur, last_id, worker_name='recent_items'):
cur.execute("""
INSERT INTO worker_progress (worker_name, last_processed_id, updated_at)
VALUES (%s, %s, NOW(6))
ON DUPLICATE KEY UPDATE last_processed_id = VALUES(last_processed_id), updated_at = VALUES(updated_at)
""", (worker_name, last_id))

def group_by_user(events):
d = defaultdict(list)
for ev in events:
# Make sure ts is a native datetime
d[ev['user_id']].append({
'event_id': ev['id'],
'item_id': ev['item_id'],
'title': ev['item_title'],
'type': ev['item_type'],
'ts': ev['ts'] if isinstance(ev['ts'], datetime) else datetime.strptime(ev['ts'], '%Y-%m-%d %H:%M:%S.%f')
})
return d

def fetch_existing_recent(cur, user_ids):
if not user_ids:
return {}
format_strings = ','.join(['%s'] * len(user_ids))
cur.execute(f"SELECT user_id, recent_items FROM user_recent_items WHERE user_id IN ({format_strings})", tuple(user_ids))
rows = cur.fetchall()
res = {}
for r in rows:
# Parse JSON, convert ts back to datetime for robust merge
items = json.loads(r['recent_items'])
for item in items:
item['ts'] = datetime.fromisoformat(item['ts'])
res[r['user_id']] = items
return res

def mysql_get_lock(cur, lock_key, timeout=3):
cur.execute("SELECT GET_LOCK(%s, %s) AS got", (lock_key, timeout))
r = cur.fetchone()
return bool(r['got'])

def mysql_release_lock(cur, lock_key):
cur.execute("SELECT RELEASE_LOCK(%s) AS released", (lock_key,))
r = cur.fetchone()
return bool(r['released'])

def merge_and_trim(existing_list, new_events, top_k=TOP_K):
merged = {}
# Seed from existing
for it in existing_list:
merged[it['item_id']] = {
'item_id': it['item_id'],
'ts': it['ts'],
'title': it.get('title'),
'type': it.get('type')
}
# Apply new
for ev in new_events:
existing = merged.get(ev['item_id'])
if (existing is None) or (ev['ts'] > existing['ts']):
merged[ev['item_id']] = {
'item_id': ev['item_id'],
'ts': ev['ts'],
'title': ev.get('title'),
'type': ev.get('type')
}
# Sort by ts desc, limit top_k
top = sorted(merged.values(), key=lambda x: x['ts'], reverse=True)[:top_k]
# Convert all datetime to isoformat before storing to JSON
for t in top:
t['ts'] = t['ts'].isoformat()
return top

def upsert_user_recent(cur, user_id, recent_items):
cur.execute("""
INSERT INTO user_recent_items (user_id, recent_items, updated_at)
VALUES (%s, %s, NOW(6))
ON DUPLICATE KEY UPDATE recent_items = VALUES(recent_items), updated_at = VALUES(updated_at)
""", (user_id, json.dumps(recent_items)))

def main_loop():
conn = get_conn()
try:
while True:
with conn.cursor() as cur:
last_id = get_last_processed_id(cur)
events = fetch_event_batch(cur, BATCH_SIZE, last_id)
if not events:
time.sleep(SLEEP_ON_EMPTY)
continue

        grouped = group_by_user(events)
        user_ids = list(grouped.keys())[:USER_BATCH_LIMIT]

        with conn.cursor() as cur:
            existing_map = fetch_existing_recent(cur, user_ids)

        max_processed_id = last_id
        for uid in user_ids:
            lock_key = f"recent_items_{uid}"
            with conn.cursor() as cur:
                got = mysql_get_lock(cur, lock_key, timeout=1)
                if not got:
                    continue
                try:
                    existing = existing_map.get(uid, []) or []
                    new_events = grouped[uid]
                    merged = merge_and_trim(existing, new_events, TOP_K)
                    upsert_user_recent(cur, uid, merged)
                    conn.commit()
                    # Mark last processed event for this user
                    max_processed_id = max(max_processed_id, max(e['event_id'] for e in new_events))
                finally:
                    mysql_release_lock(cur, lock_key)
        # Mark batch progress if any made
        if max_processed_id > last_id:
            with conn.cursor() as cur:
                mark_events_processed(cur, max_processed_id)
                conn.commit()

finally:
    conn.close()

if name == "main":
main_loop()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions