Files
claw-apply/job_filter.mjs
Matthew Jackson 73887318b2 Fix saveQueue() called without argument in job_filter.mjs
Iterate over the full queue array instead of getJobsByStatus() results,
and pass it to saveQueue(). The previous code passed no argument, which
would corrupt or silently fail the save.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 19:50:27 -08:00

332 lines
12 KiB
JavaScript

#!/usr/bin/env node
import { loadEnv } from './lib/env.mjs';
loadEnv();
/**
* job_filter.mjs — claw-apply AI Job Filter (Anthropic Batch API)
*
* Runs in two phases on each invocation:
*
* Phase 1 — COLLECT: if a batch is in flight, check status + download results
* Phase 2 — SUBMIT: if no batch pending, find unscored jobs + submit a new batch
*
* Designed to run hourly via cron. Safe to run anytime — idempotent.
*
* Usage:
* node job_filter.mjs — normal run (collect if pending, else submit)
* node job_filter.mjs --stats — show filter stats only
*/
import { dirname, resolve } from 'path';
import { fileURLToPath } from 'url';
import { readFileSync, writeFileSync, existsSync, unlinkSync, createWriteStream } from 'fs';
const __dir = dirname(fileURLToPath(import.meta.url));
// Tee all output to a log file so it's always available regardless of how the process is launched
const logStream = createWriteStream(resolve(__dir, 'data/filter.log'), { flags: 'w' });
const origStdoutWrite = process.stdout.write.bind(process.stdout);
const origStderrWrite = process.stderr.write.bind(process.stderr);
process.stdout.write = (chunk, ...args) => { logStream.write(chunk); return origStdoutWrite(chunk, ...args); };
process.stderr.write = (chunk, ...args) => { logStream.write(chunk); return origStderrWrite(chunk, ...args); };
import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue, dedupeAfterFilter } from './lib/queue.mjs';
import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs';
import { sendTelegram, formatFilterSummary } from './lib/notify.mjs';
import { DEFAULT_FILTER_MODEL, DEFAULT_FILTER_MIN_SCORE } from './lib/constants.mjs';
const isStats = process.argv.includes('--stats');
const STATE_PATH = resolve(__dir, 'data/filter_state.json');
// ---------------------------------------------------------------------------
// State helpers
// ---------------------------------------------------------------------------
function readState() {
if (!existsSync(STATE_PATH)) return null;
try { return JSON.parse(readFileSync(STATE_PATH, 'utf8')); } catch { return null; }
}
function writeState(state) {
writeFileSync(STATE_PATH, JSON.stringify(state, null, 2));
}
function clearState() {
if (existsSync(STATE_PATH)) unlinkSync(STATE_PATH);
}
// ---------------------------------------------------------------------------
// Stats
// ---------------------------------------------------------------------------
function showStats() {
const queue = loadQueue();
const byStatus = {};
for (const j of queue) byStatus[j.status] = (byStatus[j.status] || 0) + 1;
const filtered = queue.filter(j => j.status === 'filtered');
const scored = queue.filter(j => j.filter_score != null);
console.log('📊 Filter Stats\n');
console.log(` New (unfiltered): ${byStatus['new'] || 0}`);
console.log(` Filtered (blocked): ${byStatus['filtered'] || 0}`);
console.log(` Total scored: ${scored.length}`);
console.log(` Pass rate: ${scored.length > 0 ? Math.round((scored.filter(j => j.status !== 'filtered').length / scored.length) * 100) : 0}%\n`);
const state = readState();
if (state) {
const batchIds = state.batches?.map(b => b.batchId).join(', ') || 'none';
console.log(` Pending batches: ${batchIds}`);
console.log(` Submitted: ${state.submitted_at}`);
console.log(` Job count: ${state.job_count}\n`);
}
if (filtered.length > 0) {
console.log('Sample filtered:');
filtered.slice(0, 10).forEach(j =>
console.log(` [${j.filter_score}/10] ${j.title} @ ${j.company}${j.filter_reason}`)
);
}
}
// ---------------------------------------------------------------------------
// Phase 1 — Collect results from all pending batches
// ---------------------------------------------------------------------------
async function collect(state, settings) {
const apiKey = process.env.ANTHROPIC_API_KEY;
const batches = state.batches; // array of { track, batchId, idMap, jobCount }
// Check status of all batches
let allDone = true;
for (const b of batches) {
const { status, counts } = await checkBatch(b.batchId, apiKey);
b._status = status;
b._counts = counts;
if (status === 'in_progress') {
const total = Object.values(counts).reduce((a, v) => a + v, 0);
const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0);
console.log(` [${b.track}] Still processing — ${done}/${total} complete`);
allDone = false;
} else {
console.log(` [${b.track}] Done — ${counts.succeeded || 0} succeeded, ${counts.errored || 0} errors`);
}
}
if (!allDone) {
console.log(' Not all batches done yet. Check back later.');
return;
}
// All done — download and merge all results
console.log('\n All batches complete. Downloading results...');
const resultMap = {};
let totalCost = 0;
const totalUsage = { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 };
for (const b of batches) {
const { results, usage, cost } = await downloadResults(b.batchId, apiKey, b.idMap || {});
for (const r of results) resultMap[r.jobId] = r;
totalCost += cost;
for (const [k, v] of Object.entries(usage)) totalUsage[k] = (totalUsage[k] || 0) + v;
}
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
const globalMin = searchConfig.filter_min_score ?? DEFAULT_FILTER_MIN_SCORE;
let passed = 0, filtered = 0, errors = 0;
const queue = loadQueue();
const now = new Date().toISOString();
for (const job of queue) {
const r = resultMap[job.id];
if (!r) continue;
if (job.filter_score != null) continue; // already scored — idempotent
if (r.error || r.score === null) {
errors++;
job.filter_score = null;
job.filter_reason = r.reason || 'filter_error';
job.status_updated_at = now;
continue;
}
const track = job.track || 'ae';
const searchEntry = (searchConfig.searches || []).find(s => s.track === track);
const minScore = searchEntry?.filter_min_score ?? globalMin;
job.filter_score = r.score;
job.filter_reason = r.reason;
job.status_updated_at = now;
if (r.score >= minScore) { passed++; }
else { filtered++; job.status = 'filtered'; }
}
saveQueue(queue);
// Dedup cross-track copies — keep highest-scoring version of each job
const duped = dedupeAfterFilter();
if (duped > 0) console.log(` Deduped ${duped} cross-track copies`);
clearState();
// Log run
const runsPath = resolve(__dir, 'data/filter_runs.json');
const runs = existsSync(runsPath) ? JSON.parse(readFileSync(runsPath, 'utf8')) : [];
runs.push({
batches: batches.map(b => ({ track: b.track, batch_id: b.batchId, job_count: b.jobCount })),
submitted_at: state.submitted_at,
collected_at: new Date().toISOString(),
model: state.model,
passed,
filtered,
errors,
cost_usd: Math.round(totalCost * 100) / 100,
usage: totalUsage,
});
writeFileSync(runsPath, JSON.stringify(runs, null, 2));
// Collect top-scoring jobs for summary
const freshQueue = loadQueue();
const topJobs = freshQueue
.filter(j => resultMap[j.id] && j.filter_score >= (searchConfig.filter_min_score ?? DEFAULT_FILTER_MIN_SCORE))
.sort((a, b) => (b.filter_score || 0) - (a.filter_score || 0))
.slice(0, 5)
.map(j => ({ score: j.filter_score, title: j.title, company: j.company }));
const summary = formatFilterSummary({ passed, filtered, errors, cost: totalCost, topJobs });
console.log(`\n${summary.replace(/\*/g, '')}`);
await sendTelegram(settings, summary).catch(() => {});
}
// ---------------------------------------------------------------------------
// Phase 2 — Submit a new batch
// ---------------------------------------------------------------------------
async function submit(settings, searchConfig, candidateProfile) {
const apiKey = process.env.ANTHROPIC_API_KEY;
// Clear stale batch markers — jobs marked as submitted but no filter_state.json means
// the batch completed (or was lost) without writing scores back. Reset so they get re-submitted.
const stateExists = existsSync(resolve(__dir, 'data/filter_state.json'));
if (!stateExists) {
let cleared = 0;
const queue = loadQueue();
for (const j of queue) {
if (j.status === 'new' && j.filter_score == null && j.filter_batch_id) {
delete j.filter_batch_id;
delete j.filter_submitted_at;
cleared++;
}
}
if (cleared > 0) {
saveQueue(queue);
console.log(`🔄 Cleared ${cleared} stale batch markers (batch completed without scoring)`);
}
}
// Get all new jobs that haven't been scored yet (no score AND not already in a pending batch)
const jobs = getJobsByStatus('new').filter(j => j.filter_score == null && !j.filter_batch_id && !j.filter_submitted_at);
if (jobs.length === 0) {
console.log('✅ Nothing to filter — all new jobs already scored.');
return;
}
// Build job profiles map by track
const profilePaths = settings.filter?.job_profiles || {};
const jobProfilesByTrack = {};
for (const [track, path] of Object.entries(profilePaths)) {
const profile = loadProfile(path);
if (profile) jobProfilesByTrack[track] = profile;
else console.warn(`⚠️ Could not load job profile for track "${track}" at ${path}`);
}
// Filter out jobs with no profile (will pass through unscored)
const filterable = jobs.filter(j => jobProfilesByTrack[j.track || 'ae']);
const noProfile = jobs.length - filterable.length;
if (noProfile > 0) console.warn(`⚠️ ${noProfile} jobs skipped — no profile for their track`);
if (filterable.length === 0) {
console.log('Nothing filterable — no job profiles configured for any track.');
return;
}
const model = settings.filter?.model || DEFAULT_FILTER_MODEL;
const submittedAt = new Date().toISOString();
console.log(`🚀 Submitting batches — ${filterable.length} jobs across ${Object.keys(jobProfilesByTrack).length} tracks, model: ${model}`);
const submitted = await submitBatches(filterable, jobProfilesByTrack, candidateProfile, model, apiKey);
writeState({
batches: submitted,
submitted_at: submittedAt,
job_count: filterable.length,
model,
});
// Stamp each job with its track's batch ID
const trackToBatchId = {};
for (const b of submitted) trackToBatchId[b.track] = b.batchId;
const allJobs = loadQueue();
for (const job of allJobs) {
const batchId = trackToBatchId[job.track || 'ae'];
if (batchId && !job.filter_batch_id) {
job.filter_batch_id = batchId;
job.filter_submitted_at = submittedAt;
}
}
saveQueue(allJobs);
const batchSummary = submitted.map(b => `${b.track}: ${b.jobCount} jobs`).join(', ');
console.log(` ${batchSummary}`);
console.log(` Results typically ready in < 1 hour. Next run will collect.`);
// No Telegram on submit — only notify on collect when results are ready
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main() {
if (isStats) {
showStats();
return;
}
const apiKey = process.env.ANTHROPIC_API_KEY;
if (!apiKey) {
console.error('❌ ANTHROPIC_API_KEY not set');
process.exit(1);
}
const settings = loadConfig(resolve(__dir, 'config/settings.json'));
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json'));
console.log('🔍 claw-apply: AI Job Filter\n');
const state = readState();
if (state?.batches?.length > 0) {
// Phase 1: collect results from pending batches
await collect(state, settings);
}
// Phase 2: submit any remaining unscored jobs (runs after collect too)
if (!readState()) {
await submit(settings, searchConfig, candidateProfile);
}
}
main().then(() => {
process.exit(0);
}).catch(err => {
console.error('Fatal:', err.message);
process.exit(1);
});