Database Module

The database module provides an interface for storing and retrieving conference papers using SQLAlchemy, supporting both SQLite and PostgreSQL backends.

Overview

The DatabaseManager class handles all database operations including:

  • Creating and managing database schema

  • Adding papers and authors

  • Searching and filtering papers

  • Managing paper-author relationships

Class Reference

This module provides functionality to load JSON data into a SQL database. Supports both SQLite and PostgreSQL backends via SQLAlchemy.

exception abstracts_explorer.database.DatabaseError[source]

Bases: Exception

Exception raised for database operations.

exception abstracts_explorer.database.EmbeddingModelConflictError(local_model, remote_model)[source]

Bases: DatabaseError

Raised when the embedding model in imported data differs from the local database.

Variables:
  • local_model (str) – Embedding model currently in the local database.

  • remote_model (str) – Embedding model in the data being imported.

__init__(local_model, remote_model)[source]
abstracts_explorer.database.normalize_model_name(name)[source]

Normalize an embedding model name for comparison.

Strips a leading alias- prefix (case-insensitive) so that, e.g., alias-qwen3-embeddings-8b is considered identical to qwen3-embeddings-8b.

Parameters:

name (str) – Embedding model name.

Returns:

Normalized model name.

Return type:

str

class abstracts_explorer.database.DatabaseManager[source]

Bases: object

Manager for SQL database operations using SQLAlchemy.

Supports SQLite and PostgreSQL backends through SQLAlchemy connection URLs. Database configuration is read from the config file (PAPER_DB variable).

Variables:
  • database_url (str) – SQLAlchemy database URL from configuration.

  • engine (Engine or None) – SQLAlchemy engine instance.

  • SessionLocal (sessionmaker or None) – SQLAlchemy session factory.

  • _session (Session or None) – Active database session if connected.

Examples

>>> # Database configuration comes from config file
>>> db = DatabaseManager()
>>> db.connect()
>>> db.create_tables()
>>> db.close()
__init__()[source]

Initialize the DatabaseManager.

Reads database configuration from the config file.

connect()[source]

Connect to the database.

Creates the database file if it doesn’t exist (SQLite only).

Raises:

DatabaseError – If connection fails.

Return type:

None

close()[source]

Close the database connection.

Does nothing if not connected.

Return type:

None

__enter__()[source]

Context manager entry.

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit.

static compute_uid(title, original_id, conference, year)[source]

Compute a deterministic paper UID from its identifying fields.

The UID is a 16-character hex string derived from a SHA-256 hash of the concatenated title, original_id, conference and year.

Parameters:
  • title (str) – Paper title.

  • original_id (str, int, or None) – Original paper ID from the source (e.g., OpenReview ID).

  • conference (str) – Conference name (e.g., “NeurIPS”).

  • year (int) – Conference year.

Returns:

16-character hex UID.

Return type:

str

Examples

>>> DatabaseManager.compute_uid("My Paper", "abc123", "NeurIPS", 2025)
'a1b2c3d4e5f67890'
create_tables()[source]

Create database tables for papers and embeddings metadata.

Creates the following tables: - papers: Main table for paper information with lightweight ML4PS schema - embeddings_metadata: Metadata about embeddings (model used, creation date) - clustering_cache: Cache for clustering results

This method is idempotent - it can be called multiple times without error. Tables are only created if they don’t already exist.

Raises:

DatabaseError – If table creation fails.

Return type:

None

add_paper(paper)[source]

Add a single paper to the database.

Parameters:

paper (LightweightPaper) – Validated paper object to insert.

Returns:

The UID of the inserted paper, or None if paper was skipped (duplicate).

Return type:

str or None

Raises:

DatabaseError – If insertion fails.

Examples

>>> from abstracts_explorer.plugin import LightweightPaper
>>> db = DatabaseManager()
>>> with db:
...     db.create_tables()
...     paper = LightweightPaper(
...         title="Test Paper",
...         authors=["John Doe"],
...         abstract="Test abstract",
...         session="Session 1",
...         poster_position="P1",
...         year=2025,
...         conference="NeurIPS"
...     )
...     paper_uid = db.add_paper(paper)
>>> print(f"Inserted paper with UID: {paper_uid}")
add_papers(papers)[source]

Add multiple papers to the database in a batch.

Parameters:

papers (list of LightweightPaper) – List of validated paper objects to insert.

Returns:

Number of papers successfully inserted (excludes duplicates).

Return type:

int

Raises:

DatabaseError – If batch insertion fails.

Examples

