from __future__ import annotations import base64 from html import escape import logging import json import mimetypes from pathlib import Path import re from typing import Any from html import unescape as _html_unescape from urllib.parse import quote_plus, urlparse from urllib.request import Request, urlopen from .config import get_settings def _auth_header(username: str, app_password: str) -> str: token = base64.b64encode(f"{username}:{app_password}".encode("utf-8")).decode("ascii") return f"Basic {token}" def _wp_request( *, base_url: str, auth_header: str, method: str, endpoint: str, payload: dict[str, Any] | None = None, ) -> Any: url = f"{base_url.rstrip('/')}/wp-json/wp/v2/{endpoint.lstrip('/')}" data = json.dumps(payload).encode("utf-8") if payload is not None else None req = Request( url=url, data=data, method=method, headers={ "Authorization": auth_header, "Content-Type": "application/json; charset=utf-8", "Accept": "application/json", "User-Agent": "rss-news-publisher/1.0", }, ) with urlopen(req, timeout=20) as resp: raw = resp.read().decode("utf-8", errors="replace") return json.loads(raw) if raw else {} def _selected_image_url_from_meta(meta_json: str | None) -> str | None: if not meta_json: return None try: meta = json.loads(meta_json) except Exception: return None if not isinstance(meta, dict): return None image_review = meta.get("image_review") if not isinstance(image_review, dict): return None selected = image_review.get("selected_url") return selected if isinstance(selected, str) and selected.strip() else None def _selected_tags_from_meta(meta_json: str | None) -> list[str]: if not meta_json: return [] try: meta = json.loads(meta_json) except Exception: return [] if not isinstance(meta, dict): return [] raw_tags = meta.get("generated_tags") if not isinstance(raw_tags, list): return [] tags: list[str] = [] seen: set[str] = set() for item in raw_tags: value = str(item or "").strip() if not value: continue key = value.casefold() if key in seen: continue seen.add(key) tags.append(value) if len(tags) >= 12: break return tags def _resolve_wp_tag_ids(*, base_url: str, auth_header: str, tags: list[str]) -> list[int]: ids: list[int] = [] seen: set[int] = set() for tag in tags: name = tag.strip() if not name: continue try: endpoint = f"tags?search={quote_plus(name)}&per_page=20" result = _wp_request(base_url=base_url, auth_header=auth_header, method="GET", endpoint=endpoint) tag_id: int | None = None if isinstance(result, list): for row in result: if not isinstance(row, dict): continue row_name = str(row.get("name") or "") rid = int(row.get("id", 0) or 0) if rid <= 0: continue if row_name.casefold() == name.casefold(): tag_id = rid break if tag_id is None: for row in result: if isinstance(row, dict) and int(row.get("id", 0) or 0) > 0: tag_id = int(row.get("id", 0)) break if tag_id is None: created = _wp_request( base_url=base_url, auth_header=auth_header, method="POST", endpoint="tags", payload={"name": name}, ) if isinstance(created, dict): rid = int(created.get("id", 0) or 0) if rid > 0: tag_id = rid if tag_id is not None and tag_id > 0 and tag_id not in seen: seen.add(tag_id) ids.append(tag_id) except Exception: continue return ids _BLOCKED_IMAGE_EXTS = {".svg", ".gif", ".ico", ".webp"} _logger = logging.getLogger(__name__) def _sanitize_image_url(url: str) -> str: """Decode HTML entities (e.g. & → &) in image URLs from RSS feeds.""" return _html_unescape(url) _PLACEHOLDER_PATTERNS = ("some-default.jpg", "default-image", "placeholder", "no-image", "noimage") def _is_usable_image_url(url: str) -> bool: """Return False for URLs that are unlikely to work as WP featured images.""" if not url or url.startswith("data:"): return False try: path = urlparse(url).path.lower() _, ext = path.rsplit(".", 1) if "." in path else ("", "") if f".{ext}" in _BLOCKED_IMAGE_EXTS: return False if any(p in path for p in _PLACEHOLDER_PATTERNS): return False except Exception: pass return True def _download_image_bytes(url: str, referer: str | None = None) -> tuple[bytes, str]: url = _sanitize_image_url(url) headers = { "User-Agent": "Mozilla/5.0 (compatible; rss-news-publisher/1.0)", "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8", } if referer: headers["Referer"] = referer req = Request(url=url, headers=headers) with urlopen(req, timeout=20) as resp: raw = resp.read() content_type = resp.headers.get("Content-Type", "application/octet-stream") content_type = content_type.split(";")[0].strip() if content_type else "application/octet-stream" if not content_type.lower().startswith("image/"): raise RuntimeError(f"Ausgewählte Bild-URL liefert kein Bild ({content_type})") return raw, content_type def _guess_filename(image_url: str, content_type: str) -> str: parsed = urlparse(_sanitize_image_url(image_url)) stem = Path(parsed.path).name or "article-image" if "." not in stem: ext = mimetypes.guess_extension(content_type.split(";")[0].strip()) or ".jpg" stem = f"{stem}{ext}" # Sanitize to ASCII-safe characters for the HTTP Content-Disposition header stem = stem.encode("ascii", errors="ignore").decode("ascii") stem = re.sub(r"[^\w.\-]", "_", stem) or "article-image.jpg" return stem def _get_image_meta_for_url(meta_json: str | None, image_url: str) -> dict: """Return the caption/credit dict for a specific image URL from extraction metadata.""" if not meta_json or not image_url: return {} try: from urllib.parse import urlparse meta = json.loads(meta_json) image_metadata = (meta.get("extraction") or {}).get("image_metadata") or {} # Exact match first if image_url in image_metadata: return image_metadata[image_url] # Fuzzy match: compare without query string (handles ?w=1200 variants) base_url = urlparse(image_url)._replace(query="").geturl() for key, val in image_metadata.items(): key_base = urlparse(key)._replace(query="").geturl() if key_base == base_url: return val return {} except Exception: return {} def _build_image_caption(image_meta: dict, source_url: str) -> str: """Build a WP caption string from image metadata and source URL.""" # caption from figcaption typically already contains the credit text caption = (image_meta.get("caption") or "").strip() if caption: return caption return f"Quelle: {source_url}" def _upload_featured_media( *, base_url: str, auth_header: str, image_url: str, article_title: str, source_url: str, image_caption: str = "", ) -> int: image_bytes, content_type = _download_image_bytes(image_url, referer=source_url or None) filename = _guess_filename(image_url, content_type) media_url = f"{base_url.rstrip('/')}/wp-json/wp/v2/media" media_req = Request( url=media_url, data=image_bytes, method="POST", headers={ "Authorization": auth_header, "Content-Type": content_type, "Content-Disposition": f'attachment; filename="{filename}"', "Accept": "application/json", "User-Agent": "rss-news-publisher/1.0", }, ) with urlopen(media_req, timeout=30) as resp: media_raw = resp.read().decode("utf-8", errors="replace") media_payload = json.loads(media_raw) if media_raw else {} media_id = int(media_payload.get("id", 0)) if isinstance(media_payload, dict) else 0 if media_id <= 0: raise RuntimeError(f"WordPress Media-Upload fehlgeschlagen: {media_payload}") _wp_request( base_url=base_url, auth_header=auth_header, method="POST", endpoint=f"media/{media_id}", payload={ "title": f"{article_title[:120]} - Bild", "caption": image_caption or f"Quelle: {source_url}", "alt_text": article_title[:200], }, ) return media_id def _as_paragraph_html(text: str) -> str: chunks = [chunk.strip() for chunk in re.split(r"\n{2,}", text.strip()) if chunk.strip()] if not chunks: return "" lines = [] for chunk in chunks: compact = re.sub(r"\s*\n\s*", " ", chunk) lines.append(f"

{escape(compact)}

") return "\n".join(lines) def _as_block_paragraphs(text: str) -> str: chunks = [chunk.strip() for chunk in re.split(r"\n{2,}", text.strip()) if chunk.strip()] if not chunks: return "" lines = [] for chunk in chunks: compact = re.sub(r"\s*\n\s*", " ", chunk) lines.append(f"

{escape(compact)}

") return "\n".join(lines) def _strip_html_tags(raw: str) -> str: text = re.sub(r"<[^>]+>", " ", raw or "") return re.sub(r"\s+", " ", text).strip() def _html_to_wp_blocks(html: str) -> str: src = (html or "").strip() if not src: return "" pattern = re.compile( r"]*>[\s\S]*?|]*>[\s\S]*?

|]*>[\s\S]*?|]*>[\s\S]*?", re.IGNORECASE, ) blocks: list[str] = [] for match in pattern.finditer(src): block_html = match.group(0).strip() if not block_html: continue tag_match = re.match(r"<([a-z0-9]+)", block_html, re.IGNORECASE) tag = (tag_match.group(1).lower() if tag_match else "") if tag == "p": blocks.append(f"{block_html}") elif tag in {"ul", "ol"}: ordered = tag == "ol" if ordered: blocks.append(f'{block_html}') else: blocks.append(f"{block_html}") elif tag.startswith("h") and len(tag) == 2 and tag[1].isdigit(): level = int(tag[1]) blocks.append(f'{block_html}') if blocks: return "\n".join(blocks) return _as_block_paragraphs(_strip_html_tags(src)) def _as_block_heading(level: int, text: str) -> str: safe_level = min(6, max(1, int(level))) return f'{escape(text)}' def _as_block_list(items: list[str]) -> str: if not items: return "" content = "".join(f"
  • {item}
  • " for item in items) return f"
      {content}
    " def _sanitize_publish_text(text: str) -> str: raw = (text or "").strip() if not raw: return "" lines = [ln.strip() for ln in raw.splitlines() if ln.strip()] if len(lines) > 3: lines = lines[3:] merged = "\n".join(lines) merged = re.sub(r"\n?\s*Pressekontakt[\s\S]*$", "", merged, flags=re.IGNORECASE).strip() return merged def _build_attribution_block(article: dict[str, Any]) -> str: """Build a WP Gutenberg attribution block for the bottom of the article.""" from urllib.parse import urlparse source_url = (article.get("canonical_url") or article.get("source_url") or "").strip() source_name = (article.get("source_name_snapshot") or "").strip() author = (article.get("author") or "").strip() # If the feed name is "Google Alerts" (or similar generic names), derive the # real source name from the hostname of the canonical URL. if not source_name or source_name.lower() in ("google alerts", "google"): try: hostname = urlparse(source_url).hostname or "" source_name = hostname.removeprefix("www.") except Exception: pass # Get image credit from extraction metadata (uses fuzzy URL match) meta_json = article.get("meta_json") credit = "" try: meta = json.loads(meta_json or "{}") selected_url = (meta.get("image_review") or {}).get("selected_url") or "" if selected_url: img_meta = _get_image_meta_for_url(meta_json, selected_url) raw_credit = (img_meta.get("credit") or "").strip() caption_text = (img_meta.get("caption") or "").strip() # If credit is just a bare marker prefix (e.g. "Foto:", "Bild:"), # clear it and extract the full credit from the caption text instead. _BARE_MARKERS = {"foto", "bild", "credit", "fotograf", "fotografie", "photo", "bildnachweis"} if raw_credit.endswith(":") and raw_credit[:-1].strip().lower() in _BARE_MARKERS: raw_credit = "" if raw_credit: credit = raw_credit elif caption_text: # Extract credit markers like "Foto: IMAGO/…", "© Agentur", "Bild: …" import re as _re m = _re.search( r"(©[^\n]{1,120}|(?:Foto|Bild|Credit|Fotograf|Photo)\s*:[^\n]{1,120})", caption_text, ) credit = m.group(1).strip() if m else "" except Exception: pass parts: list[str] = [] if source_url: label = source_name or source_url parts.append(f'Originalartikel: {escape(label)}') if author: parts.append(f"Autor: {escape(author)}") if credit: parts.append(f"Bildnachweis: {escape(credit)}") if not parts: return "" inner = "  |  ".join(parts) return ( "\n" "
    \n" f'' f'

    {inner}

    ' "" ) def _build_post_content(article: dict[str, Any]) -> tuple[str, str | None]: summary = (article.get("summary") or "").strip() body_text = (article.get("content_rewritten") or article.get("content_raw") or "").strip() body_text = _sanitize_publish_text(body_text) if not body_text: body_text = summary has_html = bool(re.search(r"<[a-zA-Z][^>]*>", body_text)) body_html = _html_to_wp_blocks(body_text) if has_html else _as_block_paragraphs(body_text) if not body_html: body_html = "

    Kein Inhalt verfügbar.

    " attribution = _build_attribution_block(article) content = (body_html + attribution).strip() return content, None def publish_article_draft(article: dict[str, Any]) -> tuple[int, str | None]: settings = get_settings() if not settings.wordpress_base_url or not settings.wordpress_username or not settings.wordpress_app_password: raise RuntimeError("WordPress Konfiguration fehlt (base_url, username, app_password)") auth = _auth_header(settings.wordpress_username, settings.wordpress_app_password) title = (article.get("title") or "Ohne Titel").strip() content, excerpt = _build_post_content(article) source_url = article.get("source_url") or "" featured_media_id = None selected_image_url = _selected_image_url_from_meta(article.get("meta_json")) # Build candidate list: primary selected URL + fallbacks from image_urls_json image_candidates: list[str] = [] if selected_image_url and _is_usable_image_url(selected_image_url): image_candidates.append(selected_image_url) try: extra_urls = json.loads(article.get("image_urls_json") or "[]") for u in extra_urls: if u and u not in image_candidates and _is_usable_image_url(u): image_candidates.append(u) except Exception: pass for candidate_url in image_candidates: image_meta = _get_image_meta_for_url(article.get("meta_json"), candidate_url) image_caption = _build_image_caption(image_meta, source_url) try: featured_media_id = _upload_featured_media( base_url=settings.wordpress_base_url, auth_header=auth, image_url=candidate_url, article_title=title, source_url=source_url, image_caption=image_caption, ) break # success — stop trying further candidates except Exception as img_exc: _logger.warning( "Bild-Upload fehlgeschlagen, versuche nächste URL: %s — %s", candidate_url, img_exc ) if not featured_media_id and image_candidates: _logger.warning( "Alle %d Bild-Kandidaten fehlgeschlagen für Artikel #%s (%s)", len(image_candidates), article.get("id"), title[:60], ) payload = { "title": title, "content": content, "status": settings.wordpress_default_status, } if excerpt: payload["excerpt"] = excerpt if featured_media_id: payload["featured_media"] = featured_media_id scheduled_at = article.get("scheduled_publish_at") if scheduled_at: payload["date"] = scheduled_at # e.g. "2026-03-24T09:00:00" # Use status "future" so WP schedules auto-publishing at the given date. # WP ignores date for drafts and shows "Sofort veröffentlichen" instead. try: from datetime import datetime as _dt if _dt.fromisoformat(scheduled_at) > _dt.now(): payload["status"] = "future" except Exception: pass wp_post_id = article.get("wp_post_id") tag_ids = _resolve_wp_tag_ids( base_url=settings.wordpress_base_url, auth_header=auth, tags=_selected_tags_from_meta(article.get("meta_json")), ) if tag_ids: payload["tags"] = tag_ids if wp_post_id: result = _wp_request( base_url=settings.wordpress_base_url, auth_header=auth, method="POST", endpoint=f"posts/{int(wp_post_id)}", payload=payload, ) else: result = _wp_request( base_url=settings.wordpress_base_url, auth_header=auth, method="POST", endpoint="posts", payload=payload, ) if not isinstance(result, dict): raise RuntimeError(f"WordPress Antwort im unerwarteten Format: {result}") post_id = int(result.get("id", 0)) if post_id <= 0: raise RuntimeError(f"WordPress Antwort ohne Post-ID: {result}") post_url = result.get("link") return post_id, post_url if isinstance(post_url, str) else None def selected_image_exists(article: dict[str, Any]) -> bool: return _selected_image_url_from_meta(article.get("meta_json")) is not None def delete_wp_post(wp_post_id: int) -> None: """Permanently delete a WordPress post (moves to trash, then deletes).""" settings = get_settings() if not settings.wordpress_base_url or not settings.wordpress_username or not settings.wordpress_app_password: raise RuntimeError("WordPress Konfiguration fehlt") auth = _auth_header(settings.wordpress_username, settings.wordpress_app_password) # force=true skips trash _wp_request( base_url=settings.wordpress_base_url, auth_header=auth, method="DELETE", endpoint=f"posts/{wp_post_id}?force=true", ) def sync_db_from_wordpress() -> dict[str, Any]: """Sync scheduled_publish_at and wp_post_url in the DB from WordPress. WordPress is treated as the source of truth for scheduling. For each DB article that has a wp_post_id: - If WP post exists as 'future': update scheduled_publish_at to WP date. - If WP post exists as 'draft': clear scheduled_publish_at (not yet scheduled). - If WP post exists as 'publish': mark article as published in DB. - If WP post is trashed/deleted (404 or trash status): clear wp_post_id, wp_post_url, and scheduled_publish_at so the article can be re-processed. Returns a stats dict with counts of each action taken. """ from .db import get_conn settings = get_settings() if not settings.wordpress_base_url or not settings.wordpress_username or not settings.wordpress_app_password: raise RuntimeError("WordPress Konfiguration fehlt") auth = _auth_header(settings.wordpress_username, settings.wordpress_app_password) base_url = settings.wordpress_base_url.rstrip("/") # Fetch all future + draft + published WP posts in one pass (up to 300 per status) wp_posts: dict[int, dict] = {} for status in ("future", "draft", "publish"): for page in range(1, 4): # max 300 per status try: result = _wp_request( base_url=base_url, auth_header=auth, method="GET", endpoint=f"posts?status={status}&per_page=100&page={page}&_fields=id,date,status,link", ) except Exception: break if not isinstance(result, list) or not result: break for post in result: try: wp_posts[int(post["id"])] = post except Exception: pass if len(result) < 100: break # Load all DB articles that have a wp_post_id with get_conn() as conn: rows = conn.execute( """ SELECT id, wp_post_id, wp_post_url, scheduled_publish_at, status FROM articles WHERE wp_post_id IS NOT NULL AND status NOT IN ('no_image') ORDER BY id """ ).fetchall() stats: dict[str, int] = { "total_db_articles": len(rows), "wp_posts_found": len(wp_posts), "slot_updated": 0, "slot_cleared_draft": 0, "marked_published": 0, "wp_reference_cleared": 0, "already_in_sync": 0, } for row in rows: article_id = row["id"] wp_post_id = int(row["wp_post_id"]) wp_post = wp_posts.get(wp_post_id) if wp_post is None: # Post not found in future/draft/publish — likely trashed or deleted # Clear wp reference so article can be re-processed if needed with get_conn() as conn: conn.execute( """UPDATE articles SET wp_post_id = NULL, wp_post_url = NULL, scheduled_publish_at = NULL WHERE id = ?""", (article_id,), ) stats["wp_reference_cleared"] += 1 continue wp_status = wp_post.get("status", "") wp_date = wp_post.get("date", "") # local CET datetime, e.g. "2026-05-05T09:00:00" wp_link = wp_post.get("link") or row["wp_post_url"] if wp_status == "publish": # Already published in WP — mark as published in DB if not already if row["status"] != "published": with get_conn() as conn: conn.execute( "UPDATE articles SET status = 'published', wp_post_url = ? WHERE id = ?", (wp_link, article_id), ) stats["marked_published"] += 1 else: stats["already_in_sync"] += 1 elif wp_status == "future": # Scheduled — sync the date into scheduled_publish_at current_slot = row["scheduled_publish_at"] or "" # WP returns e.g. "2026-05-05T09:00:00" — compare ignoring seconds if current_slot[:16] != wp_date[:16]: with get_conn() as conn: conn.execute( "UPDATE articles SET scheduled_publish_at = ?, wp_post_url = ? WHERE id = ?", (wp_date, wp_link, article_id), ) stats["slot_updated"] += 1 else: stats["already_in_sync"] += 1 elif wp_status == "draft": # Draft without a schedule — clear scheduled_publish_at if set if row["scheduled_publish_at"]: with get_conn() as conn: conn.execute( "UPDATE articles SET scheduled_publish_at = NULL WHERE id = ?", (article_id,), ) stats["slot_cleared_draft"] += 1 else: stats["already_in_sync"] += 1 return stats