Source code for audia.ui.routes.research

"""
/api/research – Search ArXiv and convert selected papers to audio.
All endpoints accept an optional ?project= query parameter (defaults to "default").
"""

from __future__ import annotations

import asyncio
import json
import uuid
from pathlib import Path

from fastapi import APIRouter, File, HTTPException, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel

from audia.agents.graph import run_pipeline
from audia.agents.pdf_processor import extract_text
from audia.agents.research import ArxivSearcher
from audia.agents.text_cleaner import heuristic_clean, llm_curate
from audia.agents.tts import synthesize
from audia.config import DEFAULT_PROJECT, get_settings
from audia.storage import AudioFile, Paper, ResearchSession, get_session
from audia.ui.jobs import JOBS

router = APIRouter()


def _proj(project: str | None) -> str:
    return (project or DEFAULT_PROJECT).strip() or DEFAULT_PROJECT


def _dl_url(audio_id: int, project: str) -> str:
    if project == DEFAULT_PROJECT:
        return f"/api/convert/download/{audio_id}"
    return f"/api/convert/download/{audio_id}?project={project}"


[docs] class SearchRequest(BaseModel): query: str max_results: int = 10
[docs] class NormalizeRequest(BaseModel): query: str llm_provider: str | None = None llm_model: str | None = None
[docs] class ConvertResearchRequest(BaseModel): arxiv_ids: list[str] project: str | None = None
[docs] class EnqueueRequest(BaseModel): arxiv_ids: list[str] query: str | None = None llm_provider: str | None = None llm_model: str | None = None tts_backend: str | None = None tts_voice: str | None = None project: str | None = None
def _make_job(pdf_title: str | None = None) -> dict: return { "status": "running", "stage": "queued", "stage_label": "Queued", "progress": 2, "log": [], "stats": {}, "result": None, "error": None, "cancelled": False, "pdf_path": None, "pdf_title": pdf_title, "paper_id": None, } def _log(job: dict, line: str) -> None: job["log"].append(line) # ─────────────────────────────────────────────────── normalize @router.post( "/normalize", summary="Distil a natural-language query into a short ArXiv search string via LLM" ) async def normalize(body: NormalizeRequest) -> JSONResponse: from langchain_core.messages import HumanMessage, SystemMessage # type: ignore from audia.agents.text_cleaner import _build_llm cfg = get_settings() if body.llm_provider: cfg.__dict__["llm_provider"] = body.llm_provider.lower() if body.llm_model: cfg.__dict__["llm_model"] = body.llm_model try: def _run() -> str: llm = _build_llm(cfg) messages = [ SystemMessage( content=( "You are a search query assistant. " "Extract a concise ArXiv search query (3-8 words) from the user's message. " "Return ONLY the search query, nothing else." ) ), HumanMessage(content=body.query), ] result = llm.invoke(messages) return getattr(result, "content", str(result)).strip() search_string = await asyncio.to_thread(_run) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) return JSONResponse({"search_string": search_string}) # ─────────────────────────────────────────────────── search @router.post("/search", summary="Search ArXiv for papers") async def search(body: SearchRequest) -> JSONResponse: searcher = ArxivSearcher(max_results=body.max_results) try: papers = searcher.search(body.query) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) return JSONResponse( { "query": body.query, "results": [ { "arxiv_id": p.arxiv_id, "title": p.title, "authors": p.authors, "abstract": p.abstract, "pdf_url": p.pdf_url, "published": p.published, } for p in papers ], } ) # ─────────────────────────────────────────────────── convert (synchronous) @router.post("/convert", summary="Synchronously convert ArXiv papers to audio") async def convert_papers(body: ConvertResearchRequest) -> JSONResponse: proj = _proj(body.project) cfg = get_settings() dirs = cfg.get_project_dirs(proj) dirs.ensure_dirs() results = [] for arxiv_id in body.arxiv_ids: searcher = ArxivSearcher() papers = await asyncio.to_thread(searcher.search, f"id:{arxiv_id}") if not papers: results.append({"arxiv_id": arxiv_id, "error": "Not found on ArXiv"}) continue paper = papers[0] try: pdf_path: Path = await asyncio.to_thread(searcher.download_pdf, paper, dirs.upload_dir) except Exception as exc: results.append({"arxiv_id": arxiv_id, "error": f"Download failed: {exc}"}) continue state = await asyncio.to_thread(run_pipeline, str(pdf_path)) if state.get("error") or not state.get("audio_path"): results.append({"arxiv_id": arxiv_id, "error": state.get("error", "Pipeline failed")}) continue audio_path = Path(state["audio_path"]) with get_session(proj) as session: db_paper = Paper( title=paper.title, authors=json.dumps(paper.authors), abstract=paper.abstract, arxiv_id=paper.arxiv_id, pdf_path=str(pdf_path), pdf_url=paper.pdf_url, ) session.add(db_paper) session.flush() af = AudioFile( paper_id=db_paper.id, filename=audio_path.name, file_path=str(audio_path), tts_backend=state.get("tts_backend", cfg.tts_backend), tts_voice=state.get("tts_voice", cfg.tts_voice), ) session.add(af) session.flush() audio_id = af.id results.append( { "arxiv_id": arxiv_id, "title": paper.title, "download_url": _dl_url(audio_id, proj), } ) return JSONResponse({"results": results}) # ─────────────────────────────────────────────────── enqueue @router.post("/enqueue", summary="Enqueue ArXiv paper(s) for async conversion") async def enqueue_research(body: EnqueueRequest) -> JSONResponse: proj = _proj(body.project) cfg = get_settings() dirs = cfg.get_project_dirs(proj) dirs.ensure_dirs() jobs_out = [] for arxiv_id in body.arxiv_ids: job_id = uuid.uuid4().hex job = _make_job(pdf_title=arxiv_id) JOBS[job_id] = job asyncio.create_task( _run_research_job( job_id, arxiv_id, project=proj, query=body.query, llm_provider=body.llm_provider, llm_model=body.llm_model, tts_backend=body.tts_backend, tts_voice=body.tts_voice, ) ) jobs_out.append({"arxiv_id": arxiv_id, "job_id": job_id}) return JSONResponse({"jobs": jobs_out}) async def _run_research_job( job_id: str, arxiv_id: str, project: str = DEFAULT_PROJECT, query: str | None = None, llm_provider: str | None = None, llm_model: str | None = None, tts_backend: str | None = None, tts_voice: str | None = None, ) -> None: job = JOBS[job_id] cfg = get_settings() dirs = cfg.get_project_dirs(project) upload_id = uuid.uuid4().hex[:8] try: # Stage 1 – Search ArXiv for metadata job.update(stage="searching", stage_label="Step 1/6 \u2500 Searching ArXiv", progress=5) _log(job, f"Step 1/6 \u2500 Searching ArXiv for {arxiv_id}") searcher = ArxivSearcher() papers = await asyncio.to_thread(searcher.search, f"id:{arxiv_id}") if job["cancelled"]: job.update(status="cancelled", stage="cancelled", stage_label="Cancelled") return if not papers: job.update( status="error", stage="error", stage_label="Failed", error=f"Paper {arxiv_id} not found on ArXiv.", ) _log(job, f" \u2717 {arxiv_id} not found on ArXiv") return paper = papers[0] job["pdf_title"] = paper.title _log(job, f" \u2713 Found: {paper.title[:80]}") # Stage 2 – Download PDF job.update(stage="downloading", stage_label="Step 2/6 \u2500 Downloading PDF", progress=15) _log(job, "Step 2/6 \u2500 Downloading PDF") pdf_path: Path = await asyncio.to_thread(searcher.download_pdf, paper, dirs.upload_dir) if job["cancelled"]: job.update(status="cancelled", stage="cancelled", stage_label="Cancelled") return job["pdf_path"] = str(pdf_path) _log(job, f" \u2713 Saved to {pdf_path.name}") # Stage 3 – PDF extraction job.update(stage="extracting", stage_label="Step 3/6 \u2500 PDF extraction", progress=28) _log(job, "Step 3/6 \u2500 PDF extraction") pdf_result = await asyncio.to_thread(extract_text, str(pdf_path)) if job["cancelled"]: job.update(status="cancelled", stage="cancelled", stage_label="Cancelled") return raw_chars = len(pdf_result.text) job["stats"].update(raw_chars=raw_chars, num_pages=pdf_result.num_pages) if pdf_result.title: job["pdf_title"] = pdf_result.title _log(job, f" \u2713 {pdf_result.num_pages} pages, {raw_chars:,} chars") # Stage 4 – Heuristic pre-pass job.update( stage="preprocessing", stage_label="Step 4/6 \u2500 Heuristic pre-cleaning", progress=40 ) _log(job, "Step 4/6 \u2500 Heuristic pre-cleaning") precleaned = await asyncio.to_thread(heuristic_clean, pdf_result.text) if job["cancelled"]: job.update(status="cancelled", stage="cancelled", stage_label="Cancelled") return job["stats"]["precleaned_chars"] = len(precleaned) _log(job, f" \u2713 {raw_chars:,} \u2192 {len(precleaned):,} chars after pre-pass") # Stage 5 – LLM curation job.update(stage="curating", stage_label="Step 5/6 \u2500 LLM curation", progress=55) _log(job, "Step 5/6 \u2500 LLM curation") cfg2 = get_settings() if llm_provider: cfg2.__dict__["llm_provider"] = llm_provider.lower() if llm_model: cfg2.__dict__["llm_model"] = llm_model if tts_backend: cfg2.__dict__["tts_backend"] = tts_backend if tts_voice: cfg2.__dict__["tts_voice"] = tts_voice def _cb_llm(msg: str) -> None: _log(job, f" {msg}") curated = await asyncio.to_thread(llm_curate, precleaned, cfg2, _cb_llm) if job["cancelled"]: job.update(status="cancelled", stage="cancelled", stage_label="Cancelled") return job["stats"]["curated_chars"] = len(curated) _log(job, f" \u2713 Curation complete \u2013 {len(curated):,} chars") # Stage 6 – TTS synthesis job.update(stage="synthesizing", stage_label="Step 6/6 \u2500 TTS synthesis", progress=72) _log(job, "Step 6/6 \u2500 TTS synthesis") _log(job, f" Backend: {cfg2.tts_backend} \u00b7 Voice: {cfg2.tts_voice}") def _cb_tts(msg: str) -> None: _log(job, f" {msg}") job["progress"] = min(92, job["progress"] + 1) audio_path = await asyncio.to_thread( synthesize, text=curated, output_dir=str(dirs.audio_dir), filename=upload_id, settings=cfg2, progress_cb=_cb_tts, ) if job["cancelled"]: job.update(status="cancelled", stage="cancelled", stage_label="Cancelled") return job["stats"]["audio_filename"] = audio_path.name _log(job, f" \u2713 Audio saved: {audio_path.name}") # Save to DB job.update(stage="saving", stage_label="Saving to library", progress=95) _log(job, "Saving to library\u2026") with get_session(project) as session: db_paper = Paper( title=job["pdf_title"] or paper.title, authors=json.dumps(paper.authors), abstract=paper.abstract, arxiv_id=paper.arxiv_id, pdf_path=str(pdf_path), pdf_url=paper.pdf_url, ) session.add(db_paper) session.flush() af = AudioFile( paper_id=db_paper.id, filename=audio_path.name, file_path=str(audio_path), tts_backend=cfg2.tts_backend, tts_voice=cfg2.tts_voice, ) session.add(af) session.flush() audio_id = af.id paper_id = db_paper.id if query: session.add( ResearchSession( query=query, paper_ids=json.dumps([paper_id]), ) ) job["paper_id"] = paper_id _log(job, f" \u2713 Saved (paper_id={paper_id}, audio_id={audio_id})") job.update( status="done", stage="done", stage_label="Complete", progress=100, result={ "audio_id": audio_id, "paper_id": paper_id, "audio_filename": audio_path.name, "download_url": _dl_url(audio_id, project), "title": job["pdf_title"] or paper.title, "num_pages": pdf_result.num_pages, }, ) except Exception as exc: job.update(status="error", stage="error", stage_label="Failed", error=str(exc)) _log(job, f" \u2717 Error: {exc}") # ─────────────────────────────────────────────────── transcribe @router.post("/transcribe", summary="Transcribe uploaded audio to text") async def transcribe_audio(file: UploadFile = File(...)) -> JSONResponse: """Accept a browser audio recording and return the Whisper transcription.""" import tempfile cfg = get_settings() suffix = Path(file.filename or "audio.webm").suffix or ".webm" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: content = await file.read() tmp.write(content) tmp_path = tmp.name try: from audia.agents.stt import transcribe_file text = await asyncio.to_thread(transcribe_file, tmp_path, cfg.stt_model, cfg.stt_device) finally: Path(tmp_path).unlink(missing_ok=True) return JSONResponse({"text": text}) # ─────────────────────────────────────────────────── status / cancel / pdf @router.get("/status/{job_id}", summary="Poll research job status") async def get_job_status(job_id: str) -> JSONResponse: job = JOBS.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found.") return JSONResponse(job) @router.delete("/jobs/{job_id}", summary="Cancel a running research job") async def cancel_job(job_id: str) -> JSONResponse: job = JOBS.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found.") if job["status"] == "running": job["cancelled"] = True job["stage_label"] = "Cancelling\u2026" _log(job, "Cancellation requested\u2026") return JSONResponse({"status": "cancel_requested", "job_id": job_id}) @router.get( "/jobs/{job_id}/pdf", summary="Serve the PDF for an in-progress or completed research job" ) async def serve_job_pdf(job_id: str) -> FileResponse: job = JOBS.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found.") pdf_path = job.get("pdf_path") if not pdf_path or not Path(pdf_path).exists(): raise HTTPException(status_code=404, detail="PDF not yet available.") return FileResponse( path=pdf_path, media_type="application/pdf", headers={"Content-Disposition": "inline"}, )