>>> from abstracts_explorer.plugin import LightweightPaper
>>> db = DatabaseManager()
>>> with db:
...     db.create_tables()
...     papers = [
...         LightweightPaper(
...             title="Paper 1",
...             authors=["Author 1"],
...             abstract="Abstract 1",
...             session="Session 1",
...             poster_position="P1",
...             year=2025,
...             conference="NeurIPS"
...         ),
...         LightweightPaper(
...             title="Paper 2",
...             authors=["Author 2"],
...             abstract="Abstract 2",
...             session="Session 2",
...             poster_position="P2",
...             year=2025,
...             conference="NeurIPS"
...         )
...     ]
...     count = db.add_papers(papers)
>>> print(f"Inserted {count} papers")
donate_validation_data(paper_priorities)[source]

Store donated paper rating data for validation purposes.

This method accepts anonymized paper ratings from users and stores them in the validation_data table for improving the service.

Parameters:

paper_priorities (Dict[str, Dict[str, Any]]) – Dictionary mapping paper UIDs to priority data. Each priority data dict must contain: - priority (int): Rating value - searchTerm (str, optional): Search term associated with the rating

Returns:

Number of papers successfully donated

Return type:

int

Raises:
  • ValueError – If paper_priorities is empty or contains invalid data format

  • DatabaseError – If database operation fails

Examples

>>> db = DatabaseManager()
>>> with db:
...     priorities = {
...         "abc123": {"priority": 5, "searchTerm": "machine learning"},
...         "def456": {"priority": 4, "searchTerm": "deep learning"}
...     }
...     count = db.donate_validation_data(priorities)
...     print(f"Donated {count} papers")
donate_chat_transcript(rating, transcript)[source]

Store a donated chat transcript with thumbs up/down feedback.

This method accepts an anonymized chat transcript and a rating from the user and stores it for improving the chat system.

Parameters:
  • rating (str) – User feedback rating, must be ‘up’ or ‘down’.

  • transcript (List[Dict[str, str]]) – List of message dicts, each with ‘role’ and ‘text’ keys.

Returns:

ID of the stored donation entry.

Return type:

int

Raises:
  • ValueError – If rating is invalid or transcript is empty/malformed.

  • DatabaseError – If database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     transcript = [
...         {"role": "user", "text": "What papers discuss transformers?"},
...         {"role": "assistant", "text": "Here are some relevant papers..."}
...     ]
...     donation_id = db.donate_chat_transcript("up", transcript)
get_chat_donations(limit=None, rating=None, offset=0)[source]

Retrieve donated chat transcripts from the database.

Parameters:
  • limit (int, optional) – Maximum number of entries to return. Returns all if None.

  • rating (str, optional) – Filter by rating (‘up’ or ‘down’). Returns all ratings if None.

  • offset (int, optional) – Number of entries to skip for pagination (default: 0).

Returns:

List of donation dicts, each containing: - id (int): Entry ID. - rating (str): ‘up’ or ‘down’. - transcript (list): Parsed list of message dicts. - donated_at (datetime): Donation timestamp.

Return type:

list of dict

Raises:

DatabaseError – If the database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     donations = db.get_chat_donations(limit=10, rating='up')
get_chat_donation_stats()[source]

Get summary statistics for donated chat transcripts.

Returns:

Statistics dict containing: - total (int): Total number of donations. - up (int): Number of thumbs-up donations. - down (int): Number of thumbs-down donations. - avg_turns (float): Average number of turns per transcript.

Return type:

dict

Raises:

DatabaseError – If the database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     stats = db.get_chat_donation_stats()
...     print(f"Total donations: {stats['total']}")
get_validation_data(limit=None, offset=0)[source]

Retrieve donated validation (interesting-paper) data from the database.

Parameters:
  • limit (int, optional) – Maximum number of entries to return. Returns all if None.

  • offset (int, optional) – Number of entries to skip for pagination (default: 0).

Returns:

List of validation data dicts, each containing: - id (int): Entry ID. - paper_uid (str): Paper UID. - priority (int): Priority rating. - search_term (str or None): Associated search term. - donated_at (datetime): Donation timestamp.

Return type:

list of dict

Raises:

DatabaseError – If the database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     data = db.get_validation_data(limit=20)
get_validation_data_stats()[source]

Get summary statistics for donated validation (interesting-paper) data.

Returns:

Statistics dict containing: - total (int): Total number of donated paper ratings. - unique_papers (int): Number of distinct paper UIDs. - avg_priority (float): Average priority rating. - priority_distribution (dict): Count per priority value.

Return type:

dict

Raises:

DatabaseError – If the database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     stats = db.get_validation_data_stats()
...     print(f"Total data donations: {stats['total']}")
delete_chat_donations(ids=None)[source]

