Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ For Codex Desktop GUI (non-tmux) use command-mode nudging:
"nudge_mode": "command",
"nudge_command": "/ABSOLUTE/PATH/ide-agent-kit/tools/codex_gui_nudge.sh"
},
"dm_poller": {
"enabled": true,
"seen_file": "/tmp/codex-dm-seen.txt",
"limit": 100,
"human_only": false
},
"tmux": {
"ide_session": "codex",
"nudge_text": "check room and respond only if you have something relevant to say [codex]"
Expand All @@ -143,6 +149,8 @@ Run:
node bin/cli.mjs rooms watch --config /ABSOLUTE/PATH/ide-agent-kit-codex.json
```

When `dm_poller.enabled` is set, the same watcher also polls `/api/v1/messages?limit=100`, keeps a separate DM seen-state file, and nudges on new `type: "dm"` rows addressed to your configured handle. DM notifications are appended to the normal notification file so existing `rooms check` and GUI-nudge flows continue to work.

There is also a ready-to-copy example at:

```bash
Expand Down
6 changes: 6 additions & 0 deletions config/codex.desktop.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,11 @@
"nudge_command": "/ABSOLUTE/PATH/ide-agent-kit/tools/codex_gui_nudge.sh",
"notification_file": "/tmp/codex-room-notifications.txt",
"seen_file": "/tmp/codex-room-seen.txt"
},
"dm_poller": {
"enabled": true,
"seen_file": "/tmp/codex-dm-seen.txt",
"limit": 100,
"human_only": false
}
}
13 changes: 11 additions & 2 deletions src/config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ const DEFAULT_CONFIG = {
nudge_mode: 'tmux',
nudge_command: ''
},
dm_poller: {
enabled: false,
handle: '',
interval_sec: 30,
seen_file: '/tmp/iak-dm-seen-ids.txt',
api_key: '',
human_only: false,
limit: 100
},
github: { webhook_secret: '', event_kinds: ['pull_request', 'issue_comment', 'check_suite', 'workflow_run'] },
outbound: { default_webhook_url: '' },
rate_limit: { message_interval_sec: 30 },
Expand Down Expand Up @@ -63,6 +72,7 @@ export function loadConfig(configPath) {
receipts: { ...DEFAULT_CONFIG.receipts, ...raw.receipts },
tmux: { ...DEFAULT_CONFIG.tmux, ...raw.tmux },
poller: { ...DEFAULT_CONFIG.poller, ...raw.poller },
dm_poller: { ...DEFAULT_CONFIG.dm_poller, ...raw.dm_poller },
github: { ...DEFAULT_CONFIG.github, ...raw.github },
outbound: { ...DEFAULT_CONFIG.outbound, ...raw.outbound },
rate_limit: { ...DEFAULT_CONFIG.rate_limit, ...raw.rate_limit },
Expand All @@ -75,7 +85,6 @@ export function loadConfig(configPath) {
},
discord: { ...DEFAULT_CONFIG.discord, ...raw.discord },
acp: { ...DEFAULT_CONFIG.acp, ...raw.acp },
openclaw: raw.openclaw || {},
poller: raw.poller || {}
openclaw: raw.openclaw || {}
};
}
184 changes: 164 additions & 20 deletions src/team-relay/room-poller.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,47 @@ function saveSeenIds(path, ids) {
writeFileSync(path, arr.join('\n') + '\n');
}

const DM_SEEN_FILE_DEFAULT = '/tmp/iak-dm-seen-ids.txt';

function normalizeHandle(handle) {
if (typeof handle !== 'string') return '';
const trimmed = handle.trim();
if (!trimmed) return '';
return trimmed.startsWith('@') ? trimmed : `@${trimmed}`;
}

function parsePositiveInt(value, fallback) {
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}

function appendNotifications(path, lines) {
if (lines.length === 0) return;
appendFileSync(path, lines.join('\n') + '\n');
}

function triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session }) {
if (nudgeMode === 'command') {
return nudgeCommand(nudgeCommandText, { text: nudgeText, session });
}
if (nudgeMode === 'none') {
return true;
}
return nudgeTmux(session, nudgeText);
}

export function isRelevantDirectMessage(message, { selfHandle, humanOnly = false } = {}) {
if (!message || message.type !== 'dm' || !message.id) return false;
const sender = normalizeHandle(message.from || message.sender);
const recipient = normalizeHandle(message.to);
const expectedRecipient = normalizeHandle(selfHandle);
if (!sender || !recipient || !expectedRecipient) return false;
if (recipient !== expectedRecipient) return false;
if (sender === expectedRecipient) return false;
if (humanOnly && !message?.metadata?.user) return false;
return true;
}

