Add S3-backed storage to prevent data loss

- New lib/storage.mjs: async S3 backup on every queue/log save
- Versioned S3 bucket (claw-apply-data) keeps every revision
- Auto-restore from S3 if local file is missing or corrupt
- saveQueue/saveLog now validate data type before writing
  (prevents the exact bug that corrupted the queue)
- IAM role attached to EC2 instance for credential-free S3 access
- Config: storage.type = "local" (default) or "s3"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 21:56:37 -08:00
parent c78586926a
commit 253d1888e9
7 changed files with 228 additions and 4 deletions

View File

@@ -25,5 +25,11 @@
"browser": { "browser": {
"provider": "kernel", "provider": "kernel",
"playwright_path": null "playwright_path": null
},
"storage": {
"type": "local",
"_note": "Set type to 's3' for S3-backed storage. Requires @aws-sdk/client-s3 and IAM permissions.",
"bucket": "claw-apply-data",
"region": "us-west-2"
} }
} }

View File

@@ -19,7 +19,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr);
process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); };
process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); };
import { getJobsByStatus, updateJobStatus, appendLog, loadConfig, isAlreadyApplied } from './lib/queue.mjs'; import { getJobsByStatus, updateJobStatus, appendLog, loadConfig, isAlreadyApplied, initQueueFromS3 } from './lib/queue.mjs';
import { acquireLock } from './lib/lock.mjs'; import { acquireLock } from './lib/lock.mjs';
import { createBrowser } from './lib/browser.mjs'; import { createBrowser } from './lib/browser.mjs';
import { ensureAuth } from './lib/session.mjs'; import { ensureAuth } from './lib/session.mjs';
@@ -43,6 +43,7 @@ async function main() {
const lock = acquireLock('applier', resolve(__dir, 'data')); const lock = acquireLock('applier', resolve(__dir, 'data'));
const settings = loadConfig(resolve(__dir, 'config/settings.json')); const settings = loadConfig(resolve(__dir, 'config/settings.json'));
await initQueueFromS3(settings);
const profile = loadConfig(resolve(__dir, 'config/profile.json')); const profile = loadConfig(resolve(__dir, 'config/profile.json'));
const answersPath = resolve(__dir, 'config/answers.json'); const answersPath = resolve(__dir, 'config/answers.json');
const answers = existsSync(answersPath) ? loadConfig(answersPath) : []; const answers = existsSync(answersPath) ? loadConfig(answersPath) : [];

View File

@@ -30,7 +30,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr);
process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); };
process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); };
import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter } from './lib/queue.mjs'; import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter, initQueueFromS3 } from './lib/queue.mjs';
import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs'; import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs';
import { sendTelegram, formatFilterSummary } from './lib/notify.mjs'; import { sendTelegram, formatFilterSummary } from './lib/notify.mjs';
import { DEFAULT_FILTER_MODEL, DEFAULT_FILTER_MIN_SCORE } from './lib/constants.mjs'; import { DEFAULT_FILTER_MODEL, DEFAULT_FILTER_MIN_SCORE } from './lib/constants.mjs';
@@ -305,6 +305,7 @@ async function main() {
} }
const settings = loadConfig(resolve(__dir, 'config/settings.json')); const settings = loadConfig(resolve(__dir, 'config/settings.json'));
await initQueueFromS3(settings);
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json')); const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json')); const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json'));

View File

