From 253d1888e911b0df7f28d2ef118367a16c1d43ef Mon Sep 17 00:00:00 2001 From: Matthew Jackson Date: Fri, 6 Mar 2026 21:56:37 -0800 Subject: [PATCH] Add S3-backed storage to prevent data loss - New lib/storage.mjs: async S3 backup on every queue/log save - Versioned S3 bucket (claw-apply-data) keeps every revision - Auto-restore from S3 if local file is missing or corrupt - saveQueue/saveLog now validate data type before writing (prevents the exact bug that corrupted the queue) - IAM role attached to EC2 instance for credential-free S3 access - Config: storage.type = "local" (default) or "s3" Co-Authored-By: Claude Opus 4.6 --- config/settings.example.json | 6 ++ job_applier.mjs | 3 +- job_filter.mjs | 3 +- job_searcher.mjs | 3 +- lib/queue.mjs | 61 +++++++++++++- lib/storage.mjs | 155 +++++++++++++++++++++++++++++++++++ package.json | 1 + 7 files changed, 228 insertions(+), 4 deletions(-) create mode 100644 lib/storage.mjs diff --git a/config/settings.example.json b/config/settings.example.json index cb9e727..b55b1a0 100644 --- a/config/settings.example.json +++ b/config/settings.example.json @@ -25,5 +25,11 @@ "browser": { "provider": "kernel", "playwright_path": null + }, + "storage": { + "type": "local", + "_note": "Set type to 's3' for S3-backed storage. Requires @aws-sdk/client-s3 and IAM permissions.", + "bucket": "claw-apply-data", + "region": "us-west-2" } } diff --git a/job_applier.mjs b/job_applier.mjs index bb5d07d..50b6aa7 100644 --- a/job_applier.mjs +++ b/job_applier.mjs @@ -19,7 +19,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr); process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; -import { getJobsByStatus, updateJobStatus, appendLog, loadConfig, isAlreadyApplied } from './lib/queue.mjs'; +import { getJobsByStatus, updateJobStatus, appendLog, loadConfig, isAlreadyApplied, initQueueFromS3 } from './lib/queue.mjs'; import { acquireLock } from './lib/lock.mjs'; import { createBrowser } from './lib/browser.mjs'; import { ensureAuth } from './lib/session.mjs'; @@ -43,6 +43,7 @@ async function main() { const lock = acquireLock('applier', resolve(__dir, 'data')); const settings = loadConfig(resolve(__dir, 'config/settings.json')); + await initQueueFromS3(settings); const profile = loadConfig(resolve(__dir, 'config/profile.json')); const answersPath = resolve(__dir, 'config/answers.json'); const answers = existsSync(answersPath) ? loadConfig(answersPath) : []; diff --git a/job_filter.mjs b/job_filter.mjs index e84bd17..5593371 100644 --- a/job_filter.mjs +++ b/job_filter.mjs @@ -30,7 +30,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr); process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; -import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter } from './lib/queue.mjs'; +import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter, initQueueFromS3 } from './lib/queue.mjs'; import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs'; import { sendTelegram, formatFilterSummary } from './lib/notify.mjs'; import { DEFAULT_FILTER_MODEL, DEFAULT_FILTER_MIN_SCORE } from './lib/constants.mjs'; @@ -305,6 +305,7 @@ async function main() { } const settings = loadConfig(resolve(__dir, 'config/settings.json')); + await initQueueFromS3(settings); const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json')); const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json')); diff --git a/job_searcher.mjs b/job_searcher.mjs index a35f70f..ee01bc5 100644 --- a/job_searcher.mjs +++ b/job_searcher.mjs @@ -20,7 +20,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr); process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; -import { addJobs, loadQueue, loadConfig, getJobsByStatus, updateJobStatus } from './lib/queue.mjs'; +import { addJobs, loadQueue, loadConfig, getJobsByStatus, updateJobStatus, initQueueFromS3 } from './lib/queue.mjs'; import { writeFileSync, readFileSync, existsSync } from 'fs'; import { acquireLock } from './lib/lock.mjs'; import { createBrowser } from './lib/browser.mjs'; @@ -60,6 +60,7 @@ async function main() { const startedAt = Date.now(); const settings = loadConfig(resolve(__dir, 'config/settings.json')); + await initQueueFromS3(settings); const writeLastRun = (finished = false) => { const entry = { diff --git a/lib/queue.mjs b/lib/queue.mjs index 787d453..3516417 100644 --- a/lib/queue.mjs +++ b/lib/queue.mjs @@ -2,10 +2,12 @@ * queue.mjs — Job queue management * Handles jobs_queue.json read/write/update * Uses in-memory cache to avoid redundant disk I/O within a run. + * S3-backed: every write syncs to versioned S3 bucket (never lose data). */ import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs'; import { dirname, resolve } from 'path'; import { fileURLToPath } from 'url'; +import { initStorage, backupToS3, loadJSONSafe } from './storage.mjs'; const __dir = dirname(fileURLToPath(import.meta.url)); const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`; @@ -94,13 +96,62 @@ function applyPendingUpdates(queue) { export function loadQueue() { if (_queueCache) return _queueCache; ensureDir(QUEUE_PATH); - _queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : []; + + let data = null; + if (existsSync(QUEUE_PATH)) { + try { + const raw = readFileSync(QUEUE_PATH, 'utf8'); + const parsed = JSON.parse(raw); + if (!Array.isArray(parsed)) throw new Error(`Expected array, got ${typeof parsed}`); + data = parsed; + } catch (err) { + console.warn(`⚠️ Queue file corrupt: ${err.message} — will attempt S3 restore`); + } + } + + // S3 restore handled async at startup via initQueueFromS3() — for sync path, use empty array + _queueCache = data || []; if (applyPendingUpdates(_queueCache)) { saveQueue(_queueCache); } return _queueCache; } +/** + * Async queue initialization with S3 fallback. + * Call once at startup before processing jobs. + */ +export async function initQueueFromS3(settings) { + initStorage(settings); + + // If local queue is missing or empty, try S3 + let needsRestore = false; + if (!existsSync(QUEUE_PATH)) { + needsRestore = true; + } else { + try { + const raw = readFileSync(QUEUE_PATH, 'utf8'); + const parsed = JSON.parse(raw); + if (!Array.isArray(parsed)) needsRestore = true; + } catch { + needsRestore = true; + } + } + + if (needsRestore) { + const restored = await loadJSONSafe(QUEUE_PATH, []); + if (Array.isArray(restored) && restored.length > 0) { + _queueCache = restored; + console.log(`✅ Queue restored from S3: ${restored.length} jobs`); + } + } + + // Also ensure applications log is backed up + if (existsSync(LOG_PATH)) { + backupToS3(LOG_PATH); + } +} + /** * Force a fresh read from disk + apply pending updates. * Call this between iterations in long-running processes to pick up @@ -112,9 +163,13 @@ export function reloadQueue() { } export function saveQueue(jobs) { + if (!Array.isArray(jobs)) { + throw new Error(`saveQueue: expected array, got ${typeof jobs} — refusing to write corrupt data`); + } ensureDir(QUEUE_PATH); atomicWriteJSON(QUEUE_PATH, jobs); _queueCache = jobs; + backupToS3(QUEUE_PATH); } function loadLog() { @@ -125,9 +180,13 @@ function loadLog() { } function saveLog(log) { + if (!Array.isArray(log)) { + throw new Error(`saveLog: expected array, got ${typeof log} — refusing to write corrupt data`); + } ensureDir(LOG_PATH); atomicWriteJSON(LOG_PATH, log); _logCache = log; + backupToS3(LOG_PATH); } export function appendLog(entry) { diff --git a/lib/storage.mjs b/lib/storage.mjs new file mode 100644 index 0000000..5e347e8 --- /dev/null +++ b/lib/storage.mjs @@ -0,0 +1,155 @@ +/** + * storage.mjs — S3-backed data storage + * + * Local files remain the primary read/write path (fast). + * Every write syncs to S3 asynchronously (versioned bucket — never lose data). + * On load, if local file is missing or corrupt, auto-restores from S3. + * + * Config in settings.json: + * storage: { type: "s3", bucket: "claw-apply-data", region: "us-west-2" } + * or + * storage: { type: "local" } (default, no backup) + */ +import { readFileSync, writeFileSync, existsSync } from 'fs'; +import { basename } from 'path'; + +let _s3Client = null; +let _config = null; + +/** + * Initialize storage with settings. Call once at startup. + */ +export function initStorage(settings) { + _config = settings?.storage || { type: 'local' }; +} + +function getS3Key(filePath) { + return `data/${basename(filePath)}`; +} + +async function getS3Client() { + if (_s3Client) return _s3Client; + if (!_config || _config.type !== 's3') return null; + + try { + const { S3Client: Client, PutObjectCommand, GetObjectCommand } = await import('@aws-sdk/client-s3'); + _s3Client = { + client: new Client({ region: _config.region || 'us-west-2' }), + PutObjectCommand, + GetObjectCommand, + }; + return _s3Client; + } catch { + console.warn('⚠️ @aws-sdk/client-s3 not installed — S3 backup disabled'); + return null; + } +} + +/** + * Upload a local file to S3 (async, non-blocking). + * Versioned bucket keeps every revision. + */ +export function backupToS3(filePath) { + if (!_config || _config.type !== 's3') return; + + // Fire and forget — don't block the caller + _doBackup(filePath).catch(err => { + console.warn(`⚠️ S3 backup failed for ${basename(filePath)}: ${err.message}`); + }); +} + +async function _doBackup(filePath) { + const s3 = await getS3Client(); + if (!s3) return; + + const body = readFileSync(filePath, 'utf8'); + + // Safety: don't upload obviously corrupt data + if (body.length < 3) { + console.warn(`⚠️ Refusing to backup ${basename(filePath)} — file too small (${body.length} bytes)`); + return; + } + + await s3.client.send(new s3.PutObjectCommand({ + Bucket: _config.bucket, + Key: getS3Key(filePath), + Body: body, + ContentType: 'application/json', + })); +} + +/** + * Restore a file from S3 to local disk. + * Returns true if restored, false if no S3 copy exists. + */ +export async function restoreFromS3(filePath) { + if (!_config || _config.type !== 's3') return false; + + const s3 = await getS3Client(); + if (!s3) return false; + + try { + const response = await s3.client.send(new s3.GetObjectCommand({ + Bucket: _config.bucket, + Key: getS3Key(filePath), + })); + const body = await response.Body.transformToString(); + + // Validate it's real JSON before writing + JSON.parse(body); + writeFileSync(filePath, body); + console.log(`✅ Restored ${basename(filePath)} from S3 (${body.length} bytes)`); + return true; + } catch (err) { + if (err.name === 'NoSuchKey') return false; + console.warn(`⚠️ S3 restore failed for ${basename(filePath)}: ${err.message}`); + return false; + } +} + +/** + * Safe JSON file loader with S3 fallback. + * If local file is missing or corrupt, tries S3 restore. + */ +export async function loadJSONSafe(filePath, defaultValue = []) { + // Try local first + if (existsSync(filePath)) { + try { + const raw = readFileSync(filePath, 'utf8'); + const parsed = JSON.parse(raw); + // Validate it's the expected type (array for queue/log) + if (Array.isArray(defaultValue) && !Array.isArray(parsed)) { + throw new Error(`Expected array, got ${typeof parsed}`); + } + return parsed; + } catch (err) { + console.warn(`⚠️ Local ${basename(filePath)} is corrupt: ${err.message}`); + console.warn(` Attempting S3 restore...`); + } + } + + // Local missing or corrupt — try S3 + const restored = await restoreFromS3(filePath); + if (restored) { + return JSON.parse(readFileSync(filePath, 'utf8')); + } + + return defaultValue; +} + +/** + * Safe JSON file writer with validation + S3 backup. + * Validates data type before writing to prevent corruption. + */ +export function saveJSONSafe(filePath, data) { + // Validate: never write non-object/non-array data to queue/log files + if (typeof data === 'string') { + throw new Error(`Refusing to save string to ${basename(filePath)} — data corruption prevented`); + } + if (data === null || data === undefined) { + throw new Error(`Refusing to save ${data} to ${basename(filePath)} — data corruption prevented`); + } + + writeFileSync(filePath, JSON.stringify(data, null, 2)); + backupToS3(filePath); +} diff --git a/package.json b/package.json index 36bf17a..8fee8bf 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ "apply": "node job_applier.mjs" }, "dependencies": { + "@aws-sdk/client-s3": "^3.700.0", "@onkernel/sdk": "^0.15.0", "playwright": "^1.40.0" },