function nudgeTmux(session, text) {
try {
execSync(`tmux has-session -t ${JSON.stringify(session)} 2>/dev/null`);
Expand Down Expand Up @@ -71,6 +112,21 @@ async function fetchRoomMessages(room, apiKey, limit = 10) {
}
}

async function fetchDirectMessages(apiKey, limit = 100) {
const url = `https://groupmind.one/api/v1/messages?limit=${limit}`;
try {
const result = execSync(
`curl -sS -4 -H "X-API-Key: ${apiKey}" "${url}"`,
{ encoding: 'utf8', timeout: 15000 }
);
const data = JSON.parse(result);
return data.messages || (Array.isArray(data) ? data : []);
} catch (e) {
console.error(` fetch direct messages failed: ${e.message}`);
return [];
}
}

/**
* Read and clear the notification file. Returns array of message lines.
* This is the primary way the IDE agent retrieves new messages.
Expand All @@ -96,8 +152,17 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
const nudgeText = config?.tmux?.nudge_text || 'check rooms';
const nudgeMode = config?.poller?.nudge_mode || 'tmux';
const nudgeCommandText = config?.poller?.nudge_command || '';
const pollInterval = interval || config?.poller?.interval_sec || 30;
const selfHandle = handle || config?.poller?.handle || '@unknown';
const pollInterval = parsePositiveInt(interval || config?.poller?.interval_sec, 30);
const selfHandle = normalizeHandle(handle || config?.poller?.handle || '@unknown');
const dmCfg = config?.dm_poller || {};
const dmEnabled = dmCfg.enabled === true;
const dmHandle = normalizeHandle(dmCfg.handle || selfHandle);
const dmSeenFile = dmCfg.seen_file || DM_SEEN_FILE_DEFAULT;
const dmNotifyFile = dmCfg.notification_file || notifyFile;
const dmPollInterval = parsePositiveInt(dmCfg.interval_sec, pollInterval);
const dmApiKey = dmCfg.api_key || dmCfg.apiKey || config?.poller?.api_key || config?.poller?.apiKey || apiKey;
const dmHumanOnly = dmCfg.human_only === true;
const dmLimit = parsePositiveInt(dmCfg.limit, 100);

console.log(`Room poller started`);
console.log(` rooms: ${rooms.join(', ')}`);
Expand All @@ -111,9 +176,19 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
console.log(` nudge command: ${nudgeCommandText || '(missing)'}`);
}
console.log(` seen file: ${seenFile}`);
if (dmEnabled) {
console.log(` direct messages: enabled`);
console.log(` dm handle: ${dmHandle}`);
console.log(` dm interval: ${dmPollInterval}s`);
console.log(` dm seen file: ${dmSeenFile}`);
console.log(` dm notify file: ${dmNotifyFile}`);
console.log(` dm limit: ${dmLimit}`);
console.log(` dm human only: ${dmHumanOnly}`);
}
console.log(` queue: ${queuePath}`);

const seen = loadSeenIds(seenFile);
const dmSeen = dmEnabled ? loadSeenIds(dmSeenFile) : new Set();

// Seed: mark current messages as seen on first run
if (seen.size === 0) {
Expand All @@ -128,8 +203,25 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
console.log(` seeded ${seen.size} IDs`);
}

if (dmEnabled && dmSeen.size === 0) {
console.log(` seeding seen IDs from current direct messages...`);
const directMessages = await fetchDirectMessages(dmApiKey, dmLimit);
for (const message of directMessages) {
if (isRelevantDirectMessage(message, { selfHandle: dmHandle, humanOnly: dmHumanOnly })) {
dmSeen.add(message.id);
}
}
saveSeenIds(dmSeenFile, dmSeen);
console.log(` seeded ${dmSeen.size} DM IDs`);
}

let roomPollInFlight = false;
let dmPollInFlight = false;

async function poll() {
async function pollRooms() {
if (roomPollInFlight) return;
roomPollInFlight = true;
try {
let newCount = 0;
const newMessages = [];
for (const room of rooms) {
Expand All @@ -140,8 +232,9 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
seen.add(mid);

const sender = m.from || m.sender || '?';
const normalizedSender = normalizeHandle(sender);
// Skip own messages
if (sender === selfHandle || sender === selfHandle.replace('@', '')) continue;
if (normalizedSender === selfHandle) continue;

const body = (m.body || '').slice(0, 500);
const ts = m.created_at || new Date().toISOString();
Expand Down Expand Up @@ -176,37 +269,88 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config

if (newCount > 0) {
// Primary: write to notification file (always works)
appendFileSync(notifyFile, newMessages.join('\n') + '\n');

// Secondary: try configured nudge mode.
let nudged = false;
if (nudgeMode === 'command') {
nudged = nudgeCommand(nudgeCommandText, { text: nudgeText, session });
} else if (nudgeMode === 'none') {
nudged = true;
} else {
nudged = nudgeTmux(session, nudgeText);
}
appendNotifications(notifyFile, newMessages);
const nudged = triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session });
console.log(` ${newCount} new message(s) → notified${nudged ? ' + nudge' : ''}`);
}
} finally {
roomPollInFlight = false;
}
}

async function pollDirectMessages() {
if (!dmEnabled || dmPollInFlight) return;
dmPollInFlight = true;
try {
let newCount = 0;
const newMessages = [];
const directMessages = await fetchDirectMessages(dmApiKey, dmLimit);
for (const message of directMessages) {
if (!isRelevantDirectMessage(message, { selfHandle: dmHandle, humanOnly: dmHumanOnly })) continue;
const mid = message.id;
if (!mid || dmSeen.has(mid)) continue;
dmSeen.add(mid);

const sender = normalizeHandle(message.from || message.sender) || '?';
const recipient = normalizeHandle(message.to) || dmHandle;
const body = (message.body || '').slice(0, 500);
const ts = message.created_at || new Date().toISOString();

const rawEvent = {
trace_id: randomUUID(),
event_id: mid,
source: 'antfarm',
kind: 'antfarm.dm.created',
timestamp: ts,
room: null,
actor: { login: sender },
payload: { body, type: 'dm', to: recipient },
intent: null,
memory_context: null,
enrichment_errors: []
};
const event = await enrichEvent(rawEvent, config);
appendFileSync(queuePath, JSON.stringify(event) + '\n');

const line = `[${ts.slice(0, 19)}] [dm] ${sender} -> ${recipient}: ${body.replace(/\n/g, ' ').slice(0, 200)}`;
newMessages.push(line);
newCount++;

console.log(` [${ts.slice(0, 19)}] ${sender} DM -> ${recipient}: ${body.slice(0, 80)}...`);
}

saveSeenIds(dmSeenFile, dmSeen);

if (newCount > 0) {
appendNotifications(dmNotifyFile, newMessages);
const nudged = triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session });
console.log(` ${newCount} new direct message(s) → notified${nudged ? ' + nudge' : ''}`);
}
} finally {
dmPollInFlight = false;
}
}

// Initial poll
await poll();
await pollRooms();
await pollDirectMessages();

// Start interval
const timer = setInterval(poll, pollInterval * 1000);
const roomTimer = setInterval(pollRooms, pollInterval * 1000);
const dmTimer = dmEnabled ? setInterval(pollDirectMessages, dmPollInterval * 1000) : null;

// Handle shutdown
process.on('SIGINT', () => {
console.log('\nPoller stopped.');
clearInterval(timer);
clearInterval(roomTimer);
if (dmTimer) clearInterval(dmTimer);
process.exit(0);
});
process.on('SIGTERM', () => {
clearInterval(timer);
clearInterval(roomTimer);
if (dmTimer) clearInterval(dmTimer);
process.exit(0);
});

return timer;
return { roomTimer, dmTimer };
}
41 changes: 40 additions & 1 deletion test/config.test.mjs
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
// SPDX-License-Identifier: AGPL-3.0-only

import { describe, it } from 'node:test';
import { describe, it, afterEach } from 'node:test';
import { strict as assert } from 'node:assert';
import { mkdtempSync, rmSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { loadConfig } from '../src/config.mjs';

const tempDirs = [];

afterEach(() => {
while (tempDirs.length > 0) {
rmSync(tempDirs.pop(), { recursive: true, force: true });
}
});

describe('config', () => {
it('loadConfig returns defaults when no file exists', () => {
const cfg = loadConfig('/tmp/iak-nonexistent-config.json');
assert.ok(cfg.listen);
assert.ok(cfg.queue);
assert.ok(cfg.receipts);
assert.ok(cfg.tmux);
assert.ok(cfg.poller);
assert.ok(cfg.dm_poller);
assert.ok(cfg.automation);
assert.ok(cfg.comments);
});
Expand All @@ -30,4 +43,30 @@ describe('config', () => {
assert.ok(Array.isArray(cfg.comments.github.repos));
assert.equal(cfg.comments.interval_sec, 120);
});

it('merges partial poller and dm_poller config with defaults', () => {
const dir = mkdtempSync(join(tmpdir(), 'iak-config-'));
tempDirs.push(dir);
const configPath = join(dir, 'config.json');
writeFileSync(configPath, JSON.stringify({
poller: {
rooms: ['thinkoff-development'],
handle: '@CodexMB'
},
dm_poller: {
enabled: true
}
}));

const cfg = loadConfig(configPath);

assert.deepEqual(cfg.poller.rooms, ['thinkoff-development']);
assert.equal(cfg.poller.handle, '@CodexMB');
assert.equal(cfg.poller.interval_sec, 30);
assert.equal(cfg.poller.seen_file, '/tmp/iak-seen-ids.txt');
assert.equal(cfg.dm_poller.enabled, true);
assert.equal(cfg.dm_poller.interval_sec, 30);
assert.equal(cfg.dm_poller.seen_file, '/tmp/iak-dm-seen-ids.txt');
assert.equal(cfg.dm_poller.limit, 100);
});
});
Loading
Loading