Add Telegram answer learning: poller + applier safety net
- New lib/telegram_answers.mjs: shared module that polls Telegram getUpdates, matches replies to needs_answer jobs, saves to answers.json, flips job to new - telegram_poller.mjs: lightweight cron script (every minute via OpenClaw) - Applier also processes replies at start of each run as safety net - sendTelegram now returns message_id, stored on job for reply matching - User replies "ACCEPT" to use AI answer, or types their own - Answers persist in answers.json and apply to ALL future jobs - Also includes: selectOptionFuzzy, multiple dialog handling, browser recovery, answers reload, per-job timeout bump to 10min Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ import { createBrowser } from './lib/browser.mjs';
|
|||||||
import { FormFiller } from './lib/form_filler.mjs';
|
import { FormFiller } from './lib/form_filler.mjs';
|
||||||
import { applyToJob, supportedTypes } from './lib/apply/index.mjs';
|
import { applyToJob, supportedTypes } from './lib/apply/index.mjs';
|
||||||
import { sendTelegram, formatApplySummary } from './lib/notify.mjs';
|
import { sendTelegram, formatApplySummary } from './lib/notify.mjs';
|
||||||
|
import { processTelegramReplies } from './lib/telegram_answers.mjs';
|
||||||
import { generateAnswer } from './lib/ai_answer.mjs';
|
import { generateAnswer } from './lib/ai_answer.mjs';
|
||||||
import {
|
import {
|
||||||
APPLY_BETWEEN_DELAY_BASE, APPLY_BETWEEN_DELAY_JITTER, DEFAULT_MAX_RETRIES,
|
APPLY_BETWEEN_DELAY_BASE, APPLY_BETWEEN_DELAY_JITTER, DEFAULT_MAX_RETRIES,
|
||||||
@@ -58,6 +59,11 @@ async function main() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
console.log('🚀 claw-apply: Job Applier starting\n');
|
console.log('🚀 claw-apply: Job Applier starting\n');
|
||||||
|
|
||||||
|
// Process any Telegram replies before fetching jobs — saves answers and flips jobs back to "new"
|
||||||
|
const answered = await processTelegramReplies(settings, answersPath);
|
||||||
|
if (answered > 0) console.log(`📬 Processed ${answered} Telegram answer(s)\n`);
|
||||||
|
|
||||||
console.log(`Supported apply types: ${supportedTypes().join(', ')}\n`);
|
console.log(`Supported apply types: ${supportedTypes().join(', ')}\n`);
|
||||||
|
|
||||||
// Preview mode
|
// Preview mode
|
||||||
@@ -237,12 +243,6 @@ async function handleResult(job, result, results, settings, profile, apiKey) {
|
|||||||
|
|
||||||
const aiAnswer = await generateAnswer(questionText, profile, apiKey, { title, company });
|
const aiAnswer = await generateAnswer(questionText, profile, apiKey, { title, company });
|
||||||
|
|
||||||
updateJobStatus(job.id, 'needs_answer', {
|
|
||||||
title, company, pending_question,
|
|
||||||
ai_suggested_answer: aiAnswer || null,
|
|
||||||
});
|
|
||||||
appendLog({ ...job, title, company, status: 'needs_answer', pending_question, ai_suggested_answer: aiAnswer });
|
|
||||||
|
|
||||||
const msg = [
|
const msg = [
|
||||||
`❓ *New question* — ${company} / ${title}`,
|
`❓ *New question* — ${company} / ${title}`,
|
||||||
``,
|
``,
|
||||||
@@ -256,7 +256,14 @@ async function handleResult(job, result, results, settings, profile, apiKey) {
|
|||||||
`Reply with your answer to store it, or reply *ACCEPT* to use the AI answer.`,
|
`Reply with your answer to store it, or reply *ACCEPT* to use the AI answer.`,
|
||||||
].filter(Boolean).join('\n');
|
].filter(Boolean).join('\n');
|
||||||
|
|
||||||
await sendTelegram(settings, msg);
|
const telegramMsgId = await sendTelegram(settings, msg);
|
||||||
|
|
||||||
|
updateJobStatus(job.id, 'needs_answer', {
|
||||||
|
title, company, pending_question,
|
||||||
|
ai_suggested_answer: aiAnswer || null,
|
||||||
|
telegram_message_id: telegramMsgId,
|
||||||
|
});
|
||||||
|
appendLog({ ...job, title, company, status: 'needs_answer', pending_question, ai_suggested_answer: aiAnswer });
|
||||||
results.needs_answer++;
|
results.needs_answer++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,11 +6,14 @@ import { TELEGRAM_API_BASE, NOTIFY_RATE_LIMIT_MS } from './constants.mjs';
|
|||||||
|
|
||||||
let lastSentAt = 0;
|
let lastSentAt = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a Telegram message. Returns the message_id on success (useful for tracking replies).
|
||||||
|
*/
|
||||||
export async function sendTelegram(settings, message) {
|
export async function sendTelegram(settings, message) {
|
||||||
const { bot_token, telegram_user_id } = settings.notifications;
|
const { bot_token, telegram_user_id } = settings.notifications || {};
|
||||||
if (!bot_token || !telegram_user_id) {
|
if (!bot_token || !telegram_user_id) {
|
||||||
console.log(`[notify] No Telegram config — would send: ${message.substring(0, 80)}`);
|
console.log(`[notify] No Telegram config — would send: ${message.substring(0, 80)}`);
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rate limit to avoid Telegram API throttling
|
// Rate limit to avoid Telegram API throttling
|
||||||
@@ -32,14 +35,58 @@ export async function sendTelegram(settings, message) {
|
|||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
lastSentAt = Date.now();
|
lastSentAt = Date.now();
|
||||||
if (!res.ok) { console.error(`[notify] Telegram HTTP error: ${res.status}`); return; }
|
if (!res.ok) { console.error(`[notify] Telegram HTTP error: ${res.status}`); return null; }
|
||||||
const data = await res.json();
|
const data = await res.json();
|
||||||
if (!data.ok) console.error('[notify] Telegram error:', data.description);
|
if (!data.ok) { console.error('[notify] Telegram error:', data.description); return null; }
|
||||||
|
return data.result?.message_id || null;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error('[notify] Failed to send Telegram message:', e.message);
|
console.error('[notify] Failed to send Telegram message:', e.message);
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get updates from Telegram Bot API (long polling).
|
||||||
|
* @param {string} botToken
|
||||||
|
* @param {number} offset - Update ID offset (pass last_update_id + 1)
|
||||||
|
* @param {number} timeout - Long poll timeout in seconds
|
||||||
|
* @returns {Array} Array of update objects
|
||||||
|
*/
|
||||||
|
export async function getTelegramUpdates(botToken, offset = 0, timeout = 5) {
|
||||||
|
const url = `${TELEGRAM_API_BASE}${botToken}/getUpdates`;
|
||||||
|
try {
|
||||||
|
const res = await fetch(url, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({ offset, timeout }),
|
||||||
|
});
|
||||||
|
if (!res.ok) return [];
|
||||||
|
const data = await res.json();
|
||||||
|
return data.ok ? (data.result || []) : [];
|
||||||
|
} catch {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reply to a specific Telegram message.
|
||||||
|
*/
|
||||||
|
export async function replyTelegram(botToken, chatId, replyToMessageId, text) {
|
||||||
|
const url = `${TELEGRAM_API_BASE}${botToken}/sendMessage`;
|
||||||
|
try {
|
||||||
|
await fetch(url, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
chat_id: chatId,
|
||||||
|
text,
|
||||||
|
reply_to_message_id: replyToMessageId,
|
||||||
|
parse_mode: 'Markdown',
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
} catch { /* best effort */ }
|
||||||
|
}
|
||||||
|
|
||||||
export function formatSearchSummary(added, skipped, platforms) {
|
export function formatSearchSummary(added, skipped, platforms) {
|
||||||
if (added === 0) return `🔍 *Job Search Complete*\nNo new jobs found this run.`;
|
if (added === 0) return `🔍 *Job Search Complete*\nNo new jobs found this run.`;
|
||||||
return `🔍 *Job Search Complete*\n${added} new job${added !== 1 ? 's' : ''} added to queue (${skipped} already seen)\nPlatforms: ${platforms.join(', ')}`;
|
return `🔍 *Job Search Complete*\n${added} new job${added !== 1 ? 's' : ''} added to queue (${skipped} already seen)\nPlatforms: ${platforms.join(', ')}`;
|
||||||
|
|||||||
164
lib/telegram_answers.mjs
Normal file
164
lib/telegram_answers.mjs
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
/**
|
||||||
|
* telegram_answers.mjs — Process Telegram replies to question messages
|
||||||
|
*
|
||||||
|
* Shared by telegram_poller.mjs (cron) and job_applier.mjs (safety net).
|
||||||
|
*
|
||||||
|
* Flow:
|
||||||
|
* 1. Poll Telegram getUpdates for new messages
|
||||||
|
* 2. Match replies to needs_answer jobs via reply_to_message_id
|
||||||
|
* 3. Save answer to answers.json (reused for ALL future jobs)
|
||||||
|
* 4. Flip job back to "new" for retry
|
||||||
|
* 5. Send confirmation reply
|
||||||
|
*/
|
||||||
|
import { existsSync, readFileSync, writeFileSync, renameSync } from 'fs';
|
||||||
|
import { dirname, resolve } from 'path';
|
||||||
|
import { fileURLToPath } from 'url';
|
||||||
|
|
||||||
|
import { loadQueue, saveQueue } from './queue.mjs';
|
||||||
|
import { getTelegramUpdates, replyTelegram } from './notify.mjs';
|
||||||
|
|
||||||
|
const __dir = dirname(fileURLToPath(import.meta.url));
|
||||||
|
const OFFSET_PATH = resolve(__dir, '../data/telegram_offset.json');
|
||||||
|
|
||||||
|
function loadOffset() {
|
||||||
|
if (!existsSync(OFFSET_PATH)) return 0;
|
||||||
|
try {
|
||||||
|
return JSON.parse(readFileSync(OFFSET_PATH, 'utf8')).offset || 0;
|
||||||
|
} catch { return 0; }
|
||||||
|
}
|
||||||
|
|
||||||
|
function saveOffset(offset) {
|
||||||
|
writeFileSync(OFFSET_PATH, JSON.stringify({ offset }));
|
||||||
|
}
|
||||||
|
|
||||||
|
function loadAnswers(path) {
|
||||||
|
if (!existsSync(path)) return [];
|
||||||
|
return JSON.parse(readFileSync(path, 'utf8'));
|
||||||
|
}
|
||||||
|
|
||||||
|
function saveAnswers(path, answers) {
|
||||||
|
const tmp = path + '.tmp';
|
||||||
|
writeFileSync(tmp, JSON.stringify(answers, null, 2));
|
||||||
|
renameSync(tmp, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process pending Telegram replies. Returns number of answers processed.
|
||||||
|
* @param {object} settings - settings.json contents
|
||||||
|
* @param {string} answersPath - absolute path to answers.json
|
||||||
|
* @returns {number} count of answers saved
|
||||||
|
*/
|
||||||
|
export async function processTelegramReplies(settings, answersPath) {
|
||||||
|
const botToken = settings.notifications?.bot_token;
|
||||||
|
const chatId = settings.notifications?.telegram_user_id;
|
||||||
|
if (!botToken || !chatId) return 0;
|
||||||
|
|
||||||
|
const offset = loadOffset();
|
||||||
|
const updates = await getTelegramUpdates(botToken, offset, 1);
|
||||||
|
if (updates.length === 0) return 0;
|
||||||
|
|
||||||
|
// Build lookup: telegram_message_id → job
|
||||||
|
const queue = loadQueue();
|
||||||
|
const jobsByMsgId = new Map();
|
||||||
|
for (const job of queue) {
|
||||||
|
if (job.status === 'needs_answer' && job.telegram_message_id) {
|
||||||
|
jobsByMsgId.set(job.telegram_message_id, job);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let queueDirty = false;
|
||||||
|
let answersDirty = false;
|
||||||
|
const answers = loadAnswers(answersPath);
|
||||||
|
let maxUpdateId = offset;
|
||||||
|
let processed = 0;
|
||||||
|
|
||||||
|
for (const update of updates) {
|
||||||
|
maxUpdateId = Math.max(maxUpdateId, update.update_id);
|
||||||
|
|
||||||
|
const msg = update.message;
|
||||||
|
if (!msg || !msg.text) continue;
|
||||||
|
if (String(msg.chat?.id) !== String(chatId)) continue;
|
||||||
|
|
||||||
|
const replyTo = msg.reply_to_message?.message_id;
|
||||||
|
const text = msg.text.trim();
|
||||||
|
|
||||||
|
// Match reply to a pending question
|
||||||
|
let matchedJob = null;
|
||||||
|
if (replyTo && jobsByMsgId.has(replyTo)) {
|
||||||
|
matchedJob = jobsByMsgId.get(replyTo);
|
||||||
|
} else if (!replyTo) {
|
||||||
|
// Not a reply — match to the single pending question if only one exists
|
||||||
|
const pending = queue
|
||||||
|
.filter(j => j.status === 'needs_answer' && j.telegram_message_id)
|
||||||
|
.sort((a, b) => (b.status_updated_at || '').localeCompare(a.status_updated_at || ''));
|
||||||
|
if (pending.length === 1) {
|
||||||
|
matchedJob = pending[0];
|
||||||
|
} else if (pending.length > 1) {
|
||||||
|
await replyTelegram(botToken, chatId, msg.message_id,
|
||||||
|
`I have ${pending.length} questions waiting. Please *reply* to the specific question message so I know which one you're answering.`
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!matchedJob) continue;
|
||||||
|
|
||||||
|
const questionLabel = matchedJob.pending_question?.label || matchedJob.pending_question || '';
|
||||||
|
const questionOptions = matchedJob.pending_question?.options || [];
|
||||||
|
let answer;
|
||||||
|
|
||||||
|
if (/^accept$/i.test(text)) {
|
||||||
|
answer = matchedJob.ai_suggested_answer;
|
||||||
|
if (!answer) {
|
||||||
|
await replyTelegram(botToken, chatId, msg.message_id,
|
||||||
|
`No AI answer available for this question. Please type your answer directly.`
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
answer = text;
|
||||||
|
}
|
||||||
|
|
||||||
|
// For select fields, match reply to exact option text
|
||||||
|
if (questionOptions.length > 0) {
|
||||||
|
const matched = questionOptions.find(o => o.toLowerCase() === answer.toLowerCase());
|
||||||
|
if (matched) answer = matched;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to answers.json
|
||||||
|
if (questionLabel) {
|
||||||
|
const existing = answers.findIndex(a => a.pattern === questionLabel);
|
||||||
|
if (existing >= 0) {
|
||||||
|
answers[existing].answer = answer;
|
||||||
|
} else {
|
||||||
|
answers.push({ pattern: questionLabel, answer });
|
||||||
|
}
|
||||||
|
answersDirty = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flip job back to "new"
|
||||||
|
const idx = queue.findIndex(j => j.id === matchedJob.id);
|
||||||
|
if (idx >= 0) {
|
||||||
|
queue[idx] = {
|
||||||
|
...queue[idx],
|
||||||
|
status: 'new',
|
||||||
|
status_updated_at: new Date().toISOString(),
|
||||||
|
telegram_message_id: null,
|
||||||
|
};
|
||||||
|
queueDirty = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
await replyTelegram(botToken, chatId, msg.message_id,
|
||||||
|
`Saved "${answer}" for "${questionLabel.slice(0, 60)}"\n\n${matchedJob.title} @ ${matchedJob.company} will be retried next run.`
|
||||||
|
);
|
||||||
|
|
||||||
|
console.log(`[telegram] Saved answer for "${questionLabel}": "${answer.slice(0, 50)}"`);
|
||||||
|
processed++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (answersDirty) saveAnswers(answersPath, answers);
|
||||||
|
if (queueDirty) saveQueue(queue);
|
||||||
|
saveOffset(maxUpdateId + 1);
|
||||||
|
|
||||||
|
return processed;
|
||||||
|
}
|
||||||
21
telegram_poller.mjs
Normal file
21
telegram_poller.mjs
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
/**
|
||||||
|
* telegram_poller.mjs — Polls Telegram for replies to question messages
|
||||||
|
*
|
||||||
|
* Run via OpenClaw cron: * * * * * (every minute)
|
||||||
|
* Lightweight — single HTTP call, exits immediately if no updates.
|
||||||
|
*/
|
||||||
|
import { loadEnv } from './lib/env.mjs';
|
||||||
|
loadEnv();
|
||||||
|
|
||||||
|
import { resolve, dirname } from 'path';
|
||||||
|
import { fileURLToPath } from 'url';
|
||||||
|
import { loadConfig } from './lib/queue.mjs';
|
||||||
|
import { processTelegramReplies } from './lib/telegram_answers.mjs';
|
||||||
|
|
||||||
|
const __dir = dirname(fileURLToPath(import.meta.url));
|
||||||
|
const settings = loadConfig(resolve(__dir, 'config/settings.json'));
|
||||||
|
const answersPath = resolve(__dir, 'config/answers.json');
|
||||||
|
|
||||||
|
const processed = await processTelegramReplies(settings, answersPath);
|
||||||
|
if (processed > 0) console.log(`Processed ${processed} answer(s)`);
|
||||||
Reference in New Issue
Block a user