Files
claw-apply/job_filter.mjs
Matthew Jackson 534d318953 Make S3 the primary storage layer (not backup)
storage.mjs is now a single interface: loadJSON() and saveJSON()
route to either local disk or S3 based on settings.storage.type.
The app never touches disk/S3 directly.

- All queue/log functions are now async (saveQueue, appendLog, etc.)
- All callers updated with await
- Data validation prevents saving corrupt types (strings, nulls)
- S3 versioned bucket preserves every write
- Config: storage.type = "local" (disk) or "s3" (S3 primary)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 22:03:16 -08:00

333 lines
12 KiB
JavaScript

#!/usr/bin/env node
import { loadEnv } from './lib/env.mjs';
loadEnv();
/**
* job_filter.mjs — claw-apply AI Job Filter (Anthropic Batch API)
*
* Runs in two phases on each invocation:
*
* Phase 1 — COLLECT: if a batch is in flight, check status + download results
* Phase 2 — SUBMIT: if no batch pending, find unscored jobs + submit a new batch
*
* Designed to run hourly via cron. Safe to run anytime — idempotent.
*
* Usage:
* node job_filter.mjs — normal run (collect if pending, else submit)
* node job_filter.mjs --stats — show filter stats only
*/
import { dirname, resolve } from 'path';
import { fileURLToPath } from 'url';
import { readFileSync, writeFileSync, existsSync, unlinkSync, createWriteStream } from 'fs';
const __dir = dirname(fileURLToPath(import.meta.url));
// Tee all output to a log file so it's always available regardless of how the process is launched
const logStream = createWriteStream(resolve(__dir, 'data/filter.log'), { flags: 'w' });
const origStdoutWrite = process.stdout.write.bind(process.stdout);
const origStderrWrite = process.stderr.write.bind(process.stderr);
process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); };
process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); };
import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter, initQueue } from './lib/queue.mjs';
import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs';
import { sendTelegram, formatFilterSummary } from './lib/notify.mjs';
import { DEFAULT_FILTER_MODEL, DEFAULT_FILTER_MIN_SCORE } from './lib/constants.mjs';
const isStats = process.argv.includes('--stats');
const STATE_PATH = resolve(__dir, 'data/filter_state.json');
// ---------------------------------------------------------------------------
// State helpers
// ---------------------------------------------------------------------------
function readState() {
if (!existsSync(STATE_PATH)) return null;
try { return JSON.parse(readFileSync(STATE_PATH, 'utf8')); } catch { return null; }
}
function writeState(state) {
writeFileSync(STATE_PATH, JSON.stringify(state, null, 2));
}
function clearState() {
if (existsSync(STATE_PATH)) unlinkSync(STATE_PATH);
}
// ---------------------------------------------------------------------------
// Stats
// ---------------------------------------------------------------------------
function showStats() {
const queue = loadQueue();
const byStatus = {};
for (const j of queue) byStatus[j.status] = (byStatus[j.status] || 0) + 1;
const filtered = queue.filter(j => j.status === 'filtered');
const scored = queue.filter(j => j.filter_score != null);
console.log('📊 Filter Stats\n');
console.log(` New (unfiltered): ${byStatus['new'] || 0}`);
console.log(` Filtered (blocked): ${byStatus['filtered'] || 0}`);
console.log(` Total scored: ${scored.length}`);
console.log(` Pass rate: ${scored.length > 0 ? Math.round((scored.filter(j => j.status !== 'filtered').length / scored.length) * 100) : 0}%\n`);
const state = readState();
if (state) {
const batchIds = state.batches?.map(b => b.batchId).join(', ') || 'none';
console.log(` Pending batches: ${batchIds}`);
console.log(` Submitted: ${state.submitted_at}`);
console.log(` Job count: ${state.job_count}\n`);
}
if (filtered.length > 0) {
console.log('Sample filtered:');
filtered.slice(0, 10).forEach(j =>
console.log(` [${j.filter_score}/10] ${j.title} @ ${j.company}${j.filter_reason}`)
);
}
}
// ---------------------------------------------------------------------------
// Phase 1 — Collect results from all pending batches
// ---------------------------------------------------------------------------
async function collect(state, settings) {
const apiKey = process.env.ANTHROPIC_API_KEY;
const batches = state.batches; // array of { track, batchId, idMap, jobCount }
// Check status of all batches
let allDone = true;
for (const b of batches) {
const { status, counts } = await checkBatch(b.batchId, apiKey);
b._status = status;
b._counts = counts;
if (status === 'in_progress') {
const total = Object.values(counts).reduce((a, v) => a + v, 0);
const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0);
console.log(` [${b.track}] Still processing — ${done}/${total} complete`);
allDone = false;
} else {
console.log(` [${b.track}] Done — ${counts.succeeded || 0} succeeded, ${counts.errored || 0} errors`);
}
}
if (!allDone) {
console.log(' Not all batches done yet. Check back later.');
return;
}
// All done — download and merge all results
console.log('\n All batches complete. Downloading results...');
const resultMap = {};
let totalCost = 0;
const totalUsage = { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 };
for (const b of batches) {
const { results, usage, cost } = await downloadResults(b.batchId, apiKey, b.idMap || {});
for (const r of results) resultMap[r.jobId] = r;
totalCost += cost;
for (const [k, v] of Object.entries(usage)) totalUsage[k] = (totalUsage[k] || 0) + v;
}
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
const globalMin = searchConfig.filter_min_score ?? DEFAULT_FILTER_MIN_SCORE;
let passed = 0, filtered = 0, errors = 0;
const queue = loadQueue();
const now = new Date().toISOString();
for (const job of queue) {
const r = resultMap[job.id];
if (!r) continue;
if (job.filter_score != null) continue; // already scored — idempotent
if (r.error || r.score === null) {
errors++;
job.filter_score = null;
job.filter_reason = r.reason || 'filter_error';
job.status_updated_at = now;
continue;
}
const track = job.track || 'ae';
const searchEntry = (searchConfig.searches || []).find(s => s.track === track);
const minScore = searchEntry?.filter_min_score ?? globalMin;
job.filter_score = r.score;
job.filter_reason = r.reason;
job.status_updated_at = now;
if (r.score >= minScore) { passed++; }
else { filtered++; job.status = 'filtered'; }
}
await saveQueue(queue);
// Dedup cross-track copies — keep highest-scoring version of each job
const duped = await dedupeAfterFilter();
if (duped > 0) console.log(` Deduped ${duped} cross-track copies`);
clearState();
// Log run
const runsPath = resolve(__dir, 'data/filter_runs.json');
const runs = existsSync(runsPath) ? JSON.parse(readFileSync(runsPath, 'utf8')) : [];
runs.push({
batches: batches.map(b => ({ track: b.track, batch_id: b.batchId, job_count: b.jobCount })),
submitted_at: state.submitted_at,
collected_at: new Date().toISOString(),
model: state.model,
passed,
filtered,
errors,
cost_usd: Math.round(totalCost * 100) / 100,
usage: totalUsage,
});
writeFileSync(runsPath, JSON.stringify(runs, null, 2));
// Collect top-scoring jobs for summary
const freshQueue = loadQueue();
const topJobs = freshQueue
.filter(j => resultMap[j.id] && j.filter_score >= (searchConfig.filter_min_score ?? DEFAULT_FILTER_MIN_SCORE))
.sort((a, b) => (b.filter_score || 0) - (a.filter_score || 0))
.slice(0, 5)
.map(j => ({ score: j.filter_score, title: j.title, company: j.company }));
const summary = formatFilterSummary({ passed, filtered, errors, cost: totalCost, topJobs });
console.log(`\n${summary.replace(/\*/g, '')}`);
await sendTelegram(settings, summary).catch(() => {});
}
// ---------------------------------------------------------------------------
// Phase 2 — Submit a new batch
// ---------------------------------------------------------------------------
async function submit(settings, searchConfig, candidateProfile) {
const apiKey = process.env.ANTHROPIC_API_KEY;
// Clear stale batch markers — jobs marked as submitted but no filter_state.json means
// the batch completed (or was lost) without writing scores back. Reset so they get re-submitted.
const stateExists = existsSync(resolve(__dir, 'data/filter_state.json'));
if (!stateExists) {
let cleared = 0;
const queue = loadQueue();
for (const j of queue) {
if (j.status === 'new' && j.filter_score == null && j.filter_batch_id) {
delete j.filter_batch_id;
delete j.filter_submitted_at;
cleared++;
}
}
if (cleared > 0) {
await saveQueue(queue);
console.log(`🔄 Cleared ${cleared} stale batch markers (batch completed without scoring)`);
}
}
// Get all new jobs that haven't been scored yet (no score AND not already in a pending batch)
const jobs = getJobsByStatus('new').filter(j => j.filter_score == null && !j.filter_batch_id && !j.filter_submitted_at);
if (jobs.length === 0) {
console.log('✅ Nothing to filter — all new jobs already scored.');
return;
}
// Build job profiles map by track
const profilePaths = settings.filter?.job_profiles || {};
const jobProfilesByTrack = {};
for (const [track, path] of Object.entries(profilePaths)) {
const profile = loadProfile(path);
if (profile) jobProfilesByTrack[track] = profile;
else console.warn(`⚠️ Could not load job profile for track "${track}" at ${path}`);
}
// Filter out jobs with no profile (will pass through unscored)
const filterable = jobs.filter(j => jobProfilesByTrack[j.track || 'ae']);
const noProfile = jobs.length - filterable.length;
if (noProfile > 0) console.warn(`⚠️ ${noProfile} jobs skipped — no profile for their track`);
if (filterable.length === 0) {
console.log('Nothing filterable — no job profiles configured for any track.');
return;
}
const model = settings.filter?.model || DEFAULT_FILTER_MODEL;
const submittedAt = new Date().toISOString();
console.log(`🚀 Submitting batches — ${filterable.length} jobs across ${Object.keys(jobProfilesByTrack).length} tracks, model: ${model}`);
const submitted = await submitBatches(filterable, jobProfilesByTrack, candidateProfile, model, apiKey);
writeState({
batches: submitted,
submitted_at: submittedAt,
job_count: filterable.length,
model,
});
// Stamp each job with its track's batch ID
const trackToBatchId = {};
for (const b of submitted) trackToBatchId[b.track] = b.batchId;
const allJobs = loadQueue();
for (const job of allJobs) {
const batchId = trackToBatchId[job.track || 'ae'];
if (batchId && !job.filter_batch_id) {
job.filter_batch_id = batchId;
job.filter_submitted_at = submittedAt;
}
}
await saveQueue(allJobs);
const batchSummary = submitted.map(b => `${b.track}: ${b.jobCount} jobs`).join(', ');
console.log(` ${batchSummary}`);
console.log(` Results typically ready in < 1 hour. Next run will collect.`);
// No Telegram on submit — only notify on collect when results are ready
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main() {
if (isStats) {
showStats();
return;
}
const apiKey = process.env.ANTHROPIC_API_KEY;
if (!apiKey) {
console.error('❌ ANTHROPIC_API_KEY not set');
process.exit(1);
}
const settings = loadConfig(resolve(__dir, 'config/settings.json'));
await initQueue(settings);
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json'));
console.log('🔍 claw-apply: AI Job Filter\n');
const state = readState();
if (state?.batches?.length > 0) {
// Phase 1: collect results from pending batches
await collect(state, settings);
}
// Phase 2: submit any remaining unscored jobs (runs after collect too)
if (!readState()) {
await submit(settings, searchConfig, candidateProfile);
}
}
main().then(() => {
process.exit(0);
}).catch(err => {
console.error('Fatal:', err.message);
process.exit(1);
});