Add sidecar update pattern to prevent concurrent queue writes
Secondary processes (standalone classifier, ad-hoc scripts) now write to queue_updates.jsonl via writePendingUpdate() instead of modifying jobs_queue.json directly. Primary processes pick up updates on next loadQueue() call using atomic rename-then-read. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3,13 +3,14 @@
|
|||||||
* Handles jobs_queue.json read/write/update
|
* Handles jobs_queue.json read/write/update
|
||||||
* Uses in-memory cache to avoid redundant disk I/O within a run.
|
* Uses in-memory cache to avoid redundant disk I/O within a run.
|
||||||
*/
|
*/
|
||||||
import { readFileSync, writeFileSync, renameSync, existsSync, mkdirSync } from 'fs';
|
import { readFileSync, writeFileSync, appendFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from 'fs';
|
||||||
import { dirname, resolve } from 'path';
|
import { dirname, resolve } from 'path';
|
||||||
import { fileURLToPath } from 'url';
|
import { fileURLToPath } from 'url';
|
||||||
|
|
||||||
const __dir = dirname(fileURLToPath(import.meta.url));
|
const __dir = dirname(fileURLToPath(import.meta.url));
|
||||||
const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`;
|
const QUEUE_PATH = `${__dir}/../data/jobs_queue.json`;
|
||||||
const LOG_PATH = `${__dir}/../data/applications_log.json`;
|
const LOG_PATH = `${__dir}/../data/applications_log.json`;
|
||||||
|
const UPDATES_PATH = `${__dir}/../data/queue_updates.jsonl`;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load and validate a JSON config file. Throws with a clear message on failure.
|
* Load and validate a JSON config file. Throws with a clear message on failure.
|
||||||
@@ -52,13 +53,64 @@ function atomicWriteJSON(filePath, data) {
|
|||||||
let _queueCache = null;
|
let _queueCache = null;
|
||||||
let _logCache = null;
|
let _logCache = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply pending updates from the sidecar JSONL file.
|
||||||
|
* Secondary processes (e.g. standalone classifier) write updates here
|
||||||
|
* instead of modifying jobs_queue.json directly, avoiding write conflicts.
|
||||||
|
*
|
||||||
|
* Uses rename-then-read to avoid losing updates appended between read and delete.
|
||||||
|
* Each line is JSON: { id, ...fields }
|
||||||
|
*/
|
||||||
|
function applyPendingUpdates(queue) {
|
||||||
|
if (!existsSync(UPDATES_PATH)) return false;
|
||||||
|
|
||||||
|
// Atomically claim the file by renaming it — new appends go to a fresh file
|
||||||
|
const claimedPath = UPDATES_PATH + '.applying';
|
||||||
|
try { renameSync(UPDATES_PATH, claimedPath); } catch { return false; }
|
||||||
|
|
||||||
|
let lines;
|
||||||
|
try {
|
||||||
|
lines = readFileSync(claimedPath, 'utf8').trim().split('\n').filter(Boolean);
|
||||||
|
} catch { return false; }
|
||||||
|
finally { try { unlinkSync(claimedPath); } catch {} }
|
||||||
|
|
||||||
|
if (lines.length === 0) return false;
|
||||||
|
|
||||||
|
const byId = new Map(queue.map((j, i) => [j.id, i]));
|
||||||
|
let applied = 0;
|
||||||
|
for (const line of lines) {
|
||||||
|
try {
|
||||||
|
const { id, ...fields } = JSON.parse(line);
|
||||||
|
const idx = byId.get(id);
|
||||||
|
if (idx == null) continue;
|
||||||
|
queue[idx] = { ...queue[idx], ...fields, status_updated_at: new Date().toISOString() };
|
||||||
|
applied++;
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
|
||||||
|
return applied > 0;
|
||||||
|
}
|
||||||
|
|
||||||
export function loadQueue() {
|
export function loadQueue() {
|
||||||
if (_queueCache) return _queueCache;
|
if (_queueCache) return _queueCache;
|
||||||
ensureDir(QUEUE_PATH);
|
ensureDir(QUEUE_PATH);
|
||||||
_queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : [];
|
_queueCache = existsSync(QUEUE_PATH) ? JSON.parse(readFileSync(QUEUE_PATH, 'utf8')) : [];
|
||||||
|
if (applyPendingUpdates(_queueCache)) {
|
||||||
|
saveQueue(_queueCache);
|
||||||
|
}
|
||||||
return _queueCache;
|
return _queueCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force a fresh read from disk + apply pending updates.
|
||||||
|
* Call this between iterations in long-running processes to pick up
|
||||||
|
* sidecar updates from secondary processes.
|
||||||
|
*/
|
||||||
|
export function reloadQueue() {
|
||||||
|
_queueCache = null;
|
||||||
|
return loadQueue();
|
||||||
|
}
|
||||||
|
|
||||||
export function saveQueue(jobs) {
|
export function saveQueue(jobs) {
|
||||||
ensureDir(QUEUE_PATH);
|
ensureDir(QUEUE_PATH);
|
||||||
atomicWriteJSON(QUEUE_PATH, jobs);
|
atomicWriteJSON(QUEUE_PATH, jobs);
|
||||||
@@ -152,6 +204,20 @@ export function updateJobStatus(id, status, extra = {}) {
|
|||||||
return queue[idx];
|
return queue[idx];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a pending update to the sidecar JSONL file.
|
||||||
|
* Use this from secondary processes (standalone scripts) instead of
|
||||||
|
* calling updateJobStatus/saveQueue directly. The primary process
|
||||||
|
* (searcher/applier) will pick up these updates on next loadQueue().
|
||||||
|
*
|
||||||
|
* @param {string} id — job id
|
||||||
|
* @param {object} fields — fields to merge (e.g. { status: 'new', apply_type: 'lever', apply_url: '...' })
|
||||||
|
*/
|
||||||
|
export function writePendingUpdate(id, fields) {
|
||||||
|
ensureDir(UPDATES_PATH);
|
||||||
|
appendFileSync(UPDATES_PATH, JSON.stringify({ id, ...fields }) + '\n');
|
||||||
|
}
|
||||||
|
|
||||||
export function addJobs(newJobs) {
|
export function addJobs(newJobs) {
|
||||||
// Always read fresh from disk to avoid clobbering concurrent writes (e.g. filter scoring)
|
// Always read fresh from disk to avoid clobbering concurrent writes (e.g. filter scoring)
|
||||||
_queueCache = null;
|
_queueCache = null;
|
||||||
|
|||||||
Reference in New Issue
Block a user