Delete donated chat transcripts.

Parameters:

ids (list of int, optional) – List of donation IDs to delete. If None, deletes all donations.

Returns:

Number of donations deleted.

Return type:

int

Raises:

DatabaseError – If the database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     deleted = db.delete_chat_donations()
...     print(f"Deleted {deleted} donations")
delete_validation_data(ids=None)[source]

Delete donated validation (interesting-paper) data.

Parameters:

ids (list of int, optional) – List of entry IDs to delete. If None, deletes all validation data.

Returns:

Number of entries deleted.

Return type:

int

Raises:

DatabaseError – If the database operation fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     deleted = db.delete_validation_data()
...     print(f"Deleted {deleted} entries")
query(sql, parameters=())[source]

Execute a SQL query and return results.

Note: This method provides backward compatibility with raw SQL queries. For new code, prefer using SQLAlchemy ORM methods.

Parameters:
  • sql (str) – SQL query to execute (use named parameters like :param1, :param2).

  • parameters (tuple, optional) – Query parameters for parameterized queries.

Returns:

Query results as list of dictionaries.

Return type:

list of dict

Raises:

DatabaseError – If query execution fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     results = db.query("SELECT * FROM papers WHERE session = ?", ("Poster",))
>>> for row in results:
...     print(row['title'])
get_paper_count()[source]

Get the total number of papers in the database.

Returns:

Number of papers.

Return type:

int

Raises:

DatabaseError – If query fails.

get_paper_by_uid(uid)[source]

Retrieve a paper by its UID.

Parameters:

uid (str) – UID of the paper to retrieve.

Returns:

Paper data as a dictionary, or None if not found.

Return type:

dict or None

Raises:

DatabaseError – If query fails.

get_paper_by_original_id_or_uid(paper_id)[source]

Retrieve a paper by its UID or original_id (whichever matches first).

Tries uid first, then falls back to original_id. All formatting (e.g. author deserialization) is performed inside this method so that callers always receive a fully formatted paper dictionary.

Parameters:

paper_id (str) – Value to match against the uid or original_id column.

Returns:

Fully formatted paper data dictionary, or None if not found.

Return type:

dict or None

Raises:

DatabaseError – If the database query fails.

SEARCHABLE_FIELDS: set = {'abstract', 'authors', 'award', 'conference', 'endtime', 'keywords', 'original_id', 'paper_pdf_url', 'poster_image_url', 'poster_position', 'room_name', 'session', 'starttime', 'title', 'url', 'year'}

Paper model column names that can be used as field:"value" filters in search queries. Internal columns (uid, created_at) are excluded because they are not meaningful search targets for users.

FIELD_ALIASES: Dict[str, str] = {'author': 'authors'}

Aliases for field names in search queries. An alias is transparently resolved to the canonical column name before applying the filter.

search_papers(keyword=None, field_filters=None, session=None, sessions=None, year=None, years=None, conference=None, conferences=None, limit=100)[source]

Search for papers by various criteria (lightweight schema).

Parameters:
  • keyword (str, optional) – Keyword to search in title, abstract, or keywords fields.

  • field_filters (dict of str to str, optional) – Mapping of Paper column names to search values. Each entry adds a case-insensitive ILIKE %value% condition on the corresponding column (e.g. {"authors": "Smith", "award": "Best Paper"}).

  • session (str, optional) – Single session to filter by (deprecated, use sessions instead).

  • sessions (list[str], optional) – List of sessions to filter by (matches ANY).

  • year (int, optional) – Single year to filter by (deprecated, use years instead).

  • years (list[int], optional) – List of years to filter by (matches ANY).

  • conference (str, optional) – Single conference to filter by (deprecated, use conferences instead).

  • conferences (list[str], optional) – List of conferences to filter by (matches ANY).

  • limit (int, default=100) – Maximum number of results to return.

Returns:

Matching papers as dictionaries.

Return type:

list of dict

Raises:

DatabaseError – If search fails.

Examples

>>> db = DatabaseManager("neurips.db")
>>> with db:
...     papers = db.search_papers(keyword="neural network", limit=10)
>>> for paper in papers:
...     print(paper['title'])
>>> # Search with multiple sessions
>>> papers = db.search_papers(sessions=["Session 1", "Session 2"])
>>> # Search with years
>>> papers = db.search_papers(years=[2024, 2025])
>>> # Search by field filter
>>> papers = db.search_papers(field_filters={"authors": "John Smith"})
static parse_field_filters(query)[source]

Parse field-specific filters from a search query string.

Extracts all field:"value" patterns from the query where field is a column name of the Paper model (or a recognised alias; see FIELD_ALIASES). Unrecognised field names are left in the query text as-is.

