From c88a71fc2003059d4432c59073d1e4eff90c664c Mon Sep 17 00:00:00 2001 From: Claw Date: Fri, 6 Mar 2026 11:35:15 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20one=20batch=20per=20track=20=E2=80=94?= =?UTF-8?q?=20separate=20GTM/AE=20batches=20with=20their=20own=20system=20?= =?UTF-8?q?prompts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - submitBatch → submitBatches: groups jobs by track, submits one batch each - filter_state.json now stores batches[] array instead of single batch_id - Collect waits for all batches to finish before processing - Each track gets its own cached system prompt = better caching + cleaner scoring - Idempotent collect: skips already-scored jobs --- job_filter.mjs | 109 +++++++++++++++++++++++++------------------------ lib/filter.mjs | 94 +++++++++++++++++++++--------------------- 2 files changed, 103 insertions(+), 100 deletions(-) diff --git a/job_filter.mjs b/job_filter.mjs index 8a9d6fa..c6e4a72 100644 --- a/job_filter.mjs +++ b/job_filter.mjs @@ -24,7 +24,7 @@ import { readFileSync, writeFileSync, existsSync, unlinkSync } from 'fs'; const __dir = dirname(fileURLToPath(import.meta.url)); import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue } from './lib/queue.mjs'; -import { loadProfile, submitBatch, checkBatch, downloadResults } from './lib/filter.mjs'; +import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs'; import { sendTelegram } from './lib/notify.mjs'; const isStats = process.argv.includes('--stats'); @@ -83,44 +83,53 @@ function showStats() { } // --------------------------------------------------------------------------- -// Phase 1 — Collect results from a pending batch +// Phase 1 — Collect results from all pending batches // --------------------------------------------------------------------------- async function collect(state, settings) { const apiKey = process.env.ANTHROPIC_API_KEY; - console.log(`šŸ” Checking batch ${state.batch_id}...`); + const batches = state.batches; // array of { track, batchId, idMap, jobCount } - const { status, counts } = await checkBatch(state.batch_id, apiKey); + // Check status of all batches + let allDone = true; + for (const b of batches) { + const { status, counts } = await checkBatch(b.batchId, apiKey); + b._status = status; + b._counts = counts; + if (status === 'in_progress') { + const total = Object.values(counts).reduce((a, b) => a + b, 0); + const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0); + console.log(` [${b.track}] Still processing — ${done}/${total} complete`); + allDone = false; + } else { + console.log(` [${b.track}] Done — ${counts.succeeded || 0} succeeded, ${counts.errored || 0} errors`); + } + } - if (status === 'in_progress') { - const total = Object.values(counts).reduce((a, b) => a + b, 0); - const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0); - console.log(` Still processing — ${done}/${total} complete. Check back later.`); + if (!allDone) { + console.log(' Not all batches done yet. Check back later.'); return; } - console.log(` Batch ended. Downloading results...`); - const results = await downloadResults(state.batch_id, apiKey, state.id_map || {}); + // All done — download and merge all results + console.log('\n All batches complete. Downloading results...'); + const resultMap = {}; + for (const b of batches) { + const results = await downloadResults(b.batchId, apiKey, b.idMap || {}); + for (const r of results) resultMap[r.jobId] = r; + } const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json')); const globalMin = searchConfig.filter_min_score ?? 5; let passed = 0, filtered = 0, errors = 0; - - // Build a lookup map from results for O(1) access - const resultMap = {}; - for (const r of results) resultMap[r.jobId] = r; - - // Load queue once, apply all updates in memory, save once const queue = loadQueue(); const now = new Date().toISOString(); for (const job of queue) { const r = resultMap[job.id]; if (!r) continue; - - // Skip jobs already scored (idempotent — safe to re-run collect) - if (job.filter_score != null) continue; + if (job.filter_score != null) continue; // already scored — idempotent if (r.error || r.score === null) { errors++; @@ -138,28 +147,20 @@ async function collect(state, settings) { job.filter_reason = r.reason; job.status_updated_at = now; - if (r.score >= minScore) { - passed++; - // keep status as 'new' - } else { - filtered++; - job.status = 'filtered'; - } + if (r.score >= minScore) { passed++; } + else { filtered++; job.status = 'filtered'; } } - // Single write for all updates saveQueue(queue); - clearState(); - // Append to filter run history + // Log run const runsPath = resolve(__dir, 'data/filter_runs.json'); const runs = existsSync(runsPath) ? JSON.parse(readFileSync(runsPath, 'utf8')) : []; runs.push({ - batch_id: state.batch_id, + batches: batches.map(b => ({ track: b.track, batch_id: b.batchId, job_count: b.jobCount })), submitted_at: state.submitted_at, collected_at: new Date().toISOString(), - job_count: state.job_count, model: state.model, passed, filtered, @@ -170,10 +171,9 @@ async function collect(state, settings) { const summary = `āœ… Filter complete — ${passed} passed, ${filtered} filtered, ${errors} errors`; console.log(`\n${summary}`); - // Notify via Telegram await sendTelegram(settings, `šŸ” *AI Filter complete*\nāœ… Passed: ${passed}\n🚫 Filtered: ${filtered}\nāš ļø Errors: ${errors}` - ).catch(() => {}); // non-fatal + ).catch(() => {}); } // --------------------------------------------------------------------------- @@ -212,37 +212,38 @@ async function submit(settings, searchConfig, candidateProfile) { } const model = settings.filter?.model || DEFAULT_MODEL; - console.log(`šŸš€ Submitting batch — ${filterable.length} jobs, model: ${model}`); + const submittedAt = new Date().toISOString(); + console.log(`šŸš€ Submitting batches — ${filterable.length} jobs across ${Object.keys(jobProfilesByTrack).length} tracks, model: ${model}`); - const { batchId, idMap } = await submitBatch(filterable, jobProfilesByTrack, searchConfig, candidateProfile, model, apiKey); + const submitted = await submitBatches(filterable, jobProfilesByTrack, candidateProfile, model, apiKey); + + writeState({ + batches: submitted, + submitted_at: submittedAt, + job_count: filterable.length, + model, + }); + + // Stamp each job with its track's batch ID + const trackToBatchId = {}; + for (const b of submitted) trackToBatchId[b.track] = b.batchId; - // Stamp each job with the batch ID so they're excluded from future submissions const allJobs = loadQueue(); - const filteredIds = new Set(filterable.map(j => j.id)); for (const job of allJobs) { - if (filteredIds.has(job.id)) { + const batchId = trackToBatchId[job.track || 'ae']; + if (batchId && !job.filter_batch_id) { job.filter_batch_id = batchId; job.filter_submitted_at = submittedAt; } } saveQueue(allJobs); - const submittedAt = new Date().toISOString(); - writeState({ - batch_id: batchId, - submitted_at: submittedAt, - job_count: filterable.length, - model, - tracks: Object.keys(jobProfilesByTrack), - id_map: idMap, - }); - - console.log(` Batch submitted: ${batchId}`); + const batchSummary = submitted.map(b => `${b.track}: ${b.jobCount} jobs`).join(', '); + console.log(` ${batchSummary}`); console.log(` Results typically ready in < 1 hour. Next run will collect.`); - // Notify await sendTelegram(settings, - `šŸ” *AI Filter submitted*\n${filterable.length} jobs queued for scoring\nBatch: \`${batchId}\`` + `šŸ” *AI Filter submitted*\n${filterable.length} jobs across ${submitted.length} tracks\n${batchSummary}` ).catch(() => {}); } @@ -270,11 +271,11 @@ async function main() { const state = readState(); - if (state?.batch_id) { - // Phase 1: collect results from pending batch + if (state?.batches?.length > 0) { + // Phase 1: collect results from all pending batches await collect(state, settings); } else { - // Phase 2: submit new batch + // Phase 2: submit new batches (one per track) await submit(settings, searchConfig, candidateProfile); } } diff --git a/lib/filter.mjs b/lib/filter.mjs index b88b744..76c4346 100644 --- a/lib/filter.mjs +++ b/lib/filter.mjs @@ -97,61 +97,63 @@ function apiHeaders(apiKey) { } /** - * Submit all jobs as a single Anthropic batch. - * System prompt is marked cache_control=ephemeral so it's cached across requests. - * Returns the batch ID. + * Submit one batch per track (one per job profile/search description). + * Each batch uses the system prompt for that track only — maximizes prompt caching. + * Returns array of { track, batchId, idMap, jobCount } */ -export async function submitBatch(jobs, jobProfilesByTrack, searchConfig, candidateProfile, model, apiKey) { - const globalMin = searchConfig.filter_min_score ?? 5; - - const requests = []; - const idMap = {}; // custom_id → job.id (handles truncation edge cases) - +export async function submitBatches(jobs, jobProfilesByTrack, candidateProfile, model, apiKey) { + // Group jobs by track + const byTrack = {}; for (const job of jobs) { const track = job.track || 'ae'; + if (!jobProfilesByTrack[track]) continue; // no profile → skip + if (!byTrack[track]) byTrack[track] = []; + byTrack[track].push(job); + } + + if (Object.keys(byTrack).length === 0) throw new Error('No jobs to submit — check job profiles are configured'); + + const submitted = []; + + for (const [track, trackJobs] of Object.entries(byTrack)) { const jobProfile = jobProfilesByTrack[track]; - if (!jobProfile) continue; // no profile → skip (caller handles this) - const systemPrompt = buildSystemPrompt(jobProfile, candidateProfile); - // Anthropic custom_id max 64 chars - const customId = job.id.length <= 64 ? job.id : job.id.substring(0, 64); - idMap[customId] = job.id; + const idMap = {}; + const requests = []; - requests.push({ - custom_id: customId, - params: { - model, - max_tokens: 1024, - system: [ - { - type: 'text', - text: systemPrompt, - cache_control: { type: 'ephemeral' }, // cache the shared context - } - ], - messages: [ - { role: 'user', content: buildJobMessage(job) } - ], - } + for (const job of trackJobs) { + // Anthropic custom_id max 64 chars + const customId = job.id.length <= 64 ? job.id : job.id.substring(0, 64); + idMap[customId] = job.id; + + requests.push({ + custom_id: customId, + params: { + model, + max_tokens: 1024, + system: [{ type: 'text', text: systemPrompt, cache_control: { type: 'ephemeral' } }], + messages: [{ role: 'user', content: buildJobMessage(job) }], + } + }); + } + + const res = await fetch(BATCH_API, { + method: 'POST', + headers: apiHeaders(apiKey), + body: JSON.stringify({ requests }), }); + + if (!res.ok) { + const err = await res.text(); + throw new Error(`Batch submit failed for track "${track}" ${res.status}: ${err}`); + } + + const data = await res.json(); + submitted.push({ track, batchId: data.id, idMap, jobCount: trackJobs.length }); + console.log(` [${track}] ${trackJobs.length} jobs → batch ${data.id}`); } - if (requests.length === 0) throw new Error('No requests to submit — check job profiles are configured'); - - // Return idMap alongside batch ID so collector can resolve truncated IDs - const res = await fetch(BATCH_API, { - method: 'POST', - headers: apiHeaders(apiKey), - body: JSON.stringify({ requests }), - }); - - if (!res.ok) { - const err = await res.text(); - throw new Error(`Batch submit failed ${res.status}: ${err}`); - } - - const data = await res.json(); - return { batchId: data.id, idMap }; + return submitted; } /**