diff --git a/lib/queue.mjs b/lib/queue.mjs index e5abad0..787d453 100644 --- a/lib/queue.mjs +++ b/lib/queue.mjs @@ -3,13 +3,14 @@ * Handles jobs_queue.json read/write/update * Uses in-memory cache to avoid redundant disk I/O within a run. */ -import { readFileSync, writeFileSync, renameSync, existsSync, mkdirSync } from 'fs'; +import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs'; import { dirname, resolve } from 'path'; import { fileURLToPath } from 'url'; const __dir = dirname(fileURLToPath(import.meta.url)); const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`; const LOG_PATH = `${__dir}/../data/applications_log.json`; +const UPDATES_PATH = `${__dir}/../data/queue_updates.jsonl`; /** * Load and validate a JSON config file. Throws with a clear message on failure. @@ -52,13 +53,64 @@ function atomicWriteJSON(filePath, data) { let _queueCache = null; let _logCache = null; +/** + * Apply pending updates from the sidecar JSONL file. + * Secondary processes (e.g. standalone classifier) write updates here + * instead of modifying jobs_queue.json directly, avoiding write conflicts. + * + * Uses rename-then-read to avoid losing updates appended between read and delete. + * Each line is JSON: { id, ...fields } + */ +function applyPendingUpdates(queue) { + if (!existsSync(UPDATES_PATH)) return false; + + // Atomically claim the file by renaming it — new appends go to a fresh file + const claimedPath = UPDATES_PATH + '.applying'; + try { renameSync(UPDATES_PATH, claimedPath); } catch { return false; } + + let lines; + try { + lines = readFileSync(claimedPath, 'utf8').trim().split('\n').filter(Boolean); + } catch { return false; } + finally { try { unlinkSync(claimedPath); } catch {} } + + if (lines.length === 0) return false; + + const byId = new Map(queue.map((j, i) => [j.id, i])); + let applied = 0; + for (const line of lines) { + try { + const { id, ...fields } = JSON.parse(line); + const idx = byId.get(id); + if (idx == null) continue; + queue[idx] = { ...queue[idx], ...fields, status_updated_at: new Date().toISOString() }; + applied++; + } catch {} + } + + return applied > 0; +} + export function loadQueue() { if (_queueCache) return _queueCache; ensureDir(QUEUE_PATH); _queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : []; + if (applyPendingUpdates(_queueCache)) { + saveQueue(_queueCache); + } return _queueCache; } +/** + * Force a fresh read from disk + apply pending updates. + * Call this between iterations in long-running processes to pick up + * sidecar updates from secondary processes. + */ +export function reloadQueue() { + _queueCache = null; + return loadQueue(); +} + export function saveQueue(jobs) { ensureDir(QUEUE_PATH); atomicWriteJSON(QUEUE_PATH, jobs); @@ -152,6 +204,20 @@ export function updateJobStatus(id, status, extra = {}) { return queue[idx]; } +/** + * Write a pending update to the sidecar JSONL file. + * Use this from secondary processes (standalone scripts) instead of + * calling updateJobStatus/saveQueue directly. The primary process + * (searcher/applier) will pick up these updates on next loadQueue(). + * + * @param {string} id — job id + * @param {object} fields — fields to merge (e.g. { status: 'new', apply_type: 'lever', apply_url: '...' }) + */ +export function writePendingUpdate(id, fields) { + ensureDir(UPDATES_PATH); + appendFileSync(UPDATES_PATH, JSON.stringify({ id, ...fields }) + '\n'); +} + export function addJobs(newJobs) { // Always read fresh from disk to avoid clobbering concurrent writes (e.g. filter scoring) _queueCache = null;