From f14af48905f079b2c769fa785ea1e70c57323c91 Mon Sep 17 00:00:00 2001 From: Matthew Jackson Date: Sat, 7 Mar 2026 08:25:22 -0800 Subject: [PATCH] Route all file I/O through storage layer (S3 or disk) - filter.mjs: loadProfile now async, uses loadJSON - telegram_answers.mjs: answers read/write through storage layer - status.mjs: uses initQueue + loadQueue for S3 support - setup.mjs: await all loadConfig calls - storage.mjs: more robust getS3Key using URL parsing Co-Authored-By: Claude Opus 4.6 --- job_filter.mjs | 2 +- lib/filter.mjs | 8 ++--- lib/storage.mjs | 8 ++--- lib/telegram_answers.mjs | 28 ++++------------ setup.mjs | 6 ++-- status.mjs | 69 +++++++++++++++++++++------------------- 6 files changed, 55 insertions(+), 66 deletions(-) diff --git a/job_filter.mjs b/job_filter.mjs index bc7f0ef..eca36c5 100644 --- a/job_filter.mjs +++ b/job_filter.mjs @@ -238,7 +238,7 @@ async function submit(settings, searchConfig, candidateProfile) { const profilePaths = settings.filter?.job_profiles || {}; const jobProfilesByTrack = {}; for (const [track, path] of Object.entries(profilePaths)) { - const profile = loadProfile(path); + const profile = await loadProfile(path); if (profile) jobProfilesByTrack[track] = profile; else console.warn(`⚠️ Could not load job profile for track "${track}" at ${path}`); } diff --git a/lib/filter.mjs b/lib/filter.mjs index 77cb064..05fee9a 100644 --- a/lib/filter.mjs +++ b/lib/filter.mjs @@ -4,18 +4,18 @@ * Uses Batch API for 50% cost savings + prompt caching for shared context */ -import { readFileSync, existsSync } from 'fs'; import { ANTHROPIC_BATCH_API_URL, FILTER_DESC_MAX_CHARS, FILTER_BATCH_MAX_TOKENS } from './constants.mjs'; +import { loadJSON } from './storage.mjs'; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -export function loadProfile(profilePath) { - if (!profilePath || !existsSync(profilePath)) return null; - try { return JSON.parse(readFileSync(profilePath, 'utf8')); } catch { return null; } +export async function loadProfile(profilePath) { + if (!profilePath) return null; + try { return await loadJSON(profilePath, null); } catch { return null; } } function buildSystemPrompt(jobProfile, candidateProfile) { diff --git a/lib/storage.mjs b/lib/storage.mjs index 8a7ff73..223a8aa 100644 --- a/lib/storage.mjs +++ b/lib/storage.mjs @@ -28,11 +28,11 @@ export function storageType() { function getS3Key(filePath) { // Extract relative path from project root (e.g. config/foo.json or data/bar.json) - const projectRoot = dirname(dirname(import.meta.url.replace('file://', ''))); + const storageUrl = new URL(import.meta.url); + const projectRoot = dirname(dirname(storageUrl.pathname)); const abs = filePath.startsWith('/') ? filePath : join(projectRoot, filePath); - if (abs.startsWith(projectRoot)) { - const rel = abs.slice(projectRoot.length + 1); - return rel; + if (abs.startsWith(projectRoot + '/')) { + return abs.slice(projectRoot.length + 1); } // Fallback: use last two path segments (e.g. data/jobs_queue.json) const parts = filePath.split('/'); diff --git a/lib/telegram_answers.mjs b/lib/telegram_answers.mjs index c92f922..a5ed4c0 100644 --- a/lib/telegram_answers.mjs +++ b/lib/telegram_answers.mjs @@ -10,11 +10,12 @@ * 4. Flip job back to "new" for retry * 5. Send confirmation reply */ -import { existsSync, readFileSync, writeFileSync, renameSync } from 'fs'; +import { existsSync, readFileSync, writeFileSync } from 'fs'; import { dirname, resolve } from 'path'; import { fileURLToPath } from 'url'; import { loadQueue, saveQueue } from './queue.mjs'; +import { loadJSON, saveJSON } from './storage.mjs'; import { getTelegramUpdates, replyTelegram } from './notify.mjs'; const __dir = dirname(fileURLToPath(import.meta.url)); @@ -31,27 +32,10 @@ function saveOffset(offset) { writeFileSync(OFFSET_PATH, JSON.stringify({ offset })); } -function loadAnswers(path) { - if (!existsSync(path)) return []; - const raw = JSON.parse(readFileSync(path, 'utf8')); - // Normalize: support both object {"q":"a"} and array [{pattern,answer}] formats - if (Array.isArray(raw)) return raw; - if (raw && typeof raw === 'object') { - return Object.entries(raw).map(([pattern, answer]) => ({ pattern, answer: String(answer) })); - } - return []; -} - -function saveAnswers(path, answers) { - const tmp = path + '.tmp'; - writeFileSync(tmp, JSON.stringify(answers, null, 2)); - renameSync(tmp, path); -} - /** * Process pending Telegram replies. Returns number of answers processed. * @param {object} settings - settings.json contents - * @param {string} answersPath - absolute path to answers.json + * @param {string} answersPath - path to answers.json (used as storage key) * @returns {number} count of answers saved */ export async function processTelegramReplies(settings, answersPath) { @@ -63,7 +47,7 @@ export async function processTelegramReplies(settings, answersPath) { const updates = await getTelegramUpdates(botToken, offset, 1); if (updates.length === 0) return 0; - // Build lookup: telegram_message_id β†’ job (loadQueue returns cached in-memory queue) + // Build lookup: telegram_message_id -> job const queue = loadQueue(); const jobsByMsgId = new Map(); for (const job of queue) { @@ -74,7 +58,7 @@ export async function processTelegramReplies(settings, answersPath) { let queueDirty = false; let answersDirty = false; - const answers = loadAnswers(answersPath); + const answers = await loadJSON(answersPath, []); let maxUpdateId = offset; let processed = 0; @@ -162,7 +146,7 @@ export async function processTelegramReplies(settings, answersPath) { processed++; } - if (answersDirty) saveAnswers(answersPath, answers); + if (answersDirty) await saveJSON(answersPath, answers); if (queueDirty) await saveQueue(queue); saveOffset(maxUpdateId + 1); diff --git a/setup.mjs b/setup.mjs index 0746207..9f9f93e 100644 --- a/setup.mjs +++ b/setup.mjs @@ -18,9 +18,9 @@ async function main() { // Check configs console.log('Checking config files...'); - const settings = loadConfig(resolve(__dir, 'config/settings.json')); - const profile = loadConfig(resolve(__dir, 'config/profile.json')); - const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json')); + const settings = await loadConfig(resolve(__dir, 'config/settings.json')); + const profile = await loadConfig(resolve(__dir, 'config/profile.json')); + const searchConfig = await loadConfig(resolve(__dir, 'config/search_config.json')); const checks = [ [profile.name?.first && profile.name?.last, 'profile.json: name'], diff --git a/status.mjs b/status.mjs index 544add6..6de96b7 100644 --- a/status.mjs +++ b/status.mjs @@ -9,6 +9,12 @@ import { readFileSync, existsSync } from 'fs'; import { dirname, resolve } from 'path'; import { fileURLToPath } from 'url'; +import { loadEnv } from './lib/env.mjs'; +loadEnv(); + +import { loadConfig, initQueue, loadQueue } from './lib/queue.mjs'; +import { sendTelegram } from './lib/notify.mjs'; + const __dir = dirname(fileURLToPath(import.meta.url)); const jsonMode = process.argv.includes('--json'); @@ -32,7 +38,6 @@ function buildSearchProgress() { const sp = readJson(resolve(__dir, 'data/search_progress.json')); if (!sp) return null; - // Build unique track list from completed + keyword_progress, prefer platform-specific key const seen = new Set(); const tracks = []; @@ -62,10 +67,7 @@ function buildSearchProgress() { return tracks; } -function buildStatus() { - const queue = readJson(resolve(__dir, 'data/jobs_queue.json')) || []; - const log = readJson(resolve(__dir, 'data/applications_log.json')) || []; - +function buildStatus(queue, log) { // Queue breakdown const byStatus = {}; const byPlatform = {}; @@ -162,27 +164,27 @@ function formatReport(s) { // Searcher section const sr = s.searcher; const searcherLine = sr.running - ? `πŸ”„ Running now β€” ${q.total} jobs found so far` + ? `Running now β€” ${q.total} jobs found so far` : sr.last_run?.finished === false - ? `⚠️ Last run interrupted ${timeAgo(sr.last_run?.started_at)} (partial results saved)` - : `⏸️ Last ran ${timeAgo(sr.last_run?.finished_at)}`; + ? `Last run interrupted ${timeAgo(sr.last_run?.started_at)} (partial results saved)` + : `Last ran ${timeAgo(sr.last_run?.finished_at)}`; const lastRunDetail = sr.last_run && !sr.running - ? `β†’ Found ${sr.last_run.added} new jobs (${sr.last_run.seen} seen, ${sr.last_run.skipped_dupes || 0} dupes)` + ? `Found ${sr.last_run.added} new jobs (${sr.last_run.seen} seen, ${sr.last_run.skipped_dupes || 0} dupes)` : null; // Applier section const ar = s.applier; const applierLine = ar.running - ? `πŸ”„ Running now` - : `⏸️ Last ran ${timeAgo(ar.last_run?.finished_at)}`; + ? `Running now` + : `Last ran ${timeAgo(ar.last_run?.finished_at)}`; const lastApplierDetail = ar.last_run && !ar.running - ? `β†’ Applied ${ar.last_run.submitted} jobs in that run` + ? `Applied ${ar.last_run.submitted} jobs in that run` : null; const lines = [ - `πŸ“Š *claw-apply Status*`, + `*claw-apply Status*`, ``, - `πŸ” *Searcher:* ${searcherLine}`, + `*Searcher:* ${searcherLine}`, ]; if (lastRunDetail) lines.push(lastRunDetail); @@ -198,8 +200,8 @@ function formatReport(s) { lines.push(`${platform.charAt(0).toUpperCase() + platform.slice(1)}:`); for (const t of tracks) { const pct = t.total > 0 ? Math.round((t.done / t.total) * 100) : 0; - const bar = t.complete ? 'βœ… done' : `${t.done}/${t.total} keywords (${pct}%)`; - lines.push(`β€’ ${t.track}: ${bar}`); + const bar = t.complete ? 'done' : `${t.done}/${t.total} keywords (${pct}%)`; + lines.push(` ${t.track}: ${bar}`); } } } @@ -208,23 +210,23 @@ function formatReport(s) { const fr = s.filter; let filterLine; if (fr.batch_pending) { - filterLine = `⏳ Batch in flight β€” ${fr.batch_job_count} jobs, submitted ${timeAgo(new Date(fr.submitted_at).getTime())}`; + filterLine = `Batch in flight β€” ${fr.batch_job_count} jobs, submitted ${timeAgo(new Date(fr.submitted_at).getTime())}`; } else if (fr.last_run) { const lr = fr.last_run; - filterLine = `⏸️ Last ran ${timeAgo(new Date(lr.collected_at).getTime())} β€” βœ… ${lr.passed} passed, 🚫 ${lr.filtered} filtered`; + filterLine = `Last ran ${timeAgo(new Date(lr.collected_at).getTime())} β€” ${lr.passed} passed, ${lr.filtered} filtered`; } else { - filterLine = fr.unscored > 0 ? `🟑 ${fr.unscored} jobs awaiting filter` : `⏸️ Never run`; + filterLine = fr.unscored > 0 ? `${fr.unscored} jobs awaiting filter` : `Never run`; } if (fr.model) filterLine += ` (${fr.model.replace('claude-', '').replace(/-\d{8}$/, '')})`; - if (fr.unscored > 0 && !fr.batch_pending) filterLine += ` Β· ${fr.unscored} unscored`; + if (fr.unscored > 0 && !fr.batch_pending) filterLine += ` | ${fr.unscored} unscored`; - lines.push(`πŸ”¬ *Filter:* ${filterLine}`); - lines.push(`πŸš€ *Applier:* ${applierLine}`); + lines.push(`*Filter:* ${filterLine}`); + lines.push(`*Applier:* ${applierLine}`); if (lastApplierDetail) lines.push(lastApplierDetail); - // Queue summary β€” only show non-zero counts + // Queue summary const unique = q.total - (q.duplicate || 0); - lines.push('', `πŸ“‹ *Queue* β€” ${unique} unique jobs (${q.duplicate || 0} dupes)`); + lines.push('', `*Queue* β€” ${unique} unique jobs (${q.duplicate || 0} dupes)`); const queueLines = [ [q.applied, 'Applied'], @@ -254,17 +256,17 @@ function formatReport(s) { unknown: 'Unknown', }; if (sorted.length > 0) { - lines.push(`β€’ Ready to apply: ${q.new}`); + lines.push(`Ready to apply: ${q.new}`); for (const [type, count] of sorted) { - lines.push(`β†’ ${typeNames[type] || type}: ${count}`); + lines.push(` ${typeNames[type] || type}: ${count}`); } } else { - lines.push(`β€’ Ready to apply: ${q.new}`); + lines.push(`Ready to apply: ${q.new}`); } } for (const [count, label] of queueLines) { - if (count > 0) lines.push(`β€’ ${label}: ${count}`); + if (count > 0) lines.push(`${label}: ${count}`); } // Last applied @@ -276,16 +278,19 @@ function formatReport(s) { return lines.join('\n'); } -import { loadConfig } from './lib/queue.mjs'; -import { sendTelegram } from './lib/notify.mjs'; +// Main +const settings = await loadConfig(resolve(__dir, 'config/settings.json')); +await initQueue(settings); +const queue = loadQueue(); +const { loadJSON } = await import('./lib/storage.mjs'); +const log = await loadJSON(resolve(__dir, 'data/applications_log.json'), []); -const status = buildStatus(); +const status = buildStatus(queue, log); if (jsonMode) { console.log(JSON.stringify(status, null, 2)); } else { const report = formatReport(status); - const settings = loadConfig(resolve(__dir, 'config/settings.json')); if (settings.notifications?.bot_token && settings.notifications?.telegram_user_id) { await sendTelegram(settings, report); } else {