Parameters:

query (str) – The raw search query, e.g. 'authors:"John Smith" award:"Best Paper" transformers'.

Returns:

A tuple (field_filters, remaining_query) where field_filters maps canonical column names to their search values and remaining_query is the query with recognised filters removed.

Return type:

tuple of (dict, str)

Examples

>>> DatabaseManager.parse_field_filters('authors:"John Smith" transformers')
({'authors': 'John Smith'}, 'transformers')
>>> DatabaseManager.parse_field_filters('author:"John Smith" transformers')
({'authors': 'John Smith'}, 'transformers')
>>> DatabaseManager.parse_field_filters('Author:"John Smith" transformers')
({'authors': 'John Smith'}, 'transformers')
>>> DatabaseManager.parse_field_filters('transformers')
({}, 'transformers')
>>> DatabaseManager.parse_field_filters('award:"Best Paper" authors:"Doe"')
({'award': 'Best Paper', 'authors': 'Doe'}, '')
search_papers_keyword(query, limit=10, sessions=None, years=None, conferences=None)[source]

Perform keyword-based search with filtering and field filter parsing.

This is a convenience method that wraps search_papers and formats the results for web API consumption, including author parsing.

Supports field:"value" syntax for any Paper model column, e.g. 'authors:"John Smith" transformers' or 'award:"Best Paper"'.

Parameters:
  • query (str) – Keyword to search in title, abstract, or keywords fields. May include field:"value" filters for any Paper column.

  • limit (int, optional) – Maximum number of results, by default 10

  • sessions (list of str, optional) – Filter by paper sessions

  • years (list of int, optional) – Filter by publication years

  • conferences (list of str, optional) – Filter by conference names

Returns:

List of paper dictionaries with parsed authors

Return type:

list of dict

Examples

>>> papers = db.search_papers_keyword(
...     "neural networks",
...     limit=5,
...     years=[2024, 2025]
... )
>>> papers = db.search_papers_keyword('authors:"John Smith"')
>>> papers = db.search_papers_keyword('award:"Best Paper" transformers')
get_stats(year=None, conference=None)[source]

Get database statistics, optionally filtered by year and conference.

Parameters:
  • year (int, optional) – Filter by specific year

  • conference (str, optional) – Filter by specific conference

Returns:

Statistics dictionary with: - total_papers: int - Number of papers matching filters - year: int or None - Filter year if provided - conference: str or None - Filter conference if provided

Return type:

dict

Examples

>>> stats = db.get_stats()
>>> print(f"Total papers: {stats['total_papers']}")
>>> stats_2024 = db.get_stats(year=2024)
>>> print(f"Papers in 2024: {stats_2024['total_papers']}")
get_author_count()[source]

Get the approximate number of unique authors in the database.

Note: This provides an estimate by counting unique author names across all papers. The actual count may vary.

Returns:

Approximate number of unique authors.

Return type:

int

Raises:

DatabaseError – If query fails.

get_years_for_conference(conference)[source]

Get distinct years available for a specific conference.

Parameters:

conference (str) – Conference name to query.

Returns:

Sorted list of distinct years for the conference.

Return type:

list of int

Raises:

DatabaseError – If query fails.

get_conference_years_from_db()[source]

Return a mapping of each conference to the years that have papers in the database.

Years are sorted in descending order (most recent first).

Returns:

Mapping of conference name to list of years (descending) that have at least one paper in the database. Returns an empty dict when the database is empty or not connected.

Return type:

dict[str, list[int]]

Raises:

DatabaseError – If the query fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     mapping = db.get_conference_years_from_db()
>>> print(mapping)
{'NeurIPS': [2025, 2024], 'ICLR': [2024]}
resolve_default_conference_year(configured_conference, configured_year)[source]

Resolve the effective default conference and year, guaranteeing they have data.

The configured values are used when they point at a conference/year that actually has papers in the database. When they do not, the method falls back to the most recent conference/year combination present in the database.

Parameters:
  • configured_conference (str) – Conference name from the application configuration. May be empty or may not match any downloaded conference.

  • configured_year (int or None) – Year from the application configuration. May be None or may not have data for the matched conference.

Returns:

A (conference, year) pair that is guaranteed to have data in the database, or the original configured values if the database is empty.

Return type:

tuple[str, int | None]

Examples

>>> db = DatabaseManager()
>>> with db:
...     conf, year = db.resolve_default_conference_year("NeurIPS", 2024)
>>> print(conf, year)
NeurIPS 2024
resolve_conference_name(conference)[source]

Resolve a conference name to the canonical form stored in the database.