@@ -20,7 +20,7 @@ const origStderrWrite = process.stderr.write.bind(process.stderr);
process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); }; process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); };
process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); }; process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); };
import { addJobs, loadQueue, loadConfig, getJobsByStatus, updateJobStatus } from './lib/queue.mjs'; import { addJobs, loadQueue, loadConfig, getJobsByStatus, updateJobStatus, initQueueFromS3 } from './lib/queue.mjs';
import { writeFileSync, readFileSync, existsSync } from 'fs'; import { writeFileSync, readFileSync, existsSync } from 'fs';
import { acquireLock } from './lib/lock.mjs'; import { acquireLock } from './lib/lock.mjs';
import { createBrowser } from './lib/browser.mjs'; import { createBrowser } from './lib/browser.mjs';
@@ -60,6 +60,7 @@ async function main() {
const startedAt = Date.now(); const startedAt = Date.now();
const settings = loadConfig(resolve(__dir, 'config/settings.json')); const settings = loadConfig(resolve(__dir, 'config/settings.json'));
await initQueueFromS3(settings);
const writeLastRun = (finished = false) => { const writeLastRun = (finished = false) => {
const entry = { const entry = {

View File

@@ -2,10 +2,12 @@
* queue.mjs — Job queue management * queue.mjs — Job queue management
* Handles jobs_queue.json read/write/update * Handles jobs_queue.json read/write/update
* Uses in-memory cache to avoid redundant disk I/O within a run. * Uses in-memory cache to avoid redundant disk I/O within a run.
* S3-backed: every write syncs to versioned S3 bucket (never lose data).
*/ */
import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs'; import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs';
import { dirname, resolve } from 'path'; import { dirname, resolve } from 'path';
import { fileURLToPath } from 'url'; import { fileURLToPath } from 'url';
import { initStorage, backupToS3, loadJSONSafe } from './storage.mjs';
const __dir = dirname(fileURLToPath(import.meta.url)); const __dir = dirname(fileURLToPath(import.meta.url));
const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`; const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`;
@@ -94,13 +96,62 @@ function applyPendingUpdates(queue) {
export function loadQueue() { export function loadQueue() {
if (_queueCache) return _queueCache; if (_queueCache) return _queueCache;
ensureDir(QUEUE_PATH); ensureDir(QUEUE_PATH);
_queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : [];
let data = null;
if (existsSync(QUEUE_PATH)) {
try {
const raw = readFileSync(QUEUE_PATH, 'utf8');
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed)) throw new Error(`Expected array, got ${typeof parsed}`);
data = parsed;
} catch (err) {
console.warn(`⚠️ Queue file corrupt: ${err.message} — will attempt S3 restore`);
}
}
// S3 restore handled async at startup via initQueueFromS3() — for sync path, use empty array
_queueCache = data || [];
if (applyPendingUpdates(_queueCache)) { if (applyPendingUpdates(_queueCache)) {
saveQueue(_queueCache); saveQueue(_queueCache);
} }
return _queueCache; return _queueCache;
} }
/**
* Async queue initialization with S3 fallback.
* Call once at startup before processing jobs.
*/
export async function initQueueFromS3(settings) {
initStorage(settings);
// If local queue is missing or empty, try S3
let needsRestore = false;
if (!existsSync(QUEUE_PATH)) {
needsRestore = true;
} else {
try {
const raw = readFileSync(QUEUE_PATH, 'utf8');
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed)) needsRestore = true;
} catch {
needsRestore = true;
}
}
if (needsRestore) {
const restored = await loadJSONSafe(QUEUE_PATH, []);
if (Array.isArray(restored) && restored.length > 0) {
_queueCache = restored;
console.log(`✅ Queue restored from S3: ${restored.length} jobs`);
}
}
// Also ensure applications log is backed up
if (existsSync(LOG_PATH)) {
backupToS3(LOG_PATH);
}
}
/** /**
* Force a fresh read from disk + apply pending updates. * Force a fresh read from disk + apply pending updates.
* Call this between iterations in long-running processes to pick up * Call this between iterations in long-running processes to pick up
@@ -112,9 +163,13 @@ export function reloadQueue() {
} }
export function saveQueue(jobs) { export function saveQueue(jobs) {
if (!Array.isArray(jobs)) {
throw new Error(`saveQueue: expected array, got ${typeof jobs} — refusing to write corrupt data`);
}
ensureDir(QUEUE_PATH); ensureDir(QUEUE_PATH);
atomicWriteJSON(QUEUE_PATH, jobs); atomicWriteJSON(QUEUE_PATH, jobs);
_queueCache = jobs; _queueCache = jobs;
backupToS3(QUEUE_PATH);
} }
function loadLog() { function loadLog() {
@@ -125,9 +180,13 @@ function loadLog() {
} }
function saveLog(log) { function saveLog(log) {
if (!Array.isArray(log)) {
throw new Error(`saveLog: expected array, got ${typeof log} — refusing to write corrupt data`);
}
ensureDir(LOG_PATH); ensureDir(LOG_PATH);
atomicWriteJSON(LOG_PATH, log); atomicWriteJSON(LOG_PATH, log);
_logCache = log; _logCache = log;
backupToS3(LOG_PATH);
} }
export function appendLog(entry) { export function appendLog(entry) {

155
lib/storage.mjs Normal file
View File

@@ -0,0 +1,155 @@
/**
* storage.mjs — S3-backed data storage
*
* Local files remain the primary read/write path (fast).
* Every write syncs to S3 asynchronously (versioned bucket — never lose data).
* On load, if local file is missing or corrupt, auto-restores from S3.
*
* Config in settings.json:
* storage: { type: "s3", bucket: "claw-apply-data", region: "us-west-2" }
* or
* storage: { type: "local" } (default, no backup)
*/
import { readFileSync, writeFileSync, existsSync } from 'fs';
import { basename } from 'path';
let _s3Client = null;
let _config = null;
/**
* Initialize storage with settings. Call once at startup.
*/
export function initStorage(settings) {
_config = settings?.storage || { type: 'local' };
}
function getS3Key(filePath) {
return `data/${basename(filePath)}`;
}
async function getS3Client() {
if (_s3Client) return _s3Client;
if (!_config || _config.type !== 's3') return null;
try {
const { S3Client: Client, PutObjectCommand, GetObjectCommand } = await import('@aws-sdk/client-s3');
_s3Client = {
client: new Client({ region: _config.region || 'us-west-2' }),
PutObjectCommand,
GetObjectCommand,
};
return _s3Client;
} catch {
console.warn('⚠️ @aws-sdk/client-s3 not installed — S3 backup disabled');
return null;
}
}
/**
* Upload a local file to S3 (async, non-blocking).
* Versioned bucket keeps every revision.
*/
export function backupToS3(filePath) {
if (!_config || _config.type !== 's3') return;
// Fire and forget — don't block the caller
_doBackup(filePath).catch(err => {
console.warn(`⚠️ S3 backup failed for ${basename(filePath)}: ${err.message}`);
});
}
async function _doBackup(filePath) {
const s3 = await getS3Client();
if (!s3) return;
const body = readFileSync(filePath, 'utf8');
// Safety: don't upload obviously corrupt data
if (body.length < 3) {
console.warn(`⚠️ Refusing to backup ${basename(filePath)} — file too small (${body.length} bytes)`);
return;
}
await s3.client.send(new s3.PutObjectCommand({
Bucket: _config.bucket,
Key: getS3Key(filePath),
Body: body,
ContentType: 'application/json',
}));
}
/**
* Restore a file from S3 to local disk.
* Returns true if restored, false if no S3 copy exists.
*/
export async function restoreFromS3(filePath) {
if (!_config || _config.type !== 's3') return false;
const s3 = await getS3Client();
if (!s3) return false;
try {
const response = await s3.client.send(new s3.GetObjectCommand({
Bucket: _config.bucket,
Key: getS3Key(filePath),
}));
const body = await response.Body.transformToString();
// Validate it's real JSON before writing
JSON.parse(body);
writeFileSync(filePath, body);
console.log(`✅ Restored ${basename(filePath)} from S3 (${body.length} bytes)`);
return true;
} catch (err) {
if (err.name === 'NoSuchKey') return false;
console.warn(`⚠️ S3 restore failed for ${basename(filePath)}: ${err.message}`);
return false;
}
}
/**
* Safe JSON file loader with S3 fallback.
* If local file is missing or corrupt, tries S3 restore.
*/
export async function loadJSONSafe(filePath, defaultValue = []) {
// Try local first
if (existsSync(filePath)) {
try {
const raw = readFileSync(filePath, 'utf8');
const parsed = JSON.parse(raw);
// Validate it's the expected type (array for queue/log)
if (Array.isArray(defaultValue) && !Array.isArray(parsed)) {
throw new Error(`Expected array, got ${typeof parsed}`);
}
return parsed;
} catch (err) {
console.warn(`⚠️ Local ${basename(filePath)} is corrupt: ${err.message}`);
console.warn(` Attempting S3 restore...`);
}
}
// Local missing or corrupt — try S3
const restored = await restoreFromS3(filePath);
if (restored) {
return JSON.parse(readFileSync(filePath, 'utf8'));
}
return defaultValue;
}
/**
* Safe JSON file writer with validation + S3 backup.
* Validates data type before writing to prevent corruption.
*/
export function saveJSONSafe(filePath, data) {
// Validate: never write non-object/non-array data to queue/log files
if (typeof data === 'string') {
throw new Error(`Refusing to save string to ${basename(filePath)} — data corruption prevented`);
}
if (data === null || data === undefined) {
throw new Error(`Refusing to save ${data} to ${basename(filePath)} — data corruption prevented`);
}
writeFileSync(filePath, JSON.stringify(data, null, 2));
backupToS3(filePath);
}

View File

@@ -9,6 +9,7 @@
"apply": "node job_applier.mjs" "apply": "node job_applier.mjs"
}, },
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.700.0",
"@onkernel/sdk": "^0.15.0", "@onkernel/sdk": "^0.15.0",
"playwright": "^1.40.0" "playwright": "^1.40.0"
}, },