storage.mjs is now a single interface: loadJSON() and saveJSON() route to either local disk or S3 based on settings.storage.type. The app never touches disk/S3 directly. - All queue/log functions are now async (saveQueue, appendLog, etc.) - All callers updated with await - Data validation prevents saving corrupt types (strings, nulls) - S3 versioned bucket preserves every write - Config: storage.type = "local" (disk) or "s3" (S3 primary) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
272 lines
7.8 KiB
JavaScript
272 lines
7.8 KiB
JavaScript
/**
|
|
* queue.mjs — Job queue management
|
|
* Handles jobs_queue.json read/write/update
|
|
*
|
|
* Storage is pluggable via settings.storage:
|
|
* { type: "local" } — reads/writes to local disk (default)
|
|
* { type: "s3", bucket: "...", region: "..." } — S3 is primary store
|
|
*
|
|
* In-memory cache avoids redundant I/O within a run.
|
|
* Call initStorage() once at startup before any queue operations.
|
|
*/
|
|
import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs';
|
|
import { dirname, resolve } from 'path';
|
|
import { fileURLToPath } from 'url';
|
|
import { initStorage as _initStorage, loadJSON, saveJSON, storageType } from './storage.mjs';
|
|
|
|
const __dir = dirname(fileURLToPath(import.meta.url));
|
|
const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`;
|
|
const LOG_PATH = `${__dir}/../data/applications_log.json`;
|
|
const UPDATES_PATH = `${__dir}/../data/queue_updates.jsonl`;
|
|
|
|
/**
|
|
* Load and validate a JSON config file. Throws with a clear message on failure.
|
|
*/
|
|
export function loadConfig(filePath) {
|
|
const resolved = resolve(filePath);
|
|
if (!existsSync(resolved)) {
|
|
throw new Error(`Config file not found: ${resolved}\nCopy the matching .example.json and fill in your values.`);
|
|
}
|
|
let raw;
|
|
try {
|
|
raw = readFileSync(resolved, 'utf8');
|
|
} catch (e) {
|
|
throw new Error(`Cannot read config file ${resolved}: ${e.message}`);
|
|
}
|
|
try {
|
|
return JSON.parse(raw);
|
|
} catch (e) {
|
|
throw new Error(`Invalid JSON in ${resolved}: ${e.message}`);
|
|
}
|
|
}
|
|
|
|
function ensureDir(path) {
|
|
const dir = dirname(path);
|
|
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
|
|
}
|
|
|
|
// --- In-memory caches (populated on first read, invalidated on write) ---
|
|
let _queueCache = null;
|
|
let _logCache = null;
|
|
let _initialized = false;
|
|
|
|
/**
|
|
* Initialize storage. Must be called (and awaited) once at startup
|
|
* before any queue operations.
|
|
*/
|
|
export async function initQueue(settings) {
|
|
_initStorage(settings);
|
|
|
|
// Load queue and log from primary storage (S3 or local)
|
|
_queueCache = await loadJSON(QUEUE_PATH, []);
|
|
_logCache = await loadJSON(LOG_PATH, []);
|
|
|
|
// Apply any pending sidecar updates (local-only mechanism)
|
|
if (applyPendingUpdates(_queueCache)) {
|
|
await saveQueue(_queueCache);
|
|
}
|
|
|
|
_initialized = true;
|
|
const type = storageType();
|
|
console.log(`📦 Storage: ${type}${type === 's3' ? ` (${settings.storage.bucket})` : ''} — ${_queueCache.length} jobs loaded`);
|
|
}
|
|
|
|
/**
|
|
* Apply pending updates from the sidecar JSONL file.
|
|
* Secondary processes (e.g. standalone classifier) write updates here
|
|
* instead of modifying jobs_queue.json directly, avoiding write conflicts.
|
|
*/
|
|
function applyPendingUpdates(queue) {
|
|
if (!existsSync(UPDATES_PATH)) return false;
|
|
|
|
const claimedPath = UPDATES_PATH + '.applying';
|
|
try { renameSync(UPDATES_PATH, claimedPath); } catch { return false; }
|
|
|
|
let lines;
|
|
try {
|
|
lines = readFileSync(claimedPath, 'utf8').trim().split('\n').filter(Boolean);
|
|
} catch { return false; }
|
|
finally { try { unlinkSync(claimedPath); } catch {} }
|
|
|
|
if (lines.length === 0) return false;
|
|
|
|
const byId = new Map(queue.map((j, i) => [j.id, i]));
|
|
let applied = 0;
|
|
for (const line of lines) {
|
|
try {
|
|
const { id, ...fields } = JSON.parse(line);
|
|
const idx = byId.get(id);
|
|
if (idx == null) continue;
|
|
queue[idx] = { ...queue[idx], ...fields, status_updated_at: new Date().toISOString() };
|
|
applied++;
|
|
} catch {}
|
|
}
|
|
|
|
return applied > 0;
|
|
}
|
|
|
|
/**
|
|
* Get the in-memory queue. Must call initQueue() first.
|
|
*/
|
|
export function loadQueue() {
|
|
if (!_initialized) {
|
|
// Fallback for code that hasn't been updated to call initQueue yet
|
|
ensureDir(QUEUE_PATH);
|
|
if (!_queueCache) {
|
|
_queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : [];
|
|
if (!Array.isArray(_queueCache)) _queueCache = [];
|
|
}
|
|
}
|
|
return _queueCache;
|
|
}
|
|
|
|
/**
|
|
* Force a fresh read from storage + apply pending updates.
|
|
* Call this between iterations in long-running processes.
|
|
*/
|
|
export async function reloadQueue() {
|
|
_queueCache = await loadJSON(QUEUE_PATH, []);
|
|
if (applyPendingUpdates(_queueCache)) {
|
|
await saveQueue(_queueCache);
|
|
}
|
|
return _queueCache;
|
|
}
|
|
|
|
/**
|
|
* Save the queue to primary storage.
|
|
*/
|
|
export async function saveQueue(jobs) {
|
|
if (!Array.isArray(jobs)) {
|
|
throw new Error(`saveQueue: expected array, got ${typeof jobs} — refusing to write corrupt data`);
|
|
}
|
|
await saveJSON(QUEUE_PATH, jobs);
|
|
_queueCache = jobs;
|
|
}
|
|
|
|
async function loadLog() {
|
|
if (_logCache) return _logCache;
|
|
_logCache = await loadJSON(LOG_PATH, []);
|
|
return _logCache;
|
|
}
|
|
|
|
async function saveLog(log) {
|
|
if (!Array.isArray(log)) {
|
|
throw new Error(`saveLog: expected array, got ${typeof log} — refusing to write corrupt data`);
|
|
}
|
|
await saveJSON(LOG_PATH, log);
|
|
_logCache = log;
|
|
}
|
|
|
|
export async function appendLog(entry) {
|
|
const log = await loadLog();
|
|
log.push({ ...entry, logged_at: new Date().toISOString() });
|
|
await saveLog(log);
|
|
}
|
|
|
|
/**
|
|
* After AI filtering, deduplicate jobs that exist on multiple tracks.
|
|
*/
|
|
export async function dedupeAfterFilter() {
|
|
_queueCache = null;
|
|
const queue = await loadJSON(QUEUE_PATH, []);
|
|
_queueCache = queue;
|
|
|
|
const byUrl = {};
|
|
for (const job of queue) {
|
|
if (!job.url) continue;
|
|
if (!byUrl[job.url]) byUrl[job.url] = [];
|
|
byUrl[job.url].push(job);
|
|
}
|
|
|
|
let deduped = 0;
|
|
for (const jobs of Object.values(byUrl)) {
|
|
if (jobs.length < 2) continue;
|
|
if (jobs.some(j => j.filter_score == null && j.status !== 'filtered')) continue;
|
|
|
|
jobs.sort((a, b) => {
|
|
const sa = a.filter_score ?? -1;
|
|
const sb = b.filter_score ?? -1;
|
|
if (sb !== sa) return sb - sa;
|
|
if (a.status === 'new' && b.status !== 'new') return -1;
|
|
return 1;
|
|
});
|
|
for (const loser of jobs.slice(1)) {
|
|
loser.status = 'duplicate';
|
|
loser.status_updated_at = new Date().toISOString();
|
|
deduped++;
|
|
}
|
|
}
|
|
|
|
if (deduped > 0) await saveQueue(queue);
|
|
return deduped;
|
|
}
|
|
|
|
export async function isAlreadyApplied(jobId) {
|
|
const log = await loadLog();
|
|
return log.some(e => e.id === jobId && e.status === 'applied');
|
|
}
|
|
|
|
export function getJobsByStatus(status) {
|
|
const queue = loadQueue();
|
|
if (Array.isArray(status)) return queue.filter(j => status.includes(j.status));
|
|
return queue.filter(j => j.status === status);
|
|
}
|
|
|
|
export async function updateJobStatus(id, status, extra = {}) {
|
|
const queue = loadQueue();
|
|
const idx = queue.findIndex(j => j.id === id);
|
|
if (idx === -1) return;
|
|
queue[idx] = {
|
|
...queue[idx],
|
|
...extra,
|
|
status,
|
|
status_updated_at: new Date().toISOString(),
|
|
};
|
|
await saveQueue(queue);
|
|
return queue[idx];
|
|
}
|
|
|
|
/**
|
|
* Write a pending update to the sidecar JSONL file.
|
|
* Use this from secondary processes instead of calling updateJobStatus directly.
|
|
*/
|
|
export function writePendingUpdate(id, fields) {
|
|
ensureDir(UPDATES_PATH);
|
|
appendFileSync(UPDATES_PATH, JSON.stringify({ id, ...fields }) + '\n');
|
|
}
|
|
|
|
export async function addJobs(newJobs) {
|
|
// Reload fresh to avoid clobbering concurrent writes
|
|
_queueCache = await loadJSON(QUEUE_PATH, []);
|
|
const queue = _queueCache;
|
|
|
|
const existingKeys = new Set(queue.map(j => `${j.track || 'ae'}::${j.id}`));
|
|
let added = 0;
|
|
|
|
for (const job of newJobs) {
|
|
const track = job.track || 'ae';
|
|
const key = `${track}::${job.id}`;
|
|
if (existingKeys.has(key)) continue;
|
|
existingKeys.add(key);
|
|
|
|
const idConflict = queue.some(j => j.id === job.id && (j.track || 'ae') !== track);
|
|
const queueId = idConflict ? `${job.id}_${track}` : job.id;
|
|
|
|
queue.push({
|
|
...job,
|
|
id: queueId,
|
|
original_id: job.id,
|
|
status: 'new',
|
|
found_at: new Date().toISOString(),
|
|
status_updated_at: new Date().toISOString(),
|
|
pending_question: null,
|
|
applied_at: null,
|
|
notes: null,
|
|
});
|
|
added++;
|
|
}
|
|
|
|
await saveQueue(queue);
|
|
return added;
|
|
}
|