Performs a case-insensitive match against conference names already present in the database. If no database match is found, falls back to a case-insensitive match against the conference_name attribute of every registered downloader plugin. If neither lookup succeeds the original conference string is returned unchanged.

This is the single authoritative place where conference-name normalization must happen. CLI commands should call this method once at the entry point of each command and then work with the returned canonical name for all subsequent operations.

Parameters:

conference (str) – Conference name as supplied by the caller. May differ in case or spelling from the form stored in the database (e.g. ml4ps@neurips vs. ML4PS@Neurips).

Returns:

The conference name exactly as it appears in the database (first match), or exactly as defined by the first matching plugin, or the input string if no match is found.

Return type:

str

Examples

>>> with DatabaseManager() as db:
...     db.create_tables()
...     canonical = db.resolve_conference_name("ml4ps@neurips")
...     # Returns "ML4PS@Neurips" if that form is stored in the DB
resolve_conference_for_url(url_path)[source]

Resolve a URL path segment to a conference, checking data availability.

Combines plugin-based name resolution with a database data check. Returns a result dict describing the outcome:

  • found with data: {"conference": "<name>", "error": None}

  • found without data: {"conference": None, "error": {"message": "...", "available_conferences": [...]}}

  • not found: {"conference": None, "error": {"message": "...", "available_conferences": [...]}}

Parameters:

url_path (str) – URL path segment (e.g. "neurips", "ICLR").

Returns:

Dictionary with keys "conference" (str or None) and "error" (dict or None).

Return type:

dict

Examples

>>> with DatabaseManager() as db:
...     result = db.resolve_conference_for_url("neurips")
...     if result["conference"]:
...         print(f"Found: {result['conference']}")
get_sessions(conference=None, year=None)[source]

Get distinct session names from the database.

Parameters:
  • conference (str, optional) – Filter sessions to only those belonging to this conference.

  • year (int, optional) – Filter sessions to only those belonging to this year.

Returns:

Sorted list of distinct non-empty session names.

Return type:

list of str

Raises:

DatabaseError – If query fails or not connected.

Examples

>>> db = DatabaseManager()
>>> with db:
...     sessions = db.get_sessions()
>>> print(sessions)
['Session 1', 'Session 2', ...]
>>> sessions = db.get_sessions(conference="NeurIPS", year=2025)
get_conferences(year=None)[source]

Get distinct conference names from the database.

Parameters:

year (int, optional) – Filter conferences to only those that have papers for this year.

Returns:

Sorted list of distinct non-empty conference names.

Return type:

list of str

Raises:

DatabaseError – If query fails or not connected.

Examples

>>> db = DatabaseManager()
>>> with db:
...     conferences = db.get_conferences()
>>> print(conferences)
['ICLR', 'NeurIPS']
>>> conferences = db.get_conferences(year=2025)
get_years(conference=None)[source]

Get distinct years from the database, sorted descending (most recent first).

Parameters:

conference (str, optional) – Filter years to only those that have papers for this conference.

Returns:

Sorted list (descending) of distinct years.

Return type:

list of int

Raises:

DatabaseError – If query fails or not connected.

Examples

>>> db = DatabaseManager()
>>> with db:
...     years = db.get_years()
>>> print(years)
[2025, 2024, 2023]
>>> years = db.get_years(conference="NeurIPS")
get_embedding_model()[source]

Get the embedding model used for the current embeddings.

Returns:

Name of the embedding model, or None if not set.

Return type:

str or None

Raises:

DatabaseError – If query fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     model = db.get_embedding_model()
>>> print(model)
'text-embedding-qwen3-embedding-4b'
set_embedding_model(model_name)[source]

Set the embedding model used for embeddings.

This stores or updates the embedding model metadata. If a record exists, it updates the model and timestamp. Otherwise, it creates a new record.

Parameters:

model_name (str) – Name of the embedding model.

Raises:

DatabaseError – If update fails.

Return type:

None

Examples

>>> db = DatabaseManager()
>>> with db:
...     db.set_embedding_model("text-embedding-qwen3-embedding-4b")
get_clustering_cache(embedding_model, clustering_method, n_clusters=None, clustering_params=None, reduction_method=None, n_components=None, conference=None, year=None)[source]

Get cached clustering results matching the parameters.

When reduction_method and n_components are provided, only entries that match exactly (including the reduction method) are returned. When they are omitted (None), the reduction method is ignored and the most recent entry matching the clustering parameters is returned.

