Source code for abstracts_explorer.evaluation

"""
Automatic Evaluation
====================

This module implements automatic evaluation of the RAG system via the
:class:`Evaluator` class.  It provides methods for:

* Generating evaluation Q/A pairs from the paper database and MCP tools
  using an LLM.
* Running the RAG pipeline on stored Q/A pairs and scoring the output
  with an LLM-as-judge approach.
* Computing summary statistics and formatting results for display.
"""

import json
import logging
import random
import re
import time
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional

from abstracts_explorer.config import get_config
from abstracts_explorer.database import DatabaseManager
from abstracts_explorer.embeddings import EmbeddingsManager
from abstracts_explorer.mcp_tools import execute_mcp_tool, format_tool_result_for_llm

logger = logging.getLogger(__name__)


[docs] class EvaluationError(Exception): """Exception raised for evaluation-related errors.""" pass
# --------------------------------------------------------------------------- # Prompt templates # --------------------------------------------------------------------------- _TOOL_DESCRIPTIONS: Dict[str, str] = { "search_papers": "Search for papers on a specific topic. Returns relevant papers from the database.", "get_conference_topics": "Get the main research topics of a conference.", "analyze_topic_relevance": ( "Analyze the relevance and popularity of a research topic by counting " "papers within a specified distance in embedding space." ), "get_topic_evolution": "Analyze how a specific topic has evolved over the years.", "get_cluster_visualization": "Generate visualization data for clustered paper embeddings.", } _TOOL_ARGUMENT_EXAMPLES: Dict[str, str] = { "search_papers": '{"topic_keywords": "deep learning", "n_results": 5}', "get_conference_topics": '{"conferences": ["NeurIPS"]}', "analyze_topic_relevance": '{"topic": "transformers", "distance_threshold": 1.1}', "get_topic_evolution": '{"topic_keywords": "reinforcement learning"}', "get_cluster_visualization": '{"n_clusters": 8, "reduction_method": "pca"}', } _QA_GENERATION_SYSTEM_PROMPT = """\ You are an expert evaluation dataset creator for a conference paper search and \ analysis system. The system allows users to search papers, analyze research \ topics, track topic evolution, and visualize clusters. Your task is to generate realistic evaluation queries that test the system's \ capabilities. Each entry consists of a user *query* and the *tool_arguments* \ needed to call the corresponding MCP tool. The actual expected answer will be \ produced by running the tool. IMPORTANT RULES: 1. Queries must be natural questions a researcher would ask. 2. Each query must clearly map to the specified MCP tool. 3. tool_arguments must be a valid JSON object matching the tool's parameter schema. 4. Base queries on the provided paper information (topics, titles, conferences, years). """ _QA_GENERATION_USER_PROMPT = """\ Generate {n_pairs} evaluation query(ies) for the MCP tool "{tool_name}". Tool description: {tool_description} Example tool arguments: {tool_arguments_example} Here are some papers from the database to base your queries on: {papers_context} Respond with a JSON array. Each element must have these keys: - "query": the user's natural-language question - "tool_arguments": a JSON object of arguments to call the "{tool_name}" tool Example format: [ {{ "query": "What are the main topics in NeurIPS 2025?", "tool_arguments": {{"n_clusters": 8}} }} ] Return ONLY the JSON array, no other text. """ _FOLLOWUP_GENERATION_PROMPT = """\ Given the following initial query and answer, generate {n_followups} natural \ follow-up question(s). The follow-ups should explore the topic deeper or ask \ for clarification. Initial query: {initial_query} Initial answer: {initial_answer} Tool to use: {tool_name} Example tool arguments: {tool_arguments_example} Available papers for context: {papers_context} Respond with a JSON array. Each element must have these keys: - "query": the follow-up question - "tool_arguments": a JSON object of arguments to call the "{tool_name}" tool Return ONLY the JSON array, no other text. """ _JUDGE_SYSTEM_PROMPT = """\ You are an impartial judge evaluating the quality of an AI assistant's answer \ about conference papers. Compare the actual answer to the expected reference \ answer. Score the answer on a scale of 1-5: 5 = Excellent: fully correct, comprehensive, well-structured 4 = Good: mostly correct with minor omissions 3 = Adequate: partially correct but missing important details 2 = Poor: significant inaccuracies or missing key information 1 = Bad: incorrect, irrelevant, or empty response Respond with ONLY a JSON object: {{"score": <1-5>, "reasoning": "<brief explanation>"}} """ _JUDGE_USER_PROMPT = """\ Query: {query} Expected answer: {expected_answer} Actual answer: {actual_answer} Score the actual answer (1-5) and explain briefly. """ # --------------------------------------------------------------------------- # Helper utilities (module-level, no Evaluator instance required) # --------------------------------------------------------------------------- def _parse_json_array(text: str) -> List[Dict[str, Any]]: """ Parse a JSON array from LLM output, tolerating markdown fences. Parameters ---------- text : str Raw LLM output that should contain a JSON array. Returns ------- list of dict Parsed array elements. Raises ------ EvaluationError If parsing fails. """ # Strip <think> blocks cleaned = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() # Strip markdown code fences cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned) cleaned = re.sub(r"\s*```$", "", cleaned) cleaned = cleaned.strip() try: parsed = json.loads(cleaned) except json.JSONDecodeError as exc: raise EvaluationError(f"Failed to parse LLM JSON output: {exc}\nRaw output:\n{text[:500]}") from exc if not isinstance(parsed, list): raise EvaluationError(f"Expected JSON array, got {type(parsed).__name__}") return parsed
[docs] def format_eval_summary(summary: Dict[str, Any], run_id: str) -> str: """ Format an evaluation run summary for display. Parameters ---------- summary : dict Summary from :meth:`DatabaseManager.get_eval_run_summary`. run_id : str The evaluation run identifier. Returns ------- str Human-readable summary string. """ lines = [ f"Evaluation Run: {run_id}", "=" * 50, ] run_date = summary.get("run_date") if run_date is not None: date_str = run_date.strftime("%Y-%m-%d %H:%M:%S") if isinstance(run_date, datetime) else str(run_date) lines.append(f"Run date: {date_str}") lines.append(f"Total pairs evaluated: {summary['total']}") avg = summary.get("avg_score") if avg is not None: lines.append(f"Average answer score: {avg:.2f} / 5.00") else: lines.append("Average answer score: N/A") acc = summary.get("tool_accuracy") if acc is not None: lines.append(f"Tool selection accuracy: {acc * 100:.1f}%") else: lines.append("Tool selection accuracy: N/A") lat = summary.get("avg_latency_ms") if lat is not None: lines.append(f"Average latency: {lat:.0f} ms") else: lines.append("Average latency: N/A") lines.append(f"Errors: {summary.get('error_count', 0)}") return "\n".join(lines)
[docs] def format_eval_result_detail(result: Dict[str, Any], qa_pair: Optional[Dict[str, Any]] = None) -> str: """ Format a single evaluation result for display. Parameters ---------- result : dict Result row from :meth:`DatabaseManager.get_eval_results`. qa_pair : dict, optional Corresponding Q/A pair for additional context. Returns ------- str Human-readable detail string. """ lines = [f"Result #{result['id']} (run: {result['run_id']})"] lines.append("-" * 50) if qa_pair: lines.append(f"Query: {qa_pair['query']}") lines.append(f"Expected: {qa_pair['expected_answer']}") else: lines.append(f"QA Pair ID: {result['qa_pair_id']}") actual = result.get("actual_answer") or "(no answer)" lines.append(f"Actual: {actual}") score = result.get("answer_score") lines.append(f"Score: {score}/5" if score else "Score: N/A") tc = result.get("tool_correct") if tc is not None: expected_tool = qa_pair.get("tool_name", "?") if qa_pair else "?" lines.append( f"Tool: {'✅ correct' if tc else '❌ wrong'}" f" (expected: {expected_tool}, got: {result.get('actual_tool_name', '?')})" ) if result.get("latency_ms"): lines.append(f"Latency: {result['latency_ms']} ms") if result.get("error"): lines.append(f"Error: {result['error']}") reasoning = result.get("judge_reasoning") if reasoning: lines.append(f"Judge: {reasoning}") return "\n".join(lines)
# --------------------------------------------------------------------------- # Evaluator class # ---------------------------------------------------------------------------
[docs] class Evaluator: """ Automatic evaluation of the RAG system. Wraps all evaluation operations — Q/A pair generation, evaluation execution, and result storage — sharing a single :class:`~abstracts_explorer.embeddings.EmbeddingsManager` and its ``openai_client`` for LLM calls. Parameters ---------- embeddings_manager : EmbeddingsManager A connected embeddings manager. Its ``openai_client`` property is used for all LLM calls (generation and judging). db : DatabaseManager A connected database manager. model : str, optional Chat model name. Falls back to config default. Examples -------- >>> em = EmbeddingsManager() >>> em.connect() >>> with DatabaseManager() as db: ... evaluator = Evaluator(em, db) ... pairs = evaluator.generate_qa_pairs(n_pairs_per_tool=2) ... evaluator.store_qa_pairs(pairs) ... run_id = evaluator.run_evaluation() ... print(evaluator.format_run_summary(run_id)) """
[docs] def __init__( self, embeddings_manager: EmbeddingsManager, db: DatabaseManager, model: Optional[str] = None, ): if embeddings_manager is None: raise EvaluationError("embeddings_manager is required.") if db is None: raise EvaluationError("db is required.") config = get_config() self.embeddings_manager = embeddings_manager self.db = db self.model = model or config.chat_model
@property def openai_client(self): """ OpenAI client shared with the embeddings manager. Returns ------- OpenAI The lazily-initialised client from ``EmbeddingsManager``. """ return self.embeddings_manager.openai_client # ------------------------------------------------------------------ # Q/A pair generation # ------------------------------------------------------------------ def _sample_papers_context(self, n_papers: int = 10) -> str: """ Sample random papers from the database and format them as context. Parameters ---------- n_papers : int Number of papers to sample. Returns ------- str Formatted paper context string. """ total = self.db.get_paper_count() if total == 0: return "(no papers in database)" # Fetch a random page of papers all_papers = self.db.search_papers(limit=min(total, 200)) sampled = random.sample(all_papers, min(n_papers, len(all_papers))) lines: List[str] = [] for i, p in enumerate(sampled, 1): title = p.get("title", "Untitled") abstract = (p.get("abstract") or "")[:200] year = p.get("year", "") conf = p.get("conference", "") keywords = p.get("keywords", "") lines.append(f"Paper {i}: {title} ({conf} {year})\n Keywords: {keywords}\n Abstract: {abstract}...") return "\n\n".join(lines)
[docs] def generate_qa_pairs( self, n_pairs_per_tool: int = 2, tools: Optional[List[str]] = None, generate_followups: bool = True, n_followups: int = 1, ) -> List[Dict[str, Any]]: """ Generate evaluation Q/A pairs using the LLM. For each requested MCP tool a set of query/answer pairs is generated based on papers sampled from the database. Optionally, follow-up questions are generated for each initial pair. Parameters ---------- n_pairs_per_tool : int Number of initial Q/A pairs to generate per tool. tools : list of str, optional MCP tool names to generate pairs for. Defaults to all tools. generate_followups : bool Whether to generate follow-up questions. n_followups : int Number of follow-up turns per initial pair. Returns ------- list of dict Generated pairs, each with keys: ``conversation_id``, ``turn_number``, ``query``, ``expected_answer``, ``tool_name``, ``source_info``. Raises ------ EvaluationError If generation fails. """ available_tools = list(_TOOL_DESCRIPTIONS.keys()) target_tools = tools if tools else available_tools invalid = set(target_tools) - set(available_tools) if invalid: raise EvaluationError(f"Unknown tool(s): {', '.join(sorted(invalid))}") papers_context = self._sample_papers_context() all_pairs: List[Dict[str, Any]] = [] for tool_name in target_tools: logger.info(f"Generating {n_pairs_per_tool} Q/A pair(s) for tool: {tool_name}") tool_desc = _TOOL_DESCRIPTIONS[tool_name] tool_args_example = _TOOL_ARGUMENT_EXAMPLES[tool_name] user_prompt = _QA_GENERATION_USER_PROMPT.format( n_pairs=n_pairs_per_tool, tool_name=tool_name, tool_description=tool_desc, tool_arguments_example=tool_args_example, papers_context=papers_context, ) try: resp = self.openai_client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": _QA_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=0.7, max_tokens=2000, timeout=120, ) raw = resp.choices[0].message.content or "" pairs = _parse_json_array(raw) except EvaluationError: raise except Exception as exc: raise EvaluationError(f"LLM call failed for tool {tool_name}: {exc}") from exc for pair in pairs[:n_pairs_per_tool]: conv_id = uuid.uuid4().hex[:12] query = pair.get("query", "") tool_args = pair.get("tool_arguments", {}) if isinstance(tool_args, str): try: tool_args = json.loads(tool_args) except json.JSONDecodeError: tool_args = {} # Execute the MCP tool to obtain the real expected answer expected_answer = self._run_tool_for_answer(tool_name, tool_args, conv_id, turn=0) if expected_answer is None: continue # tool returned an error; skip this pair entry = { "conversation_id": conv_id, "turn_number": 0, "query": query, "expected_answer": expected_answer, "tool_name": tool_name, "source_info": json.dumps({"model": self.model, "tool": tool_name, "tool_args": tool_args}), } all_pairs.append(entry) logger.info( f"[{tool_name}] turn 0 (conv {conv_id})\n" f" Query: {entry['query']}\n" f" Expected: {entry['expected_answer']}" ) # Generate follow-ups if generate_followups and n_followups > 0: followup_prompt = _FOLLOWUP_GENERATION_PROMPT.format( n_followups=n_followups, initial_query=entry["query"], initial_answer=entry["expected_answer"], tool_name=tool_name, tool_arguments_example=tool_args_example, papers_context=papers_context, ) try: fu_resp = self.openai_client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": _QA_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": followup_prompt}, ], temperature=0.7, max_tokens=2000, timeout=120, ) fu_raw = fu_resp.choices[0].message.content or "" followups = _parse_json_array(fu_raw) except Exception as exc: logger.warning(f"Failed to generate follow-ups for conv {conv_id}: {exc}") followups = [] for idx, fu in enumerate(followups[:n_followups], 1): fu_query = fu.get("query", "") fu_tool_args = fu.get("tool_arguments", {}) if isinstance(fu_tool_args, str): try: fu_tool_args = json.loads(fu_tool_args) except json.JSONDecodeError: fu_tool_args = {} fu_answer = self._run_tool_for_answer(tool_name, fu_tool_args, conv_id, turn=idx) if fu_answer is None: continue # tool returned an error; skip this follow-up fu_entry = { "conversation_id": conv_id, "turn_number": idx, "query": fu_query, "expected_answer": fu_answer, "tool_name": tool_name, "source_info": json.dumps( { "model": self.model, "tool": tool_name, "followup_of": conv_id, "tool_args": fu_tool_args, } ), } all_pairs.append(fu_entry) logger.info( f"[{tool_name}] turn {idx} (conv {conv_id})\n" f" Query: {fu_entry['query']}\n" f" Expected: {fu_entry['expected_answer']}" ) logger.info(f"Generated {len(all_pairs)} Q/A pairs total") return all_pairs
def _run_tool_for_answer( self, tool_name: str, tool_args: Dict[str, Any], conv_id: str, turn: int ) -> Optional[str]: """ Execute an MCP tool and return its formatted output as an expected answer. Parameters ---------- tool_name : str The MCP tool to execute. tool_args : Dict[str, Any] Arguments for the tool call. conv_id : str Conversation ID (used only for log messages). turn : int Turn number (used only for log messages). Returns ------- str or None Formatted tool output, or ``None`` if the tool returned an error or the result is not a valid JSON string. """ try: raw_result = execute_mcp_tool(tool_name, tool_args) if not isinstance(raw_result, str): logger.warning( f"MCP tool {tool_name} returned unexpected type {type(raw_result).__name__} " f"(conv {conv_id}, turn {turn}) — skipping pair" ) return None try: result_data = json.loads(raw_result) except json.JSONDecodeError as exc: logger.warning( f"MCP tool {tool_name} returned invalid JSON (conv {conv_id}, turn {turn}): {exc} — skipping pair" ) return None if isinstance(result_data, dict) and "error" in result_data: logger.warning( f"MCP tool {tool_name} returned error (conv {conv_id}, turn {turn}): " f"{result_data['error']} — skipping pair" ) return None return format_tool_result_for_llm(tool_name, raw_result) except Exception as exc: logger.warning(f"MCP tool {tool_name} failed (conv {conv_id}, turn {turn}): {exc} — skipping pair") return None
[docs] def store_qa_pairs(self, pairs: List[Dict[str, Any]]) -> int: """ Persist generated Q/A pairs into the database. Parameters ---------- pairs : list of dict Pairs as returned by :meth:`generate_qa_pairs`. Returns ------- int Number of pairs stored. """ count = 0 for p in pairs: self.db.add_eval_qa_pair( conversation_id=p["conversation_id"], turn_number=p["turn_number"], query=p["query"], expected_answer=p["expected_answer"], tool_name=p.get("tool_name"), source_info=p.get("source_info"), ) count += 1 return count
# ------------------------------------------------------------------ # Evaluation runner # ------------------------------------------------------------------ def _judge_answer( self, query: str, expected_answer: str, actual_answer: str, ) -> Dict[str, Any]: """ Use LLM-as-judge to score an answer. Parameters ---------- query : str Original query. expected_answer : str Reference answer. actual_answer : str RAG system output. Returns ------- dict ``{"score": int, "reasoning": str}`` """ user_prompt = _JUDGE_USER_PROMPT.format( query=query, expected_answer=expected_answer, actual_answer=actual_answer, ) try: resp = self.openai_client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": _JUDGE_SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=0.1, max_tokens=300, timeout=60, ) raw = resp.choices[0].message.content or "" logger.debug(f"Judge raw response: {raw[:200]}") # Strip <think> blocks cleaned = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip() # Strip markdown fences cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.MULTILINE) cleaned = re.sub(r"\s*```$", "", cleaned, flags=re.MULTILINE) cleaned = cleaned.strip() # Use raw_decode to extract the first JSON object even if the model # adds preamble text (e.g. "Here is my evaluation: {…}") decoder = json.JSONDecoder() start = cleaned.find("{") if start == -1: raise ValueError(f"No JSON object found in judge response: {cleaned[:200]!r}") parsed, _ = decoder.raw_decode(cleaned, start) score_raw = parsed.get("score") if score_raw is None: logger.warning("Judge returned null score, defaulting to 3") score = 3 else: score = max(1, min(5, int(score_raw))) return {"score": score, "reasoning": parsed.get("reasoning", "")} except Exception as exc: logger.warning(f"Judge scoring failed: {exc}") return {"score": None, "reasoning": f"Judge error: {exc}"}
[docs] def run_evaluation( self, verified_only: bool = True, limit: Optional[int] = None, ) -> str: """ Run evaluation on stored Q/A pairs and record results. Executes each stored query through the RAG system, scores the output with an LLM judge, and stores the results in the database. Parameters ---------- verified_only : bool If ``True``, only evaluate verified pairs (default). limit : int, optional Maximum number of pairs to evaluate. Returns ------- str The ``run_id`` for the evaluation run. Raises ------ EvaluationError If evaluation fails. """ from abstracts_explorer.rag import RAGChat pairs = self.db.get_eval_qa_pairs(verified_only=verified_only, limit=limit) if not pairs: raise EvaluationError("No Q/A pairs found for evaluation. Generate and verify pairs first.") run_id = f"eval-{uuid.uuid4().hex[:8]}" logger.info(f"Starting evaluation run {run_id} with {len(pairs)} pair(s)") # Group pairs by conversation for multi-turn handling conversations: Dict[str, List[Dict[str, Any]]] = {} for p in pairs: conv_id = p["conversation_id"] conversations.setdefault(conv_id, []).append(p) for turns in conversations.values(): turns.sort(key=lambda x: x["turn_number"]) # Initialise a RAGChat instance for each conversation for conv_id, turns in conversations.items(): rag = RAGChat( embeddings_manager=self.embeddings_manager, database=self.db, model=self.model, ) for pair in turns: qa_pair_id = pair["id"] query_text = pair["query"] expected = pair["expected_answer"] expected_tool = pair.get("tool_name") start = time.time() actual_answer = None actual_tool = None error_msg = None try: result = rag.query(query_text) actual_answer = (result.get("response") or "").strip() tools_used = result.get("metadata", {}).get("tools_executed", []) actual_tool = tools_used[0] if tools_used else None except Exception as exc: error_msg = str(exc) logger.warning(f"Query failed for pair {qa_pair_id}: {exc}") elapsed_ms = int((time.time() - start) * 1000) # Tool correctness tool_correct = None if expected_tool and actual_tool is not None: tool_correct = 1 if actual_tool == expected_tool else 0 # LLM judge scoring judge_result: Dict[str, Any] = {"score": None, "reasoning": ""} if actual_answer and not error_msg: judge_result = self._judge_answer(query_text, expected, actual_answer) elif not error_msg: logger.warning(f"Skipping judge for pair {qa_pair_id}: empty response from RAG") self.db.add_eval_result( run_id=run_id, qa_pair_id=qa_pair_id, actual_answer=actual_answer, actual_tool_name=actual_tool, answer_score=judge_result.get("score"), tool_correct=tool_correct, latency_ms=elapsed_ms, error=error_msg, judge_reasoning=judge_result.get("reasoning", ""), ) logger.info(f"Evaluation run {run_id} complete") return run_id
# ------------------------------------------------------------------ # Convenience formatting delegators # ------------------------------------------------------------------
[docs] def format_run_summary(self, run_id: str) -> str: """ Compute and format the summary for *run_id*. Parameters ---------- run_id : str Evaluation run identifier. Returns ------- str Human-readable summary. """ summary = self.db.get_eval_run_summary(run_id) return format_eval_summary(summary, run_id)