diff --git a/job_applier.mjs b/job_applier.mjs index 50b6aa7..050fecf 100644 --- a/job_applier.mjs +++ b/job_applier.mjs @@ -19,7 +19,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr); process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; -import { getJobsByStatus, updateJobStatus, appendLog, loadConfig, isAlreadyApplied, initQueueFromS3 } from './lib/queue.mjs'; +import { getJobsByStatus, updateJobStatus, appendLog, loadConfig, isAlreadyApplied, initQueue } from './lib/queue.mjs'; import { acquireLock } from './lib/lock.mjs'; import { createBrowser } from './lib/browser.mjs'; import { ensureAuth } from './lib/session.mjs'; @@ -43,7 +43,7 @@ async function main() { const lock = acquireLock('applier', resolve(__dir, 'data')); const settings = loadConfig(resolve(__dir, 'config/settings.json')); - await initQueueFromS3(settings); + await initQueue(settings); const profile = loadConfig(resolve(__dir, 'config/profile.json')); const answersPath = resolve(__dir, 'config/answers.json'); const answers = existsSync(answersPath) ? loadConfig(answersPath) : []; @@ -180,7 +180,7 @@ async function main() { console.error(` ❌ ${platform} auth failed: ${authResult.reason}`); await sendTelegram(settings, `⚠️ *${platform}* auth failed — ${authResult.reason}`).catch(() => {}); for (const job of platformJobs) { - updateJobStatus(job.id, 'new', { retry_reason: 'auth_failed' }); + await updateJobStatus(job.id, 'new', { retry_reason: 'auth_failed' }); } continue; } @@ -190,9 +190,9 @@ async function main() { } for (const job of platformJobs) { - if (isAlreadyApplied(job.id)) { + if (await isAlreadyApplied(job.id)) { console.log(` ⏭️ Already applied — ${job.title} @ ${job.company || '?'}`); - updateJobStatus(job.id, 'already_applied', {}); + await updateJobStatus(job.id, 'already_applied', {}); results.already_applied++; continue; } @@ -261,10 +261,10 @@ async function main() { const retries = (job.retry_count || 0) + 1; if (retries <= maxRetries) { - updateJobStatus(job.id, 'new', { retry_count: retries }); + await updateJobStatus(job.id, 'new', { retry_count: retries }); } else { - updateJobStatus(job.id, 'failed', { error: e.message }); - appendLog({ ...job, status: 'failed', error: e.message }); + await updateJobStatus(job.id, 'failed', { error: e.message }); + await appendLog({ ...job, status: 'failed', error: e.message }); results.failed++; } } @@ -311,8 +311,8 @@ async function handleResult(job, result, results, settings, profile, apiKey) { switch (status) { case 'submitted': console.log(` ✅ Applied!${applyDuration ? ` (${applyDuration}s)` : ''}`); - updateJobStatus(job.id, 'applied', { title, company, applied_at: Date.now(), apply_started_at: applyStartedAt }); - appendLog({ ...job, title, company, status: 'applied', applied_at: Date.now(), apply_started_at: applyStartedAt }); + await updateJobStatus(job.id, 'applied', { title, company, applied_at: Date.now(), apply_started_at: applyStartedAt }); + await appendLog({ ...job, title, company, status: 'applied', applied_at: Date.now(), apply_started_at: applyStartedAt }); results.submitted++; break; @@ -339,12 +339,12 @@ async function handleResult(job, result, results, settings, profile, apiKey) { const telegramMsgId = await sendTelegram(settings, msg); - updateJobStatus(job.id, 'needs_answer', { + await updateJobStatus(job.id, 'needs_answer', { title, company, pending_question, ai_suggested_answer: aiAnswer || null, telegram_message_id: telegramMsgId, }); - appendLog({ ...job, title, company, status: 'needs_answer', pending_question, ai_suggested_answer: aiAnswer }); + await appendLog({ ...job, title, company, status: 'needs_answer', pending_question, ai_suggested_answer: aiAnswer }); results.needs_answer++; console.log(` ⏸️ Question sent to Telegram. Job will retry after you reply.`); break; @@ -352,16 +352,16 @@ async function handleResult(job, result, results, settings, profile, apiKey) { case 'skipped_recruiter_only': console.log(` 🚫 Recruiter-only`); - updateJobStatus(job.id, 'skipped_recruiter_only', { title, company }); - appendLog({ ...job, title, company, status: 'skipped_recruiter_only' }); + await updateJobStatus(job.id, 'skipped_recruiter_only', { title, company }); + await appendLog({ ...job, title, company, status: 'skipped_recruiter_only' }); results.skipped_recruiter++; break; case 'skipped_external_unsupported': { const platform = ats_platform || job.apply_type || 'unknown'; console.log(` ⏭️ External ATS: ${platform}`); - updateJobStatus(job.id, 'skipped_external_unsupported', { title, company, ats_url: externalUrl, ats_platform: platform }); - appendLog({ ...job, title, company, status: 'skipped_external_unsupported', ats_url: externalUrl, ats_platform: platform }); + await updateJobStatus(job.id, 'skipped_external_unsupported', { title, company, ats_url: externalUrl, ats_platform: platform }); + await appendLog({ ...job, title, company, status: 'skipped_external_unsupported', ats_url: externalUrl, ats_platform: platform }); results.skipped_external++; results.atsCounts[platform] = (results.atsCounts[platform] || 0) + 1; break; @@ -369,14 +369,14 @@ async function handleResult(job, result, results, settings, profile, apiKey) { case 'rate_limited': console.log(` ⚠️ LinkedIn Easy Apply daily limit reached — stopping run`); - updateJobStatus(job.id, 'new', { title, company, retry_reason: 'rate_limited' }); + await updateJobStatus(job.id, 'new', { title, company, retry_reason: 'rate_limited' }); results.rate_limited = true; break; case 'closed': console.log(` 🚫 Closed — no longer accepting applications`); - updateJobStatus(job.id, 'closed', { title, company }); - appendLog({ ...job, title, company, status: 'closed' }); + await updateJobStatus(job.id, 'closed', { title, company }); + await appendLog({ ...job, title, company, status: 'closed' }); results.closed = (results.closed || 0) + 1; break; @@ -384,8 +384,8 @@ async function handleResult(job, result, results, settings, profile, apiKey) { case 'skipped_no_apply': case 'skipped_easy_apply_unsupported': console.log(` ⏭️ Skipped — ${status}`); - updateJobStatus(job.id, status, { title, company }); - appendLog({ ...job, title, company, status }); + await updateJobStatus(job.id, status, { title, company }); + await appendLog({ ...job, title, company, status }); results.skipped_no_apply++; break; @@ -393,8 +393,8 @@ async function handleResult(job, result, results, settings, profile, apiKey) { case 'skipped_login_required': case 'skipped_captcha': console.log(` ⏭️ Skipped — ${status.replace('skipped_', '')}`); - updateJobStatus(job.id, status, { title, company }); - appendLog({ ...job, title, company, status }); + await updateJobStatus(job.id, status, { title, company }); + await appendLog({ ...job, title, company, status }); results.skipped_other++; break; @@ -407,11 +407,11 @@ async function handleResult(job, result, results, settings, profile, apiKey) { const maxRetry = settings.max_retries ?? DEFAULT_MAX_RETRIES; if (retries <= maxRetry) { console.log(` ⏭️ ${status} — will retry (attempt ${retries}/${maxRetry})`); - updateJobStatus(job.id, 'new', { title, company, retry_count: retries }); + await updateJobStatus(job.id, 'new', { title, company, retry_count: retries }); } else { console.log(` ⏭️ ${status} — max retries reached`); - updateJobStatus(job.id, status, { title, company }); - appendLog({ ...job, title, company, status }); + await updateJobStatus(job.id, status, { title, company }); + await appendLog({ ...job, title, company, status }); } results.skipped_other++; break; @@ -419,8 +419,8 @@ async function handleResult(job, result, results, settings, profile, apiKey) { default: console.warn(` ⚠️ Unhandled status: ${status}`); - updateJobStatus(job.id, status, { title, company }); - appendLog({ ...job, title, company, status }); + await updateJobStatus(job.id, status, { title, company }); + await appendLog({ ...job, title, company, status }); results.skipped_other++; } } diff --git a/job_filter.mjs b/job_filter.mjs index 5593371..f6587d1 100644 --- a/job_filter.mjs +++ b/job_filter.mjs @@ -30,7 +30,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr); process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; -import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter, initQueueFromS3 } from './lib/queue.mjs'; +import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter, initQueue } from './lib/queue.mjs'; import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs'; import { sendTelegram, formatFilterSummary } from './lib/notify.mjs'; import { DEFAULT_FILTER_MODEL, DEFAULT_FILTER_MIN_SCORE } from './lib/constants.mjs'; @@ -163,10 +163,10 @@ async function collect(state, settings) { else { filtered++; job.status = 'filtered'; } } - saveQueue(queue); + await saveQueue(queue); // Dedup cross-track copies — keep highest-scoring version of each job - const duped = dedupeAfterFilter(); + const duped = await dedupeAfterFilter(); if (duped > 0) console.log(` Deduped ${duped} cross-track copies`); clearState(); @@ -221,7 +221,7 @@ async function submit(settings, searchConfig, candidateProfile) { } } if (cleared > 0) { - saveQueue(queue); + await saveQueue(queue); console.log(`🔄 Cleared ${cleared} stale batch markers (batch completed without scoring)`); } } @@ -279,7 +279,7 @@ async function submit(settings, searchConfig, candidateProfile) { job.filter_submitted_at = submittedAt; } } - saveQueue(allJobs); + await saveQueue(allJobs); const batchSummary = submitted.map(b => `${b.track}: ${b.jobCount} jobs`).join(', '); console.log(` ${batchSummary}`); @@ -305,7 +305,7 @@ async function main() { } const settings = loadConfig(resolve(__dir, 'config/settings.json')); - await initQueueFromS3(settings); + await initQueue(settings); const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json')); const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json')); diff --git a/job_searcher.mjs b/job_searcher.mjs index ee01bc5..674cba6 100644 --- a/job_searcher.mjs +++ b/job_searcher.mjs @@ -20,7 +20,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr); process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; -import { addJobs, loadQueue, loadConfig, getJobsByStatus, updateJobStatus, initQueueFromS3 } from './lib/queue.mjs'; +import { addJobs, loadQueue, loadConfig, getJobsByStatus, updateJobStatus, initQueue } from './lib/queue.mjs'; import { writeFileSync, readFileSync, existsSync } from 'fs'; import { acquireLock } from './lib/lock.mjs'; import { createBrowser } from './lib/browser.mjs'; @@ -60,7 +60,7 @@ async function main() { const startedAt = Date.now(); const settings = loadConfig(resolve(__dir, 'config/settings.json')); - await initQueueFromS3(settings); + await initQueue(settings); const writeLastRun = (finished = false) => { const entry = { @@ -184,8 +184,8 @@ async function main() { const effectiveSearch = { ...search, keywords: search.keywords.slice(keywordStart), keywordOffset: keywordStart, filters: { ...search.filters, posted_within_days: lookbackDays } }; let queryFound = 0, queryAdded = 0; await searchLinkedIn(liBrowser.page, effectiveSearch, { - onPage: (pageJobs) => { - const added = addJobs(pageJobs); + onPage: async (pageJobs) => { + const added = await addJobs(pageJobs); totalAdded += added; totalSeen += pageJobs.length; queryFound += pageJobs.length; @@ -209,7 +209,7 @@ async function main() { if (unclassified.length > 0) { console.log(`\n🔍 Classifying ${unclassified.length} external jobs...`); const { classified, remaining } = await classifyExternalJobs(liBrowser.page, unclassified, (job, applyType, applyUrl) => { - updateJobStatus(job.id, 'new', { apply_type: applyType, apply_url: applyUrl }); + await updateJobStatus(job.id, 'new', { apply_type: applyType, apply_url: applyUrl }); }); console.log(` ✅ Classified ${classified}, ${remaining} still unknown`); } @@ -241,8 +241,8 @@ async function main() { const effectiveSearch = { ...search, filters: { ...search.filters, posted_within_days: lookbackDays } }; let queryFound = 0, queryAdded = 0; await searchWellfound(wfBrowser.page, effectiveSearch, { - onPage: (pageJobs) => { - const added = addJobs(pageJobs); + onPage: async (pageJobs) => { + const added = await addJobs(pageJobs); totalAdded += added; totalSeen += pageJobs.length; queryFound += pageJobs.length; diff --git a/lib/queue.mjs b/lib/queue.mjs index 3516417..faef2f2 100644 --- a/lib/queue.mjs +++ b/lib/queue.mjs @@ -1,13 +1,18 @@ /** * queue.mjs — Job queue management * Handles jobs_queue.json read/write/update - * Uses in-memory cache to avoid redundant disk I/O within a run. - * S3-backed: every write syncs to versioned S3 bucket (never lose data). + * + * Storage is pluggable via settings.storage: + * { type: "local" } — reads/writes to local disk (default) + * { type: "s3", bucket: "...", region: "..." } — S3 is primary store + * + * In-memory cache avoids redundant I/O within a run. + * Call initStorage() once at startup before any queue operations. */ import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs'; import { dirname, resolve } from 'path'; import { fileURLToPath } from 'url'; -import { initStorage, backupToS3, loadJSONSafe } from './storage.mjs'; +import { initStorage as _initStorage, loadJSON, saveJSON, storageType } from './storage.mjs'; const __dir = dirname(fileURLToPath(import.meta.url)); const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`; @@ -40,33 +45,40 @@ function ensureDir(path) { if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); } -/** - * Atomic write — writes to a temp file then renames. - * Prevents corruption if two processes write simultaneously or the process - * crashes mid-write. rename() is atomic on POSIX filesystems. - */ -function atomicWriteJSON(filePath, data) { - const tmp = filePath + '.tmp'; - writeFileSync(tmp, JSON.stringify(data, null, 2)); - renameSync(tmp, filePath); -} - // --- In-memory caches (populated on first read, invalidated on write) --- let _queueCache = null; let _logCache = null; +let _initialized = false; + +/** + * Initialize storage. Must be called (and awaited) once at startup + * before any queue operations. + */ +export async function initQueue(settings) { + _initStorage(settings); + + // Load queue and log from primary storage (S3 or local) + _queueCache = await loadJSON(QUEUE_PATH, []); + _logCache = await loadJSON(LOG_PATH, []); + + // Apply any pending sidecar updates (local-only mechanism) + if (applyPendingUpdates(_queueCache)) { + await saveQueue(_queueCache); + } + + _initialized = true; + const type = storageType(); + console.log(`📦 Storage: ${type}${type === 's3' ? ` (${settings.storage.bucket})` : ''} — ${_queueCache.length} jobs loaded`); +} /** * Apply pending updates from the sidecar JSONL file. * Secondary processes (e.g. standalone classifier) write updates here * instead of modifying jobs_queue.json directly, avoiding write conflicts. - * - * Uses rename-then-read to avoid losing updates appended between read and delete. - * Each line is JSON: { id, ...fields } */ function applyPendingUpdates(queue) { if (!existsSync(UPDATES_PATH)) return false; - // Atomically claim the file by renaming it — new appends go to a fresh file const claimedPath = UPDATES_PATH + '.applying'; try { renameSync(UPDATES_PATH, claimedPath); } catch { return false; } @@ -93,118 +105,72 @@ function applyPendingUpdates(queue) { return applied > 0; } +/** + * Get the in-memory queue. Must call initQueue() first. + */ export function loadQueue() { - if (_queueCache) return _queueCache; - ensureDir(QUEUE_PATH); - - let data = null; - if (existsSync(QUEUE_PATH)) { - try { - const raw = readFileSync(QUEUE_PATH, 'utf8'); - const parsed = JSON.parse(raw); - if (!Array.isArray(parsed)) throw new Error(`Expected array, got ${typeof parsed}`); - data = parsed; - } catch (err) { - console.warn(`⚠️ Queue file corrupt: ${err.message} — will attempt S3 restore`); + if (!_initialized) { + // Fallback for code that hasn't been updated to call initQueue yet + ensureDir(QUEUE_PATH); + if (!_queueCache) { + _queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : []; + if (!Array.isArray(_queueCache)) _queueCache = []; } } - - // S3 restore handled async at startup via initQueueFromS3() — for sync path, use empty array - _queueCache = data || []; - if (applyPendingUpdates(_queueCache)) { - saveQueue(_queueCache); - } return _queueCache; } /** - * Async queue initialization with S3 fallback. - * Call once at startup before processing jobs. + * Force a fresh read from storage + apply pending updates. + * Call this between iterations in long-running processes. */ -export async function initQueueFromS3(settings) { - initStorage(settings); - - // If local queue is missing or empty, try S3 - let needsRestore = false; - if (!existsSync(QUEUE_PATH)) { - needsRestore = true; - } else { - try { - const raw = readFileSync(QUEUE_PATH, 'utf8'); - const parsed = JSON.parse(raw); - if (!Array.isArray(parsed)) needsRestore = true; - } catch { - needsRestore = true; - } - } - - if (needsRestore) { - const restored = await loadJSONSafe(QUEUE_PATH, []); - if (Array.isArray(restored) && restored.length > 0) { - _queueCache = restored; - console.log(`✅ Queue restored from S3: ${restored.length} jobs`); - } - } - - // Also ensure applications log is backed up - if (existsSync(LOG_PATH)) { - backupToS3(LOG_PATH); +export async function reloadQueue() { + _queueCache = await loadJSON(QUEUE_PATH, []); + if (applyPendingUpdates(_queueCache)) { + await saveQueue(_queueCache); } + return _queueCache; } /** - * Force a fresh read from disk + apply pending updates. - * Call this between iterations in long-running processes to pick up - * sidecar updates from secondary processes. + * Save the queue to primary storage. */ -export function reloadQueue() { - _queueCache = null; - return loadQueue(); -} - -export function saveQueue(jobs) { +export async function saveQueue(jobs) { if (!Array.isArray(jobs)) { throw new Error(`saveQueue: expected array, got ${typeof jobs} — refusing to write corrupt data`); } - ensureDir(QUEUE_PATH); - atomicWriteJSON(QUEUE_PATH, jobs); + await saveJSON(QUEUE_PATH, jobs); _queueCache = jobs; - backupToS3(QUEUE_PATH); } -function loadLog() { +async function loadLog() { if (_logCache) return _logCache; - ensureDir(LOG_PATH); - _logCache = existsSync(LOG_PATH) ? JSON.parse(readFileSync(LOG_PATH, 'utf8')) : []; + _logCache = await loadJSON(LOG_PATH, []); return _logCache; } -function saveLog(log) { +async function saveLog(log) { if (!Array.isArray(log)) { throw new Error(`saveLog: expected array, got ${typeof log} — refusing to write corrupt data`); } - ensureDir(LOG_PATH); - atomicWriteJSON(LOG_PATH, log); + await saveJSON(LOG_PATH, log); _logCache = log; - backupToS3(LOG_PATH); } -export function appendLog(entry) { - const log = loadLog(); +export async function appendLog(entry) { + const log = await loadLog(); log.push({ ...entry, logged_at: new Date().toISOString() }); - saveLog(log); + await saveLog(log); } /** * After AI filtering, deduplicate jobs that exist on multiple tracks. - * For each group sharing the same original job URL, keep the highest-scoring copy. - * Marks losers as status='duplicate'. Call this after collect completes. */ -export function dedupeAfterFilter() { +export async function dedupeAfterFilter() { _queueCache = null; - const queue = loadQueue(); + const queue = await loadJSON(QUEUE_PATH, []); + _queueCache = queue; - // Group by URL (canonical dedup key) const byUrl = {}; for (const job of queue) { if (!job.url) continue; @@ -215,10 +181,8 @@ export function dedupeAfterFilter() { let deduped = 0; for (const jobs of Object.values(byUrl)) { if (jobs.length < 2) continue; - // Only dedup if ALL copies have been scored — skip groups with unscored members if (jobs.some(j => j.filter_score == null && j.status !== 'filtered')) continue; - // Keep the one with highest filter_score; if tied, prefer 'new' over 'filtered' jobs.sort((a, b) => { const sa = a.filter_score ?? -1; const sb = b.filter_score ?? -1; @@ -226,7 +190,6 @@ export function dedupeAfterFilter() { if (a.status === 'new' && b.status !== 'new') return -1; return 1; }); - // Mark losers as duplicate for (const loser of jobs.slice(1)) { loser.status = 'duplicate'; loser.status_updated_at = new Date().toISOString(); @@ -234,12 +197,12 @@ export function dedupeAfterFilter() { } } - if (deduped > 0) saveQueue(queue); + if (deduped > 0) await saveQueue(queue); return deduped; } -export function isAlreadyApplied(jobId) { - const log = loadLog(); +export async function isAlreadyApplied(jobId) { + const log = await loadLog(); return log.some(e => e.id === jobId && e.status === 'applied'); } @@ -249,7 +212,7 @@ export function getJobsByStatus(status) { return queue.filter(j => j.status === status); } -export function updateJobStatus(id, status, extra = {}) { +export async function updateJobStatus(id, status, extra = {}) { const queue = loadQueue(); const idx = queue.findIndex(j => j.id === id); if (idx === -1) return; @@ -259,31 +222,24 @@ export function updateJobStatus(id, status, extra = {}) { status, status_updated_at: new Date().toISOString(), }; - saveQueue(queue); + await saveQueue(queue); return queue[idx]; } /** * Write a pending update to the sidecar JSONL file. - * Use this from secondary processes (standalone scripts) instead of - * calling updateJobStatus/saveQueue directly. The primary process - * (searcher/applier) will pick up these updates on next loadQueue(). - * - * @param {string} id — job id - * @param {object} fields — fields to merge (e.g. { status: 'new', apply_type: 'lever', apply_url: '...' }) + * Use this from secondary processes instead of calling updateJobStatus directly. */ export function writePendingUpdate(id, fields) { ensureDir(UPDATES_PATH); appendFileSync(UPDATES_PATH, JSON.stringify({ id, ...fields }) + '\n'); } -export function addJobs(newJobs) { - // Always read fresh from disk to avoid clobbering concurrent writes (e.g. filter scoring) - _queueCache = null; - const queue = loadQueue(); +export async function addJobs(newJobs) { + // Reload fresh to avoid clobbering concurrent writes + _queueCache = await loadJSON(QUEUE_PATH, []); + const queue = _queueCache; - // Dedup key: same job.id + same track = skip (duplicate search hit for same track) - // Same job.id but different track = allow (will be deduped after AI filter, keeping best score) const existingKeys = new Set(queue.map(j => `${j.track || 'ae'}::${j.id}`)); let added = 0; @@ -293,8 +249,6 @@ export function addJobs(newJobs) { if (existingKeys.has(key)) continue; existingKeys.add(key); - // If this job.id already exists on a different track, give this copy a composite id - // so filter batch custom_ids don't collide const idConflict = queue.some(j => j.id === job.id && (j.track || 'ae') !== track); const queueId = idConflict ? `${job.id}_${track}` : job.id; @@ -312,6 +266,6 @@ export function addJobs(newJobs) { added++; } - saveQueue(queue); + await saveQueue(queue); return added; } diff --git a/lib/storage.mjs b/lib/storage.mjs index 5e347e8..81dc4fa 100644 --- a/lib/storage.mjs +++ b/lib/storage.mjs @@ -1,155 +1,108 @@ /** - * storage.mjs — S3-backed data storage + * storage.mjs — Pluggable data storage (local disk or S3) * - * Local files remain the primary read/write path (fast). - * Every write syncs to S3 asynchronously (versioned bucket — never lose data). - * On load, if local file is missing or corrupt, auto-restores from S3. + * When type is "local": reads/writes go to local disk (default). + * When type is "s3": S3 is the primary store. No local files for data. + * - Versioned bucket means every write is recoverable. + * - In-memory cache in queue.mjs handles read performance. * * Config in settings.json: * storage: { type: "s3", bucket: "claw-apply-data", region: "us-west-2" } - * or - * storage: { type: "local" } (default, no backup) + * storage: { type: "local" } (default) */ import { readFileSync, writeFileSync, existsSync } from 'fs'; import { basename } from 'path'; let _s3Client = null; -let _config = null; +let _config = { type: 'local' }; -/** - * Initialize storage with settings. Call once at startup. - */ export function initStorage(settings) { _config = settings?.storage || { type: 'local' }; } +export function storageType() { + return _config?.type || 'local'; +} + function getS3Key(filePath) { return `data/${basename(filePath)}`; } async function getS3Client() { if (_s3Client) return _s3Client; - if (!_config || _config.type !== 's3') return null; - - try { - const { S3Client: Client, PutObjectCommand, GetObjectCommand } = await import('@aws-sdk/client-s3'); - _s3Client = { - client: new Client({ region: _config.region || 'us-west-2' }), - PutObjectCommand, - GetObjectCommand, - }; - return _s3Client; - } catch { - console.warn('⚠️ @aws-sdk/client-s3 not installed — S3 backup disabled'); - return null; - } + const { S3Client: Client, PutObjectCommand, GetObjectCommand } = await import('@aws-sdk/client-s3'); + _s3Client = { + client: new Client({ region: _config.region || 'us-west-2' }), + PutObjectCommand, + GetObjectCommand, + }; + return _s3Client; } /** - * Upload a local file to S3 (async, non-blocking). - * Versioned bucket keeps every revision. + * Load JSON data. Source depends on storage type. */ -export function backupToS3(filePath) { - if (!_config || _config.type !== 's3') return; - - // Fire and forget — don't block the caller - _doBackup(filePath).catch(err => { - console.warn(`⚠️ S3 backup failed for ${basename(filePath)}: ${err.message}`); - }); -} - -async function _doBackup(filePath) { - const s3 = await getS3Client(); - if (!s3) return; - - const body = readFileSync(filePath, 'utf8'); - - // Safety: don't upload obviously corrupt data - if (body.length < 3) { - console.warn(`⚠️ Refusing to backup ${basename(filePath)} — file too small (${body.length} bytes)`); - return; - } - - await s3.client.send(new s3.PutObjectCommand({ - Bucket: _config.bucket, - Key: getS3Key(filePath), - Body: body, - ContentType: 'application/json', - })); -} - -/** - * Restore a file from S3 to local disk. - * Returns true if restored, false if no S3 copy exists. - */ -export async function restoreFromS3(filePath) { - if (!_config || _config.type !== 's3') return false; - - const s3 = await getS3Client(); - if (!s3) return false; - - try { - const response = await s3.client.send(new s3.GetObjectCommand({ - Bucket: _config.bucket, - Key: getS3Key(filePath), - })); - const body = await response.Body.transformToString(); - - // Validate it's real JSON before writing - JSON.parse(body); - writeFileSync(filePath, body); - console.log(`✅ Restored ${basename(filePath)} from S3 (${body.length} bytes)`); - return true; - } catch (err) { - if (err.name === 'NoSuchKey') return false; - console.warn(`⚠️ S3 restore failed for ${basename(filePath)}: ${err.message}`); - return false; - } -} - -/** - * Safe JSON file loader with S3 fallback. - * If local file is missing or corrupt, tries S3 restore. - */ -export async function loadJSONSafe(filePath, defaultValue = []) { - // Try local first - if (existsSync(filePath)) { +export async function loadJSON(filePath, defaultValue = []) { + if (_config.type === 's3') { try { - const raw = readFileSync(filePath, 'utf8'); - const parsed = JSON.parse(raw); - // Validate it's the expected type (array for queue/log) + const s3 = await getS3Client(); + const response = await s3.client.send(new s3.GetObjectCommand({ + Bucket: _config.bucket, + Key: getS3Key(filePath), + })); + const body = await response.Body.transformToString(); + const parsed = JSON.parse(body); if (Array.isArray(defaultValue) && !Array.isArray(parsed)) { throw new Error(`Expected array, got ${typeof parsed}`); } return parsed; } catch (err) { - console.warn(`⚠️ Local ${basename(filePath)} is corrupt: ${err.message}`); - console.warn(` Attempting S3 restore...`); + if (err.name === 'NoSuchKey') return defaultValue; + console.warn(`⚠️ S3 load failed for ${basename(filePath)}: ${err.message}`); + return defaultValue; } } - // Local missing or corrupt — try S3 - const restored = await restoreFromS3(filePath); - if (restored) { - return JSON.parse(readFileSync(filePath, 'utf8')); + // Local storage + if (!existsSync(filePath)) return defaultValue; + try { + const raw = readFileSync(filePath, 'utf8'); + const parsed = JSON.parse(raw); + if (Array.isArray(defaultValue) && !Array.isArray(parsed)) { + throw new Error(`Expected array, got ${typeof parsed}`); + } + return parsed; + } catch (err) { + console.warn(`⚠️ Local ${basename(filePath)} is corrupt: ${err.message}`); + return defaultValue; } - - return defaultValue; } /** - * Safe JSON file writer with validation + S3 backup. - * Validates data type before writing to prevent corruption. + * Save JSON data. Destination depends on storage type. + * Validates data before writing to prevent corruption. */ -export function saveJSONSafe(filePath, data) { - // Validate: never write non-object/non-array data to queue/log files - if (typeof data === 'string') { - throw new Error(`Refusing to save string to ${basename(filePath)} — data corruption prevented`); - } - if (data === null || data === undefined) { - throw new Error(`Refusing to save ${data} to ${basename(filePath)} — data corruption prevented`); +export async function saveJSON(filePath, data) { + if (typeof data === 'string' || data === null || data === undefined) { + throw new Error(`Refusing to save ${typeof data} to ${basename(filePath)} — data corruption prevented`); } - writeFileSync(filePath, JSON.stringify(data, null, 2)); - backupToS3(filePath); + const body = JSON.stringify(data, null, 2); + + if (_config.type === 's3') { + const s3 = await getS3Client(); + await s3.client.send(new s3.PutObjectCommand({ + Bucket: _config.bucket, + Key: getS3Key(filePath), + Body: body, + ContentType: 'application/json', + })); + return; + } + + // Local storage — atomic write + const tmp = filePath + '.tmp'; + writeFileSync(tmp, body); + const { renameSync } = await import('fs'); + renameSync(tmp, filePath); } diff --git a/lib/telegram_answers.mjs b/lib/telegram_answers.mjs index 54dec57..9c0a72a 100644 --- a/lib/telegram_answers.mjs +++ b/lib/telegram_answers.mjs @@ -163,7 +163,7 @@ export async function processTelegramReplies(settings, answersPath) { } if (answersDirty) saveAnswers(answersPath, answers); - if (queueDirty) saveQueue(queue); + if (queueDirty) await saveQueue(queue); saveOffset(maxUpdateId + 1); return processed;