feat(publisher): add wordpress draft queue with retry and admin controls

This commit is contained in:
Oliver 2026-02-18 10:49:43 +01:00
parent dcdf4d954a
commit 1cee56205e
13 changed files with 719 additions and 3 deletions

View file

@ -56,11 +56,22 @@ class ArticleUpsert:
legal_checked: bool
legal_checked_at: str | None
legal_note: str | None
wp_post_id: int | None
wp_post_url: str | None
publish_attempts: int
publish_last_error: str | None
published_to_wp_at: str | None
word_count: int
status: str
meta_json: str | None
@dataclass(frozen=True)
class PublishJobCreate:
article_id: int
max_attempts: int = 3
def create_source(payload: SourceCreate) -> int:
with get_conn() as conn:
cur = conn.execute(
@ -235,6 +246,7 @@ def get_article_by_id(article_id: int) -> dict[str, Any] | None:
a.summary, a.content_raw, a.content_rewritten, a.image_urls_json, a.press_contact,
a.source_name_snapshot, a.source_terms_url_snapshot, a.source_license_name_snapshot,
a.legal_checked, a.legal_checked_at, a.legal_note,
a.wp_post_id, a.wp_post_url, a.publish_attempts, a.publish_last_error, a.published_to_wp_at,
a.word_count, a.status, a.meta_json, a.created_at, a.updated_at
FROM articles a
WHERE a.id = ?
@ -375,6 +387,147 @@ def set_article_image_decision(article_id: int, image_url: str, action: str, act
return True
def create_publish_job(payload: PublishJobCreate) -> int:
with get_conn() as conn:
existing = conn.execute(
"""
SELECT id FROM publish_jobs
WHERE article_id = ? AND status IN ('queued', 'running')
ORDER BY id DESC
LIMIT 1
""",
(payload.article_id,),
).fetchone()
if existing:
return int(existing["id"])
cur = conn.execute(
"""
INSERT INTO publish_jobs (article_id, status, attempts, max_attempts)
VALUES (?, 'queued', 0, ?)
""",
(payload.article_id, max(1, payload.max_attempts)),
)
return int(cur.lastrowid)
def list_publish_jobs(limit: int = 100) -> list[dict[str, Any]]:
safe_limit = max(1, min(limit, 500))
with get_conn() as conn:
rows = conn.execute(
"""
SELECT j.id, j.article_id, j.status, j.attempts, j.max_attempts, j.error_message, j.wp_post_id, j.wp_post_url,
j.created_at, j.started_at, j.finished_at, a.title AS article_title
FROM publish_jobs j
LEFT JOIN articles a ON a.id = j.article_id
ORDER BY j.id DESC
LIMIT ?
""",
(safe_limit,),
).fetchall()
return rows_to_dicts(rows)
def claim_next_publish_job() -> dict[str, Any] | None:
with get_conn() as conn:
row = conn.execute(
"""
SELECT id, article_id, status, attempts, max_attempts, error_message, wp_post_id, wp_post_url
FROM publish_jobs
WHERE status = 'queued' AND attempts < max_attempts
ORDER BY id ASC
LIMIT 1
"""
).fetchone()
if not row:
return None
job_id = int(row["id"])
conn.execute(
"""
UPDATE publish_jobs
SET status = 'running',
attempts = attempts + 1,
started_at = datetime('now'),
finished_at = NULL
WHERE id = ?
""",
(job_id,),
)
claimed = conn.execute(
"""
SELECT id, article_id, status, attempts, max_attempts, error_message, wp_post_id, wp_post_url
FROM publish_jobs
WHERE id = ?
""",
(job_id,),
).fetchone()
return dict(claimed) if claimed else None
def complete_publish_job(job_id: int, wp_post_id: int | None, wp_post_url: str | None) -> None:
with get_conn() as conn:
conn.execute(
"""
UPDATE publish_jobs
SET status = 'success',
wp_post_id = ?,
wp_post_url = ?,
error_message = NULL,
finished_at = datetime('now')
WHERE id = ?
""",
(wp_post_id, wp_post_url, job_id),
)
def fail_publish_job(job_id: int, error_message: str, requeue: bool) -> None:
next_status = "queued" if requeue else "failed"
with get_conn() as conn:
conn.execute(
"""
UPDATE publish_jobs
SET status = ?,
error_message = ?,
finished_at = datetime('now')
WHERE id = ?
""",
(next_status, error_message[:2000], job_id),
)
def mark_article_publish_result(
article_id: int,
*,
wp_post_id: int | None,
wp_post_url: str | None,
error: str | None,
increment_attempts: bool,
set_published_status: bool,
) -> None:
with get_conn() as conn:
conn.execute(
"""
UPDATE articles
SET wp_post_id = ?,
wp_post_url = ?,
publish_attempts = CASE WHEN ? THEN publish_attempts + 1 ELSE publish_attempts END,
publish_last_error = ?,
published_to_wp_at = CASE WHEN ? IS NOT NULL THEN datetime('now') ELSE published_to_wp_at END,
status = CASE WHEN ? THEN 'published' ELSE status END
WHERE id = ?
""",
(
wp_post_id,
wp_post_url,
1 if increment_attempts else 0,
error[:2000] if error else None,
wp_post_id,
1 if set_published_status else 0,
article_id,
),
)
def _resolve_existing_article_id(payload: ArticleUpsert) -> int | None:
with get_conn() as conn:
# 1) strongest key: source_url
@ -417,8 +570,9 @@ def upsert_article(payload: ArticleUpsert) -> int:
summary, content_raw, content_rewritten, image_urls_json, press_contact,
source_name_snapshot, source_terms_url_snapshot, source_license_name_snapshot,
legal_checked, legal_checked_at, legal_note,
wp_post_id, wp_post_url, publish_attempts, publish_last_error, published_to_wp_at,
word_count, status, meta_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
payload.feed_id,
@ -440,6 +594,11 @@ def upsert_article(payload: ArticleUpsert) -> int:
1 if payload.legal_checked else 0,
payload.legal_checked_at,
payload.legal_note,
payload.wp_post_id,
payload.wp_post_url,
payload.publish_attempts,
payload.publish_last_error,
payload.published_to_wp_at,
payload.word_count,
payload.status,
payload.meta_json,
@ -469,6 +628,11 @@ def upsert_article(payload: ArticleUpsert) -> int:
legal_checked = ?,
legal_checked_at = ?,
legal_note = ?,
wp_post_id = ?,
wp_post_url = ?,
publish_attempts = ?,
publish_last_error = ?,
published_to_wp_at = ?,
word_count = ?,
status = ?,
meta_json = ?
@ -494,6 +658,11 @@ def upsert_article(payload: ArticleUpsert) -> int:
1 if payload.legal_checked else 0,
payload.legal_checked_at,
payload.legal_note,
payload.wp_post_id,
payload.wp_post_url,
payload.publish_attempts,
payload.publish_last_error,
payload.published_to_wp_at,
payload.word_count,
payload.status,
payload.meta_json,
@ -515,7 +684,8 @@ def list_articles(limit: int = 100, status_filter: str | None = None) -> list[di
SELECT a.id, a.feed_id, a.source_article_id, a.source_hash, a.title, a.source_url, a.canonical_url, a.published_at, a.author,
a.summary, a.content_raw, a.word_count, a.status, a.meta_json, a.created_at, a.updated_at, f.name AS feed_name,
a.image_urls_json, a.press_contact, a.source_name_snapshot, a.source_terms_url_snapshot,
a.source_license_name_snapshot, a.legal_checked, a.legal_checked_at, a.legal_note
a.source_license_name_snapshot, a.legal_checked, a.legal_checked_at, a.legal_note,
a.wp_post_id, a.wp_post_url, a.publish_attempts, a.publish_last_error, a.published_to_wp_at
FROM articles a
LEFT JOIN feeds f ON f.id = a.feed_id
WHERE a.status = ?
@ -530,7 +700,8 @@ def list_articles(limit: int = 100, status_filter: str | None = None) -> list[di
SELECT a.id, a.feed_id, a.source_article_id, a.source_hash, a.title, a.source_url, a.canonical_url, a.published_at, a.author,
a.summary, a.content_raw, a.word_count, a.status, a.meta_json, a.created_at, a.updated_at, f.name AS feed_name,
a.image_urls_json, a.press_contact, a.source_name_snapshot, a.source_terms_url_snapshot,
a.source_license_name_snapshot, a.legal_checked, a.legal_checked_at, a.legal_note
a.source_license_name_snapshot, a.legal_checked, a.legal_checked_at, a.legal_note,
a.wp_post_id, a.wp_post_url, a.publish_attempts, a.publish_last_error, a.published_to_wp_at
FROM articles a
LEFT JOIN feeds f ON f.id = a.feed_id
ORDER BY a.id DESC