diff --git a/.env.example b/.env.example index 2b5503a..c58d3b2 100644 --- a/.env.example +++ b/.env.example @@ -13,6 +13,9 @@ PAPERLESS_DOCUMENT_TYPE_ID=3 PAPERLESS_TASK_TIMEOUT_S=600 PAPERLESS_TASK_POLL_INTERVAL_S=5.0 +# 0 = unlimited concurrent per-page uploads +PAPERLESS_UPLOAD_CONCURRENCY=0 + RENDER_DPI=200 OCR_MAX_TOKENS=1024 OCR_TEMPERATURE=0.0 diff --git a/README.md b/README.md index aa23324..14b34e9 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ FastAPI service that: - splits them into pages (JPEG) - OCRs each page via your llama.cpp OpenAI-compatible endpoint - converts each page back into a single-page PDF -- uploads **one Paperless document per page** +- uploads **one Paperless document per page** (all uploads run **in parallel**; OCR stays **one page at a time** for VRAM) - patches each uploaded document with: - `content` = OCR text - custom fields `notebook_id` (field id 1) and `notebook_page` (field id 2) @@ -47,6 +47,9 @@ PAPERLESS_CUSTOM_FIELD_NOTEBOOK_ID=1 PAPERLESS_CUSTOM_FIELD_NOTEBOOK_PAGE=2 PAPERLESS_DOCUMENT_TYPE_ID=3 +# Optional: cap concurrent Paperless uploads (0 = unlimited) +PAPERLESS_UPLOAD_CONCURRENCY=4 + # Rendering / OCR knobs RENDER_DPI=200 OCR_MAX_TOKENS=1024 diff --git a/src/notebook_tools/pipeline.py b/src/notebook_tools/pipeline.py index 8da19d1..84120fb 100644 --- a/src/notebook_tools/pipeline.py +++ b/src/notebook_tools/pipeline.py @@ -6,19 +6,25 @@ Design goals: - Keep the pipeline readable and linear. - Return enough information (created ids) for the job API. - Avoid hidden side-effects (everything is passed in / returned). + +Upload strategy: +- All per-page PDFs are uploaded to Paperless concurrently (each upload still polls until a doc id exists). +- OCR (llama) runs one page at a time to respect VRAM. +- Each page is PATCHed once that page's upload has finished and OCR for that page is done. """ from __future__ import annotations -from collections.abc import Awaitable, Callable +import asyncio import io import logging import re +from collections.abc import Awaitable, Callable +from notebook_tools import pdf_utils from notebook_tools.llama_client import LlamaClient from notebook_tools.paperless_client import PaperlessClient from notebook_tools.settings import Settings -from notebook_tools import pdf_utils from PIL import Image logger = logging.getLogger("notebook_tools.pipeline") @@ -124,64 +130,86 @@ async def run_pipeline_for_paperless_document( if on_progress: await on_progress(0, total_pages) + # One small PDF per page (used by upload tasks). + page_pdfs = [pdf_utils.jpeg_to_pdf_bytes(jpeg_bytes=b) for b in jpegs] + + # 3) Start all Paperless uploads in parallel (each task waits for ingest + document id). + conc = settings.paperless_upload_concurrency + upload_sem: asyncio.Semaphore | None = ( + asyncio.Semaphore(conc) if conc and conc > 0 else None + ) + logger.info( + "job_id=%s starting_parallel_uploads pages=%s concurrency=%s", + job_id, + total_pages, + conc if conc and conc > 0 else "unlimited", + ) + + async def _upload_one_page(idx_1based: int) -> int: + filename = f"job_{job_id}_page_{idx_1based}.pdf" + pdf_bytes = page_pdfs[idx_1based - 1] + logger.info("job_id=%s page=%s/%s upload_task_starting", job_id, idx_1based, total_pages) + if upload_sem is not None: + async with upload_sem: + return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes) + return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes) + + upload_tasks: list[asyncio.Task[int]] = [ + asyncio.create_task(_upload_one_page(i)) for i in range(1, total_pages + 1) + ] + created_ids: list[int] = [] - # 3) For each page: OCR -> convert to single-page PDF -> upload -> patch metadata. - for idx, jpeg_bytes in enumerate(jpegs, start=1): - logger.info("job_id=%s page=%s/%s starting", job_id, idx, total_pages) - # 3a) Page-number OCR (bottom corners only). - page_number = -1 - for corner_jpeg in _crop_bottom_corner_jpegs(full_page_jpeg=jpeg_bytes): - candidate_text = await llama.ocr_jpeg(jpeg_bytes=corner_jpeg, prompt=PAGE_NUMBER_PROMPT) - parsed = _parse_page_number(candidate_text) - if parsed is not None: - # Only accept non-negative numbers, or -1. Anything else becomes unknown. - if parsed == -1 or parsed >= 0: - page_number = parsed - if page_number != -1: - break - logger.info("job_id=%s page=%s detected_page_number=%s", job_id, idx, page_number) + # 4) OCR sequentially (VRAM), then await upload for that page + PATCH. + try: + for idx, jpeg_bytes in enumerate(jpegs, start=1): + logger.info("job_id=%s page=%s/%s ocr_starting", job_id, idx, total_pages) + # 4a) Page-number OCR (bottom corners only). + page_number = -1 + for corner_jpeg in _crop_bottom_corner_jpegs(full_page_jpeg=jpeg_bytes): + candidate_text = await llama.ocr_jpeg(jpeg_bytes=corner_jpeg, prompt=PAGE_NUMBER_PROMPT) + parsed = _parse_page_number(candidate_text) + if parsed is not None: + if parsed == -1 or parsed >= 0: + page_number = parsed + if page_number != -1: + break + logger.info("job_id=%s page=%s detected_page_number=%s", job_id, idx, page_number) - # 3b) Full-page OCR for actual searchable text content. - logger.info("job_id=%s page=%s ocr_full_page", job_id, idx) - ocr_text = await llama.ocr_jpeg(jpeg_bytes=jpeg_bytes, prompt=ocr_prompt_override) - logger.info("job_id=%s page=%s ocr_chars=%s", job_id, idx, len(ocr_text)) + # 4b) Full-page OCR for searchable content. + logger.info("job_id=%s page=%s ocr_full_page", job_id, idx) + ocr_text = await llama.ocr_jpeg(jpeg_bytes=jpeg_bytes, prompt=ocr_prompt_override) + logger.info("job_id=%s page=%s ocr_chars=%s", job_id, idx, len(ocr_text)) - page_pdf = pdf_utils.jpeg_to_pdf_bytes(jpeg_bytes=jpeg_bytes) - logger.info("job_id=%s page=%s pdf_bytes=%s", job_id, idx, len(page_pdf)) + logger.info("job_id=%s page=%s awaiting_upload_then_patch", job_id, idx) + new_id = await upload_tasks[idx - 1] + logger.info("job_id=%s page=%s uploaded_document_id=%s", job_id, idx, new_id) - # Upload the per-page PDF as a new Paperless document. - logger.info("job_id=%s page=%s uploading_to_paperless", job_id, idx) - new_id = await paperless.upload_pdf(filename=f"job_{job_id}_page_{idx}.pdf", pdf_bytes=page_pdf) - logger.info("job_id=%s page=%s uploaded_document_id=%s", job_id, idx, new_id) + custom_fields = [ + {"field": settings.paperless_custom_field_notebook_id, "value": notebook_id}, + {"field": settings.paperless_custom_field_notebook_page, "value": page_number}, + ] - # Patch metadata: - # - content: OCR text so it becomes searchable in Paperless - # - custom_fields: notebook_id + notebook_page - # - document_type: per-page document type (Paperless id) - # - title - custom_fields = [ - {"field": settings.paperless_custom_field_notebook_id, "value": notebook_id}, - {"field": settings.paperless_custom_field_notebook_page, "value": page_number}, - ] + title = f"Notebook {notebook_id} Page {page_number}" - # Per your request, title is always in this exact format. - # (We keep `title_prefix` in the API for now, but it is no longer used.) - title = f"Notebook {notebook_id} Page {page_number}" + logger.info("job_id=%s page=%s patching_document_id=%s", job_id, idx, new_id) + await paperless.patch_document( + document_id=new_id, + title=title, + content=ocr_text, + custom_fields=custom_fields, + document_type=settings.paperless_document_type_id, + ) + logger.info("job_id=%s page=%s patched_document_id=%s", job_id, idx, new_id) - logger.info("job_id=%s page=%s patching_document_id=%s", job_id, idx, new_id) - await paperless.patch_document( - document_id=new_id, - title=title, - content=ocr_text, - custom_fields=custom_fields, - document_type=settings.paperless_document_type_id, - ) - logger.info("job_id=%s page=%s patched_document_id=%s", job_id, idx, new_id) - - created_ids.append(new_id) - if on_progress: - await on_progress(idx, total_pages) + created_ids.append(new_id) + if on_progress: + await on_progress(idx, total_pages) + except BaseException: + for t in upload_tasks: + if not t.done(): + t.cancel() + await asyncio.gather(*upload_tasks, return_exceptions=True) + raise return {"created_document_ids": created_ids} - diff --git a/src/notebook_tools/settings.py b/src/notebook_tools/settings.py index de329b1..c45857f 100644 --- a/src/notebook_tools/settings.py +++ b/src/notebook_tools/settings.py @@ -46,6 +46,9 @@ class Settings(BaseSettings): paperless_task_timeout_s: int = Field(600, alias="PAPERLESS_TASK_TIMEOUT_S") paperless_task_poll_interval_s: float = Field(5.0, alias="PAPERLESS_TASK_POLL_INTERVAL_S") + # Max concurrent per-page uploads to Paperless (0 = unlimited). Limits load on the server. + paperless_upload_concurrency: int = Field(0, alias="PAPERLESS_UPLOAD_CONCURRENCY") + # Logging log_level: str = Field("INFO", alias="LOG_LEVEL")