diff --git a/README.md b/README.md index 0298fb8..5ecc643 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,12 @@ For Codex Desktop GUI (non-tmux) use command-mode nudging: "nudge_mode": "command", "nudge_command": "/ABSOLUTE/PATH/ide-agent-kit/tools/codex_gui_nudge.sh" }, + "dm_poller": { + "enabled": true, + "seen_file": "/tmp/codex-dm-seen.txt", + "limit": 100, + "human_only": false + }, "tmux": { "ide_session": "codex", "nudge_text": "check room and respond only if you have something relevant to say [codex]" @@ -143,6 +149,8 @@ Run: node bin/cli.mjs rooms watch --config /ABSOLUTE/PATH/ide-agent-kit-codex.json ``` +When `dm_poller.enabled` is set, the same watcher also polls `/api/v1/messages?limit=100`, keeps a separate DM seen-state file, and nudges on new `type: "dm"` rows addressed to your configured handle. DM notifications are appended to the normal notification file so existing `rooms check` and GUI-nudge flows continue to work. + There is also a ready-to-copy example at: ```bash diff --git a/config/codex.desktop.example.json b/config/codex.desktop.example.json index 09a4480..fd953da 100644 --- a/config/codex.desktop.example.json +++ b/config/codex.desktop.example.json @@ -39,5 +39,11 @@ "nudge_command": "/ABSOLUTE/PATH/ide-agent-kit/tools/codex_gui_nudge.sh", "notification_file": "/tmp/codex-room-notifications.txt", "seen_file": "/tmp/codex-room-seen.txt" + }, + "dm_poller": { + "enabled": true, + "seen_file": "/tmp/codex-dm-seen.txt", + "limit": 100, + "human_only": false } } diff --git a/src/config.mjs b/src/config.mjs index 1c2a76a..c36d2a9 100644 --- a/src/config.mjs +++ b/src/config.mjs @@ -17,6 +17,15 @@ const DEFAULT_CONFIG = { nudge_mode: 'tmux', nudge_command: '' }, + dm_poller: { + enabled: false, + handle: '', + interval_sec: 30, + seen_file: '/tmp/iak-dm-seen-ids.txt', + api_key: '', + human_only: false, + limit: 100 + }, github: { webhook_secret: '', event_kinds: ['pull_request', 'issue_comment', 'check_suite', 'workflow_run'] }, outbound: { default_webhook_url: '' }, rate_limit: { message_interval_sec: 30 }, @@ -63,6 +72,7 @@ export function loadConfig(configPath) { receipts: { ...DEFAULT_CONFIG.receipts, ...raw.receipts }, tmux: { ...DEFAULT_CONFIG.tmux, ...raw.tmux }, poller: { ...DEFAULT_CONFIG.poller, ...raw.poller }, + dm_poller: { ...DEFAULT_CONFIG.dm_poller, ...raw.dm_poller }, github: { ...DEFAULT_CONFIG.github, ...raw.github }, outbound: { ...DEFAULT_CONFIG.outbound, ...raw.outbound }, rate_limit: { ...DEFAULT_CONFIG.rate_limit, ...raw.rate_limit }, @@ -75,7 +85,6 @@ export function loadConfig(configPath) { }, discord: { ...DEFAULT_CONFIG.discord, ...raw.discord }, acp: { ...DEFAULT_CONFIG.acp, ...raw.acp }, - openclaw: raw.openclaw || {}, - poller: raw.poller || {} + openclaw: raw.openclaw || {} }; } diff --git a/src/team-relay/room-poller.mjs b/src/team-relay/room-poller.mjs index 6b44478..f3c9ad6 100644 --- a/src/team-relay/room-poller.mjs +++ b/src/team-relay/room-poller.mjs @@ -40,6 +40,47 @@ function saveSeenIds(path, ids) { writeFileSync(path, arr.join('\n') + '\n'); } +const DM_SEEN_FILE_DEFAULT = '/tmp/iak-dm-seen-ids.txt'; + +function normalizeHandle(handle) { + if (typeof handle !== 'string') return ''; + const trimmed = handle.trim(); + if (!trimmed) return ''; + return trimmed.startsWith('@') ? trimmed : `@${trimmed}`; +} + +function parsePositiveInt(value, fallback) { + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +function appendNotifications(path, lines) { + if (lines.length === 0) return; + appendFileSync(path, lines.join('\n') + '\n'); +} + +function triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session }) { + if (nudgeMode === 'command') { + return nudgeCommand(nudgeCommandText, { text: nudgeText, session }); + } + if (nudgeMode === 'none') { + return true; + } + return nudgeTmux(session, nudgeText); +} + +export function isRelevantDirectMessage(message, { selfHandle, humanOnly = false } = {}) { + if (!message || message.type !== 'dm' || !message.id) return false; + const sender = normalizeHandle(message.from || message.sender); + const recipient = normalizeHandle(message.to); + const expectedRecipient = normalizeHandle(selfHandle); + if (!sender || !recipient || !expectedRecipient) return false; + if (recipient !== expectedRecipient) return false; + if (sender === expectedRecipient) return false; + if (humanOnly && !message?.metadata?.user) return false; + return true; +} + function nudgeTmux(session, text) { try { execSync(`tmux has-session -t ${JSON.stringify(session)} 2>/dev/null`); @@ -71,6 +112,21 @@ async function fetchRoomMessages(room, apiKey, limit = 10) { } } +async function fetchDirectMessages(apiKey, limit = 100) { + const url = `https://groupmind.one/api/v1/messages?limit=${limit}`; + try { + const result = execSync( + `curl -sS -4 -H "X-API-Key: ${apiKey}" "${url}"`, + { encoding: 'utf8', timeout: 15000 } + ); + const data = JSON.parse(result); + return data.messages || (Array.isArray(data) ? data : []); + } catch (e) { + console.error(` fetch direct messages failed: ${e.message}`); + return []; + } +} + /** * Read and clear the notification file. Returns array of message lines. * This is the primary way the IDE agent retrieves new messages. @@ -96,8 +152,17 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config const nudgeText = config?.tmux?.nudge_text || 'check rooms'; const nudgeMode = config?.poller?.nudge_mode || 'tmux'; const nudgeCommandText = config?.poller?.nudge_command || ''; - const pollInterval = interval || config?.poller?.interval_sec || 30; - const selfHandle = handle || config?.poller?.handle || '@unknown'; + const pollInterval = parsePositiveInt(interval || config?.poller?.interval_sec, 30); + const selfHandle = normalizeHandle(handle || config?.poller?.handle || '@unknown'); + const dmCfg = config?.dm_poller || {}; + const dmEnabled = dmCfg.enabled === true; + const dmHandle = normalizeHandle(dmCfg.handle || selfHandle); + const dmSeenFile = dmCfg.seen_file || DM_SEEN_FILE_DEFAULT; + const dmNotifyFile = dmCfg.notification_file || notifyFile; + const dmPollInterval = parsePositiveInt(dmCfg.interval_sec, pollInterval); + const dmApiKey = dmCfg.api_key || dmCfg.apiKey || config?.poller?.api_key || config?.poller?.apiKey || apiKey; + const dmHumanOnly = dmCfg.human_only === true; + const dmLimit = parsePositiveInt(dmCfg.limit, 100); console.log(`Room poller started`); console.log(` rooms: ${rooms.join(', ')}`); @@ -111,9 +176,19 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config console.log(` nudge command: ${nudgeCommandText || '(missing)'}`); } console.log(` seen file: ${seenFile}`); + if (dmEnabled) { + console.log(` direct messages: enabled`); + console.log(` dm handle: ${dmHandle}`); + console.log(` dm interval: ${dmPollInterval}s`); + console.log(` dm seen file: ${dmSeenFile}`); + console.log(` dm notify file: ${dmNotifyFile}`); + console.log(` dm limit: ${dmLimit}`); + console.log(` dm human only: ${dmHumanOnly}`); + } console.log(` queue: ${queuePath}`); const seen = loadSeenIds(seenFile); + const dmSeen = dmEnabled ? loadSeenIds(dmSeenFile) : new Set(); // Seed: mark current messages as seen on first run if (seen.size === 0) { @@ -128,8 +203,25 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config console.log(` seeded ${seen.size} IDs`); } + if (dmEnabled && dmSeen.size === 0) { + console.log(` seeding seen IDs from current direct messages...`); + const directMessages = await fetchDirectMessages(dmApiKey, dmLimit); + for (const message of directMessages) { + if (isRelevantDirectMessage(message, { selfHandle: dmHandle, humanOnly: dmHumanOnly })) { + dmSeen.add(message.id); + } + } + saveSeenIds(dmSeenFile, dmSeen); + console.log(` seeded ${dmSeen.size} DM IDs`); + } + + let roomPollInFlight = false; + let dmPollInFlight = false; - async function poll() { + async function pollRooms() { + if (roomPollInFlight) return; + roomPollInFlight = true; + try { let newCount = 0; const newMessages = []; for (const room of rooms) { @@ -140,8 +232,9 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config seen.add(mid); const sender = m.from || m.sender || '?'; + const normalizedSender = normalizeHandle(sender); // Skip own messages - if (sender === selfHandle || sender === selfHandle.replace('@', '')) continue; + if (normalizedSender === selfHandle) continue; const body = (m.body || '').slice(0, 500); const ts = m.created_at || new Date().toISOString(); @@ -176,37 +269,88 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config if (newCount > 0) { // Primary: write to notification file (always works) - appendFileSync(notifyFile, newMessages.join('\n') + '\n'); - - // Secondary: try configured nudge mode. - let nudged = false; - if (nudgeMode === 'command') { - nudged = nudgeCommand(nudgeCommandText, { text: nudgeText, session }); - } else if (nudgeMode === 'none') { - nudged = true; - } else { - nudged = nudgeTmux(session, nudgeText); - } + appendNotifications(notifyFile, newMessages); + const nudged = triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session }); console.log(` ${newCount} new message(s) → notified${nudged ? ' + nudge' : ''}`); } + } finally { + roomPollInFlight = false; + } + } + + async function pollDirectMessages() { + if (!dmEnabled || dmPollInFlight) return; + dmPollInFlight = true; + try { + let newCount = 0; + const newMessages = []; + const directMessages = await fetchDirectMessages(dmApiKey, dmLimit); + for (const message of directMessages) { + if (!isRelevantDirectMessage(message, { selfHandle: dmHandle, humanOnly: dmHumanOnly })) continue; + const mid = message.id; + if (!mid || dmSeen.has(mid)) continue; + dmSeen.add(mid); + + const sender = normalizeHandle(message.from || message.sender) || '?'; + const recipient = normalizeHandle(message.to) || dmHandle; + const body = (message.body || '').slice(0, 500); + const ts = message.created_at || new Date().toISOString(); + + const rawEvent = { + trace_id: randomUUID(), + event_id: mid, + source: 'antfarm', + kind: 'antfarm.dm.created', + timestamp: ts, + room: null, + actor: { login: sender }, + payload: { body, type: 'dm', to: recipient }, + intent: null, + memory_context: null, + enrichment_errors: [] + }; + const event = await enrichEvent(rawEvent, config); + appendFileSync(queuePath, JSON.stringify(event) + '\n'); + + const line = `[${ts.slice(0, 19)}] [dm] ${sender} -> ${recipient}: ${body.replace(/\n/g, ' ').slice(0, 200)}`; + newMessages.push(line); + newCount++; + + console.log(` [${ts.slice(0, 19)}] ${sender} DM -> ${recipient}: ${body.slice(0, 80)}...`); + } + + saveSeenIds(dmSeenFile, dmSeen); + + if (newCount > 0) { + appendNotifications(dmNotifyFile, newMessages); + const nudged = triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session }); + console.log(` ${newCount} new direct message(s) → notified${nudged ? ' + nudge' : ''}`); + } + } finally { + dmPollInFlight = false; + } } // Initial poll - await poll(); + await pollRooms(); + await pollDirectMessages(); // Start interval - const timer = setInterval(poll, pollInterval * 1000); + const roomTimer = setInterval(pollRooms, pollInterval * 1000); + const dmTimer = dmEnabled ? setInterval(pollDirectMessages, dmPollInterval * 1000) : null; // Handle shutdown process.on('SIGINT', () => { console.log('\nPoller stopped.'); - clearInterval(timer); + clearInterval(roomTimer); + if (dmTimer) clearInterval(dmTimer); process.exit(0); }); process.on('SIGTERM', () => { - clearInterval(timer); + clearInterval(roomTimer); + if (dmTimer) clearInterval(dmTimer); process.exit(0); }); - return timer; + return { roomTimer, dmTimer }; } diff --git a/test/config.test.mjs b/test/config.test.mjs index dec8190..0697a35 100644 --- a/test/config.test.mjs +++ b/test/config.test.mjs @@ -1,9 +1,20 @@ // SPDX-License-Identifier: AGPL-3.0-only -import { describe, it } from 'node:test'; +import { describe, it, afterEach } from 'node:test'; import { strict as assert } from 'node:assert'; +import { mkdtempSync, rmSync, writeFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; import { loadConfig } from '../src/config.mjs'; +const tempDirs = []; + +afterEach(() => { + while (tempDirs.length > 0) { + rmSync(tempDirs.pop(), { recursive: true, force: true }); + } +}); + describe('config', () => { it('loadConfig returns defaults when no file exists', () => { const cfg = loadConfig('/tmp/iak-nonexistent-config.json'); @@ -11,6 +22,8 @@ describe('config', () => { assert.ok(cfg.queue); assert.ok(cfg.receipts); assert.ok(cfg.tmux); + assert.ok(cfg.poller); + assert.ok(cfg.dm_poller); assert.ok(cfg.automation); assert.ok(cfg.comments); }); @@ -30,4 +43,30 @@ describe('config', () => { assert.ok(Array.isArray(cfg.comments.github.repos)); assert.equal(cfg.comments.interval_sec, 120); }); + + it('merges partial poller and dm_poller config with defaults', () => { + const dir = mkdtempSync(join(tmpdir(), 'iak-config-')); + tempDirs.push(dir); + const configPath = join(dir, 'config.json'); + writeFileSync(configPath, JSON.stringify({ + poller: { + rooms: ['thinkoff-development'], + handle: '@CodexMB' + }, + dm_poller: { + enabled: true + } + })); + + const cfg = loadConfig(configPath); + + assert.deepEqual(cfg.poller.rooms, ['thinkoff-development']); + assert.equal(cfg.poller.handle, '@CodexMB'); + assert.equal(cfg.poller.interval_sec, 30); + assert.equal(cfg.poller.seen_file, '/tmp/iak-seen-ids.txt'); + assert.equal(cfg.dm_poller.enabled, true); + assert.equal(cfg.dm_poller.interval_sec, 30); + assert.equal(cfg.dm_poller.seen_file, '/tmp/iak-dm-seen-ids.txt'); + assert.equal(cfg.dm_poller.limit, 100); + }); }); diff --git a/test/room-poller.test.mjs b/test/room-poller.test.mjs new file mode 100644 index 0000000..a84884f --- /dev/null +++ b/test/room-poller.test.mjs @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +import { describe, it } from 'node:test'; +import { strict as assert } from 'node:assert'; +import { isRelevantDirectMessage } from '../src/team-relay/room-poller.mjs'; + +describe('room-poller direct messages', () => { + it('accepts a new DM addressed to the configured handle', () => { + const message = { + id: 'dm-1', + from: '@petrus', + to: '@CodexMB', + type: 'dm', + metadata: { user: { id: 'user-1' } } + }; + + assert.equal(isRelevantDirectMessage(message, { selfHandle: '@CodexMB' }), true); + }); + + it('rejects messages sent by self', () => { + const message = { + id: 'dm-2', + from: '@CodexMB', + to: '@CodexMB', + type: 'dm' + }; + + assert.equal(isRelevantDirectMessage(message, { selfHandle: '@CodexMB' }), false); + }); + + it('rejects DMs addressed to another recipient', () => { + const message = { + id: 'dm-3', + from: '@petrus', + to: '@SomeoneElse', + type: 'dm' + }; + + assert.equal(isRelevantDirectMessage(message, { selfHandle: '@CodexMB' }), false); + }); + + it('can require human-authored DMs', () => { + const botMessage = { + id: 'dm-4', + from: '@antigravity', + to: '@CodexMB', + type: 'dm', + metadata: { dm: { to_user_id: 'user-1' } } + }; + + assert.equal(isRelevantDirectMessage(botMessage, { selfHandle: '@CodexMB', humanOnly: true }), false); + assert.equal(isRelevantDirectMessage(botMessage, { selfHandle: '@CodexMB', humanOnly: false }), true); + }); +});