"""OCR pipeline: Paperless PDF -> per-page OCR -> per-page PDFs -> Paperless uploads. This module is where the "business logic" lives. Design goals: - Keep the pipeline readable and linear. - Return enough information (created ids) for the job API. - Avoid hidden side-effects (everything is passed in / returned). Upload strategy: - All per-page PDFs are uploaded to Paperless concurrently (each upload still polls until a doc id exists). - OCR (llama) runs one page at a time to respect VRAM. - Each page is PATCHed once that page's upload has finished and OCR for that page is done. """ from __future__ import annotations import asyncio import io import logging import re from collections.abc import Awaitable, Callable from notebook_tools import pdf_utils from notebook_tools.llama_client import LlamaClient from notebook_tools.paperless_client import PaperlessClient from notebook_tools.settings import Settings from PIL import Image logger = logging.getLogger("notebook_tools.pipeline") PAGE_NUMBER_PROMPT = ( "You are reading a handwritten page number in the bottom corner of a notebook page. " "Return ONLY the page number as an integer. If you cannot determine it, return -1. " "Do not output any other words." ) def _crop_bottom_corner_jpegs(*, full_page_jpeg: bytes) -> list[bytes]: """Return small JPEG crops from bottom-left and bottom-right corners. Why crop? - It reduces visual clutter so the model focuses on the handwritten page number. - It reduces payload size, making OCR faster. The crop is based on percentages so it works across different page sizes. """ img = Image.open(io.BytesIO(full_page_jpeg)).convert("RGB") w, h = img.size # Bottom band (e.g. last 20% of page height) band_h = int(h * 0.22) y0 = max(0, h - band_h) # Left/right corner width (e.g. 35% of page width) corner_w = int(w * 0.35) crops = [ img.crop((0, y0, corner_w, h)), # bottom-left img.crop((w - corner_w, y0, w, h)), # bottom-right ] out: list[bytes] = [] for c in crops: buf = io.BytesIO() c.save(buf, format="JPEG", quality=90, optimize=True) out.append(buf.getvalue()) return out def _parse_page_number(text: str) -> int | None: """Try to parse an integer page number from a model response. We accept: - '12' - 'Page 12' (if the model disobeys slightly) - '-1' """ m = re.search(r"-?\d+", text) if not m: return None try: return int(m.group(0)) except ValueError: return None async def run_pipeline_for_paperless_document( *, settings: Settings, paperless_document_id: int, notebook_id: str, job_id: str, on_progress: Callable[[int, int], Awaitable[None]] | None, ocr_prompt_override: str | None, title_prefix: str | None, ) -> dict[str, list[int]]: """Run the full OCR pipeline for one Paperless document id. Returns: {"created_document_ids": [...]} where each id is a NEW Paperless document (one per page). """ paperless = PaperlessClient( base_url=str(settings.paperless_base_url), token=settings.paperless_token, task_timeout_s=settings.paperless_task_timeout_s, task_poll_interval_s=settings.paperless_task_poll_interval_s, ) llama = LlamaClient( base_url=str(settings.llama_base_url), model=settings.llama_model, temperature=settings.ocr_temperature, max_tokens=settings.ocr_max_tokens, ) # 1) Download the source PDF. logger.info("job_id=%s downloading paperless_document_id=%s", job_id, paperless_document_id) pdf_bytes = await paperless.download_document_pdf(document_id=paperless_document_id) logger.info("job_id=%s downloaded_pdf_bytes=%s", job_id, len(pdf_bytes)) # 2) Render the PDF pages as JPEG images. logger.info("job_id=%s rendering_pages dpi=%s", job_id, settings.render_dpi) jpegs = pdf_utils.render_pdf_to_jpegs(pdf_bytes=pdf_bytes, dpi=settings.render_dpi) total_pages = len(jpegs) logger.info("job_id=%s rendered_pages=%s", job_id, total_pages) if on_progress: await on_progress(0, total_pages) # One small PDF per page (used by upload tasks). page_pdfs = [pdf_utils.jpeg_to_pdf_bytes(jpeg_bytes=b) for b in jpegs] # 3) Start all Paperless uploads in parallel (each task waits for ingest + document id). conc = settings.paperless_upload_concurrency upload_sem: asyncio.Semaphore | None = ( asyncio.Semaphore(conc) if conc and conc > 0 else None ) logger.info( "job_id=%s starting_parallel_uploads pages=%s concurrency=%s", job_id, total_pages, conc if conc and conc > 0 else "unlimited", ) async def _upload_one_page(idx_1based: int) -> int: filename = f"job_{job_id}_page_{idx_1based}.pdf" pdf_bytes = page_pdfs[idx_1based - 1] logger.info("job_id=%s page=%s/%s upload_task_starting", job_id, idx_1based, total_pages) if upload_sem is not None: async with upload_sem: return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes) return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes) upload_tasks: list[asyncio.Task[int]] = [ asyncio.create_task(_upload_one_page(i)) for i in range(1, total_pages + 1) ] created_ids: list[int] = [] # 4) OCR sequentially (VRAM), then await upload for that page + PATCH. try: for idx, jpeg_bytes in enumerate(jpegs, start=1): logger.info("job_id=%s page=%s/%s ocr_starting", job_id, idx, total_pages) # 4a) Page-number OCR (bottom corners only). page_number = -1 for corner_jpeg in _crop_bottom_corner_jpegs(full_page_jpeg=jpeg_bytes): candidate_text = await llama.ocr_jpeg(jpeg_bytes=corner_jpeg, prompt=PAGE_NUMBER_PROMPT) parsed = _parse_page_number(candidate_text) if parsed is not None: if parsed == -1 or parsed >= 0: page_number = parsed if page_number != -1: break logger.info("job_id=%s page=%s detected_page_number=%s", job_id, idx, page_number) # 4b) Full-page OCR for searchable content. logger.info("job_id=%s page=%s ocr_full_page", job_id, idx) ocr_text = await llama.ocr_jpeg(jpeg_bytes=jpeg_bytes, prompt=ocr_prompt_override) logger.info("job_id=%s page=%s ocr_chars=%s", job_id, idx, len(ocr_text)) logger.info("job_id=%s page=%s awaiting_upload_then_patch", job_id, idx) new_id = await upload_tasks[idx - 1] logger.info("job_id=%s page=%s uploaded_document_id=%s", job_id, idx, new_id) custom_fields = [ {"field": settings.paperless_custom_field_notebook_id, "value": notebook_id}, {"field": settings.paperless_custom_field_notebook_page, "value": page_number}, ] title = f"Notebook {notebook_id} Page {page_number}" logger.info("job_id=%s page=%s patching_document_id=%s", job_id, idx, new_id) await paperless.patch_document( document_id=new_id, title=title, content=ocr_text, custom_fields=custom_fields, document_type=settings.paperless_document_type_id, ) logger.info("job_id=%s page=%s patched_document_id=%s", job_id, idx, new_id) created_ids.append(new_id) if on_progress: await on_progress(idx, total_pages) except BaseException: for t in upload_tasks: if not t.done(): t.cancel() await asyncio.gather(*upload_tasks, return_exceptions=True) raise return {"created_document_ids": created_ids}