Parameters:
  • embedding_model (str) – Name of the embedding model.

  • clustering_method (str) – Clustering algorithm used.

  • n_clusters (int, optional) – Number of clusters to match. When None the query does not filter by this column.

  • clustering_params (dict, optional) – Additional clustering parameters (e.g., distance_threshold, eps).

  • reduction_method (str, optional) – Dimensionality reduction method. When provided, the query requires an exact match on this column.

  • n_components (int, optional) – Number of components after reduction. When provided, the query requires an exact match on this column.

  • conference (str, optional) – Conference name to filter by. None matches entries that have no conference set.

  • year (int, optional) – Conference year to filter by. None matches entries that have no year set.

Returns:

Cached clustering results as dictionary, or None if not found.

Return type:

dict or None

Raises:

DatabaseError – If query fails.

save_clustering_cache(embedding_model, reduction_method, n_components, clustering_method, results, n_clusters=None, clustering_params=None, conference=None, year=None)[source]

Save clustering results to cache.

The full results including visualization coordinates are stored. The reduction_method and n_components are stored so that an exact-match lookup can return cached points directly. When only the reduction method changes, the clustering results are reused and only the reduction is re-applied.

Parameters:
  • embedding_model (str) – Name of the embedding model.

  • reduction_method (str) – Dimensionality reduction method.

  • n_components (int) – Number of components after reduction.

  • clustering_method (str) – Clustering algorithm used.

  • results (dict) – Clustering results to cache (full results including points).

  • n_clusters (int, optional) – Number of clusters. When None, the actual count is extracted from results["statistics"]["n_clusters"].

  • clustering_params (dict, optional) – Additional clustering parameters.

  • conference (str, optional) – Conference name this entry is scoped to.

  • year (int, optional) – Conference year this entry is scoped to.

Raises:

DatabaseError – If save fails.

Return type:

None

delete_papers_by_conference_year(conference, year)[source]

Delete all papers for a specific conference and year combination.

Parameters:
  • conference (str) – Conference name (exact, as stored in the database).

  • year (int) – Conference year.

Returns:

Number of papers deleted.

Return type:

int

Raises:

DatabaseError – If deletion fails.

delete_clustering_cache_by_conference_year(conference, year)[source]

Delete all clustering cache entries for a specific conference and year.

Parameters:
  • conference (str) – Conference name (exact, as stored in the database).

  • year (int) – Conference year.

Returns:

Number of cache entries deleted.

Return type:

int

Raises:

DatabaseError – If deletion fails.

count_clustering_cache_by_conference_year(conference, year)[source]

Count clustering cache entries for a specific conference and year.

Parameters:
  • conference (str) – Conference name (exact, as stored in the database).

  • year (int) – Conference year.

Returns:

Number of cache entries found.

Return type:

int

Raises:

DatabaseError – If the query fails.

clear_clustering_cache(embedding_model=None)[source]

Clear clustering cache, optionally filtered by embedding model.

This is useful when embeddings change or cache becomes stale.

Parameters:

embedding_model (str, optional) – If provided, only clear cache for this embedding model. If None, clear all cache entries.

Returns:

Number of cache entries deleted.

Return type:

int

Raises:

DatabaseError – If deletion fails.

update_clustering_cache_embedding_model(old_model, new_model)[source]

Update the embedding model name in all clustering cache entries.

Renames every ClusteringCache row whose embedding_model matches old_model so that it refers to new_model instead. This keeps cached clustering results usable after the embedding model metadata is renamed (e.g. via update_embedding_model_metadata.py).

Parameters:
  • old_model (str) – Current embedding model name stored in the cache.

  • new_model (str) – New embedding model name to write.

Returns:

Number of cache entries updated.

Return type:

int

Raises:

DatabaseError – If the update fails.

Examples

>>> db = DatabaseManager()
>>> with db:
...     count = db.update_clustering_cache_embedding_model(
...         "old-model", "new-model"
...     )
get_hierarchical_label_cache(embedding_model, linkage='ward')[source]

Get cached hierarchical labels for agglomerative clustering.

Hierarchical labels are independent of the number of clusters and the distance threshold, so they are reused for all agglomerative clustering settings that share the same embedding model and linkage.

Parameters:
  • embedding_model (str) – Name of the embedding model.

  • linkage (str, optional) – Agglomerative linkage method (default: "ward").

Returns:

Mapping of {node_id: label} (integer keys), or None if no entry is found.

Return type:

dict or None

Raises:

DatabaseError – If query fails.

save_hierarchical_label_cache(embedding_model, labels, linkage='ward')[source]

Save hierarchical cluster labels to cache.

Parameters:
  • embedding_model (str) – Name of the embedding model.

  • labels (dict) – Mapping of {node_id: label} to store.

  • linkage (str, optional) – Agglomerative linkage method (default: "ward").

Raises:

