Files
claw-apply/lib/telegram_answers.mjs
Matthew Jackson f14af48905 Route all file I/O through storage layer (S3 or disk)
- filter.mjs: loadProfile now async, uses loadJSON
- telegram_answers.mjs: answers read/write through storage layer
- status.mjs: uses initQueue + loadQueue for S3 support
- setup.mjs: await all loadConfig calls
- storage.mjs: more robust getS3Key using URL parsing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 08:25:22 -08:00

155 lines
5.0 KiB
JavaScript

/**
* telegram_answers.mjs — Process Telegram replies to question messages
*
* Shared by telegram_poller.mjs (cron) and job_applier.mjs (safety net).
*
* Flow:
* 1. Poll Telegram getUpdates for new messages
* 2. Match replies to needs_answer jobs via reply_to_message_id
* 3. Save answer to answers.json (reused for ALL future jobs)
* 4. Flip job back to "new" for retry
* 5. Send confirmation reply
*/
import { existsSync, readFileSync, writeFileSync } from 'fs';
import { dirname, resolve } from 'path';
import { fileURLToPath } from 'url';
import { loadQueue, saveQueue } from './queue.mjs';
import { loadJSON, saveJSON } from './storage.mjs';
import { getTelegramUpdates, replyTelegram } from './notify.mjs';
const __dir = dirname(fileURLToPath(import.meta.url));
const OFFSET_PATH = resolve(__dir, '../data/telegram_offset.json');
function loadOffset() {
if (!existsSync(OFFSET_PATH)) return 0;
try {
return JSON.parse(readFileSync(OFFSET_PATH, 'utf8')).offset || 0;
} catch { return 0; }
}
function saveOffset(offset) {
writeFileSync(OFFSET_PATH, JSON.stringify({ offset }));
}
/**
* Process pending Telegram replies. Returns number of answers processed.
* @param {object} settings - settings.json contents
* @param {string} answersPath - path to answers.json (used as storage key)
* @returns {number} count of answers saved
*/
export async function processTelegramReplies(settings, answersPath) {
const botToken = settings.notifications?.bot_token;
const chatId = settings.notifications?.telegram_user_id;
if (!botToken || !chatId) return 0;
const offset = loadOffset();
const updates = await getTelegramUpdates(botToken, offset, 1);
if (updates.length === 0) return 0;
// Build lookup: telegram_message_id -> job
const queue = loadQueue();
const jobsByMsgId = new Map();
for (const job of queue) {
if (job.status === 'needs_answer' && job.telegram_message_id) {
jobsByMsgId.set(job.telegram_message_id, job);
}
}
let queueDirty = false;
let answersDirty = false;
const answers = await loadJSON(answersPath, []);
let maxUpdateId = offset;
let processed = 0;
for (const update of updates) {
maxUpdateId = Math.max(maxUpdateId, update.update_id);
const msg = update.message;
if (!msg || !msg.text) continue;
if (String(msg.chat?.id) !== String(chatId)) continue;
const replyTo = msg.reply_to_message?.message_id;
const text = msg.text.trim();
// Match reply to a pending question
let matchedJob = null;
if (replyTo && jobsByMsgId.has(replyTo)) {
matchedJob = jobsByMsgId.get(replyTo);
} else if (!replyTo) {
// Not a reply — match to the single pending question if only one exists
const pending = queue
.filter(j => j.status === 'needs_answer' && j.telegram_message_id)
.sort((a, b) => (b.status_updated_at || '').localeCompare(a.status_updated_at || ''));
if (pending.length === 1) {
matchedJob = pending[0];
} else if (pending.length > 1) {
await replyTelegram(botToken, chatId, msg.message_id,
`I have ${pending.length} questions waiting. Please *reply* to the specific question message so I know which one you're answering.`
);
continue;
}
}
if (!matchedJob) continue;
const questionLabel = matchedJob.pending_question?.label || matchedJob.pending_question || '';
const questionOptions = matchedJob.pending_question?.options || [];
let answer;
if (/^accept$/i.test(text)) {
answer = matchedJob.ai_suggested_answer;
if (!answer) {
await replyTelegram(botToken, chatId, msg.message_id,
`No AI answer available for this question. Please type your answer directly.`
);
continue;
}
} else {
answer = text;
}
// For select fields, match reply to exact option text
if (questionOptions.length > 0) {
const matched = questionOptions.find(o => o.toLowerCase() === answer.toLowerCase());
if (matched) answer = matched;
}
// Save to answers.json
if (questionLabel) {
const existing = answers.findIndex(a => a.pattern === questionLabel);
if (existing >= 0) {
answers[existing].answer = answer;
} else {
answers.push({ pattern: questionLabel, answer });
}
answersDirty = true;
}
// Flip job back to "new"
const idx = queue.findIndex(j => j.id === matchedJob.id);
if (idx >= 0) {
queue[idx] = {
...queue[idx],
status: 'new',
status_updated_at: new Date().toISOString(),
telegram_message_id: null,
};
queueDirty = true;
}
await replyTelegram(botToken, chatId, msg.message_id,
`Saved "${answer}" for "${questionLabel.slice(0, 60)}"\n\n${matchedJob.title} @ ${matchedJob.company} will be retried next run.`
);
console.log(`[telegram] Saved answer for "${questionLabel}": "${answer.slice(0, 50)}"`);
processed++;
}
if (answersDirty) await saveJSON(answersPath, answers);
if (queueDirty) await saveQueue(queue);
saveOffset(maxUpdateId + 1);
return processed;
}