feat: one batch per track — separate GTM/AE batches with their own system prompts

- submitBatch → submitBatches: groups jobs by track, submits one batch each
- filter_state.json now stores batches[] array instead of single batch_id
- Collect waits for all batches to finish before processing
- Each track gets its own cached system prompt = better caching + cleaner scoring
- Idempotent collect: skips already-scored jobs
This commit is contained in:
2026-03-06 11:35:15 +00:00
parent aadec0704b
commit c88a71fc20
2 changed files with 103 additions and 100 deletions

View File

@@ -24,7 +24,7 @@ import { readFileSync, writeFileSync, existsSync, unlinkSync } from 'fs';
const __dir = dirname(fileURLToPath(import.meta.url));
import { getJobsByStatus, updateJobStatus, loadConfig, loadQueue, saveQueue } from './lib/queue.mjs';
import { loadProfile, submitBatch, checkBatch, downloadResults } from './lib/filter.mjs';
import { loadProfile, submitBatches, checkBatch, downloadResults } from './lib/filter.mjs';
import { sendTelegram } from './lib/notify.mjs';
const isStats = process.argv.includes('--stats');
@@ -83,44 +83,53 @@ function showStats() {
}
// ---------------------------------------------------------------------------
// Phase 1 — Collect results from a pending batch
// Phase 1 — Collect results from all pending batches
// ---------------------------------------------------------------------------
async function collect(state, settings) {
const apiKey = process.env.ANTHROPIC_API_KEY;
console.log(`🔍 Checking batch ${state.batch_id}...`);
const batches = state.batches; // array of { track, batchId, idMap, jobCount }
const { status, counts } = await checkBatch(state.batch_id, apiKey);
// Check status of all batches
let allDone = true;
for (const b of batches) {
const { status, counts } = await checkBatch(b.batchId, apiKey);
b._status = status;
b._counts = counts;
if (status === 'in_progress') {
const total = Object.values(counts).reduce((a, b) => a + b, 0);
const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0);
console.log(` [${b.track}] Still processing — ${done}/${total} complete`);
allDone = false;
} else {
console.log(` [${b.track}] Done — ${counts.succeeded || 0} succeeded, ${counts.errored || 0} errors`);
}
}
if (status === 'in_progress') {
const total = Object.values(counts).reduce((a, b) => a + b, 0);
const done = (counts.succeeded || 0) + (counts.errored || 0) + (counts.canceled || 0) + (counts.expired || 0);
console.log(` Still processing — ${done}/${total} complete. Check back later.`);
if (!allDone) {
console.log(' Not all batches done yet. Check back later.');
return;
}
console.log(` Batch ended. Downloading results...`);
const results = await downloadResults(state.batch_id, apiKey, state.id_map || {});
// All done — download and merge all results
console.log('\n All batches complete. Downloading results...');
const resultMap = {};
for (const b of batches) {
const results = await downloadResults(b.batchId, apiKey, b.idMap || {});
for (const r of results) resultMap[r.jobId] = r;
}
const searchConfig = loadConfig(resolve(__dir, 'config/search_config.json'));
const globalMin = searchConfig.filter_min_score ?? 5;
let passed = 0, filtered = 0, errors = 0;
// Build a lookup map from results for O(1) access
const resultMap = {};
for (const r of results) resultMap[r.jobId] = r;
// Load queue once, apply all updates in memory, save once
const queue = loadQueue();
const now = new Date().toISOString();
for (const job of queue) {
const r = resultMap[job.id];
if (!r) continue;
// Skip jobs already scored (idempotent — safe to re-run collect)
if (job.filter_score != null) continue;
if (job.filter_score != null) continue; // already scored — idempotent
if (r.error || r.score === null) {
errors++;
@@ -138,28 +147,20 @@ async function collect(state, settings) {
job.filter_reason = r.reason;
job.status_updated_at = now;
if (r.score >= minScore) {
passed++;
// keep status as 'new'
} else {
filtered++;
job.status = 'filtered';
}
if (r.score >= minScore) { passed++; }
else { filtered++; job.status = 'filtered'; }
}
// Single write for all updates
saveQueue(queue);
clearState();
// Append to filter run history
// Log run
const runsPath = resolve(__dir, 'data/filter_runs.json');
const runs = existsSync(runsPath) ? JSON.parse(readFileSync(runsPath, 'utf8')) : [];
runs.push({
batch_id: state.batch_id,
batches: batches.map(b => ({ track: b.track, batch_id: b.batchId, job_count: b.jobCount })),
submitted_at: state.submitted_at,
collected_at: new Date().toISOString(),
job_count: state.job_count,
model: state.model,
passed,
filtered,
@@ -170,10 +171,9 @@ async function collect(state, settings) {
const summary = `✅ Filter complete — ${passed} passed, ${filtered} filtered, ${errors} errors`;
console.log(`\n${summary}`);
// Notify via Telegram
await sendTelegram(settings,
`🔍 *AI Filter complete*\n✅ Passed: ${passed}\n🚫 Filtered: ${filtered}\n⚠️ Errors: ${errors}`
).catch(() => {}); // non-fatal
).catch(() => {});
}
// ---------------------------------------------------------------------------
@@ -212,37 +212,38 @@ async function submit(settings, searchConfig, candidateProfile) {
}
const model = settings.filter?.model || DEFAULT_MODEL;
console.log(`🚀 Submitting batch — ${filterable.length} jobs, model: ${model}`);
const submittedAt = new Date().toISOString();
console.log(`🚀 Submitting batches — ${filterable.length} jobs across ${Object.keys(jobProfilesByTrack).length} tracks, model: ${model}`);
const { batchId, idMap } = await submitBatch(filterable, jobProfilesByTrack, searchConfig, candidateProfile, model, apiKey);
const submitted = await submitBatches(filterable, jobProfilesByTrack, candidateProfile, model, apiKey);
writeState({
batches: submitted,
submitted_at: submittedAt,
job_count: filterable.length,
model,
});
// Stamp each job with its track's batch ID
const trackToBatchId = {};
for (const b of submitted) trackToBatchId[b.track] = b.batchId;
// Stamp each job with the batch ID so they're excluded from future submissions
const allJobs = loadQueue();
const filteredIds = new Set(filterable.map(j => j.id));
for (const job of allJobs) {
if (filteredIds.has(job.id)) {
const batchId = trackToBatchId[job.track || 'ae'];
if (batchId && !job.filter_batch_id) {
job.filter_batch_id = batchId;
job.filter_submitted_at = submittedAt;
}
}
saveQueue(allJobs);
const submittedAt = new Date().toISOString();
writeState({
batch_id: batchId,
submitted_at: submittedAt,
job_count: filterable.length,
model,
tracks: Object.keys(jobProfilesByTrack),
id_map: idMap,
});
console.log(` Batch submitted: ${batchId}`);
const batchSummary = submitted.map(b => `${b.track}: ${b.jobCount} jobs`).join(', ');
console.log(` ${batchSummary}`);
console.log(` Results typically ready in < 1 hour. Next run will collect.`);
// Notify
await sendTelegram(settings,
`🔍 *AI Filter submitted*\n${filterable.length} jobs queued for scoring\nBatch: \`${batchId}\``
`🔍 *AI Filter submitted*\n${filterable.length} jobs across ${submitted.length} tracks\n${batchSummary}`
).catch(() => {});
}
@@ -270,11 +271,11 @@ async function main() {
const state = readState();
if (state?.batch_id) {
// Phase 1: collect results from pending batch
if (state?.batches?.length > 0) {
// Phase 1: collect results from all pending batches
await collect(state, settings);
} else {
// Phase 2: submit new batch
// Phase 2: submit new batches (one per track)
await submit(settings, searchConfig, candidateProfile);
}
}