fix(ingestion): preserve article workflow data and skip closed items on re-import

This commit is contained in:
Oliver 2026-02-21 14:51:36 +01:00
parent b0f995d5c9
commit 93f52f72b9
No known key found for this signature in database
3 changed files with 206 additions and 31 deletions

View file

@ -16,6 +16,7 @@ from .repositories import (
ArticleUpsert,
RunCreate,
create_run,
find_existing_article_for_upsert,
finish_run,
get_feed_by_id,
list_enabled_feeds,
@ -135,6 +136,20 @@ def _select_relevant_images(source_url: str, title: str, images: list[str], max_
return kept, primary, ranked
def _merge_ingestion_meta(existing_meta_json: str | None, attribution: dict[str, Any], extraction_meta: dict[str, Any]) -> str:
meta: dict[str, Any] = {}
if existing_meta_json:
try:
parsed = json.loads(existing_meta_json)
if isinstance(parsed, dict):
meta = parsed
except Exception:
meta = {}
meta["attribution"] = attribution
meta["extraction"] = extraction_meta
return json.dumps(meta, ensure_ascii=False)
def run_ingestion(feed_id: int | None = None) -> IngestionStats:
run_id = create_run(RunCreate(run_type="ingestion", status="running", details="started"))
feeds_processed = 0
@ -268,37 +283,73 @@ def run_ingestion(feed_id: int | None = None) -> IngestionStats:
"total_candidates": len(extracted.images),
"ranked": ranked_images,
}
article_id = upsert_article(
ArticleUpsert(
feed_id=int(feed["id"]),
source_article_id=entry.get("id") or entry.get("guid"),
source_hash=source_hash,
title=final_title,
source_url=link,
canonical_url=final_canonical,
published_at=_entry_published_iso(entry),
author=final_author,
summary=final_summary,
content_raw=final_content_raw,
content_rewritten=None,
image_urls_json=json.dumps(selected_images, ensure_ascii=False) if selected_images else None,
press_contact=extracted.press_contact,
source_name_snapshot=feed.get("source_name"),
source_terms_url_snapshot=feed.get("source_terms_url"),
source_license_name_snapshot=feed.get("source_license_name"),
legal_checked=False,
legal_checked_at=None,
legal_note=None,
wp_post_id=None,
wp_post_url=None,
publish_attempts=0,
publish_last_error=None,
published_to_wp_at=None,
word_count=len((final_content_raw or "").split()),
status="new",
meta_json=json.dumps({"attribution": attribution, "extraction": extraction_meta}, ensure_ascii=False),
)
base_payload = ArticleUpsert(
feed_id=int(feed["id"]),
source_article_id=entry.get("id") or entry.get("guid"),
source_hash=source_hash,
title=final_title,
source_url=link,
canonical_url=final_canonical,
published_at=_entry_published_iso(entry),
author=final_author,
summary=final_summary,
content_raw=final_content_raw,
content_rewritten=None,
image_urls_json=json.dumps(selected_images, ensure_ascii=False) if selected_images else None,
press_contact=extracted.press_contact,
source_name_snapshot=feed.get("source_name"),
source_terms_url_snapshot=feed.get("source_terms_url"),
source_license_name_snapshot=feed.get("source_license_name"),
legal_checked=False,
legal_checked_at=None,
legal_note=None,
wp_post_id=None,
wp_post_url=None,
publish_attempts=0,
publish_last_error=None,
published_to_wp_at=None,
word_count=len((final_content_raw or "").split()),
status="new",
meta_json=json.dumps({"attribution": attribution, "extraction": extraction_meta}, ensure_ascii=False),
)
existing = find_existing_article_for_upsert(base_payload)
if existing and existing.get("status") == "error":
# Explicitly closed article: ignore on subsequent ingestion runs.
continue
payload = base_payload
if existing:
payload = ArticleUpsert(
feed_id=base_payload.feed_id,
source_article_id=base_payload.source_article_id,
source_hash=base_payload.source_hash,
title=base_payload.title,
source_url=base_payload.source_url,
canonical_url=base_payload.canonical_url,
published_at=base_payload.published_at,
author=base_payload.author,
summary=base_payload.summary,
content_raw=base_payload.content_raw,
content_rewritten=existing.get("content_rewritten"),
image_urls_json=base_payload.image_urls_json,
press_contact=base_payload.press_contact or existing.get("press_contact"),
source_name_snapshot=base_payload.source_name_snapshot,
source_terms_url_snapshot=base_payload.source_terms_url_snapshot,
source_license_name_snapshot=base_payload.source_license_name_snapshot,
legal_checked=bool(int(existing.get("legal_checked", 0))),
legal_checked_at=existing.get("legal_checked_at"),
legal_note=existing.get("legal_note"),
wp_post_id=existing.get("wp_post_id"),
wp_post_url=existing.get("wp_post_url"),
publish_attempts=int(existing.get("publish_attempts", 0)),
publish_last_error=existing.get("publish_last_error"),
published_to_wp_at=existing.get("published_to_wp_at"),
word_count=base_payload.word_count,
status=existing.get("status") or "new",
meta_json=_merge_ingestion_meta(existing.get("meta_json"), attribution, extraction_meta),
)
article_id = upsert_article(payload)
if article_id:
articles_upserted += 1
feed_upserts += 1

View file

@ -633,6 +633,13 @@ def _resolve_existing_article_id(payload: ArticleUpsert) -> int | None:
return None
def find_existing_article_for_upsert(payload: ArticleUpsert) -> dict[str, Any] | None:
article_id = _resolve_existing_article_id(payload)
if article_id is None:
return None
return get_article_by_id(article_id)
def upsert_article(payload: ArticleUpsert) -> int:
existing_id = _resolve_existing_article_id(payload)
with get_conn() as conn: