feat: rewrite filter to use Anthropic Batch API
- Batch API = 50% cost savings vs synchronous calls - Prompt caching on system prompt (profile + criteria shared across all jobs) - One request per job with custom_id = job ID for result matching - Two-phase state machine: submit → poll/collect (hourly cron safe) - filter_state.json tracks pending batch ID between runs - Model configurable via settings.filter.model (default: claude-sonnet-4-6) - Telegram notifications on submit + collect - Errors pass through — never block applications due to filter failure - --stats flag for queue overview
This commit is contained in:
272
job_filter.mjs
272
job_filter.mjs
@@ -3,121 +3,231 @@ import { loadEnv } from './lib/env.mjs';
|
||||
loadEnv();
|
||||
|
||||
/**
|
||||
* job_filter.mjs — claw-apply AI Job Filter
|
||||
* Scores all queued 'new' jobs 0-10 against candidate profile using Claude Haiku
|
||||
* Jobs below filter_min_score (default 5, configurable per-search in search_config.json)
|
||||
* are marked 'filtered' and skipped by the applier
|
||||
* job_filter.mjs — claw-apply AI Job Filter (Anthropic Batch API)
|
||||
*
|
||||
* Runs in two phases on each invocation:
|
||||
*
|
||||
* Phase 1 — COLLECT: if a batch is in flight, check status + download results
|
||||
* Phase 2 — SUBMIT: if no batch pending, find unscored jobs + submit a new batch
|
||||
*
|
||||
* Designed to run hourly via cron. Safe to run anytime — idempotent.
|
||||
*
|
||||
* Usage:
|
||||
* node job_filter.mjs — filter all new jobs
|
||||
* node job_filter.mjs --dry-run — score without writing status changes
|
||||
* node job_filter.mjs --stats — show filter stats only (no re-filter)
|
||||
* node job_filter.mjs — normal run (collect if pending, else submit)
|
||||
* node job_filter.mjs --stats — show filter stats only
|
||||
*/
|
||||
|
||||
import { dirname, resolve } from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { readFileSync, writeFileSync, existsSync, unlinkSync } from 'fs';
|
||||
|
||||
const __dir = dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
import { getJobsByStatus, updateJobStatus, loadConfig } from './lib/queue.mjs';
|
||||
import { acquireLock } from './lib/lock.mjs';
|
||||
import { runFilter } from './lib/filter.mjs';
|
||||
import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue } from './lib/queue.mjs';
|
||||
import { loadProfile, submitBatch, checkBatch, downloadResults } from './lib/filter.mjs';
|
||||
import { sendTelegram } from './lib/notify.mjs';
|
||||
|
||||
const isDryRun = process.argv.includes('--dry-run');
|
||||
const isStats = process.argv.includes('--stats');
|
||||
|
||||
async function showStats() {
|
||||
const all = getJobsByStatus(['new', 'filtered']);
|
||||
const filtered = all.filter(j => j.status === 'filtered');
|
||||
const scored = all.filter(j => j.filter_score != null);
|
||||
const STATE_PATH = resolve(__dir, 'data/filter_state.json');
|
||||
const DEFAULT_MODEL = 'claude-sonnet-4-6-20251101';
|
||||
|
||||
console.log(`📊 Filter Stats\n`);
|
||||
console.log(` Filtered (blocked): ${filtered.length}`);
|
||||
console.log(` New (passed/unscored): ${all.length - filtered.length}`);
|
||||
console.log(` Total scored: ${scored.length}\n`);
|
||||
// ---------------------------------------------------------------------------
|
||||
// State helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function readState() {
|
||||
if (!existsSync(STATE_PATH)) return null;
|
||||
try { return JSON.parse(readFileSync(STATE_PATH, 'utf8')); } catch { return null; }
|
||||
}
|
||||
|
||||
function writeState(state) {
|
||||
writeFileSync(STATE_PATH, JSON.stringify(state, null, 2));
|
||||
}
|
||||
|
||||
function clearState() {
|
||||
if (existsSync(STATE_PATH)) unlinkSync(STATE_PATH);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Stats
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function showStats() {
|
||||
const queue = loadQueue();
|
||||
const byStatus = {};
|
||||
for (const j of queue) byStatus[j.status] = (byStatus[j.status] || 0) + 1;
|
||||
|
||||
const filtered = queue.filter(j => j.status === 'filtered');
|
||||
const scored = queue.filter(j => j.filter_score != null);
|
||||
|
||||
console.log('📊 Filter Stats\n');
|
||||
console.log(` New (unfiltered): ${byStatus['new'] || 0}`);
|
||||
console.log(` Filtered (blocked): ${byStatus['filtered'] || 0}`);
|
||||
console.log(` Total scored: ${scored.length}`);
|
||||
console.log(` Pass rate: ${scored.length > 0 ? Math.round((scored.filter(j => j.status !== 'filtered').length / scored.length) * 100) : 0}%\n`);
|
||||
|
||||
const state = readState();
|
||||
if (state) {
|
||||
console.log(` Pending batch: ${state.batch_id}`);
|
||||
console.log(` Submitted: ${state.submitted_at}`);
|
||||
console.log(` Job count: ${state.job_count}\n`);
|
||||
}
|
||||
|
||||
if (filtered.length > 0) {
|
||||
console.log(`Sample filtered jobs:`);
|
||||
filtered.slice(0, 10).forEach(j => {
|
||||
console.log(` [${j.filter_score}/10] ${j.title} @ ${j.company} — ${j.filter_reason}`);
|
||||
});
|
||||
console.log('Sample filtered:');
|
||||
filtered.slice(0, 10).forEach(j =>
|
||||
console.log(` [${j.filter_score}/10] ${j.title} @ ${j.company} — ${j.filter_reason}`)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Phase 1 — Collect results from a pending batch
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function collect(state, settings) {
|
||||
const apiKey = process.env.ANTHROPIC_API_KEY;
|
||||
console.log(`🔍 Checking batch ${state.batch_id}...`);
|
||||
|
||||
const { status, counts } = await checkBatch(state.batch_id, apiKey);
|
||||
|
||||
if (status === 'in_progress') {
|
||||
const total = Object.values(counts).reduce((a, b) => a + b, 0);
|
||||
const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0);
|
||||
console.log(` Still processing — ${done}/${total} complete. Check back later.`);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(` Batch ended. Downloading results...`);
|
||||
const results = await downloadResults(state.batch_id, apiKey);
|
||||
|
||||
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
|
||||
const globalMin = searchConfig.filter_min_score ?? 5;
|
||||
|
||||
let passed = 0, filtered = 0, errors = 0;
|
||||
|
||||
for (const { jobId, score, reason, error } of results) {
|
||||
if (error || score === null) {
|
||||
errors++;
|
||||
// Pass through on error — never block applications due to filter failure
|
||||
updateJobStatus(jobId, 'new', { filter_score: null, filter_reason: reason || 'filter_error' });
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find per-track threshold
|
||||
const queue = loadQueue();
|
||||
const job = queue.find(j => j.id === jobId);
|
||||
const track = job?.track || 'ae';
|
||||
const searchEntry = (searchConfig.searches || []).find(s => s.track === track);
|
||||
const minScore = searchEntry?.filter_min_score ?? globalMin;
|
||||
|
||||
if (score >= minScore) {
|
||||
passed++;
|
||||
updateJobStatus(jobId, 'new', { filter_score: score, filter_reason: reason });
|
||||
} else {
|
||||
filtered++;
|
||||
updateJobStatus(jobId, 'filtered', { filter_score: score, filter_reason: reason });
|
||||
}
|
||||
}
|
||||
|
||||
clearState();
|
||||
|
||||
const summary = `✅ Filter complete — ${passed} passed, ${filtered} filtered, ${errors} errors`;
|
||||
console.log(`\n${summary}`);
|
||||
|
||||
// Notify via Telegram
|
||||
await sendTelegram(settings,
|
||||
`🔍 *AI Filter complete*\n✅ Passed: ${passed}\n🚫 Filtered: ${filtered}\n⚠️ Errors: ${errors}`
|
||||
).catch(() => {}); // non-fatal
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Phase 2 — Submit a new batch
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function submit(settings, searchConfig, candidateProfile) {
|
||||
const apiKey = process.env.ANTHROPIC_API_KEY;
|
||||
|
||||
// Get all new jobs that haven't been scored yet
|
||||
const jobs = getJobsByStatus('new').filter(j => j.filter_score == null);
|
||||
|
||||
if (jobs.length === 0) {
|
||||
console.log('✅ Nothing to filter — all new jobs already scored.');
|
||||
return;
|
||||
}
|
||||
|
||||
// Build job profiles map by track
|
||||
const profilePaths = settings.filter?.job_profiles || {};
|
||||
const jobProfilesByTrack = {};
|
||||
for (const [track, path] of Object.entries(profilePaths)) {
|
||||
const profile = loadProfile(path);
|
||||
if (profile) jobProfilesByTrack[track] = profile;
|
||||
else console.warn(`⚠️ Could not load job profile for track "${track}" at ${path}`);
|
||||
}
|
||||
|
||||
// Filter out jobs with no profile (will pass through unscored)
|
||||
const filterable = jobs.filter(j => jobProfilesByTrack[j.track || 'ae']);
|
||||
const noProfile = jobs.length - filterable.length;
|
||||
|
||||
if (noProfile > 0) console.warn(`⚠️ ${noProfile} jobs skipped — no profile for their track`);
|
||||
|
||||
if (filterable.length === 0) {
|
||||
console.log('Nothing filterable — no job profiles configured for any track.');
|
||||
return;
|
||||
}
|
||||
|
||||
const model = settings.filter?.model || DEFAULT_MODEL;
|
||||
console.log(`🚀 Submitting batch — ${filterable.length} jobs, model: ${model}`);
|
||||
|
||||
const batchId = await submitBatch(filterable, jobProfilesByTrack, searchConfig, candidateProfile, model, apiKey);
|
||||
|
||||
writeState({
|
||||
batch_id: batchId,
|
||||
submitted_at: new Date().toISOString(),
|
||||
job_count: filterable.length,
|
||||
model,
|
||||
});
|
||||
|
||||
console.log(` Batch submitted: ${batchId}`);
|
||||
console.log(` Results typically ready in < 1 hour. Next run will collect.`);
|
||||
|
||||
// Notify
|
||||
await sendTelegram(settings,
|
||||
`🔍 *AI Filter submitted*\n${filterable.length} jobs queued for scoring\nBatch: \`${batchId}\``
|
||||
).catch(() => {});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Main
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function main() {
|
||||
if (isStats) {
|
||||
await showStats();
|
||||
showStats();
|
||||
return;
|
||||
}
|
||||
|
||||
const apiKey = process.env.ANTHROPIC_API_KEY;
|
||||
if (!apiKey) {
|
||||
console.error('❌ ANTHROPIC_API_KEY not set — filter requires Anthropic API');
|
||||
console.error('❌ ANTHROPIC_API_KEY not set');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const lock = acquireLock('filter', resolve(__dir, 'data'));
|
||||
|
||||
const settings = loadConfig(resolve(__dir, 'config/settings.json'));
|
||||
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
|
||||
const candidateProfile = loadConfig(resolve(__dir, 'config/profile.json'));
|
||||
|
||||
const jobs = getJobsByStatus('new');
|
||||
const globalMin = searchConfig.filter_min_score ?? 5;
|
||||
console.log('🔍 claw-apply: AI Job Filter\n');
|
||||
|
||||
console.log(`🔍 claw-apply: AI Job Filter${isDryRun ? ' (DRY RUN)' : ''}\n`);
|
||||
console.log(` Jobs to score: ${jobs.length}`);
|
||||
console.log(` Default threshold: ${globalMin}/10\n`);
|
||||
const state = readState();
|
||||
|
||||
if (jobs.length === 0) {
|
||||
console.log('Nothing to filter.');
|
||||
return;
|
||||
}
|
||||
|
||||
let passed = 0, filtered = 0, errors = 0;
|
||||
const filterLog = [];
|
||||
|
||||
const results = await runFilter(jobs, searchConfig, settings, candidateProfile, apiKey, {
|
||||
onProgress: (done, total, track) => {
|
||||
process.stdout.write(`\r [${track}] ${done}/${total} scored...`);
|
||||
}
|
||||
});
|
||||
|
||||
console.log('\n');
|
||||
|
||||
for (const { job, score, reason, pass, minScore } of results) {
|
||||
if (score === null) {
|
||||
errors++;
|
||||
continue;
|
||||
}
|
||||
|
||||
filterLog.push({ id: job.id, title: job.title, company: job.company, score, reason, pass, minScore });
|
||||
|
||||
if (pass) {
|
||||
passed++;
|
||||
if (!isDryRun) {
|
||||
updateJobStatus(job.id, 'new', { filter_score: score, filter_reason: reason });
|
||||
}
|
||||
} else {
|
||||
filtered++;
|
||||
if (!isDryRun) {
|
||||
updateJobStatus(job.id, 'filtered', { filter_score: score, filter_reason: reason });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`✅ Filter complete${isDryRun ? ' (no changes written)' : ''}`);
|
||||
console.log(` ✅ Passed: ${passed}`);
|
||||
console.log(` 🚫 Filtered: ${filtered}`);
|
||||
console.log(` ⚠️ Errors: ${errors} (passed through)`);
|
||||
console.log(` 📊 Pass rate: ${jobs.length > 0 ? Math.round((passed / jobs.length) * 100) : 0}%\n`);
|
||||
|
||||
if (isDryRun && filterLog.length > 0) {
|
||||
console.log(`Sample scores:`);
|
||||
filterLog.slice(0, 20).forEach(j => {
|
||||
const icon = j.pass ? '✅' : '🚫';
|
||||
console.log(` ${icon} [${j.score}/10] ${j.title} @ ${j.company} — ${j.reason}`);
|
||||
});
|
||||
if (state?.batch_id) {
|
||||
// Phase 1: collect results from pending batch
|
||||
await collect(state, settings);
|
||||
} else {
|
||||
// Phase 2: submit new batch
|
||||
await submit(settings, searchConfig, candidateProfile);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user