Compare commits
7 Commits
facf6b26f0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 4820888ff9 | |||
| bcb4714778 | |||
| 7fec4bc575 | |||
| 612fbe2055 | |||
| 29c790fdfd | |||
| ece1b037d1 | |||
| 9b1705d82b |
16
.dockerignore
Normal file
16
.dockerignore
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
.git
|
||||||
|
.github
|
||||||
|
.gitea
|
||||||
|
.venv
|
||||||
|
__pycache__
|
||||||
|
*.py[cod]
|
||||||
|
.pytest_cache
|
||||||
|
.ruff_cache
|
||||||
|
.coverage
|
||||||
|
htmlcov
|
||||||
|
.env
|
||||||
|
.env.*
|
||||||
|
!.env.example
|
||||||
|
tests
|
||||||
|
.cursor
|
||||||
|
*.plan.md
|
||||||
22
.env.example
Normal file
22
.env.example
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
PAPERLESS_BASE_URL="https://paperless.example.com"
|
||||||
|
PAPERLESS_TOKEN="paste-token-here"
|
||||||
|
|
||||||
|
LLAMA_BASE_URL="http://127.0.0.1:9292"
|
||||||
|
LLAMA_MODEL="ggml-model-q4_k_m"
|
||||||
|
|
||||||
|
PAPERLESS_CUSTOM_FIELD_NOTEBOOK_ID=1
|
||||||
|
PAPERLESS_CUSTOM_FIELD_NOTEBOOK_PAGE=2
|
||||||
|
|
||||||
|
# Document type id for each uploaded per-page document
|
||||||
|
PAPERLESS_DOCUMENT_TYPE_ID=3
|
||||||
|
|
||||||
|
PAPERLESS_TASK_TIMEOUT_S=600
|
||||||
|
PAPERLESS_TASK_POLL_INTERVAL_S=5.0
|
||||||
|
|
||||||
|
# 0 = unlimited concurrent per-page uploads
|
||||||
|
PAPERLESS_UPLOAD_CONCURRENCY=0
|
||||||
|
|
||||||
|
RENDER_DPI=200
|
||||||
|
OCR_MAX_TOKENS=1024
|
||||||
|
OCR_TEMPERATURE=0.0
|
||||||
|
|
||||||
38
.github/workflows/build-docker.yml
vendored
Normal file
38
.github/workflows/build-docker.yml
vendored
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
name: Build and Publish Docker Image
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-push:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: Login to Docker Registry
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: ${{ secrets.DOCKER_REGISTRY }}
|
||||||
|
username: ${{ secrets.DOCKER_USERNAME }}
|
||||||
|
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||||
|
|
||||||
|
- name: Build and push
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
file: Dockerfile
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
${{ secrets.DOCKER_REGISTRY }}/${{ secrets.DOCKER_USERNAME }}/notebook-tools:${{ gitea.sha }}
|
||||||
|
${{ secrets.DOCKER_REGISTRY }}/${{ secrets.DOCKER_USERNAME }}/notebook-tools:latest
|
||||||
|
labels: |
|
||||||
|
org.opencontainers.image.source=${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
org.opencontainers.image.description=Notebook tools — Paperless + llama.cpp OCR API
|
||||||
|
cache-from: type=gha
|
||||||
|
cache-to: type=gha,mode=max
|
||||||
19
Dockerfile
Normal file
19
Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# syntax=docker/dockerfile:1
|
||||||
|
# Production image: uv sync (frozen lockfile), run FastAPI with uvicorn.
|
||||||
|
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
ENV UV_COMPILE_BYTECODE=1 \
|
||||||
|
UV_LINK_MODE=copy
|
||||||
|
|
||||||
|
COPY pyproject.toml uv.lock README.md ./
|
||||||
|
COPY src ./src
|
||||||
|
|
||||||
|
RUN uv sync --frozen --no-dev
|
||||||
|
|
||||||
|
ENV PATH="/app/.venv/bin:$PATH"
|
||||||
|
|
||||||
|
EXPOSE 8080
|
||||||
|
|
||||||
|
CMD ["uvicorn", "notebook_tools.api:app", "--host", "0.0.0.0", "--port", "8080"]
|
||||||
102
README.md
102
README.md
@@ -1,2 +1,104 @@
|
|||||||
# notebook-tools
|
# notebook-tools
|
||||||
|
|
||||||
|
FastAPI service that:
|
||||||
|
- downloads PDFs from Paperless-ngx
|
||||||
|
- splits them into pages (JPEG)
|
||||||
|
- OCRs each page via your llama.cpp OpenAI-compatible endpoint
|
||||||
|
- converts each page back into a single-page PDF
|
||||||
|
- uploads **one Paperless document per page** (all uploads run **in parallel**; OCR stays **one page at a time** for VRAM)
|
||||||
|
- patches each uploaded document with:
|
||||||
|
- `content` = OCR text
|
||||||
|
- custom fields `notebook_id` (field id 1) and `notebook_page` (field id 2)
|
||||||
|
- `document_type` = Paperless document type id (default **3**, configurable)
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
Install deps:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv sync
|
||||||
|
```
|
||||||
|
|
||||||
|
Create a `.env` file (example below) and **do not commit it**.
|
||||||
|
|
||||||
|
## Run locally
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv run uvicorn notebook_tools.api:app --reload --host 0.0.0.0 --port 8080
|
||||||
|
```
|
||||||
|
|
||||||
|
Then open the docs at:
|
||||||
|
- `http://127.0.0.1:8080/docs` (same machine)
|
||||||
|
- `http://<your-lan-ip>:8080/docs` (other machines on your network)
|
||||||
|
|
||||||
|
If other machines still can’t connect, check your macOS firewall and any router/network rules.
|
||||||
|
|
||||||
|
## Docker
|
||||||
|
|
||||||
|
Build and run (pass env via file or `-e`; the app reads `.env` only if you mount it):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker build -t notebook-tools:local .
|
||||||
|
docker run --rm -p 8080:8080 --env-file .env notebook-tools:local
|
||||||
|
```
|
||||||
|
|
||||||
|
`LLAMA_BASE_URL` / `PAPERLESS_BASE_URL` must be reachable **from inside the container** (use `host.docker.internal` on Docker Desktop, or your LAN IP, not `127.0.0.1` for services on the host).
|
||||||
|
|
||||||
|
### Docker Compose
|
||||||
|
|
||||||
|
Save as `compose.yaml` (any directory with your `.env`):
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
services:
|
||||||
|
notebook-tools:
|
||||||
|
image: git.danhenry.dev/daniel/notebook-tools:latest
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
# Lets the container reach services bound on the host (e.g. llama on :9292).
|
||||||
|
# Linux: requires Docker 20.10+ / Compose v2; omit on Docker Desktop if already available.
|
||||||
|
extra_hosts:
|
||||||
|
- "host.docker.internal:host-gateway"
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker compose pull && docker compose up
|
||||||
|
```
|
||||||
|
|
||||||
|
Log in to `git.danhenry.dev` first if the registry requires auth: `docker login git.danhenry.dev`.
|
||||||
|
|
||||||
|
For llama running **on the host**, set in `.env`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
LLAMA_BASE_URL="http://host.docker.internal:9292"
|
||||||
|
```
|
||||||
|
|
||||||
|
`PAPERLESS_BASE_URL` can stay a normal `https://…` URL if the container has network access to it.
|
||||||
|
|
||||||
|
CI: on push to `main`, [.github/workflows/build-docker.yml](.github/workflows/build-docker.yml) builds and pushes using the same secrets pattern as your other Gitea repos (`DOCKER_REGISTRY`, `DOCKER_USERNAME`, `DOCKER_PASSWORD`). For Docker Hub, set `DOCKER_REGISTRY` to `docker.io` (or leave per your runner docs).
|
||||||
|
|
||||||
|
## Example `.env`
|
||||||
|
|
||||||
|
```bash
|
||||||
|
PAPERLESS_BASE_URL="https://paperless.example.com"
|
||||||
|
PAPERLESS_TOKEN="paste-token-here"
|
||||||
|
|
||||||
|
LLAMA_BASE_URL="http://127.0.0.1:9292"
|
||||||
|
LLAMA_MODEL="ggml-model-q4_k_m"
|
||||||
|
|
||||||
|
# Custom field ids in Paperless
|
||||||
|
PAPERLESS_CUSTOM_FIELD_NOTEBOOK_ID=1
|
||||||
|
PAPERLESS_CUSTOM_FIELD_NOTEBOOK_PAGE=2
|
||||||
|
PAPERLESS_DOCUMENT_TYPE_ID=3
|
||||||
|
|
||||||
|
# Optional: cap concurrent Paperless uploads (0 = unlimited)
|
||||||
|
PAPERLESS_UPLOAD_CONCURRENCY=4
|
||||||
|
|
||||||
|
# Rendering / OCR knobs
|
||||||
|
RENDER_DPI=200
|
||||||
|
OCR_MAX_TOKENS=1024
|
||||||
|
OCR_TEMPERATURE=0.0
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
41
pyproject.toml
Normal file
41
pyproject.toml
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
[project]
|
||||||
|
name = "notebook-tools"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "FastAPI service to OCR Paperless-ngx PDFs via llama.cpp"
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.11"
|
||||||
|
dependencies = [
|
||||||
|
"fastapi>=0.115",
|
||||||
|
"uvicorn[standard]>=0.30",
|
||||||
|
"httpx>=0.27",
|
||||||
|
"pydantic>=2.7",
|
||||||
|
"pydantic-settings>=2.3",
|
||||||
|
"pymupdf>=1.24",
|
||||||
|
"pillow>=10.4",
|
||||||
|
"img2pdf>=0.5",
|
||||||
|
"tenacity>=8.3",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
test = [
|
||||||
|
"pytest>=8.2",
|
||||||
|
"pytest-asyncio>=0.24",
|
||||||
|
"respx>=0.21",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
asyncio_mode = "auto"
|
||||||
|
testpaths = ["tests"]
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
line-length = 100
|
||||||
|
|
||||||
|
[tool.ruff.lint]
|
||||||
|
select = ["E", "F", "I", "UP", "B"]
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[tool.hatch.build.targets.wheel]
|
||||||
|
packages = ["src/notebook_tools"]
|
||||||
6
src/notebook_tools/__init__.py
Normal file
6
src/notebook_tools/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""notebook_tools package.
|
||||||
|
|
||||||
|
This repository is intentionally written to be easy to read/modify if you're new to Python.
|
||||||
|
Most modules include short docstrings and type hints, and we keep functions small.
|
||||||
|
"""
|
||||||
|
|
||||||
74
src/notebook_tools/api.py
Normal file
74
src/notebook_tools/api.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
"""FastAPI application entrypoint.
|
||||||
|
|
||||||
|
This file is intentionally small:
|
||||||
|
- Routes call into a job manager.
|
||||||
|
- The job manager calls the pipeline.
|
||||||
|
|
||||||
|
Keeping the web layer thin makes the business logic easier to test and maintain.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException
|
||||||
|
|
||||||
|
from notebook_tools.jobs import JobManager, get_job_manager
|
||||||
|
from notebook_tools.logging_utils import configure_logging
|
||||||
|
from notebook_tools.models import JobStartRequest, JobStatusResponse
|
||||||
|
from notebook_tools.settings import Settings, get_settings
|
||||||
|
|
||||||
|
app = FastAPI(title="notebook-tools", version="0.1.0")
|
||||||
|
logger = logging.getLogger("notebook_tools.api")
|
||||||
|
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def _startup() -> None:
|
||||||
|
# Load settings once at startup so we fail fast if env vars are missing.
|
||||||
|
settings = get_settings()
|
||||||
|
configure_logging(level=settings.log_level)
|
||||||
|
logger.info("Service starting up")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health() -> dict[str, str]:
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/jobs/paperless/{document_id}", response_model=JobStatusResponse)
|
||||||
|
async def start_job_for_paperless_document(
|
||||||
|
document_id: int,
|
||||||
|
req: JobStartRequest,
|
||||||
|
background: BackgroundTasks,
|
||||||
|
settings: Settings = Depends(get_settings),
|
||||||
|
manager: JobManager = Depends(get_job_manager),
|
||||||
|
) -> JobStatusResponse:
|
||||||
|
"""Start an OCR job for an existing Paperless document id."""
|
||||||
|
|
||||||
|
if document_id <= 0:
|
||||||
|
raise HTTPException(status_code=422, detail="document_id must be a positive integer")
|
||||||
|
|
||||||
|
job = manager.create_job(document_id=document_id, notebook_id=req.notebook_id)
|
||||||
|
logger.info(
|
||||||
|
"Job created job_id=%s paperless_document_id=%s notebook_id=%s",
|
||||||
|
job.job_id,
|
||||||
|
document_id,
|
||||||
|
req.notebook_id,
|
||||||
|
)
|
||||||
|
background.add_task(
|
||||||
|
manager.run_job,
|
||||||
|
job_id=job.job_id,
|
||||||
|
settings=settings,
|
||||||
|
ocr_prompt_override=req.ocr_prompt,
|
||||||
|
title_prefix=req.title_prefix,
|
||||||
|
)
|
||||||
|
return job
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/jobs/{job_id}", response_model=JobStatusResponse)
|
||||||
|
async def get_job(job_id: str, manager: JobManager = Depends(get_job_manager)) -> JobStatusResponse:
|
||||||
|
job = manager.get_job(job_id)
|
||||||
|
if not job:
|
||||||
|
raise HTTPException(status_code=404, detail="job not found")
|
||||||
|
return job
|
||||||
|
|
||||||
122
src/notebook_tools/jobs.py
Normal file
122
src/notebook_tools/jobs.py
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
"""In-memory job tracking.
|
||||||
|
|
||||||
|
We store job status in memory because it's the simplest way to get started.
|
||||||
|
Trade-off:
|
||||||
|
- If the server restarts, in-progress jobs are lost.
|
||||||
|
|
||||||
|
If you later want durability, we can swap this for SQLite without changing the API shape.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import traceback
|
||||||
|
import uuid
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
from notebook_tools.models import JobStatusResponse
|
||||||
|
from notebook_tools.pipeline import run_pipeline_for_paperless_document
|
||||||
|
from notebook_tools.settings import Settings
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _JobRecord:
|
||||||
|
document_id: int
|
||||||
|
notebook_id: str
|
||||||
|
status: JobStatusResponse
|
||||||
|
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
||||||
|
|
||||||
|
|
||||||
|
class JobManager:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._jobs: dict[str, _JobRecord] = {}
|
||||||
|
|
||||||
|
def create_job(self, *, document_id: int, notebook_id: str) -> JobStatusResponse:
|
||||||
|
job_id = uuid.uuid4().hex
|
||||||
|
status = JobStatusResponse(job_id=job_id, state="queued", completed_pages=0)
|
||||||
|
self._jobs[job_id] = _JobRecord(
|
||||||
|
document_id=document_id,
|
||||||
|
notebook_id=notebook_id,
|
||||||
|
status=status,
|
||||||
|
)
|
||||||
|
return status
|
||||||
|
|
||||||
|
def get_job(self, job_id: str) -> JobStatusResponse | None:
|
||||||
|
rec = self._jobs.get(job_id)
|
||||||
|
return rec.status if rec else None
|
||||||
|
|
||||||
|
async def _set_status(self, job_id: str, updater) -> None:
|
||||||
|
rec = self._jobs[job_id]
|
||||||
|
async with rec.lock:
|
||||||
|
updater(rec.status)
|
||||||
|
|
||||||
|
async def _mark_running(self, job_id: str) -> None:
|
||||||
|
def _update(s: JobStatusResponse) -> None:
|
||||||
|
s.state = "running"
|
||||||
|
s.message = None
|
||||||
|
|
||||||
|
await self._set_status(job_id, _update)
|
||||||
|
|
||||||
|
async def _update_progress(self, job_id: str, *, completed: int, total: int) -> None:
|
||||||
|
def _update(s: JobStatusResponse) -> None:
|
||||||
|
s.completed_pages = completed
|
||||||
|
s.total_pages = total
|
||||||
|
|
||||||
|
await self._set_status(job_id, _update)
|
||||||
|
|
||||||
|
async def _mark_succeeded(self, job_id: str, *, created_document_ids: list[int]) -> None:
|
||||||
|
def _update(s: JobStatusResponse) -> None:
|
||||||
|
s.state = "succeeded"
|
||||||
|
s.created_document_ids = created_document_ids
|
||||||
|
s.message = "done"
|
||||||
|
|
||||||
|
await self._set_status(job_id, _update)
|
||||||
|
|
||||||
|
async def _mark_failed(self, job_id: str, *, error_message: str, traceback_text: str) -> None:
|
||||||
|
def _update(s: JobStatusResponse) -> None:
|
||||||
|
s.state = "failed"
|
||||||
|
s.message = error_message
|
||||||
|
s.errors.append(error_message)
|
||||||
|
s.debug["traceback"] = traceback_text
|
||||||
|
|
||||||
|
await self._set_status(job_id, _update)
|
||||||
|
|
||||||
|
async def run_job(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
job_id: str,
|
||||||
|
settings: Settings,
|
||||||
|
ocr_prompt_override: str | None,
|
||||||
|
title_prefix: str | None,
|
||||||
|
) -> None:
|
||||||
|
rec = self._jobs[job_id]
|
||||||
|
|
||||||
|
await self._mark_running(job_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await run_pipeline_for_paperless_document(
|
||||||
|
settings=settings,
|
||||||
|
paperless_document_id=rec.document_id,
|
||||||
|
notebook_id=rec.notebook_id,
|
||||||
|
job_id=job_id,
|
||||||
|
on_progress=lambda completed, total: self._update_progress(
|
||||||
|
job_id, completed=completed, total=total
|
||||||
|
),
|
||||||
|
ocr_prompt_override=ocr_prompt_override,
|
||||||
|
title_prefix=title_prefix,
|
||||||
|
)
|
||||||
|
|
||||||
|
await self._mark_succeeded(job_id, created_document_ids=result["created_document_ids"])
|
||||||
|
except Exception as e: # noqa: BLE001 - we want to catch and report job failures
|
||||||
|
tb = traceback.format_exc()
|
||||||
|
await self._mark_failed(job_id, error_message=str(e), traceback_text=tb)
|
||||||
|
|
||||||
|
|
||||||
|
_manager = JobManager()
|
||||||
|
|
||||||
|
|
||||||
|
def get_job_manager() -> JobManager:
|
||||||
|
"""FastAPI dependency for a singleton in-memory manager."""
|
||||||
|
|
||||||
|
return _manager
|
||||||
|
|
||||||
101
src/notebook_tools/llama_client.py
Normal file
101
src/notebook_tools/llama_client.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
"""llama.cpp OCR client (OpenAI-compatible chat/completions).
|
||||||
|
|
||||||
|
Your working prototype uses:
|
||||||
|
POST /v1/chat/completions
|
||||||
|
with:
|
||||||
|
- a strict OCR instruction (text)
|
||||||
|
- an inline base64 image_url: data:image/jpeg;base64,...
|
||||||
|
|
||||||
|
We wrap that here so the rest of the app can simply call `ocr_jpeg(...)`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential_jitter
|
||||||
|
|
||||||
|
logger = logging.getLogger("notebook_tools.llama")
|
||||||
|
|
||||||
|
class LlamaError(RuntimeError):
|
||||||
|
"""Raised for non-2xx responses or unexpected payloads from llama."""
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_OCR_PROMPT = (
|
||||||
|
"You are a highly accurate OCR system. Extract all handwritten text from this image exactly as it appears. "
|
||||||
|
"Preserve line breaks. Do not add any commentary. Output ONLY the transcribed text."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _raise_for_status(resp: httpx.Response) -> None:
|
||||||
|
if 200 <= resp.status_code < 300:
|
||||||
|
return
|
||||||
|
raise LlamaError(f"llama API {resp.status_code}: {resp.text}")
|
||||||
|
|
||||||
|
|
||||||
|
class LlamaClient:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
base_url: str,
|
||||||
|
model: str,
|
||||||
|
timeout_s: float = 120.0,
|
||||||
|
temperature: float = 0.0,
|
||||||
|
max_tokens: int = 1024,
|
||||||
|
) -> None:
|
||||||
|
self._base_url = base_url.rstrip("/")
|
||||||
|
self._model = model
|
||||||
|
self._timeout = httpx.Timeout(timeout_s)
|
||||||
|
self._temperature = temperature
|
||||||
|
self._max_tokens = max_tokens
|
||||||
|
|
||||||
|
def _url(self, path: str) -> str:
|
||||||
|
return f"{self._base_url}{path}"
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
|
||||||
|
wait=wait_exponential_jitter(initial=0.5, max=8.0),
|
||||||
|
stop=stop_after_attempt(3),
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
|
async def ocr_jpeg(self, *, jpeg_bytes: bytes, prompt: str | None = None) -> str:
|
||||||
|
"""Return extracted text for a single JPEG image."""
|
||||||
|
|
||||||
|
prompt_text = prompt or DEFAULT_OCR_PROMPT
|
||||||
|
logger.info("OCR request image_bytes=%s max_tokens=%s temperature=%s", len(jpeg_bytes), self._max_tokens, self._temperature)
|
||||||
|
|
||||||
|
b64 = base64.b64encode(jpeg_bytes).decode("utf-8")
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
"model": self._model,
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "text", "text": prompt_text},
|
||||||
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"temperature": self._temperature,
|
||||||
|
"max_tokens": self._max_tokens,
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=self._timeout) as client:
|
||||||
|
resp = await client.post(self._url("/v1/chat/completions"), json=payload)
|
||||||
|
_raise_for_status(resp)
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
try:
|
||||||
|
text = data["choices"][0]["message"]["content"]
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
raise LlamaError(f"Unexpected llama response shape: {data}") from e
|
||||||
|
|
||||||
|
if not isinstance(text, str):
|
||||||
|
raise LlamaError(f"Expected OCR text to be a string, got: {type(text)}")
|
||||||
|
|
||||||
|
# We intentionally do NOT log the OCR text itself by default because it can include sensitive content.
|
||||||
|
return text.strip()
|
||||||
|
|
||||||
41
src/notebook_tools/logging_utils.py
Normal file
41
src/notebook_tools/logging_utils.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
"""Logging setup for the service.
|
||||||
|
|
||||||
|
We use the standard library `logging` module so it works everywhere:
|
||||||
|
- local dev
|
||||||
|
- Docker
|
||||||
|
- systemd
|
||||||
|
- Kubernetes
|
||||||
|
|
||||||
|
Uvicorn has its own logging, but application logs should still be configured so
|
||||||
|
our modules emit useful messages too.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def configure_logging(*, level: str) -> None:
|
||||||
|
"""Configure root logging once.
|
||||||
|
|
||||||
|
We keep the format readable in terminals but also structured enough for log collectors.
|
||||||
|
"""
|
||||||
|
|
||||||
|
level_norm = (level or "INFO").upper()
|
||||||
|
# If logging was already configured (common with uvicorn), basicConfig will be a no-op.
|
||||||
|
logging.basicConfig(
|
||||||
|
level=getattr(logging, level_norm, logging.INFO),
|
||||||
|
format="%(asctime)s %(levelname)s %(name)s - %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Keep noisy libraries at INFO unless you explicitly crank LOG_LEVEL to DEBUG.
|
||||||
|
if level_norm != "DEBUG":
|
||||||
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||||
|
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
# Helpful banner once.
|
||||||
|
logging.getLogger("notebook_tools").info(
|
||||||
|
"Logging configured (level=%s, pid=%s)", level_norm, os.getpid()
|
||||||
|
)
|
||||||
|
|
||||||
46
src/notebook_tools/models.py
Normal file
46
src/notebook_tools/models.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
"""Pydantic models for request/response payloads.
|
||||||
|
|
||||||
|
Why bother with models?
|
||||||
|
- They validate incoming JSON (so we can give clear errors early).
|
||||||
|
- They produce automatic OpenAPI docs in FastAPI.
|
||||||
|
- They double as "living documentation" for how to call the API.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Literal
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class JobStartRequest(BaseModel):
|
||||||
|
"""Parameters for starting an OCR job."""
|
||||||
|
|
||||||
|
notebook_id: str = Field(..., description="Your logical notebook identifier to store in custom fields.")
|
||||||
|
# If you want to tweak OCR prompt later, we allow overriding it per job.
|
||||||
|
ocr_prompt: str | None = Field(
|
||||||
|
None,
|
||||||
|
description="Optional override for the OCR prompt sent to llama.cpp.",
|
||||||
|
)
|
||||||
|
# Optional: metadata you might want to set on each uploaded per-page document.
|
||||||
|
title_prefix: str | None = Field(
|
||||||
|
None,
|
||||||
|
description="Optional prefix for per-page document titles (e.g. 'Notebook 123').",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
JobState = Literal["queued", "running", "failed", "succeeded"]
|
||||||
|
|
||||||
|
|
||||||
|
class JobStatusResponse(BaseModel):
|
||||||
|
job_id: str
|
||||||
|
state: JobState
|
||||||
|
message: str | None = None
|
||||||
|
# Progress
|
||||||
|
total_pages: int | None = None
|
||||||
|
completed_pages: int = 0
|
||||||
|
# Output
|
||||||
|
created_document_ids: list[int] = Field(default_factory=list)
|
||||||
|
errors: list[str] = Field(default_factory=list)
|
||||||
|
debug: dict[str, Any] = Field(default_factory=dict)
|
||||||
|
|
||||||
278
src/notebook_tools/paperless_client.py
Normal file
278
src/notebook_tools/paperless_client.py
Normal file
@@ -0,0 +1,278 @@
|
|||||||
|
"""Paperless-ngx REST API client.
|
||||||
|
|
||||||
|
We keep this client *thin*:
|
||||||
|
- It knows how to talk to Paperless (URLs, auth header, endpoints).
|
||||||
|
- It does NOT know about OCR or PDFs beyond "upload bytes".
|
||||||
|
|
||||||
|
This separation makes it easier to test and to swap out behavior later.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential_jitter
|
||||||
|
|
||||||
|
logger = logging.getLogger("notebook_tools.paperless")
|
||||||
|
|
||||||
|
class PaperlessError(RuntimeError):
|
||||||
|
"""Raised for non-2xx responses from Paperless."""
|
||||||
|
|
||||||
|
|
||||||
|
def _auth_headers(token: str) -> dict[str, str]:
|
||||||
|
# Paperless uses token auth in the form: Authorization: Token <token>
|
||||||
|
return {"Authorization": f"Token {token}"}
|
||||||
|
|
||||||
|
|
||||||
|
def _raise_for_status(resp: httpx.Response) -> None:
|
||||||
|
"""Raise a helpful error message.
|
||||||
|
|
||||||
|
httpx has resp.raise_for_status(), but we include the response body (often JSON)
|
||||||
|
because Paperless will usually tell you exactly what's wrong.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if 200 <= resp.status_code < 300:
|
||||||
|
return
|
||||||
|
body = resp.text
|
||||||
|
raise PaperlessError(f"Paperless API {resp.status_code}: {body}")
|
||||||
|
|
||||||
|
|
||||||
|
def _document_id_from_task_payload(item: dict[str, Any]) -> int | None:
|
||||||
|
"""Extract created document id from a Paperless task object.
|
||||||
|
|
||||||
|
Paperless 2.x often returns:
|
||||||
|
- ``related_document`` as a string ``\"10\"`` (not an int)
|
||||||
|
- ``result`` as a string like ``\"Success. New document id 10 created\"`` (not a dict)
|
||||||
|
|
||||||
|
We must handle both, or polling never completes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
rd = item.get("related_document")
|
||||||
|
if rd is not None:
|
||||||
|
if isinstance(rd, int):
|
||||||
|
return rd
|
||||||
|
if isinstance(rd, str) and rd.strip().isdigit():
|
||||||
|
return int(rd.strip())
|
||||||
|
|
||||||
|
for key in ("document_id", "document"):
|
||||||
|
val = item.get(key)
|
||||||
|
if isinstance(val, int):
|
||||||
|
return val
|
||||||
|
if isinstance(val, str) and val.strip().isdigit():
|
||||||
|
return int(val.strip())
|
||||||
|
|
||||||
|
result = item.get("result")
|
||||||
|
if isinstance(result, dict):
|
||||||
|
nested = result.get("document_id") or result.get("document")
|
||||||
|
if isinstance(nested, int):
|
||||||
|
return nested
|
||||||
|
if isinstance(nested, str) and nested.strip().isdigit():
|
||||||
|
return int(nested.strip())
|
||||||
|
elif isinstance(result, str):
|
||||||
|
# e.g. "Success. New document id 10 created"
|
||||||
|
m = re.search(r"New document id\s+(\d+)", result, flags=re.IGNORECASE)
|
||||||
|
if m:
|
||||||
|
return int(m.group(1))
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class PaperlessClient:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
base_url: str,
|
||||||
|
token: str,
|
||||||
|
timeout_s: float = 60.0,
|
||||||
|
task_timeout_s: int = 600,
|
||||||
|
task_poll_interval_s: float = 5.0,
|
||||||
|
) -> None:
|
||||||
|
self._base_url = base_url.rstrip("/")
|
||||||
|
self._token = token
|
||||||
|
self._timeout = httpx.Timeout(timeout_s)
|
||||||
|
self._task_timeout_s = task_timeout_s
|
||||||
|
self._task_poll_interval_s = task_poll_interval_s
|
||||||
|
|
||||||
|
def _url(self, path: str) -> str:
|
||||||
|
return f"{self._base_url}{path}"
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
|
||||||
|
wait=wait_exponential_jitter(initial=0.5, max=5.0),
|
||||||
|
stop=stop_after_attempt(3),
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
|
async def download_document_pdf(self, *, document_id: int) -> bytes:
|
||||||
|
"""Download the original PDF bytes for a Paperless document."""
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=self._timeout, headers=_auth_headers(self._token)) as client:
|
||||||
|
# Common Paperless endpoint:
|
||||||
|
# GET /api/documents/{id}/download/
|
||||||
|
logger.info("Downloading document_id=%s", document_id)
|
||||||
|
resp = await client.get(self._url(f"/api/documents/{document_id}/download/"))
|
||||||
|
_raise_for_status(resp)
|
||||||
|
return resp.content
|
||||||
|
|
||||||
|
async def upload_pdf(self, *, filename: str, pdf_bytes: bytes) -> int:
|
||||||
|
"""Upload a new document (PDF) and return its new document id."""
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=self._timeout, headers=_auth_headers(self._token)) as client:
|
||||||
|
files = {
|
||||||
|
"document": (filename, pdf_bytes, "application/pdf"),
|
||||||
|
}
|
||||||
|
# POST /api/documents/post_document/ returns JSON about the created document/task.
|
||||||
|
logger.info("Uploading PDF filename=%s bytes=%s", filename, len(pdf_bytes))
|
||||||
|
resp = await client.post(self._url("/api/documents/post_document/"), files=files)
|
||||||
|
_raise_for_status(resp)
|
||||||
|
|
||||||
|
data: Any = resp.json()
|
||||||
|
|
||||||
|
# Paperless has had a few response shapes across versions.
|
||||||
|
# We defensively handle the most common ones.
|
||||||
|
if isinstance(data, dict):
|
||||||
|
if "document" in data and isinstance(data["document"], int):
|
||||||
|
return int(data["document"])
|
||||||
|
if "id" in data and isinstance(data["id"], int):
|
||||||
|
return int(data["id"])
|
||||||
|
# Some versions return {"task_id": "<uuid>"}.
|
||||||
|
if "task_id" in data and isinstance(data["task_id"], str):
|
||||||
|
logger.info("Upload returned task_id=%s", data["task_id"])
|
||||||
|
return await self._wait_for_task_document_id(client=client, task_id=data["task_id"])
|
||||||
|
|
||||||
|
# Other versions return the task id directly as a JSON string: "<uuid>"
|
||||||
|
if isinstance(data, str):
|
||||||
|
logger.info("Upload returned task_id=%s", data)
|
||||||
|
return await self._wait_for_task_document_id(client=client, task_id=data)
|
||||||
|
|
||||||
|
raise PaperlessError(f"Unexpected upload response: {json.dumps(data)[:500]}")
|
||||||
|
|
||||||
|
async def _wait_for_task_document_id(self, *, client: httpx.AsyncClient, task_id: str) -> int:
|
||||||
|
"""Poll Paperless' tasks endpoint until it yields a created document id.
|
||||||
|
|
||||||
|
Why polling is needed:
|
||||||
|
- `post_document` triggers async consumption in Paperless.
|
||||||
|
- Many Paperless versions return a Celery task id (UUID) instead of a document id.
|
||||||
|
|
||||||
|
This method makes the rest of the pipeline "feel" synchronous: upload_pdf()
|
||||||
|
still returns a document id, it just waits for Paperless to finish processing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# This endpoint is documented/mentioned in Paperless discussions and commits:
|
||||||
|
# /api/tasks/?task_id=<uuid>
|
||||||
|
# We'll try a few times with a small backoff.
|
||||||
|
last_payload: Any = None
|
||||||
|
# We poll until a time budget is exceeded, because Paperless ingestion time varies a lot.
|
||||||
|
max_attempts = max(1, int(self._task_timeout_s / max(self._task_poll_interval_s, 0.1)))
|
||||||
|
|
||||||
|
for attempt in range(max_attempts):
|
||||||
|
# INFO: every 5th poll + first. DEBUG: every poll (no duplicate INFO line).
|
||||||
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
|
logger.debug(
|
||||||
|
"Polling task_id=%s attempt=%s/%s (interval=%.1fs timeout=%ss)",
|
||||||
|
task_id,
|
||||||
|
attempt + 1,
|
||||||
|
max_attempts,
|
||||||
|
self._task_poll_interval_s,
|
||||||
|
self._task_timeout_s,
|
||||||
|
)
|
||||||
|
elif attempt == 0 or (attempt + 1) % 5 == 0:
|
||||||
|
logger.info(
|
||||||
|
"Polling task_id=%s attempt=%s/%s (interval=%.1fs timeout=%ss)",
|
||||||
|
task_id,
|
||||||
|
attempt + 1,
|
||||||
|
max_attempts,
|
||||||
|
self._task_poll_interval_s,
|
||||||
|
self._task_timeout_s,
|
||||||
|
)
|
||||||
|
resp = await client.get(self._url("/api/tasks/"), params={"task_id": task_id})
|
||||||
|
_raise_for_status(resp)
|
||||||
|
last_payload = resp.json()
|
||||||
|
|
||||||
|
# We expect a list (paginated or not). Handle both.
|
||||||
|
items: list[dict[str, Any]] = []
|
||||||
|
if isinstance(last_payload, dict) and "results" in last_payload and isinstance(last_payload["results"], list):
|
||||||
|
items = [x for x in last_payload["results"] if isinstance(x, dict)]
|
||||||
|
elif isinstance(last_payload, list):
|
||||||
|
items = [x for x in last_payload if isinstance(x, dict)]
|
||||||
|
|
||||||
|
# Find a matching task and extract document id.
|
||||||
|
for item in items:
|
||||||
|
# Match by Celery UUID (not the numeric DB id).
|
||||||
|
if item.get("task_id") != task_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
doc_id = _document_id_from_task_payload(item)
|
||||||
|
if doc_id is not None:
|
||||||
|
logger.info("Task task_id=%s produced document_id=%s", task_id, doc_id)
|
||||||
|
return doc_id
|
||||||
|
|
||||||
|
# If we have a numeric task pk, fetch detail — list view can lag; trailing slash required.
|
||||||
|
numeric_task_pk = item.get("id")
|
||||||
|
if isinstance(numeric_task_pk, int):
|
||||||
|
detail_resp = await client.get(self._url(f"/api/tasks/{numeric_task_pk}/"))
|
||||||
|
_raise_for_status(detail_resp)
|
||||||
|
detail = detail_resp.json()
|
||||||
|
if isinstance(detail, dict):
|
||||||
|
doc_id = _document_id_from_task_payload(detail)
|
||||||
|
if doc_id is not None:
|
||||||
|
logger.info("Task task_id=%s (detail) produced document_id=%s", task_id, doc_id)
|
||||||
|
return doc_id
|
||||||
|
|
||||||
|
# Not ready yet; sleep a fixed interval (configurable).
|
||||||
|
await asyncio.sleep(self._task_poll_interval_s)
|
||||||
|
|
||||||
|
raise PaperlessError(
|
||||||
|
f"Paperless upload task did not yield document id in time. task_id={task_id} last={json.dumps(last_payload)[:500]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
|
||||||
|
wait=wait_exponential_jitter(initial=0.5, max=5.0),
|
||||||
|
stop=stop_after_attempt(3),
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
|
async def patch_document(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
document_id: int,
|
||||||
|
title: str | None = None,
|
||||||
|
content: str | None = None,
|
||||||
|
custom_fields: list[dict[str, Any]] | None = None,
|
||||||
|
document_type: int | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Update metadata on a document.
|
||||||
|
|
||||||
|
For our use-case we mainly set:
|
||||||
|
- title: a human-friendly per-page title
|
||||||
|
- content: OCR text (so Paperless search works)
|
||||||
|
- custom_fields: notebook_id + notebook_page
|
||||||
|
- document_type: Paperless document type id (primary key)
|
||||||
|
"""
|
||||||
|
|
||||||
|
payload: dict[str, Any] = {}
|
||||||
|
if title is not None:
|
||||||
|
payload["title"] = title
|
||||||
|
if content is not None:
|
||||||
|
payload["content"] = content
|
||||||
|
if custom_fields is not None:
|
||||||
|
payload["custom_fields"] = custom_fields
|
||||||
|
if document_type is not None:
|
||||||
|
payload["document_type"] = document_type
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=self._timeout, headers=_auth_headers(self._token)) as client:
|
||||||
|
logger.info(
|
||||||
|
"Patching document_id=%s set_title=%s set_content=%s set_custom_fields=%s set_document_type=%s",
|
||||||
|
document_id,
|
||||||
|
title is not None,
|
||||||
|
content is not None,
|
||||||
|
custom_fields is not None,
|
||||||
|
document_type is not None,
|
||||||
|
)
|
||||||
|
resp = await client.patch(self._url(f"/api/documents/{document_id}/"), json=payload)
|
||||||
|
_raise_for_status(resp)
|
||||||
|
|
||||||
50
src/notebook_tools/pdf_utils.py
Normal file
50
src/notebook_tools/pdf_utils.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
"""PDF/image helpers for the OCR pipeline."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
|
||||||
|
import fitz # PyMuPDF
|
||||||
|
import img2pdf
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
|
||||||
|
def render_pdf_to_jpegs(*, pdf_bytes: bytes, dpi: int) -> list[bytes]:
|
||||||
|
"""Render each PDF page to a JPEG (one JPEG per page).
|
||||||
|
|
||||||
|
Why render to images at all?
|
||||||
|
- llama vision models take images as input.
|
||||||
|
- PDFs can contain vector text, scans, rotations, etc. Rendering normalizes that.
|
||||||
|
|
||||||
|
DPI notes:
|
||||||
|
- Higher DPI improves handwriting legibility but increases latency and payload sizes.
|
||||||
|
- 200 DPI is a reasonable starting point for notebook pages.
|
||||||
|
"""
|
||||||
|
|
||||||
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||||
|
try:
|
||||||
|
zoom = dpi / 72.0 # PDF points are 72 DPI; scale to requested DPI.
|
||||||
|
mat = fitz.Matrix(zoom, zoom)
|
||||||
|
|
||||||
|
out: list[bytes] = []
|
||||||
|
for page in doc:
|
||||||
|
pix = page.get_pixmap(matrix=mat, alpha=False)
|
||||||
|
# pix.samples is raw bytes; easiest is to build a PIL Image and then re-encode as JPEG.
|
||||||
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
# quality=90 keeps text crisp without huge files; optimize reduces size a bit.
|
||||||
|
img.save(buf, format="JPEG", quality=90, optimize=True)
|
||||||
|
out.append(buf.getvalue())
|
||||||
|
|
||||||
|
return out
|
||||||
|
finally:
|
||||||
|
doc.close()
|
||||||
|
|
||||||
|
|
||||||
|
def jpeg_to_pdf_bytes(*, jpeg_bytes: bytes) -> bytes:
|
||||||
|
"""Convert a single JPEG image to a single-page PDF."""
|
||||||
|
|
||||||
|
# img2pdf expects the image data (bytes) and returns PDF bytes.
|
||||||
|
return img2pdf.convert(jpeg_bytes)
|
||||||
|
|
||||||
215
src/notebook_tools/pipeline.py
Normal file
215
src/notebook_tools/pipeline.py
Normal file
@@ -0,0 +1,215 @@
|
|||||||
|
"""OCR pipeline: Paperless PDF -> per-page OCR -> per-page PDFs -> Paperless uploads.
|
||||||
|
|
||||||
|
This module is where the "business logic" lives.
|
||||||
|
|
||||||
|
Design goals:
|
||||||
|
- Keep the pipeline readable and linear.
|
||||||
|
- Return enough information (created ids) for the job API.
|
||||||
|
- Avoid hidden side-effects (everything is passed in / returned).
|
||||||
|
|
||||||
|
Upload strategy:
|
||||||
|
- All per-page PDFs are uploaded to Paperless concurrently (each upload still polls until a doc id exists).
|
||||||
|
- OCR (llama) runs one page at a time to respect VRAM.
|
||||||
|
- Each page is PATCHed once that page's upload has finished and OCR for that page is done.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import io
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
|
||||||
|
from notebook_tools import pdf_utils
|
||||||
|
from notebook_tools.llama_client import LlamaClient
|
||||||
|
from notebook_tools.paperless_client import PaperlessClient
|
||||||
|
from notebook_tools.settings import Settings
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
logger = logging.getLogger("notebook_tools.pipeline")
|
||||||
|
|
||||||
|
PAGE_NUMBER_PROMPT = (
|
||||||
|
"You are reading a handwritten page number in the bottom corner of a notebook page. "
|
||||||
|
"Return ONLY the page number as an integer. If you cannot determine it, return -1. "
|
||||||
|
"Do not output any other words."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _crop_bottom_corner_jpegs(*, full_page_jpeg: bytes) -> list[bytes]:
|
||||||
|
"""Return small JPEG crops from bottom-left and bottom-right corners.
|
||||||
|
|
||||||
|
Why crop?
|
||||||
|
- It reduces visual clutter so the model focuses on the handwritten page number.
|
||||||
|
- It reduces payload size, making OCR faster.
|
||||||
|
|
||||||
|
The crop is based on percentages so it works across different page sizes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
img = Image.open(io.BytesIO(full_page_jpeg)).convert("RGB")
|
||||||
|
w, h = img.size
|
||||||
|
|
||||||
|
# Bottom band (e.g. last 20% of page height)
|
||||||
|
band_h = int(h * 0.22)
|
||||||
|
y0 = max(0, h - band_h)
|
||||||
|
|
||||||
|
# Left/right corner width (e.g. 35% of page width)
|
||||||
|
corner_w = int(w * 0.35)
|
||||||
|
|
||||||
|
crops = [
|
||||||
|
img.crop((0, y0, corner_w, h)), # bottom-left
|
||||||
|
img.crop((w - corner_w, y0, w, h)), # bottom-right
|
||||||
|
]
|
||||||
|
|
||||||
|
out: list[bytes] = []
|
||||||
|
for c in crops:
|
||||||
|
buf = io.BytesIO()
|
||||||
|
c.save(buf, format="JPEG", quality=90, optimize=True)
|
||||||
|
out.append(buf.getvalue())
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_page_number(text: str) -> int | None:
|
||||||
|
"""Try to parse an integer page number from a model response.
|
||||||
|
|
||||||
|
We accept:
|
||||||
|
- '12'
|
||||||
|
- 'Page 12' (if the model disobeys slightly)
|
||||||
|
- '-1'
|
||||||
|
"""
|
||||||
|
|
||||||
|
m = re.search(r"-?\d+", text)
|
||||||
|
if not m:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return int(m.group(0))
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def run_pipeline_for_paperless_document(
|
||||||
|
*,
|
||||||
|
settings: Settings,
|
||||||
|
paperless_document_id: int,
|
||||||
|
notebook_id: str,
|
||||||
|
job_id: str,
|
||||||
|
on_progress: Callable[[int, int], Awaitable[None]] | None,
|
||||||
|
ocr_prompt_override: str | None,
|
||||||
|
title_prefix: str | None,
|
||||||
|
) -> dict[str, list[int]]:
|
||||||
|
"""Run the full OCR pipeline for one Paperless document id.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{"created_document_ids": [...]} where each id is a NEW Paperless document
|
||||||
|
(one per page).
|
||||||
|
"""
|
||||||
|
|
||||||
|
paperless = PaperlessClient(
|
||||||
|
base_url=str(settings.paperless_base_url),
|
||||||
|
token=settings.paperless_token,
|
||||||
|
task_timeout_s=settings.paperless_task_timeout_s,
|
||||||
|
task_poll_interval_s=settings.paperless_task_poll_interval_s,
|
||||||
|
)
|
||||||
|
llama = LlamaClient(
|
||||||
|
base_url=str(settings.llama_base_url),
|
||||||
|
model=settings.llama_model,
|
||||||
|
temperature=settings.ocr_temperature,
|
||||||
|
max_tokens=settings.ocr_max_tokens,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1) Download the source PDF.
|
||||||
|
logger.info("job_id=%s downloading paperless_document_id=%s", job_id, paperless_document_id)
|
||||||
|
pdf_bytes = await paperless.download_document_pdf(document_id=paperless_document_id)
|
||||||
|
logger.info("job_id=%s downloaded_pdf_bytes=%s", job_id, len(pdf_bytes))
|
||||||
|
|
||||||
|
# 2) Render the PDF pages as JPEG images.
|
||||||
|
logger.info("job_id=%s rendering_pages dpi=%s", job_id, settings.render_dpi)
|
||||||
|
jpegs = pdf_utils.render_pdf_to_jpegs(pdf_bytes=pdf_bytes, dpi=settings.render_dpi)
|
||||||
|
total_pages = len(jpegs)
|
||||||
|
logger.info("job_id=%s rendered_pages=%s", job_id, total_pages)
|
||||||
|
if on_progress:
|
||||||
|
await on_progress(0, total_pages)
|
||||||
|
|
||||||
|
# One small PDF per page (used by upload tasks).
|
||||||
|
page_pdfs = [pdf_utils.jpeg_to_pdf_bytes(jpeg_bytes=b) for b in jpegs]
|
||||||
|
|
||||||
|
# 3) Start all Paperless uploads in parallel (each task waits for ingest + document id).
|
||||||
|
conc = settings.paperless_upload_concurrency
|
||||||
|
upload_sem: asyncio.Semaphore | None = (
|
||||||
|
asyncio.Semaphore(conc) if conc and conc > 0 else None
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"job_id=%s starting_parallel_uploads pages=%s concurrency=%s",
|
||||||
|
job_id,
|
||||||
|
total_pages,
|
||||||
|
conc if conc and conc > 0 else "unlimited",
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _upload_one_page(idx_1based: int) -> int:
|
||||||
|
filename = f"job_{job_id}_page_{idx_1based}.pdf"
|
||||||
|
pdf_bytes = page_pdfs[idx_1based - 1]
|
||||||
|
logger.info("job_id=%s page=%s/%s upload_task_starting", job_id, idx_1based, total_pages)
|
||||||
|
if upload_sem is not None:
|
||||||
|
async with upload_sem:
|
||||||
|
return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes)
|
||||||
|
return await paperless.upload_pdf(filename=filename, pdf_bytes=pdf_bytes)
|
||||||
|
|
||||||
|
upload_tasks: list[asyncio.Task[int]] = [
|
||||||
|
asyncio.create_task(_upload_one_page(i)) for i in range(1, total_pages + 1)
|
||||||
|
]
|
||||||
|
|
||||||
|
created_ids: list[int] = []
|
||||||
|
|
||||||
|
# 4) OCR sequentially (VRAM), then await upload for that page + PATCH.
|
||||||
|
try:
|
||||||
|
for idx, jpeg_bytes in enumerate(jpegs, start=1):
|
||||||
|
logger.info("job_id=%s page=%s/%s ocr_starting", job_id, idx, total_pages)
|
||||||
|
# 4a) Page-number OCR (bottom corners only).
|
||||||
|
page_number = -1
|
||||||
|
for corner_jpeg in _crop_bottom_corner_jpegs(full_page_jpeg=jpeg_bytes):
|
||||||
|
candidate_text = await llama.ocr_jpeg(jpeg_bytes=corner_jpeg, prompt=PAGE_NUMBER_PROMPT)
|
||||||
|
parsed = _parse_page_number(candidate_text)
|
||||||
|
if parsed is not None:
|
||||||
|
if parsed == -1 or parsed >= 0:
|
||||||
|
page_number = parsed
|
||||||
|
if page_number != -1:
|
||||||
|
break
|
||||||
|
logger.info("job_id=%s page=%s detected_page_number=%s", job_id, idx, page_number)
|
||||||
|
|
||||||
|
# 4b) Full-page OCR for searchable content.
|
||||||
|
logger.info("job_id=%s page=%s ocr_full_page", job_id, idx)
|
||||||
|
ocr_text = await llama.ocr_jpeg(jpeg_bytes=jpeg_bytes, prompt=ocr_prompt_override)
|
||||||
|
logger.info("job_id=%s page=%s ocr_chars=%s", job_id, idx, len(ocr_text))
|
||||||
|
|
||||||
|
logger.info("job_id=%s page=%s awaiting_upload_then_patch", job_id, idx)
|
||||||
|
new_id = await upload_tasks[idx - 1]
|
||||||
|
logger.info("job_id=%s page=%s uploaded_document_id=%s", job_id, idx, new_id)
|
||||||
|
|
||||||
|
custom_fields = [
|
||||||
|
{"field": settings.paperless_custom_field_notebook_id, "value": notebook_id},
|
||||||
|
{"field": settings.paperless_custom_field_notebook_page, "value": page_number},
|
||||||
|
]
|
||||||
|
|
||||||
|
title = f"Notebook {notebook_id} Page {page_number}"
|
||||||
|
|
||||||
|
logger.info("job_id=%s page=%s patching_document_id=%s", job_id, idx, new_id)
|
||||||
|
await paperless.patch_document(
|
||||||
|
document_id=new_id,
|
||||||
|
title=title,
|
||||||
|
content=ocr_text,
|
||||||
|
custom_fields=custom_fields,
|
||||||
|
document_type=settings.paperless_document_type_id,
|
||||||
|
)
|
||||||
|
logger.info("job_id=%s page=%s patched_document_id=%s", job_id, idx, new_id)
|
||||||
|
|
||||||
|
created_ids.append(new_id)
|
||||||
|
if on_progress:
|
||||||
|
await on_progress(idx, total_pages)
|
||||||
|
except BaseException:
|
||||||
|
for t in upload_tasks:
|
||||||
|
if not t.done():
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.gather(*upload_tasks, return_exceptions=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
return {"created_document_ids": created_ids}
|
||||||
63
src/notebook_tools/settings.py
Normal file
63
src/notebook_tools/settings.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
"""App configuration loaded from environment variables.
|
||||||
|
|
||||||
|
Why this module exists:
|
||||||
|
- It's common to keep secrets (tokens) and instance-specific URLs out of code.
|
||||||
|
- Using one Settings object makes it easy to pass config to clients/pipeline.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pydantic import AnyHttpUrl, Field
|
||||||
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
"""Runtime settings for the service.
|
||||||
|
|
||||||
|
Tip: Put these in a local `.env` file while developing, but never commit secrets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
|
||||||
|
|
||||||
|
# Paperless-ngx
|
||||||
|
paperless_base_url: AnyHttpUrl = Field(..., alias="PAPERLESS_BASE_URL")
|
||||||
|
paperless_token: str = Field(..., alias="PAPERLESS_TOKEN")
|
||||||
|
|
||||||
|
# llama.cpp (OpenAI-compatible)
|
||||||
|
llama_base_url: AnyHttpUrl = Field(..., alias="LLAMA_BASE_URL")
|
||||||
|
llama_model: str = Field("ggml-model-q4_k_m", alias="LLAMA_MODEL")
|
||||||
|
|
||||||
|
# Custom field ids you already created in Paperless
|
||||||
|
paperless_custom_field_notebook_id: int = Field(1, alias="PAPERLESS_CUSTOM_FIELD_NOTEBOOK_ID")
|
||||||
|
paperless_custom_field_notebook_page: int = Field(2, alias="PAPERLESS_CUSTOM_FIELD_NOTEBOOK_PAGE")
|
||||||
|
|
||||||
|
# Document type id applied to each per-page upload (PATCH document)
|
||||||
|
paperless_document_type_id: int = Field(3, alias="PAPERLESS_DOCUMENT_TYPE_ID")
|
||||||
|
|
||||||
|
# Rendering / OCR knobs
|
||||||
|
render_dpi: int = Field(200, alias="RENDER_DPI")
|
||||||
|
ocr_max_tokens: int = Field(1024, alias="OCR_MAX_TOKENS")
|
||||||
|
ocr_temperature: float = Field(0.0, alias="OCR_TEMPERATURE")
|
||||||
|
|
||||||
|
# Paperless async consumption polling
|
||||||
|
#
|
||||||
|
# Paperless may take minutes to ingest uploads depending on server load and OCR settings.
|
||||||
|
# These settings control how long we wait for `/api/tasks/?task_id=...` to produce a document id.
|
||||||
|
paperless_task_timeout_s: int = Field(600, alias="PAPERLESS_TASK_TIMEOUT_S")
|
||||||
|
paperless_task_poll_interval_s: float = Field(5.0, alias="PAPERLESS_TASK_POLL_INTERVAL_S")
|
||||||
|
|
||||||
|
# Max concurrent per-page uploads to Paperless (0 = unlimited). Limits load on the server.
|
||||||
|
paperless_upload_concurrency: int = Field(0, alias="PAPERLESS_UPLOAD_CONCURRENCY")
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
log_level: str = Field("INFO", alias="LOG_LEVEL")
|
||||||
|
|
||||||
|
|
||||||
|
def get_settings() -> Settings:
|
||||||
|
"""Create settings once per import.
|
||||||
|
|
||||||
|
In FastAPI we typically create settings at startup so environment errors fail fast.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return Settings()
|
||||||
|
|
||||||
35
tests/test_llama_payload.py
Normal file
35
tests/test_llama_payload.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
|
||||||
|
import respx
|
||||||
|
from httpx import Response
|
||||||
|
|
||||||
|
from notebook_tools.llama_client import LlamaClient
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_llama_client_parses_openai_style_response() -> None:
|
||||||
|
# Arrange: mock llama endpoint
|
||||||
|
route = respx.post("http://llama.local/v1/chat/completions").mock(
|
||||||
|
return_value=Response(200, json={"choices": [{"message": {"content": "Hello\nWorld"}}]})
|
||||||
|
)
|
||||||
|
|
||||||
|
client = LlamaClient(base_url="http://llama.local", model="m")
|
||||||
|
|
||||||
|
# Act
|
||||||
|
out = await client.ocr_jpeg(jpeg_bytes=b"\xff\xd8\xff\xe0fakejpeg")
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert out == "Hello\nWorld"
|
||||||
|
assert route.called
|
||||||
|
|
||||||
|
# Optional: sanity-check that we really sent a base64 data URL
|
||||||
|
sent = json.loads(route.calls[0].request.content.decode("utf-8"))
|
||||||
|
url = sent["messages"][0]["content"][1]["image_url"]["url"]
|
||||||
|
assert url.startswith("data:image/jpeg;base64,")
|
||||||
|
b64 = url.split(",", 1)[1]
|
||||||
|
# If this fails, our payload construction changed.
|
||||||
|
base64.b64decode(b64)
|
||||||
|
|
||||||
28
tests/test_paperless_task_parse.py
Normal file
28
tests/test_paperless_task_parse.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from notebook_tools.paperless_client import _document_id_from_task_payload
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_document_string() -> None:
|
||||||
|
assert (
|
||||||
|
_document_id_from_task_payload(
|
||||||
|
{
|
||||||
|
"related_document": "10",
|
||||||
|
"result": "Success. New document id 10 created",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
== 10
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_result_string_only() -> None:
|
||||||
|
assert (
|
||||||
|
_document_id_from_task_payload(
|
||||||
|
{"related_document": None, "result": "Success. New document id 42 created"}
|
||||||
|
)
|
||||||
|
== 42
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_document_int() -> None:
|
||||||
|
assert _document_id_from_task_payload({"related_document": 7}) == 7
|
||||||
73
tests/test_pipeline_smoke.py
Normal file
73
tests/test_pipeline_smoke.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import respx
|
||||||
|
from httpx import Response
|
||||||
|
|
||||||
|
from notebook_tools.pipeline import run_pipeline_for_paperless_document
|
||||||
|
from notebook_tools.settings import Settings
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_pipeline_smoke_single_page(monkeypatch) -> None:
|
||||||
|
# We don't want to depend on real PDF rendering in this test.
|
||||||
|
# Instead we monkeypatch the helpers to produce a controlled "one page" output.
|
||||||
|
from notebook_tools import pdf_utils
|
||||||
|
|
||||||
|
monkeypatch.setattr(pdf_utils, "render_pdf_to_jpegs", lambda *, pdf_bytes, dpi: [b"jpegbytes"])
|
||||||
|
monkeypatch.setattr(pdf_utils, "jpeg_to_pdf_bytes", lambda *, jpeg_bytes: b"%PDF-1.4 fake")
|
||||||
|
# Page-number OCR now crops the JPEG; since our fake jpeg bytes aren't a real image,
|
||||||
|
# we bypass cropping in this smoke test and rely on the default "-1" fallback.
|
||||||
|
import notebook_tools.pipeline as pipeline
|
||||||
|
|
||||||
|
monkeypatch.setattr(pipeline, "_crop_bottom_corner_jpegs", lambda *, full_page_jpeg: [])
|
||||||
|
|
||||||
|
# Mock Paperless download/upload/patch endpoints.
|
||||||
|
respx.get("https://paperless.local/api/documents/123/download/").mock(
|
||||||
|
return_value=Response(200, content=b"%PDF-1.4 source")
|
||||||
|
)
|
||||||
|
respx.post("https://paperless.local/api/documents/post_document/").mock(
|
||||||
|
return_value=Response(200, json="34de1527-aade-499b-8a06-dd0174c9f233")
|
||||||
|
)
|
||||||
|
# When post_document returns a task id, the client polls /api/tasks/?task_id=<uuid>
|
||||||
|
respx.get("https://paperless.local/api/tasks/").mock(
|
||||||
|
return_value=Response(200, json=[{"id": 31, "task_id": "34de1527-aade-499b-8a06-dd0174c9f233", "related_document": 999}])
|
||||||
|
)
|
||||||
|
patch_route = respx.patch("https://paperless.local/api/documents/999/").mock(
|
||||||
|
return_value=Response(200, json={})
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock llama OCR
|
||||||
|
respx.post("http://llama.local/v1/chat/completions").mock(
|
||||||
|
return_value=Response(200, json={"choices": [{"message": {"content": "OCR TEXT"}}]})
|
||||||
|
)
|
||||||
|
|
||||||
|
settings = Settings(
|
||||||
|
PAPERLESS_BASE_URL="https://paperless.local",
|
||||||
|
PAPERLESS_TOKEN="t",
|
||||||
|
LLAMA_BASE_URL="http://llama.local",
|
||||||
|
LLAMA_MODEL="m",
|
||||||
|
)
|
||||||
|
|
||||||
|
out = await run_pipeline_for_paperless_document(
|
||||||
|
settings=settings,
|
||||||
|
paperless_document_id=123,
|
||||||
|
notebook_id="nb1",
|
||||||
|
job_id="job1",
|
||||||
|
on_progress=None,
|
||||||
|
ocr_prompt_override=None,
|
||||||
|
title_prefix="Notebook nb1",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert out["created_document_ids"] == [999]
|
||||||
|
assert patch_route.called
|
||||||
|
sent_patch = json.loads(patch_route.calls[0].request.content.decode("utf-8"))
|
||||||
|
assert sent_patch["content"] == "OCR TEXT"
|
||||||
|
assert sent_patch["title"] == "Notebook nb1 Page -1"
|
||||||
|
assert sent_patch["custom_fields"][0]["field"] == 1
|
||||||
|
assert sent_patch["custom_fields"][0]["value"] == "nb1"
|
||||||
|
assert sent_patch["custom_fields"][1]["field"] == 2
|
||||||
|
assert sent_patch["custom_fields"][1]["value"] == -1
|
||||||
|
assert sent_patch["document_type"] == 3
|
||||||
|
|
||||||
Reference in New Issue
Block a user