From 0920554dad12eb1f77ab4d770128fd4101b00591 Mon Sep 17 00:00:00 2001 From: Matthew Jackson Date: Fri, 6 Mar 2026 11:38:37 -0800 Subject: [PATCH] Add Telegram answer learning: poller + applier safety net - New lib/telegram_answers.mjs: shared module that polls Telegram getUpdates, matches replies to needs_answer jobs, saves to answers.json, flips job to new - telegram_poller.mjs: lightweight cron script (every minute via OpenClaw) - Applier also processes replies at start of each run as safety net - sendTelegram now returns message_id, stored on job for reply matching - User replies "ACCEPT" to use AI answer, or types their own - Answers persist in answers.json and apply to ALL future jobs - Also includes: selectOptionFuzzy, multiple dialog handling, browser recovery, answers reload, per-job timeout bump to 10min Co-Authored-By: Claude Opus 4.6 --- job_applier.mjs | 21 +++-- lib/notify.mjs | 55 ++++++++++++- lib/telegram_answers.mjs | 164 +++++++++++++++++++++++++++++++++++++++ telegram_poller.mjs | 21 +++++ 4 files changed, 250 insertions(+), 11 deletions(-) create mode 100644 lib/telegram_answers.mjs create mode 100644 telegram_poller.mjs diff --git a/job_applier.mjs b/job_applier.mjs index fee1704..683991a 100644 --- a/job_applier.mjs +++ b/job_applier.mjs @@ -18,6 +18,7 @@ import { createBrowser } from './lib/browser.mjs'; import { FormFiller } from './lib/form_filler.mjs'; import { applyToJob, supportedTypes } from './lib/apply/index.mjs'; import { sendTelegram, formatApplySummary } from './lib/notify.mjs'; +import { processTelegramReplies } from './lib/telegram_answers.mjs'; import { generateAnswer } from './lib/ai_answer.mjs'; import { APPLY_BETWEEN_DELAY_BASE, APPLY_BETWEEN_DELAY_JITTER, DEFAULT_MAX_RETRIES, @@ -58,6 +59,11 @@ async function main() { }); console.log('🚀 claw-apply: Job Applier starting\n'); + + // Process any Telegram replies before fetching jobs — saves answers and flips jobs back to "new" + const answered = await processTelegramReplies(settings, answersPath); + if (answered > 0) console.log(`📬 Processed ${answered} Telegram answer(s)\n`); + console.log(`Supported apply types: ${supportedTypes().join(', ')}\n`); // Preview mode @@ -237,12 +243,6 @@ async function handleResult(job, result, results, settings, profile, apiKey) { const aiAnswer = await generateAnswer(questionText, profile, apiKey, { title, company }); - updateJobStatus(job.id, 'needs_answer', { - title, company, pending_question, - ai_suggested_answer: aiAnswer || null, - }); - appendLog({ ...job, title, company, status: 'needs_answer', pending_question, ai_suggested_answer: aiAnswer }); - const msg = [ `❓ *New question* — ${company} / ${title}`, ``, @@ -256,7 +256,14 @@ async function handleResult(job, result, results, settings, profile, apiKey) { `Reply with your answer to store it, or reply *ACCEPT* to use the AI answer.`, ].filter(Boolean).join('\n'); - await sendTelegram(settings, msg); + const telegramMsgId = await sendTelegram(settings, msg); + + updateJobStatus(job.id, 'needs_answer', { + title, company, pending_question, + ai_suggested_answer: aiAnswer || null, + telegram_message_id: telegramMsgId, + }); + appendLog({ ...job, title, company, status: 'needs_answer', pending_question, ai_suggested_answer: aiAnswer }); results.needs_answer++; break; } diff --git a/lib/notify.mjs b/lib/notify.mjs index 9df779e..8e123ac 100644 --- a/lib/notify.mjs +++ b/lib/notify.mjs @@ -6,11 +6,14 @@ import { TELEGRAM_API_BASE, NOTIFY_RATE_LIMIT_MS } from './constants.mjs'; let lastSentAt = 0; +/** + * Send a Telegram message. Returns the message_id on success (useful for tracking replies). + */ export async function sendTelegram(settings, message) { - const { bot_token, telegram_user_id } = settings.notifications; + const { bot_token, telegram_user_id } = settings.notifications || {}; if (!bot_token || !telegram_user_id) { console.log(`[notify] No Telegram config — would send: ${message.substring(0, 80)}`); - return; + return null; } // Rate limit to avoid Telegram API throttling @@ -32,14 +35,58 @@ export async function sendTelegram(settings, message) { }), }); lastSentAt = Date.now(); - if (!res.ok) { console.error(`[notify] Telegram HTTP error: ${res.status}`); return; } + if (!res.ok) { console.error(`[notify] Telegram HTTP error: ${res.status}`); return null; } const data = await res.json(); - if (!data.ok) console.error('[notify] Telegram error:', data.description); + if (!data.ok) { console.error('[notify] Telegram error:', data.description); return null; } + return data.result?.message_id || null; } catch (e) { console.error('[notify] Failed to send Telegram message:', e.message); + return null; } } +/** + * Get updates from Telegram Bot API (long polling). + * @param {string} botToken + * @param {number} offset - Update ID offset (pass last_update_id + 1) + * @param {number} timeout - Long poll timeout in seconds + * @returns {Array} Array of update objects + */ +export async function getTelegramUpdates(botToken, offset = 0, timeout = 5) { + const url = `${TELEGRAM_API_BASE}${botToken}/getUpdates`; + try { + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ offset, timeout }), + }); + if (!res.ok) return []; + const data = await res.json(); + return data.ok ? (data.result || []) : []; + } catch { + return []; + } +} + +/** + * Reply to a specific Telegram message. + */ +export async function replyTelegram(botToken, chatId, replyToMessageId, text) { + const url = `${TELEGRAM_API_BASE}${botToken}/sendMessage`; + try { + await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + chat_id: chatId, + text, + reply_to_message_id: replyToMessageId, + parse_mode: 'Markdown', + }), + }); + } catch { /* best effort */ } +} + export function formatSearchSummary(added, skipped, platforms) { if (added === 0) return `🔍 *Job Search Complete*\nNo new jobs found this run.`; return `🔍 *Job Search Complete*\n${added} new job${added !== 1 ? 's' : ''} added to queue (${skipped} already seen)\nPlatforms: ${platforms.join(', ')}`; diff --git a/lib/telegram_answers.mjs b/lib/telegram_answers.mjs new file mode 100644 index 0000000..d3755a7 --- /dev/null +++ b/lib/telegram_answers.mjs @@ -0,0 +1,164 @@ +/** + * telegram_answers.mjs — Process Telegram replies to question messages + * + * Shared by telegram_poller.mjs (cron) and job_applier.mjs (safety net). + * + * Flow: + * 1. Poll Telegram getUpdates for new messages + * 2. Match replies to needs_answer jobs via reply_to_message_id + * 3. Save answer to answers.json (reused for ALL future jobs) + * 4. Flip job back to "new" for retry + * 5. Send confirmation reply + */ +import { existsSync, readFileSync, writeFileSync, renameSync } from 'fs'; +import { dirname, resolve } from 'path'; +import { fileURLToPath } from 'url'; + +import { loadQueue, saveQueue } from './queue.mjs'; +import { getTelegramUpdates, replyTelegram } from './notify.mjs'; + +const __dir = dirname(fileURLToPath(import.meta.url)); +const OFFSET_PATH = resolve(__dir, '../data/telegram_offset.json'); + +function loadOffset() { + if (!existsSync(OFFSET_PATH)) return 0; + try { + return JSON.parse(readFileSync(OFFSET_PATH, 'utf8')).offset || 0; + } catch { return 0; } +} + +function saveOffset(offset) { + writeFileSync(OFFSET_PATH, JSON.stringify({ offset })); +} + +function loadAnswers(path) { + if (!existsSync(path)) return []; + return JSON.parse(readFileSync(path, 'utf8')); +} + +function saveAnswers(path, answers) { + const tmp = path + '.tmp'; + writeFileSync(tmp, JSON.stringify(answers, null, 2)); + renameSync(tmp, path); +} + +/** + * Process pending Telegram replies. Returns number of answers processed. + * @param {object} settings - settings.json contents + * @param {string} answersPath - absolute path to answers.json + * @returns {number} count of answers saved + */ +export async function processTelegramReplies(settings, answersPath) { + const botToken = settings.notifications?.bot_token; + const chatId = settings.notifications?.telegram_user_id; + if (!botToken || !chatId) return 0; + + const offset = loadOffset(); + const updates = await getTelegramUpdates(botToken, offset, 1); + if (updates.length === 0) return 0; + + // Build lookup: telegram_message_id → job + const queue = loadQueue(); + const jobsByMsgId = new Map(); + for (const job of queue) { + if (job.status === 'needs_answer' && job.telegram_message_id) { + jobsByMsgId.set(job.telegram_message_id, job); + } + } + + let queueDirty = false; + let answersDirty = false; + const answers = loadAnswers(answersPath); + let maxUpdateId = offset; + let processed = 0; + + for (const update of updates) { + maxUpdateId = Math.max(maxUpdateId, update.update_id); + + const msg = update.message; + if (!msg || !msg.text) continue; + if (String(msg.chat?.id) !== String(chatId)) continue; + + const replyTo = msg.reply_to_message?.message_id; + const text = msg.text.trim(); + + // Match reply to a pending question + let matchedJob = null; + if (replyTo && jobsByMsgId.has(replyTo)) { + matchedJob = jobsByMsgId.get(replyTo); + } else if (!replyTo) { + // Not a reply — match to the single pending question if only one exists + const pending = queue + .filter(j => j.status === 'needs_answer' && j.telegram_message_id) + .sort((a, b) => (b.status_updated_at || '').localeCompare(a.status_updated_at || '')); + if (pending.length === 1) { + matchedJob = pending[0]; + } else if (pending.length > 1) { + await replyTelegram(botToken, chatId, msg.message_id, + `I have ${pending.length} questions waiting. Please *reply* to the specific question message so I know which one you're answering.` + ); + continue; + } + } + + if (!matchedJob) continue; + + const questionLabel = matchedJob.pending_question?.label || matchedJob.pending_question || ''; + const questionOptions = matchedJob.pending_question?.options || []; + let answer; + + if (/^accept$/i.test(text)) { + answer = matchedJob.ai_suggested_answer; + if (!answer) { + await replyTelegram(botToken, chatId, msg.message_id, + `No AI answer available for this question. Please type your answer directly.` + ); + continue; + } + } else { + answer = text; + } + + // For select fields, match reply to exact option text + if (questionOptions.length > 0) { + const matched = questionOptions.find(o => o.toLowerCase() === answer.toLowerCase()); + if (matched) answer = matched; + } + + // Save to answers.json + if (questionLabel) { + const existing = answers.findIndex(a => a.pattern === questionLabel); + if (existing >= 0) { + answers[existing].answer = answer; + } else { + answers.push({ pattern: questionLabel, answer }); + } + answersDirty = true; + } + + // Flip job back to "new" + const idx = queue.findIndex(j => j.id === matchedJob.id); + if (idx >= 0) { + queue[idx] = { + ...queue[idx], + status: 'new', + status_updated_at: new Date().toISOString(), + telegram_message_id: null, + }; + queueDirty = true; + } + + await replyTelegram(botToken, chatId, msg.message_id, + `Saved "${answer}" for "${questionLabel.slice(0, 60)}"\n\n${matchedJob.title} @ ${matchedJob.company} will be retried next run.` + ); + + console.log(`[telegram] Saved answer for "${questionLabel}": "${answer.slice(0, 50)}"`); + processed++; + } + + if (answersDirty) saveAnswers(answersPath, answers); + if (queueDirty) saveQueue(queue); + saveOffset(maxUpdateId + 1); + + return processed; +} diff --git a/telegram_poller.mjs b/telegram_poller.mjs new file mode 100644 index 0000000..8942f3d --- /dev/null +++ b/telegram_poller.mjs @@ -0,0 +1,21 @@ +#!/usr/bin/env node +/** + * telegram_poller.mjs — Polls Telegram for replies to question messages + * + * Run via OpenClaw cron: * * * * * (every minute) + * Lightweight — single HTTP call, exits immediately if no updates. + */ +import { loadEnv } from './lib/env.mjs'; +loadEnv(); + +import { resolve, dirname } from 'path'; +import { fileURLToPath } from 'url'; +import { loadConfig } from './lib/queue.mjs'; +import { processTelegramReplies } from './lib/telegram_answers.mjs'; + +const __dir = dirname(fileURLToPath(import.meta.url)); +const settings = loadConfig(resolve(__dir, 'config/settings.json')); +const answersPath = resolve(__dir, 'config/answers.json'); + +const processed = await processTelegramReplies(settings, answersPath); +if (processed > 0) console.log(`Processed ${processed} answer(s)`);