Make S3 the primary storage layer (not backup)
storage.mjs is now a single interface: loadJSON() and saveJSON() route to either local disk or S3 based on settings.storage.type. The app never touches disk/S3 directly. - All queue/log functions are now async (saveQueue, appendLog, etc.) - All callers updated with await - Data validation prevents saving corrupt types (strings, nulls) - S3 versioned bucket preserves every write - Config: storage.type = "local" (disk) or "s3" (S3 primary) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
186
lib/queue.mjs
186
lib/queue.mjs
@@ -1,13 +1,18 @@
|
||||
/**
|
||||
* queue.mjs — Job queue management
|
||||
* Handles jobs_queue.json read/write/update
|
||||
* Uses in-memory cache to avoid redundant disk I/O within a run.
|
||||
* S3-backed: every write syncs to versioned S3 bucket (never lose data).
|
||||
*
|
||||
* Storage is pluggable via settings.storage:
|
||||
* { type: "local" } — reads/writes to local disk (default)
|
||||
* { type: "s3", bucket: "...", region: "..." } — S3 is primary store
|
||||
*
|
||||
* In-memory cache avoids redundant I/O within a run.
|
||||
* Call initStorage() once at startup before any queue operations.
|
||||
*/
|
||||
import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs';
|
||||
import { dirname, resolve } from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { initStorage, backupToS3, loadJSONSafe } from './storage.mjs';
|
||||
import { initStorage as _initStorage, loadJSON, saveJSON, storageType } from './storage.mjs';
|
||||
|
||||
const __dir = dirname(fileURLToPath(import.meta.url));
|
||||
const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`;
|
||||
@@ -40,33 +45,40 @@ function ensureDir(path) {
|
||||
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomic write — writes to a temp file then renames.
|
||||
* Prevents corruption if two processes write simultaneously or the process
|
||||
* crashes mid-write. rename() is atomic on POSIX filesystems.
|
||||
*/
|
||||
function atomicWriteJSON(filePath, data) {
|
||||
const tmp = filePath + '.tmp';
|
||||
writeFileSync(tmp, JSON.stringify(data, null, 2));
|
||||
renameSync(tmp, filePath);
|
||||
}
|
||||
|
||||
// --- In-memory caches (populated on first read, invalidated on write) ---
|
||||
let _queueCache = null;
|
||||
let _logCache = null;
|
||||
let _initialized = false;
|
||||
|
||||
/**
|
||||
* Initialize storage. Must be called (and awaited) once at startup
|
||||
* before any queue operations.
|
||||
*/
|
||||
export async function initQueue(settings) {
|
||||
_initStorage(settings);
|
||||
|
||||
// Load queue and log from primary storage (S3 or local)
|
||||
_queueCache = await loadJSON(QUEUE_PATH, []);
|
||||
_logCache = await loadJSON(LOG_PATH, []);
|
||||
|
||||
// Apply any pending sidecar updates (local-only mechanism)
|
||||
if (applyPendingUpdates(_queueCache)) {
|
||||
await saveQueue(_queueCache);
|
||||
}
|
||||
|
||||
_initialized = true;
|
||||
const type = storageType();
|
||||
console.log(`📦 Storage: ${type}${type === 's3' ? ` (${settings.storage.bucket})` : ''} — ${_queueCache.length} jobs loaded`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply pending updates from the sidecar JSONL file.
|
||||
* Secondary processes (e.g. standalone classifier) write updates here
|
||||
* instead of modifying jobs_queue.json directly, avoiding write conflicts.
|
||||
*
|
||||
* Uses rename-then-read to avoid losing updates appended between read and delete.
|
||||
* Each line is JSON: { id, ...fields }
|
||||
*/
|
||||
function applyPendingUpdates(queue) {
|
||||
if (!existsSync(UPDATES_PATH)) return false;
|
||||
|
||||
// Atomically claim the file by renaming it — new appends go to a fresh file
|
||||
const claimedPath = UPDATES_PATH + '.applying';
|
||||
try { renameSync(UPDATES_PATH, claimedPath); } catch { return false; }
|
||||
|
||||
@@ -93,118 +105,72 @@ function applyPendingUpdates(queue) {
|
||||
return applied > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the in-memory queue. Must call initQueue() first.
|
||||
*/
|
||||
export function loadQueue() {
|
||||
if (_queueCache) return _queueCache;
|
||||
ensureDir(QUEUE_PATH);
|
||||
|
||||
let data = null;
|
||||
if (existsSync(QUEUE_PATH)) {
|
||||
try {
|
||||
const raw = readFileSync(QUEUE_PATH, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
if (!Array.isArray(parsed)) throw new Error(`Expected array, got ${typeof parsed}`);
|
||||
data = parsed;
|
||||
} catch (err) {
|
||||
console.warn(`⚠️ Queue file corrupt: ${err.message} — will attempt S3 restore`);
|
||||
if (!_initialized) {
|
||||
// Fallback for code that hasn't been updated to call initQueue yet
|
||||
ensureDir(QUEUE_PATH);
|
||||
if (!_queueCache) {
|
||||
_queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : [];
|
||||
if (!Array.isArray(_queueCache)) _queueCache = [];
|
||||
}
|
||||
}
|
||||
|
||||
// S3 restore handled async at startup via initQueueFromS3() — for sync path, use empty array
|
||||
_queueCache = data || [];
|
||||
if (applyPendingUpdates(_queueCache)) {
|
||||
saveQueue(_queueCache);
|
||||
}
|
||||
return _queueCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Async queue initialization with S3 fallback.
|
||||
* Call once at startup before processing jobs.
|
||||
* Force a fresh read from storage + apply pending updates.
|
||||
* Call this between iterations in long-running processes.
|
||||
*/
|
||||
export async function initQueueFromS3(settings) {
|
||||
initStorage(settings);
|
||||
|
||||
// If local queue is missing or empty, try S3
|
||||
let needsRestore = false;
|
||||
if (!existsSync(QUEUE_PATH)) {
|
||||
needsRestore = true;
|
||||
} else {
|
||||
try {
|
||||
const raw = readFileSync(QUEUE_PATH, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
if (!Array.isArray(parsed)) needsRestore = true;
|
||||
} catch {
|
||||
needsRestore = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (needsRestore) {
|
||||
const restored = await loadJSONSafe(QUEUE_PATH, []);
|
||||
if (Array.isArray(restored) && restored.length > 0) {
|
||||
_queueCache = restored;
|
||||
console.log(`✅ Queue restored from S3: ${restored.length} jobs`);
|
||||
}
|
||||
}
|
||||
|
||||
// Also ensure applications log is backed up
|
||||
if (existsSync(LOG_PATH)) {
|
||||
backupToS3(LOG_PATH);
|
||||
export async function reloadQueue() {
|
||||
_queueCache = await loadJSON(QUEUE_PATH, []);
|
||||
if (applyPendingUpdates(_queueCache)) {
|
||||
await saveQueue(_queueCache);
|
||||
}
|
||||
return _queueCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Force a fresh read from disk + apply pending updates.
|
||||
* Call this between iterations in long-running processes to pick up
|
||||
* sidecar updates from secondary processes.
|
||||
* Save the queue to primary storage.
|
||||
*/
|
||||
export function reloadQueue() {
|
||||
_queueCache = null;
|
||||
return loadQueue();
|
||||
}
|
||||
|
||||
export function saveQueue(jobs) {
|
||||
export async function saveQueue(jobs) {
|
||||
if (!Array.isArray(jobs)) {
|
||||
throw new Error(`saveQueue: expected array, got ${typeof jobs} — refusing to write corrupt data`);
|
||||
}
|
||||
ensureDir(QUEUE_PATH);
|
||||
atomicWriteJSON(QUEUE_PATH, jobs);
|
||||
await saveJSON(QUEUE_PATH, jobs);
|
||||
_queueCache = jobs;
|
||||
backupToS3(QUEUE_PATH);
|
||||
}
|
||||
|
||||
function loadLog() {
|
||||
async function loadLog() {
|
||||
if (_logCache) return _logCache;
|
||||
ensureDir(LOG_PATH);
|
||||
_logCache = existsSync(LOG_PATH) ? JSON.parse(readFileSync(LOG_PATH, 'utf8')) : [];
|
||||
_logCache = await loadJSON(LOG_PATH, []);
|
||||
return _logCache;
|
||||
}
|
||||
|
||||
function saveLog(log) {
|
||||
async function saveLog(log) {
|
||||
if (!Array.isArray(log)) {
|
||||
throw new Error(`saveLog: expected array, got ${typeof log} — refusing to write corrupt data`);
|
||||
}
|
||||
ensureDir(LOG_PATH);
|
||||
atomicWriteJSON(LOG_PATH, log);
|
||||
await saveJSON(LOG_PATH, log);
|
||||
_logCache = log;
|
||||
backupToS3(LOG_PATH);
|
||||
}
|
||||
|
||||
export function appendLog(entry) {
|
||||
const log = loadLog();
|
||||
export async function appendLog(entry) {
|
||||
const log = await loadLog();
|
||||
log.push({ ...entry, logged_at: new Date().toISOString() });
|
||||
saveLog(log);
|
||||
await saveLog(log);
|
||||
}
|
||||
|
||||
/**
|
||||
* After AI filtering, deduplicate jobs that exist on multiple tracks.
|
||||
* For each group sharing the same original job URL, keep the highest-scoring copy.
|
||||
* Marks losers as status='duplicate'. Call this after collect completes.
|
||||
*/
|
||||
export function dedupeAfterFilter() {
|
||||
export async function dedupeAfterFilter() {
|
||||
_queueCache = null;
|
||||
const queue = loadQueue();
|
||||
const queue = await loadJSON(QUEUE_PATH, []);
|
||||
_queueCache = queue;
|
||||
|
||||
// Group by URL (canonical dedup key)
|
||||
const byUrl = {};
|
||||
for (const job of queue) {
|
||||
if (!job.url) continue;
|
||||
@@ -215,10 +181,8 @@ export function dedupeAfterFilter() {
|
||||
let deduped = 0;
|
||||
for (const jobs of Object.values(byUrl)) {
|
||||
if (jobs.length < 2) continue;
|
||||
// Only dedup if ALL copies have been scored — skip groups with unscored members
|
||||
if (jobs.some(j => j.filter_score == null && j.status !== 'filtered')) continue;
|
||||
|
||||
// Keep the one with highest filter_score; if tied, prefer 'new' over 'filtered'
|
||||
jobs.sort((a, b) => {
|
||||
const sa = a.filter_score ?? -1;
|
||||
const sb = b.filter_score ?? -1;
|
||||
@@ -226,7 +190,6 @@ export function dedupeAfterFilter() {
|
||||
if (a.status === 'new' && b.status !== 'new') return -1;
|
||||
return 1;
|
||||
});
|
||||
// Mark losers as duplicate
|
||||
for (const loser of jobs.slice(1)) {
|
||||
loser.status = 'duplicate';
|
||||
loser.status_updated_at = new Date().toISOString();
|
||||
@@ -234,12 +197,12 @@ export function dedupeAfterFilter() {
|
||||
}
|
||||
}
|
||||
|
||||
if (deduped > 0) saveQueue(queue);
|
||||
if (deduped > 0) await saveQueue(queue);
|
||||
return deduped;
|
||||
}
|
||||
|
||||
export function isAlreadyApplied(jobId) {
|
||||
const log = loadLog();
|
||||
export async function isAlreadyApplied(jobId) {
|
||||
const log = await loadLog();
|
||||
return log.some(e => e.id === jobId && e.status === 'applied');
|
||||
}
|
||||
|
||||
@@ -249,7 +212,7 @@ export function getJobsByStatus(status) {
|
||||
return queue.filter(j => j.status === status);
|
||||
}
|
||||
|
||||
export function updateJobStatus(id, status, extra = {}) {
|
||||
export async function updateJobStatus(id, status, extra = {}) {
|
||||
const queue = loadQueue();
|
||||
const idx = queue.findIndex(j => j.id === id);
|
||||
if (idx === -1) return;
|
||||
@@ -259,31 +222,24 @@ export function updateJobStatus(id, status, extra = {}) {
|
||||
status,
|
||||
status_updated_at: new Date().toISOString(),
|
||||
};
|
||||
saveQueue(queue);
|
||||
await saveQueue(queue);
|
||||
return queue[idx];
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a pending update to the sidecar JSONL file.
|
||||
* Use this from secondary processes (standalone scripts) instead of
|
||||
* calling updateJobStatus/saveQueue directly. The primary process
|
||||
* (searcher/applier) will pick up these updates on next loadQueue().
|
||||
*
|
||||
* @param {string} id — job id
|
||||
* @param {object} fields — fields to merge (e.g. { status: 'new', apply_type: 'lever', apply_url: '...' })
|
||||
* Use this from secondary processes instead of calling updateJobStatus directly.
|
||||
*/
|
||||
export function writePendingUpdate(id, fields) {
|
||||
ensureDir(UPDATES_PATH);
|
||||
appendFileSync(UPDATES_PATH, JSON.stringify({ id, ...fields }) + '\n');
|
||||
}
|
||||
|
||||
export function addJobs(newJobs) {
|
||||
// Always read fresh from disk to avoid clobbering concurrent writes (e.g. filter scoring)
|
||||
_queueCache = null;
|
||||
const queue = loadQueue();
|
||||
export async function addJobs(newJobs) {
|
||||
// Reload fresh to avoid clobbering concurrent writes
|
||||
_queueCache = await loadJSON(QUEUE_PATH, []);
|
||||
const queue = _queueCache;
|
||||
|
||||
// Dedup key: same job.id + same track = skip (duplicate search hit for same track)
|
||||
// Same job.id but different track = allow (will be deduped after AI filter, keeping best score)
|
||||
const existingKeys = new Set(queue.map(j => `${j.track || 'ae'}::${j.id}`));
|
||||
let added = 0;
|
||||
|
||||
@@ -293,8 +249,6 @@ export function addJobs(newJobs) {
|
||||
if (existingKeys.has(key)) continue;
|
||||
existingKeys.add(key);
|
||||
|
||||
// If this job.id already exists on a different track, give this copy a composite id
|
||||
// so filter batch custom_ids don't collide
|
||||
const idConflict = queue.some(j => j.id === job.id && (j.track || 'ae') !== track);
|
||||
const queueId = idConflict ? `${job.id}_${track}` : job.id;
|
||||
|
||||
@@ -312,6 +266,6 @@ export function addJobs(newJobs) {
|
||||
added++;
|
||||
}
|
||||
|
||||
saveQueue(queue);
|
||||
await saveQueue(queue);
|
||||
return added;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user