Add parallel uploading of documents
Signed-off-by: Daniel Henry <iamdanhenry@gmail.com>
This commit is contained in:
@@ -6,19 +6,25 @@ Design goals:
|
||||
- Keep the pipeline readable and linear.
|
||||
- Return enough information (created ids) for the job API.
|
||||
- Avoid hidden side-effects (everything is passed in / returned).
|
||||
|
||||
Upload strategy:
|
||||
- All per-page PDFs are uploaded to Paperless concurrently (each upload still polls until a doc id exists).
|
||||
- OCR (llama) runs one page at a time to respect VRAM.
|
||||
- Each page is PATCHed once that page's upload has finished and OCR for that page is done.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Awaitable, Callable
|
||||
import asyncio
|
||||
import io
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
from notebook_tools import pdf_utils
|
||||
from notebook_tools.llama_client import LlamaClient
|
||||
from notebook_tools.paperless_client import PaperlessClient
|
||||
from notebook_tools.settings import Settings
|
||||
from notebook_tools import pdf_utils
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger("notebook_tools.pipeline")
|
||||
@@ -124,64 +130,86 @@ async def run_pipeline_for_paperless_document(
|
||||
if on_progress:
|
||||
await on_progress(0, total_pages)
|
||||
|
||||
# One small PDF per page (used by upload tasks).
|
||||
page_pdfs = [pdf_utils.jpeg_to_pdf_bytes(jpeg_bytes=b) for b in jpegs]
|
||||
|
||||
# 3) Start all Paperless uploads in parallel (each task waits for ingest + document id).
|
||||
conc = settings.paperless_upload_concurrency
|
||||
upload_sem: asyncio.Semaphore | None = (
|
||||
asyncio.Semaphore(conc) if conc and conc > 0 else None
|
||||
)
|
||||
logger.info(
|
||||
"job_id=%s starting_parallel_uploads pages=%s concurrency=%s",
|
||||
job_id,
|
||||
total_pages,
|
||||
conc if conc and conc > 0 else "unlimited",
|
||||
)
|
||||
|
||||
async def _upload_one_page(idx_1based: int) -> int:
|
||||
filename = f"job_{job_id}_page_{idx_1based}.pdf"
|
||||
pdf_bytes = page_pdfs[idx_1based - 1]
|
||||
logger.info("job_id=%s page=%s/%s upload_task_starting", job_id, idx_1based, total_pages)
|
||||
if upload_sem is not None:
|
||||
async with upload_sem:
|
||||
return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes)
|
||||
return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes)
|
||||
|
||||
upload_tasks: list[asyncio.Task[int]] = [
|
||||
asyncio.create_task(_upload_one_page(i)) for i in range(1, total_pages + 1)
|
||||
]
|
||||
|
||||
created_ids: list[int] = []
|
||||
|
||||
# 3) For each page: OCR -> convert to single-page PDF -> upload -> patch metadata.
|
||||
for idx, jpeg_bytes in enumerate(jpegs, start=1):
|
||||
logger.info("job_id=%s page=%s/%s starting", job_id, idx, total_pages)
|
||||
# 3a) Page-number OCR (bottom corners only).
|
||||
page_number = -1
|
||||
for corner_jpeg in _crop_bottom_corner_jpegs(full_page_jpeg=jpeg_bytes):
|
||||
candidate_text = await llama.ocr_jpeg(jpeg_bytes=corner_jpeg, prompt=PAGE_NUMBER_PROMPT)
|
||||
parsed = _parse_page_number(candidate_text)
|
||||
if parsed is not None:
|
||||
# Only accept non-negative numbers, or -1. Anything else becomes unknown.
|
||||
if parsed == -1 or parsed >= 0:
|
||||
page_number = parsed
|
||||
if page_number != -1:
|
||||
break
|
||||
logger.info("job_id=%s page=%s detected_page_number=%s", job_id, idx, page_number)
|
||||
# 4) OCR sequentially (VRAM), then await upload for that page + PATCH.
|
||||
try:
|
||||
for idx, jpeg_bytes in enumerate(jpegs, start=1):
|
||||
logger.info("job_id=%s page=%s/%s ocr_starting", job_id, idx, total_pages)
|
||||
# 4a) Page-number OCR (bottom corners only).
|
||||
page_number = -1
|
||||
for corner_jpeg in _crop_bottom_corner_jpegs(full_page_jpeg=jpeg_bytes):
|
||||
candidate_text = await llama.ocr_jpeg(jpeg_bytes=corner_jpeg, prompt=PAGE_NUMBER_PROMPT)
|
||||
parsed = _parse_page_number(candidate_text)
|
||||
if parsed is not None:
|
||||
if parsed == -1 or parsed >= 0:
|
||||
page_number = parsed
|
||||
if page_number != -1:
|
||||
break
|
||||
logger.info("job_id=%s page=%s detected_page_number=%s", job_id, idx, page_number)
|
||||
|
||||
# 3b) Full-page OCR for actual searchable text content.
|
||||
logger.info("job_id=%s page=%s ocr_full_page", job_id, idx)
|
||||
ocr_text = await llama.ocr_jpeg(jpeg_bytes=jpeg_bytes, prompt=ocr_prompt_override)
|
||||
logger.info("job_id=%s page=%s ocr_chars=%s", job_id, idx, len(ocr_text))
|
||||
# 4b) Full-page OCR for searchable content.
|
||||
logger.info("job_id=%s page=%s ocr_full_page", job_id, idx)
|
||||
ocr_text = await llama.ocr_jpeg(jpeg_bytes=jpeg_bytes, prompt=ocr_prompt_override)
|
||||
logger.info("job_id=%s page=%s ocr_chars=%s", job_id, idx, len(ocr_text))
|
||||
|
||||
page_pdf = pdf_utils.jpeg_to_pdf_bytes(jpeg_bytes=jpeg_bytes)
|
||||
logger.info("job_id=%s page=%s pdf_bytes=%s", job_id, idx, len(page_pdf))
|
||||
logger.info("job_id=%s page=%s awaiting_upload_then_patch", job_id, idx)
|
||||
new_id = await upload_tasks[idx - 1]
|
||||
logger.info("job_id=%s page=%s uploaded_document_id=%s", job_id, idx, new_id)
|
||||
|
||||
# Upload the per-page PDF as a new Paperless document.
|
||||
logger.info("job_id=%s page=%s uploading_to_paperless", job_id, idx)
|
||||
new_id = await paperless.upload_pdf(filename=f"job_{job_id}_page_{idx}.pdf", pdf_bytes=page_pdf)
|
||||
logger.info("job_id=%s page=%s uploaded_document_id=%s", job_id, idx, new_id)
|
||||
custom_fields = [
|
||||
{"field": settings.paperless_custom_field_notebook_id, "value": notebook_id},
|
||||
{"field": settings.paperless_custom_field_notebook_page, "value": page_number},
|
||||
]
|
||||
|
||||
# Patch metadata:
|
||||
# - content: OCR text so it becomes searchable in Paperless
|
||||
# - custom_fields: notebook_id + notebook_page
|
||||
# - document_type: per-page document type (Paperless id)
|
||||
# - title
|
||||
custom_fields = [
|
||||
{"field": settings.paperless_custom_field_notebook_id, "value": notebook_id},
|
||||
{"field": settings.paperless_custom_field_notebook_page, "value": page_number},
|
||||
]
|
||||
title = f"Notebook {notebook_id} Page {page_number}"
|
||||
|
||||
# Per your request, title is always in this exact format.
|
||||
# (We keep `title_prefix` in the API for now, but it is no longer used.)
|
||||
title = f"Notebook {notebook_id} Page {page_number}"
|
||||
logger.info("job_id=%s page=%s patching_document_id=%s", job_id, idx, new_id)
|
||||
await paperless.patch_document(
|
||||
document_id=new_id,
|
||||
title=title,
|
||||
content=ocr_text,
|
||||
custom_fields=custom_fields,
|
||||
document_type=settings.paperless_document_type_id,
|
||||
)
|
||||
logger.info("job_id=%s page=%s patched_document_id=%s", job_id, idx, new_id)
|
||||
|
||||
logger.info("job_id=%s page=%s patching_document_id=%s", job_id, idx, new_id)
|
||||
await paperless.patch_document(
|
||||
document_id=new_id,
|
||||
title=title,
|
||||
content=ocr_text,
|
||||
custom_fields=custom_fields,
|
||||
document_type=settings.paperless_document_type_id,
|
||||
)
|
||||
logger.info("job_id=%s page=%s patched_document_id=%s", job_id, idx, new_id)
|
||||
|
||||
created_ids.append(new_id)
|
||||
if on_progress:
|
||||
await on_progress(idx, total_pages)
|
||||
created_ids.append(new_id)
|
||||
if on_progress:
|
||||
await on_progress(idx, total_pages)
|
||||
except BaseException:
|
||||
for t in upload_tasks:
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
await asyncio.gather(*upload_tasks, return_exceptions=True)
|
||||
raise
|
||||
|
||||
return {"created_document_ids": created_ids}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user