DatabaseError – If save fails.

Return type:

None

add_eval_qa_pair(conversation_id, turn_number, query, expected_answer, tool_name=None, source_info=None)[source]

Insert a single evaluation Q/A pair.

Parameters:
  • conversation_id (str) – Identifier grouping turns in a conversation.

  • turn_number (int) – Position within the conversation (0 = first).

  • query (str) – The user query text.

  • expected_answer (str) – The expected/reference answer.

  • tool_name (str, optional) – MCP tool expected to be invoked.

  • source_info (str, optional) – JSON metadata about how the pair was generated.

Returns:

Primary key of the inserted row.

Return type:

int

Raises:

DatabaseError – If insertion fails.

get_eval_qa_pairs(verified_only=False, tool_name=None, conversation_id=None, limit=None, offset=0)[source]

Retrieve evaluation Q/A pairs with optional filters.

Parameters:
  • verified_only (bool) – If True, return only pairs with verified == 1.

  • tool_name (str, optional) – Filter by expected MCP tool name.

  • conversation_id (str, optional) – Filter by conversation.

  • limit (int, optional) – Maximum number of pairs to return.

  • offset (int) – Number of rows to skip (for pagination).

Returns:

Matching Q/A pairs as dictionaries.

Return type:

list of dict

Raises:

DatabaseError – If query fails.

get_eval_qa_pair_count(verified_only=False)[source]

Count evaluation Q/A pairs.

Parameters:

verified_only (bool) – If True, count only verified pairs.

Returns:

Number of matching pairs.

Return type:

int

Raises:

DatabaseError – If query fails.

update_eval_qa_pair(pair_id, **fields)[source]

Update fields on an existing Q/A pair.

Parameters:
  • pair_id (int) – Primary key of the pair to update.

  • **fields – Keyword arguments mapping column names to new values. Supported keys: query, expected_answer, tool_name, verified, source_info.

Returns:

True if a row was updated, False if the pair was not found.

Return type:

bool

Raises:

DatabaseError – If update fails.

delete_eval_qa_pair(pair_id)[source]

Delete an evaluation Q/A pair by ID.

Parameters:

pair_id (int) – Primary key of the pair to delete.

Returns:

True if a row was deleted, False if pair was not found.

Return type:

bool

Raises:

DatabaseError – If deletion fails.

delete_verified_eval_qa_pairs()[source]

Delete all verified (accepted) evaluation Q/A pairs.

Returns:

Number of pairs deleted.

Return type:

int

Raises:

DatabaseError – If deletion fails.

delete_eval_results(run_id=None)[source]

Delete stored evaluation results, optionally filtered to a single run.

Parameters:

run_id (str, optional) – If supplied, only results for this run are deleted. If None, all stored results are deleted.

Returns:

Number of rows deleted.

Return type:

int

Raises:

DatabaseError – If deletion fails.

add_eval_result(run_id, qa_pair_id, actual_answer=None, actual_tool_name=None, answer_score=None, tool_correct=None, latency_ms=None, error=None, judge_reasoning=None)[source]

Insert a single evaluation result.

Parameters:
  • run_id (str) – Identifier for the evaluation run.

  • qa_pair_id (int) – ID of the evaluated Q/A pair.

  • actual_answer (str, optional) – Answer produced by the RAG system.

  • actual_tool_name (str, optional) – MCP tool actually invoked.

  • answer_score (float, optional) – LLM-judged quality score (1–5).

  • tool_correct (int, optional) – 1 if the correct tool was used, 0 otherwise.

  • latency_ms (int, optional) – Query latency in milliseconds.

  • error (str, optional) – Error message if the query failed.

  • judge_reasoning (str, optional) – LLM judge’s reasoning for the score.

Returns:

Primary key of the inserted row.

Return type:

int

Raises:

DatabaseError – If insertion fails.

get_eval_results(run_id=None, limit=None, offset=0)[source]

Retrieve evaluation results with optional run filter.

Parameters:
  • run_id (str, optional) – Filter by evaluation run. If None, return results from all runs.

  • limit (int, optional) – Maximum number of results to return.

  • offset (int) – Number of rows to skip.

Returns:

Evaluation results as dictionaries.

Return type:

list of dict

Raises:

DatabaseError – If query fails.

get_eval_run_ids()[source]

Return distinct evaluation run IDs ordered by run time, oldest first.

The ordering is determined by the minimum created_at timestamp of all results in each run, so the most recent run appears last.

Returns:

Distinct run IDs ordered chronologically (oldest to newest).

Return type:

list of str

Raises:

DatabaseError – If query fails.

get_eval_run_summary(run_id)[source]

