"""Paperless-ngx REST API client. We keep this client *thin*: - It knows how to talk to Paperless (URLs, auth header, endpoints). - It does NOT know about OCR or PDFs beyond "upload bytes". This separation makes it easier to test and to swap out behavior later. """ from __future__ import annotations import asyncio import json import logging import re from typing import Any import httpx from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential_jitter logger = logging.getLogger("notebook_tools.paperless") class PaperlessError(RuntimeError): """Raised for non-2xx responses from Paperless.""" def _auth_headers(token: str) -> dict[str, str]: # Paperless uses token auth in the form: Authorization: Token return {"Authorization": f"Token {token}"} def _raise_for_status(resp: httpx.Response) -> None: """Raise a helpful error message. httpx has resp.raise_for_status(), but we include the response body (often JSON) because Paperless will usually tell you exactly what's wrong. """ if 200 <= resp.status_code < 300: return body = resp.text raise PaperlessError(f"Paperless API {resp.status_code}: {body}") def _document_id_from_task_payload(item: dict[str, Any]) -> int | None: """Extract created document id from a Paperless task object. Paperless 2.x often returns: - ``related_document`` as a string ``\"10\"`` (not an int) - ``result`` as a string like ``\"Success. New document id 10 created\"`` (not a dict) We must handle both, or polling never completes. """ rd = item.get("related_document") if rd is not None: if isinstance(rd, int): return rd if isinstance(rd, str) and rd.strip().isdigit(): return int(rd.strip()) for key in ("document_id", "document"): val = item.get(key) if isinstance(val, int): return val if isinstance(val, str) and val.strip().isdigit(): return int(val.strip()) result = item.get("result") if isinstance(result, dict): nested = result.get("document_id") or result.get("document") if isinstance(nested, int): return nested if isinstance(nested, str) and nested.strip().isdigit(): return int(nested.strip()) elif isinstance(result, str): # e.g. "Success. New document id 10 created" m = re.search(r"New document id\s+(\d+)", result, flags=re.IGNORECASE) if m: return int(m.group(1)) return None class PaperlessClient: def __init__( self, *, base_url: str, token: str, timeout_s: float = 60.0, task_timeout_s: int = 600, task_poll_interval_s: float = 5.0, ) -> None: self._base_url = base_url.rstrip("/") self._token = token self._timeout = httpx.Timeout(timeout_s) self._task_timeout_s = task_timeout_s self._task_poll_interval_s = task_poll_interval_s def _url(self, path: str) -> str: return f"{self._base_url}{path}" @retry( retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)), wait=wait_exponential_jitter(initial=0.5, max=5.0), stop=stop_after_attempt(3), reraise=True, ) async def download_document_pdf(self, *, document_id: int) -> bytes: """Download the original PDF bytes for a Paperless document.""" async with httpx.AsyncClient(timeout=self._timeout, headers=_auth_headers(self._token)) as client: # Common Paperless endpoint: # GET /api/documents/{id}/download/ logger.info("Downloading document_id=%s", document_id) resp = await client.get(self._url(f"/api/documents/{document_id}/download/")) _raise_for_status(resp) return resp.content async def upload_pdf(self, *, filename: str, pdf_bytes: bytes) -> int: """Upload a new document (PDF) and return its new document id.""" async with httpx.AsyncClient(timeout=self._timeout, headers=_auth_headers(self._token)) as client: files = { "document": (filename, pdf_bytes, "application/pdf"), } # POST /api/documents/post_document/ returns JSON about the created document/task. logger.info("Uploading PDF filename=%s bytes=%s", filename, len(pdf_bytes)) resp = await client.post(self._url("/api/documents/post_document/"), files=files) _raise_for_status(resp) data: Any = resp.json() # Paperless has had a few response shapes across versions. # We defensively handle the most common ones. if isinstance(data, dict): if "document" in data and isinstance(data["document"], int): return int(data["document"]) if "id" in data and isinstance(data["id"], int): return int(data["id"]) # Some versions return {"task_id": ""}. if "task_id" in data and isinstance(data["task_id"], str): logger.info("Upload returned task_id=%s", data["task_id"]) return await self._wait_for_task_document_id(client=client, task_id=data["task_id"]) # Other versions return the task id directly as a JSON string: "" if isinstance(data, str): logger.info("Upload returned task_id=%s", data) return await self._wait_for_task_document_id(client=client, task_id=data) raise PaperlessError(f"Unexpected upload response: {json.dumps(data)[:500]}") async def _wait_for_task_document_id(self, *, client: httpx.AsyncClient, task_id: str) -> int: """Poll Paperless' tasks endpoint until it yields a created document id. Why polling is needed: - `post_document` triggers async consumption in Paperless. - Many Paperless versions return a Celery task id (UUID) instead of a document id. This method makes the rest of the pipeline "feel" synchronous: upload_pdf() still returns a document id, it just waits for Paperless to finish processing. """ # This endpoint is documented/mentioned in Paperless discussions and commits: # /api/tasks/?task_id= # We'll try a few times with a small backoff. last_payload: Any = None # We poll until a time budget is exceeded, because Paperless ingestion time varies a lot. max_attempts = max(1, int(self._task_timeout_s / max(self._task_poll_interval_s, 0.1))) for attempt in range(max_attempts): # INFO: every 5th poll + first. DEBUG: every poll (no duplicate INFO line). if logger.isEnabledFor(logging.DEBUG): logger.debug( "Polling task_id=%s attempt=%s/%s (interval=%.1fs timeout=%ss)", task_id, attempt + 1, max_attempts, self._task_poll_interval_s, self._task_timeout_s, ) elif attempt == 0 or (attempt + 1) % 5 == 0: logger.info( "Polling task_id=%s attempt=%s/%s (interval=%.1fs timeout=%ss)", task_id, attempt + 1, max_attempts, self._task_poll_interval_s, self._task_timeout_s, ) resp = await client.get(self._url("/api/tasks/"), params={"task_id": task_id}) _raise_for_status(resp) last_payload = resp.json() # We expect a list (paginated or not). Handle both. items: list[dict[str, Any]] = [] if isinstance(last_payload, dict) and "results" in last_payload and isinstance(last_payload["results"], list): items = [x for x in last_payload["results"] if isinstance(x, dict)] elif isinstance(last_payload, list): items = [x for x in last_payload if isinstance(x, dict)] # Find a matching task and extract document id. for item in items: # Match by Celery UUID (not the numeric DB id). if item.get("task_id") != task_id: continue doc_id = _document_id_from_task_payload(item) if doc_id is not None: logger.info("Task task_id=%s produced document_id=%s", task_id, doc_id) return doc_id # If we have a numeric task pk, fetch detail — list view can lag; trailing slash required. numeric_task_pk = item.get("id") if isinstance(numeric_task_pk, int): detail_resp = await client.get(self._url(f"/api/tasks/{numeric_task_pk}/")) _raise_for_status(detail_resp) detail = detail_resp.json() if isinstance(detail, dict): doc_id = _document_id_from_task_payload(detail) if doc_id is not None: logger.info("Task task_id=%s (detail) produced document_id=%s", task_id, doc_id) return doc_id # Not ready yet; sleep a fixed interval (configurable). await asyncio.sleep(self._task_poll_interval_s) raise PaperlessError( f"Paperless upload task did not yield document id in time. task_id={task_id} last={json.dumps(last_payload)[:500]}" ) @retry( retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)), wait=wait_exponential_jitter(initial=0.5, max=5.0), stop=stop_after_attempt(3), reraise=True, ) async def patch_document( self, *, document_id: int, title: str | None = None, content: str | None = None, custom_fields: list[dict[str, Any]] | None = None, document_type: int | None = None, ) -> None: """Update metadata on a document. For our use-case we mainly set: - title: a human-friendly per-page title - content: OCR text (so Paperless search works) - custom_fields: notebook_id + notebook_page - document_type: Paperless document type id (primary key) """ payload: dict[str, Any] = {} if title is not None: payload["title"] = title if content is not None: payload["content"] = content if custom_fields is not None: payload["custom_fields"] = custom_fields if document_type is not None: payload["document_type"] = document_type async with httpx.AsyncClient(timeout=self._timeout, headers=_auth_headers(self._token)) as client: logger.info( "Patching document_id=%s set_title=%s set_content=%s set_custom_fields=%s set_document_type=%s", document_id, title is not None, content is not None, custom_fields is not None, document_type is not None, ) resp = await client.patch(self._url(f"/api/documents/{document_id}/"), json=payload) _raise_for_status(resp)