"""
Automatic Evaluation
====================
This module implements automatic evaluation of the RAG system via the
:class:`Evaluator` class. It provides methods for:
* Generating evaluation Q/A pairs from the paper database and MCP tools
using an LLM.
* Running the RAG pipeline on stored Q/A pairs and scoring the output
with an LLM-as-judge approach.
* Computing summary statistics and formatting results for display.
"""
import json
import logging
import random
import re
import time
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from abstracts_explorer.config import get_config
from abstracts_explorer.database import DatabaseManager
from abstracts_explorer.embeddings import EmbeddingsManager
from abstracts_explorer.mcp_tools import execute_mcp_tool, format_tool_result_for_llm
logger = logging.getLogger(__name__)
[docs]
class EvaluationError(Exception):
"""Exception raised for evaluation-related errors."""
pass
# ---------------------------------------------------------------------------
# Prompt templates
# ---------------------------------------------------------------------------
_TOOL_DESCRIPTIONS: Dict[str, str] = {
"search_papers": "Search for papers on a specific topic. Returns relevant papers from the database.",
"get_conference_topics": "Get the main research topics of a conference.",
"analyze_topic_relevance": (
"Analyze the relevance and popularity of a research topic by counting "
"papers within a specified distance in embedding space."
),
"get_topic_evolution": "Analyze how a specific topic has evolved over the years.",
"get_cluster_visualization": "Generate visualization data for clustered paper embeddings.",
}
_TOOL_ARGUMENT_EXAMPLES: Dict[str, str] = {
"search_papers": '{"topic_keywords": "deep learning", "n_results": 5}',
"get_conference_topics": '{"conferences": ["NeurIPS"]}',
"analyze_topic_relevance": '{"topic": "transformers", "distance_threshold": 1.1}',
"get_topic_evolution": '{"topic_keywords": "reinforcement learning"}',
"get_cluster_visualization": '{"n_clusters": 8, "reduction_method": "pca"}',
}
_QA_GENERATION_SYSTEM_PROMPT = """\
You are an expert evaluation dataset creator for a conference paper search and \
analysis system. The system allows users to search papers, analyze research \
topics, track topic evolution, and visualize clusters.
Your task is to generate realistic evaluation queries that test the system's \
capabilities. Each entry consists of a user *query* and the *tool_arguments* \
needed to call the corresponding MCP tool. The actual expected answer will be \
produced by running the tool.
IMPORTANT RULES:
1. Queries must be natural questions a researcher would ask.
2. Each query must clearly map to the specified MCP tool.
3. tool_arguments must be a valid JSON object matching the tool's parameter schema.
4. Base queries on the provided paper information (topics, titles, conferences, years).
"""
_QA_GENERATION_USER_PROMPT = """\
Generate {n_pairs} evaluation query(ies) for the MCP tool "{tool_name}".
Tool description: {tool_description}
Example tool arguments: {tool_arguments_example}
Here are some papers from the database to base your queries on:
{papers_context}
Respond with a JSON array. Each element must have these keys:
- "query": the user's natural-language question
- "tool_arguments": a JSON object of arguments to call the "{tool_name}" tool
Example format:
[
{{
"query": "What are the main topics in NeurIPS 2025?",
"tool_arguments": {{"n_clusters": 8}}
}}
]
Return ONLY the JSON array, no other text.
"""
_FOLLOWUP_GENERATION_PROMPT = """\
Given the following initial query and answer, generate {n_followups} natural \
follow-up question(s). The follow-ups should explore the topic deeper or ask \
for clarification.
Initial query: {initial_query}
Initial answer: {initial_answer}
Tool to use: {tool_name}
Example tool arguments: {tool_arguments_example}
Available papers for context:
{papers_context}
Respond with a JSON array. Each element must have these keys:
- "query": the follow-up question
- "tool_arguments": a JSON object of arguments to call the "{tool_name}" tool
Return ONLY the JSON array, no other text.
"""
_JUDGE_SYSTEM_PROMPT = """\
You are an impartial judge evaluating the quality of an AI assistant's answer \
about conference papers. Compare the actual answer to the expected reference \
answer.
Score the answer on a scale of 1-5:
5 = Excellent: fully correct, comprehensive, well-structured
4 = Good: mostly correct with minor omissions
3 = Adequate: partially correct but missing important details
2 = Poor: significant inaccuracies or missing key information
1 = Bad: incorrect, irrelevant, or empty response
Respond with ONLY a JSON object:
{{"score": <1-5>, "reasoning": "<brief explanation>"}}
"""
_JUDGE_USER_PROMPT = """\
Query: {query}
Expected answer:
{expected_answer}
Actual answer:
{actual_answer}
Score the actual answer (1-5) and explain briefly.
"""
# ---------------------------------------------------------------------------
# Helper utilities (module-level, no Evaluator instance required)
# ---------------------------------------------------------------------------
def _parse_json_array(text: str) -> List[Dict[str, Any]]:
"""
Parse a JSON array from LLM output, tolerating markdown fences.
Parameters
----------
text : str
Raw LLM output that should contain a JSON array.
Returns
-------
list of dict
Parsed array elements.
Raises
------
EvaluationError
If parsing fails.
"""
# Strip <think> blocks
cleaned = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
# Strip markdown code fences
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned)
cleaned = re.sub(r"\s*```$", "", cleaned)
cleaned = cleaned.strip()
try:
parsed = json.loads(cleaned)
except json.JSONDecodeError as exc:
raise EvaluationError(f"Failed to parse LLM JSON output: {exc}\nRaw output:\n{text[:500]}") from exc
if not isinstance(parsed, list):
raise EvaluationError(f"Expected JSON array, got {type(parsed).__name__}")
return parsed
# ---------------------------------------------------------------------------
# Evaluator class
# ---------------------------------------------------------------------------
[docs]
class Evaluator:
"""
Automatic evaluation of the RAG system.
Wraps all evaluation operations — Q/A pair generation, evaluation
execution, and result storage — sharing a single
:class:`~abstracts_explorer.embeddings.EmbeddingsManager` and its
``openai_client`` for LLM calls.
Parameters
----------
embeddings_manager : EmbeddingsManager
A connected embeddings manager. Its ``openai_client`` property is
used for all LLM calls (generation and judging).
db : DatabaseManager
A connected database manager.
model : str, optional
Chat model name. Falls back to config default.
Examples
--------
>>> em = EmbeddingsManager()
>>> em.connect()
>>> with DatabaseManager() as db:
... evaluator = Evaluator(em, db)
... pairs = evaluator.generate_qa_pairs(n_pairs_per_tool=2)
... evaluator.store_qa_pairs(pairs)
... run_id = evaluator.run_evaluation()
... print(evaluator.format_run_summary(run_id))
"""
[docs]
def __init__(
self,
embeddings_manager: EmbeddingsManager,
db: DatabaseManager,
model: Optional[str] = None,
):
if embeddings_manager is None:
raise EvaluationError("embeddings_manager is required.")
if db is None:
raise EvaluationError("db is required.")
config = get_config()
self.embeddings_manager = embeddings_manager
self.db = db
self.model = model or config.chat_model
@property
def openai_client(self):
"""
OpenAI client shared with the embeddings manager.
Returns
-------
OpenAI
The lazily-initialised client from ``EmbeddingsManager``.
"""
return self.embeddings_manager.openai_client
# ------------------------------------------------------------------
# Q/A pair generation
# ------------------------------------------------------------------
def _sample_papers_context(self, n_papers: int = 10) -> str:
"""
Sample random papers from the database and format them as context.
Parameters
----------
n_papers : int
Number of papers to sample.
Returns
-------
str
Formatted paper context string.
"""
total = self.db.get_paper_count()
if total == 0:
return "(no papers in database)"
# Fetch a random page of papers
all_papers = self.db.search_papers(limit=min(total, 200))
sampled = random.sample(all_papers, min(n_papers, len(all_papers)))
lines: List[str] = []
for i, p in enumerate(sampled, 1):
title = p.get("title", "Untitled")
abstract = (p.get("abstract") or "")[:200]
year = p.get("year", "")
conf = p.get("conference", "")
keywords = p.get("keywords", "")
lines.append(f"Paper {i}: {title} ({conf} {year})\n Keywords: {keywords}\n Abstract: {abstract}...")
return "\n\n".join(lines)
[docs]
def generate_qa_pairs(
self,
n_pairs_per_tool: int = 2,
tools: Optional[List[str]] = None,
generate_followups: bool = True,
n_followups: int = 1,
) -> List[Dict[str, Any]]:
"""
Generate evaluation Q/A pairs using the LLM.
For each requested MCP tool a set of query/answer pairs is
generated based on papers sampled from the database. Optionally,
follow-up questions are generated for each initial pair.
Parameters
----------
n_pairs_per_tool : int
Number of initial Q/A pairs to generate per tool.
tools : list of str, optional
MCP tool names to generate pairs for. Defaults to all tools.
generate_followups : bool
Whether to generate follow-up questions.
n_followups : int
Number of follow-up turns per initial pair.
Returns
-------
list of dict
Generated pairs, each with keys: ``conversation_id``,
``turn_number``, ``query``, ``expected_answer``,
``tool_name``, ``source_info``.
Raises
------
EvaluationError
If generation fails.
"""
available_tools = list(_TOOL_DESCRIPTIONS.keys())
target_tools = tools if tools else available_tools
invalid = set(target_tools) - set(available_tools)
if invalid:
raise EvaluationError(f"Unknown tool(s): {', '.join(sorted(invalid))}")
papers_context = self._sample_papers_context()
all_pairs: List[Dict[str, Any]] = []
for tool_name in target_tools:
logger.info(f"Generating {n_pairs_per_tool} Q/A pair(s) for tool: {tool_name}")
tool_desc = _TOOL_DESCRIPTIONS[tool_name]
tool_args_example = _TOOL_ARGUMENT_EXAMPLES[tool_name]
user_prompt = _QA_GENERATION_USER_PROMPT.format(
n_pairs=n_pairs_per_tool,
tool_name=tool_name,
tool_description=tool_desc,
tool_arguments_example=tool_args_example,
papers_context=papers_context,
)
try:
resp = self.openai_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": _QA_GENERATION_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=0.7,
max_tokens=2000,
timeout=120,
)
raw = resp.choices[0].message.content or ""
pairs = _parse_json_array(raw)
except EvaluationError:
raise
except Exception as exc:
raise EvaluationError(f"LLM call failed for tool {tool_name}: {exc}") from exc
for pair in pairs[:n_pairs_per_tool]:
conv_id = uuid.uuid4().hex[:12]
query = pair.get("query", "")
tool_args = pair.get("tool_arguments", {})
if isinstance(tool_args, str):
try:
tool_args = json.loads(tool_args)
except json.JSONDecodeError:
tool_args = {}
# Execute the MCP tool to obtain the real expected answer
expected_answer = self._run_tool_for_answer(tool_name, tool_args, conv_id, turn=0)
if expected_answer is None:
continue # tool returned an error; skip this pair
entry = {
"conversation_id": conv_id,
"turn_number": 0,
"query": query,
"expected_answer": expected_answer,
"tool_name": tool_name,
"source_info": json.dumps({"model": self.model, "tool": tool_name, "tool_args": tool_args}),
}
all_pairs.append(entry)
logger.info(
f"[{tool_name}] turn 0 (conv {conv_id})\n"
f" Query: {entry['query']}\n"
f" Expected: {entry['expected_answer']}"
)
# Generate follow-ups
if generate_followups and n_followups > 0:
followup_prompt = _FOLLOWUP_GENERATION_PROMPT.format(
n_followups=n_followups,
initial_query=entry["query"],
initial_answer=entry["expected_answer"],
tool_name=tool_name,
tool_arguments_example=tool_args_example,
papers_context=papers_context,
)
try:
fu_resp = self.openai_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": _QA_GENERATION_SYSTEM_PROMPT},
{"role": "user", "content": followup_prompt},
],
temperature=0.7,
max_tokens=2000,
timeout=120,
)
fu_raw = fu_resp.choices[0].message.content or ""
followups = _parse_json_array(fu_raw)
except Exception as exc:
logger.warning(f"Failed to generate follow-ups for conv {conv_id}: {exc}")
followups = []
for idx, fu in enumerate(followups[:n_followups], 1):
fu_query = fu.get("query", "")
fu_tool_args = fu.get("tool_arguments", {})
if isinstance(fu_tool_args, str):
try:
fu_tool_args = json.loads(fu_tool_args)
except json.JSONDecodeError:
fu_tool_args = {}
fu_answer = self._run_tool_for_answer(tool_name, fu_tool_args, conv_id, turn=idx)
if fu_answer is None:
continue # tool returned an error; skip this follow-up
fu_entry = {
"conversation_id": conv_id,
"turn_number": idx,
"query": fu_query,
"expected_answer": fu_answer,
"tool_name": tool_name,
"source_info": json.dumps(
{
"model": self.model,
"tool": tool_name,
"followup_of": conv_id,
"tool_args": fu_tool_args,
}
),
}
all_pairs.append(fu_entry)
logger.info(
f"[{tool_name}] turn {idx} (conv {conv_id})\n"
f" Query: {fu_entry['query']}\n"
f" Expected: {fu_entry['expected_answer']}"
)
logger.info(f"Generated {len(all_pairs)} Q/A pairs total")
return all_pairs
def _run_tool_for_answer(
self, tool_name: str, tool_args: Dict[str, Any], conv_id: str, turn: int
) -> Optional[str]:
"""
Execute an MCP tool and return its formatted output as an expected answer.
Parameters
----------
tool_name : str
The MCP tool to execute.
tool_args : Dict[str, Any]
Arguments for the tool call.
conv_id : str
Conversation ID (used only for log messages).
turn : int
Turn number (used only for log messages).
Returns
-------
str or None
Formatted tool output, or ``None`` if the tool returned an error or
the result is not a valid JSON string.
"""
try:
raw_result = execute_mcp_tool(tool_name, tool_args)
if not isinstance(raw_result, str):
logger.warning(
f"MCP tool {tool_name} returned unexpected type {type(raw_result).__name__} "
f"(conv {conv_id}, turn {turn}) — skipping pair"
)
return None
try:
result_data = json.loads(raw_result)
except json.JSONDecodeError as exc:
logger.warning(
f"MCP tool {tool_name} returned invalid JSON (conv {conv_id}, turn {turn}): {exc} — skipping pair"
)
return None
if isinstance(result_data, dict) and "error" in result_data:
logger.warning(
f"MCP tool {tool_name} returned error (conv {conv_id}, turn {turn}): "
f"{result_data['error']} — skipping pair"
)
return None
return format_tool_result_for_llm(tool_name, raw_result)
except Exception as exc:
logger.warning(f"MCP tool {tool_name} failed (conv {conv_id}, turn {turn}): {exc} — skipping pair")
return None
[docs]
def store_qa_pairs(self, pairs: List[Dict[str, Any]]) -> int:
"""
Persist generated Q/A pairs into the database.
Parameters
----------
pairs : list of dict
Pairs as returned by :meth:`generate_qa_pairs`.
Returns
-------
int
Number of pairs stored.
"""
count = 0
for p in pairs:
self.db.add_eval_qa_pair(
conversation_id=p["conversation_id"],
turn_number=p["turn_number"],
query=p["query"],
expected_answer=p["expected_answer"],
tool_name=p.get("tool_name"),
source_info=p.get("source_info"),
)
count += 1
return count
# ------------------------------------------------------------------
# Evaluation runner
# ------------------------------------------------------------------
def _judge_answer(
self,
query: str,
expected_answer: str,
actual_answer: str,
) -> Dict[str, Any]:
"""
Use LLM-as-judge to score an answer.
Parameters
----------
query : str
Original query.
expected_answer : str
Reference answer.
actual_answer : str
RAG system output.
Returns
-------
dict
``{"score": int, "reasoning": str}``
"""
user_prompt = _JUDGE_USER_PROMPT.format(
query=query,
expected_answer=expected_answer,
actual_answer=actual_answer,
)
try:
resp = self.openai_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": _JUDGE_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=0.1,
max_tokens=300,
timeout=60,
)
raw = resp.choices[0].message.content or ""
logger.debug(f"Judge raw response: {raw[:200]}")
# Strip <think> blocks
cleaned = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
# Strip markdown fences
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.MULTILINE)
cleaned = re.sub(r"\s*```$", "", cleaned, flags=re.MULTILINE)
cleaned = cleaned.strip()
# Use raw_decode to extract the first JSON object even if the model
# adds preamble text (e.g. "Here is my evaluation: {…}")
decoder = json.JSONDecoder()
start = cleaned.find("{")
if start == -1:
raise ValueError(f"No JSON object found in judge response: {cleaned[:200]!r}")
parsed, _ = decoder.raw_decode(cleaned, start)
score_raw = parsed.get("score")
if score_raw is None:
logger.warning("Judge returned null score, defaulting to 3")
score = 3
else:
score = max(1, min(5, int(score_raw)))
return {"score": score, "reasoning": parsed.get("reasoning", "")}
except Exception as exc:
logger.warning(f"Judge scoring failed: {exc}")
return {"score": None, "reasoning": f"Judge error: {exc}"}
[docs]
def run_evaluation(
self,
verified_only: bool = True,
limit: Optional[int] = None,
) -> str:
"""
Run evaluation on stored Q/A pairs and record results.
Executes each stored query through the RAG system, scores the
output with an LLM judge, and stores the results in the database.
Parameters
----------
verified_only : bool
If ``True``, only evaluate verified pairs (default).
limit : int, optional
Maximum number of pairs to evaluate.
Returns
-------
str
The ``run_id`` for the evaluation run.
Raises
------
EvaluationError
If evaluation fails.
"""
from abstracts_explorer.rag import RAGChat
pairs = self.db.get_eval_qa_pairs(verified_only=verified_only, limit=limit)
if not pairs:
raise EvaluationError("No Q/A pairs found for evaluation. Generate and verify pairs first.")
run_id = f"eval-{uuid.uuid4().hex[:8]}"
logger.info(f"Starting evaluation run {run_id} with {len(pairs)} pair(s)")
# Group pairs by conversation for multi-turn handling
conversations: Dict[str, List[Dict[str, Any]]] = {}
for p in pairs:
conv_id = p["conversation_id"]
conversations.setdefault(conv_id, []).append(p)
for turns in conversations.values():
turns.sort(key=lambda x: x["turn_number"])
# Initialise a RAGChat instance for each conversation
for conv_id, turns in conversations.items():
rag = RAGChat(
embeddings_manager=self.embeddings_manager,
database=self.db,
model=self.model,
)
for pair in turns:
qa_pair_id = pair["id"]
query_text = pair["query"]
expected = pair["expected_answer"]
expected_tool = pair.get("tool_name")
start = time.time()
actual_answer = None
actual_tool = None
error_msg = None
try:
result = rag.query(query_text)
actual_answer = (result.get("response") or "").strip()
tools_used = result.get("metadata", {}).get("tools_executed", [])
actual_tool = tools_used[0] if tools_used else None
except Exception as exc:
error_msg = str(exc)
logger.warning(f"Query failed for pair {qa_pair_id}: {exc}")
elapsed_ms = int((time.time() - start) * 1000)
# Tool correctness
tool_correct = None
if expected_tool and actual_tool is not None:
tool_correct = 1 if actual_tool == expected_tool else 0
# LLM judge scoring
judge_result: Dict[str, Any] = {"score": None, "reasoning": ""}
if actual_answer and not error_msg:
judge_result = self._judge_answer(query_text, expected, actual_answer)
elif not error_msg:
logger.warning(f"Skipping judge for pair {qa_pair_id}: empty response from RAG")
self.db.add_eval_result(
run_id=run_id,
qa_pair_id=qa_pair_id,
actual_answer=actual_answer,
actual_tool_name=actual_tool,
answer_score=judge_result.get("score"),
tool_correct=tool_correct,
latency_ms=elapsed_ms,
error=error_msg,
judge_reasoning=judge_result.get("reasoning", ""),
)
logger.info(f"Evaluation run {run_id} complete")
return run_id
# ------------------------------------------------------------------
# Convenience formatting delegators
# ------------------------------------------------------------------