Compute summary statistics for an evaluation run.

Parameters:

run_id (str) – The evaluation run identifier.

Returns:

Dictionary with keys:

  • total : int – number of evaluated pairs

  • avg_score : float or None – mean answer quality score

  • tool_accuracy : float or None – fraction of correct tool selections

  • avg_latency_ms : float or None – mean latency

  • error_count : int – number of queries that produced errors

  • run_date : datetime or None – timestamp of the first result in the run

Return type:

dict

Raises:

DatabaseError – If query fails.

export_papers_to_sqlite(output_path, conference, year)[source]

Export papers for a given conference and year to a standalone SQLite file.

The exported file includes hierarchical label cache and embeddings metadata rows. Clustering cache is not included — it is exported separately via export_clustering_cache_to_json().

Parameters:
  • output_path (Path) – Destination path for the SQLite file.

  • conference (str) – Conference name to export.

  • year (int) – Year to export.

Returns:

Number of papers exported.

Return type:

int

Raises:

DatabaseError – If the export fails or no papers are found.

import_papers_from_sqlite(sqlite_path, conference, year)[source]

Import papers for a given conference and year from a SQLite file.

Existing papers for the given conference/year are replaced (not merged). Hierarchical label cache entries that match the conference and year are replaced. Embeddings metadata is validated for consistency (the embedding model must match).

Parameters:
  • sqlite_path (Path) – Path to the source SQLite file.

  • conference (str) – Conference name being imported.

  • year (int) – Year being imported.

Returns:

Number of papers imported.

Return type:

int

Raises:

DatabaseError – If the import fails or the embedding model is inconsistent.

export_clustering_cache_to_json(conference, year)[source]

Export clustering cache entries matching conference and year as JSON.

Parameters:
  • conference (str) – Conference name to match.

  • year (int) – Year to match.

Returns:

A JSON-serialisable dictionary with an entries list. Each entry contains all ClusteringCache columns except id (auto-generated on import).

Return type:

dict

Raises:

DatabaseError – If the export fails.

import_clustering_cache_from_json(data, conference, year, overwrite_embedding_model=None)[source]

Import clustering cache entries from a JSON dictionary.

Existing clustering cache entries matching conference and year are deleted before importing the new entries.

Parameters:
  • data (dict) – Dictionary previously returned by export_clustering_cache_to_json().

  • conference (str) – Conference name for scoping the delete.

  • year (int) – Year for scoping the delete.

  • overwrite_embedding_model (str, optional) – When provided, the embedding_model field of every imported entry is replaced with this value. Use this when importing an artifact whose embedding model differs from the locally configured one (i.e. when --ignore-embedding-model-mismatch was passed) so that get_clustering_cache() can find the entries using the local model name.

Returns:

Number of cache entries imported.

Return type:

int

Raises:

DatabaseError – If the import fails.

Usage Examples

Basic Operations

from abstracts_explorer.database import DatabaseManager

# Initialize database
db = DatabaseManager()

# Add a paper
paper_id = db.add_paper({
    'openreview_id': 'abc123',
    'title': 'Example Paper',
    'abstract': 'This is an example abstract.',
    'year': 2025,
    'pdf_url': 'https://example.com/paper.pdf'
})

# Add authors
db.add_author(paper_id, 'John Doe', 0)
db.add_author(paper_id, 'Jane Smith', 1)

Searching Papers

# Search by title
results = db.search_papers(title="transformer")

# Search by abstract content
results = db.search_papers(abstract="attention mechanism")

# Filter by year
results = db.get_papers_by_year(2025)

# Get papers by author
results = db.get_papers_by_author("John Doe")

Retrieving Data

# Get all papers
all_papers = db.get_all_papers()

# Get specific paper
paper = db.get_paper_by_id(paper_id)

# Get authors for a paper
authors = db.get_authors_for_paper(paper_id)

Database Schema

papers Table

Column

Type

Description

id

INTEGER

Primary key (auto-increment)

openreview_id

TEXT

Unique OpenReview ID

title

TEXT

Paper title

abstract

TEXT

Paper abstract

year

INTEGER

Conference year

pdf_url

TEXT

URL to PDF

created_at

TIMESTAMP

Creation timestamp

authors Table

Column

Type

Description

id

INTEGER

Primary key (auto-increment)

paper_id

INTEGER

Foreign key to papers

name

TEXT

Author name

position

INTEGER

Author position in list

Error Handling

The database module raises standard SQLite exceptions. Wrap operations in try-except blocks:

try:
    db.add_paper(paper_data)
except sqlite3.IntegrityError:
    print("Paper already exists")
except Exception as e:
    print(f"Database error: {e}")