# utils/wordpress_uploader.py import requests import json import os import logging import base64 from datetime import datetime from typing import Dict, List, Optional, Tuple from dotenv import load_dotenv load_dotenv() # WordPress API Konfiguration – ausschließlich aus .env WP_BASE_URL = os.getenv("WP_BASE_URL") WP_USERNAME = os.getenv("WP_USERNAME") WP_PASSWORD = os.getenv("WP_PASSWORD") WP_AUTH_BASE64 = os.getenv("WP_AUTH_BASE64") # Request-Konfiguration REQUEST_TIMEOUT = 30 MAX_RETRIES = 3 USER_AGENT = 'RSS-Feed-Manager/1.7.x' class WordPressUploader: """ Klasse für den Upload von Artikeln zu WordPress über die REST API mit Base64-Authentifizierung """ def __init__(self): # Basis-URL validieren und Endpunkt bauen if not WP_BASE_URL: raise ValueError("WP_BASE_URL nicht gesetzt. Bitte .env konfigurieren.") self.base_url = WP_BASE_URL.rstrip('/') self.api_endpoint = f"{self.base_url}/wp-json/wp/v2" # Zugangsdaten (aus .env) self.username = WP_USERNAME self.password = WP_PASSWORD self.auth_base64 = WP_AUTH_BASE64 if not self.auth_base64 and not (self.username and self.password): raise ValueError("WordPress-Authentifizierung nicht konfiguriert. WP_AUTH_BASE64 oder WP_USERNAME + WP_PASSWORD setzen.") # Session für bessere Performance self.session = requests.Session() # Authentifizierung über Authorization Header mit Base64 if self.auth_base64: self.session.headers.update({ 'Authorization': f'Basic {self.auth_base64}', 'User-Agent': USER_AGENT, 'Content-Type': 'application/json', 'Accept': 'application/json' }) logging.info("✅ WordPress-Authentifizierung: Base64-String verwendet") else: credentials = f"{self.username}:{self.password}" encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8') self.session.headers.update({ 'Authorization': f'Basic {encoded_credentials}', 'User-Agent': USER_AGENT, 'Content-Type': 'application/json', 'Accept': 'application/json' }) logging.info("✅ WordPress-Authentifizierung: Base64 aus Username/Password generiert") # Standard-Kategorie ID ermitteln self.default_category_id = self._get_default_category_id() def _get_default_category_id(self) -> int: """ Ermittelt die ID der Standard-Kategorie 'Allgemein' """ try: response = self.session.get( f"{self.api_endpoint}/categories", params={'search': 'Allgemein', 'per_page': 10}, timeout=REQUEST_TIMEOUT ) response.raise_for_status() categories = response.json() for category in categories: if category['name'].lower() == 'allgemein': logging.info(f"✅ Standard-Kategorie 'Allgemein' gefunden: ID {category['id']}") return category['id'] # Fallback: Erste Kategorie oder Standard-ID if categories: logging.warning(f"⚠️ Kategorie 'Allgemein' nicht gefunden, verwende '{categories[0]['name']}' (ID: {categories[0]['id']})") return categories[0]['id'] else: logging.warning("⚠️ Keine Kategorien gefunden, verwende Standard-ID 1") return 1 except Exception as e: logging.error(f"❌ Fehler beim Ermitteln der Standard-Kategorie: {e}") return 1 # WordPress Standard-Kategorie def _get_or_create_tags(self, tag_names: List[str]) -> List[int]: """ Ermittelt oder erstellt Tags und gibt deren IDs zurück """ tag_ids = [] if not tag_names: return tag_ids try: # Bestehende Tags abrufen for tag_name in tag_names: tag_name = tag_name.strip() if not tag_name: continue try: # Suche nach existierendem Tag response = self.session.get( f"{self.api_endpoint}/tags", params={'search': tag_name, 'per_page': 10}, timeout=REQUEST_TIMEOUT ) response.raise_for_status() existing_tags = response.json() tag_found = False # Exakte Übereinstimmung suchen for tag in existing_tags: if tag['name'].lower() == tag_name.lower(): tag_ids.append(tag['id']) tag_found = True logging.info(f"✅ Existierender Tag gefunden: '{tag_name}' (ID: {tag['id']})") break # Tag erstellen falls nicht gefunden if not tag_found: create_response = self.session.post( f"{self.api_endpoint}/tags", json={'name': tag_name}, timeout=REQUEST_TIMEOUT ) if create_response.status_code == 201: new_tag = create_response.json() tag_ids.append(new_tag['id']) logging.info(f"✅ Neuer Tag erstellt: '{tag_name}' (ID: {new_tag['id']})") else: logging.warning(f"⚠️ Tag '{tag_name}' konnte nicht erstellt werden: {create_response.status_code}") continue except Exception as e: logging.error(f"❌ Fehler beim Verarbeiten von Tag '{tag_name}': {e}") continue logging.info(f"🏷️ Tags verarbeitet: {len(tag_ids)} Tag-IDs erstellt") return tag_ids except Exception as e: logging.error(f"❌ Allgemeiner Fehler bei Tag-Verarbeitung: {e}") return [] def _prepare_post_data(self, article: Dict) -> Dict: """ Bereitet die Artikel-Daten für WordPress vor """ # Tags verarbeiten - WordPress benötigt Tag-IDs, nicht Namen tag_names = article.get('tags', []) tag_ids = self._get_or_create_tags(tag_names) # Basis Post-Daten post_data = { 'title': article.get('title', 'Kein Titel'), 'content': article.get('text', ''), 'status': 'pending', # Artikel als "Ausstehend" markieren 'categories': [self.default_category_id], 'excerpt': article.get('summary', '')[:300], # WordPress Excerpt 'meta': { 'rss_source': article.get('source', ''), 'rss_original_link': article.get('link', ''), 'rss_import_date': datetime.now().isoformat(), 'rss_article_id': article.get('id', '') } } # Tags nur hinzufügen wenn vorhanden if tag_ids: post_data['tags'] = tag_ids # Optional: Author setzen (falls unterschiedliche Autoren gewünscht) # post_data['author'] = 1 # WordPress User ID logging.info(f"📝 Post-Daten vorbereitet: Titel='{post_data['title']}', Tags={len(tag_ids)}, Kategorie={self.default_category_id}") return post_data def _check_duplicate(self, article: Dict) -> Optional[int]: """ Prüft, ob ein Artikel bereits in WordPress existiert """ try: # Suche nach Titel title = article.get('title', '') if not title: return None response = self.session.get( f"{self.api_endpoint}/posts", params={ 'search': title, 'per_page': 5, 'status': 'any' # Alle Status durchsuchen }, timeout=REQUEST_TIMEOUT ) response.raise_for_status() posts = response.json() for post in posts: # Exakte Titel-Übereinstimmung if post['title']['rendered'].strip() == title.strip(): logging.info(f"🔄 Duplikat gefunden: '{title}' (WordPress ID: {post['id']})") return post['id'] # Prüfe auch Custom Meta Fields (RSS Article ID) article_id = article.get('id') if article_id: # Meta-Felder würden eine separate API-Abfrage erfordern # Für jetzt: Nur Titel-basierte Duplikatserkennung pass return None except Exception as e: logging.error(f"❌ Fehler bei Duplikatsprüfung für '{article.get('title', 'Unbekannt')}': {e}") return None def upload_article(self, article: Dict) -> Tuple[bool, str, Optional[int]]: """ Lädt einen einzelnen Artikel zu WordPress hoch Returns: Tuple[bool, str, Optional[int]]: (Erfolg, Nachricht, WordPress Post ID) """ title = article.get('title', 'Unbekannt') try: logging.info(f"📤 Starte WordPress-Upload: {title}") # Duplikatsprüfung existing_post_id = self._check_duplicate(article) if existing_post_id: return False, f"Artikel '{title}' existiert bereits in WordPress (ID: {existing_post_id})", existing_post_id # Post-Daten vorbereiten post_data = self._prepare_post_data(article) # Upload mit Retry-Logik for attempt in range(MAX_RETRIES): try: response = self.session.post( f"{self.api_endpoint}/posts", json=post_data, timeout=REQUEST_TIMEOUT ) if response.status_code == 201: # Erfolgreich erstellt wp_post = response.json() wp_post_id = wp_post['id'] wp_url = wp_post['link'] logging.info(f"✅ WordPress-Upload erfolgreich: '{title}' (ID: {wp_post_id})") logging.info(f"🔗 WordPress-URL: {wp_url}") return True, f"Erfolgreich hochgeladen: {wp_url}", wp_post_id elif response.status_code == 400: # Client Error - nicht wiederholen error_data = response.json() error_msg = error_data.get('message', 'Unbekannter Fehler') error_code = error_data.get('code', 'unknown') # Detaillierte Fehleranalyse if 'parameter' in error_msg.lower() and 'tags' in error_msg.lower(): logging.error(f"❌ WordPress-Tag-Fehler für '{title}': {error_msg}") logging.error(f"📋 Post-Daten: {json.dumps(post_data, indent=2, ensure_ascii=False)}") return False, f"Tag-Fehler: {error_msg} (Artikel-Tags: {article.get('tags', [])})", None else: logging.error(f"❌ WordPress-Fehler 400 für '{title}': {error_msg} (Code: {error_code})") logging.error(f"📋 Post-Daten: {json.dumps(post_data, indent=2, ensure_ascii=False)}") return False, f"WordPress-Fehler: {error_msg}", None elif response.status_code == 401: # Authentifizierungsfehler logging.error(f"❌ WordPress-Authentifizierungsfehler für '{title}'") return False, "Authentifizierungsfehler - bitte Zugangsdaten prüfen", None elif response.status_code == 403: # Berechtigungsfehler logging.error(f"❌ WordPress-Berechtigungsfehler für '{title}'") return False, "Keine Berechtigung zum Erstellen von Posts", None else: # Server Error - Retry möglich if attempt < MAX_RETRIES - 1: logging.warning(f"⚠️ WordPress-Upload Versuch {attempt + 1} fehlgeschlagen für '{title}' (Status: {response.status_code}), versuche erneut...") continue else: logging.error(f"❌ WordPress-Upload nach {MAX_RETRIES} Versuchen fehlgeschlagen für '{title}' (Status: {response.status_code})") return False, f"Upload fehlgeschlagen nach {MAX_RETRIES} Versuchen (HTTP {response.status_code})", None except requests.exceptions.Timeout: if attempt < MAX_RETRIES - 1: logging.warning(f"⏱️ Timeout bei WordPress-Upload für '{title}' (Versuch {attempt + 1}), versuche erneut...") continue else: logging.error(f"❌ Timeout bei WordPress-Upload für '{title}' nach {MAX_RETRIES} Versuchen") return False, f"Timeout nach {MAX_RETRIES} Versuchen", None except requests.exceptions.ConnectionError as e: if attempt < MAX_RETRIES - 1: logging.warning(f"🌐 Verbindungsfehler bei WordPress-Upload für '{title}' (Versuch {attempt + 1}): {e}") continue else: logging.error(f"❌ Verbindungsfehler bei WordPress-Upload für '{title}' nach {MAX_RETRIES} Versuchen: {e}") return False, f"Verbindungsfehler nach {MAX_RETRIES} Versuchen", None except Exception as e: logging.error(f"❌ Unerwarteter Fehler bei WordPress-Upload für '{title}': {e}") return False, f"Unerwarteter Fehler: {str(e)}", None def test_connection(self) -> Tuple[bool, str]: """ Testet die Verbindung zur WordPress API mit Base64-Authentifizierung """ try: logging.info("🔧 Teste WordPress-API-Verbindung...") # Einfache Abfrage der Kategorien als Test response = self.session.get( f"{self.api_endpoint}/categories", params={'per_page': 1}, timeout=REQUEST_TIMEOUT ) logging.info(f"📡 API-Response Status: {response.status_code}") if response.status_code == 200: logging.info("✅ WordPress-API-Verbindung erfolgreich") return True, "Verbindung zur WordPress API erfolgreich" elif response.status_code == 401: logging.error("❌ WordPress-API-Authentifizierung fehlgeschlagen") logging.error(f"Response Body: {response.text}") return False, "Authentifizierung fehlgeschlagen - bitte Base64-String oder Zugangsdaten prüfen" elif response.status_code == 403: logging.error("❌ WordPress-API-Berechtigung fehlgeschlagen") logging.error(f"Response Body: {response.text}") return False, "Keine Berechtigung - bitte Benutzerrechte prüfen" else: logging.error(f"❌ WordPress-API-Test fehlgeschlagen (Status: {response.status_code})") logging.error(f"Response Body: {response.text}") return False, f"API-Test fehlgeschlagen (HTTP {response.status_code}): {response.text[:100]}" except requests.exceptions.ConnectionError as e: logging.error(f"❌ Verbindungsfehler zur WordPress API: {e}") return False, f"Verbindungsfehler: {str(e)}" except Exception as e: logging.error(f"❌ Unerwarteter Fehler beim WordPress-API-Test: {e}") return False, f"Unerwarteter Fehler: {str(e)}" def upload_multiple_articles(self, articles: List[Dict]) -> Dict: """ Lädt mehrere Artikel zu WordPress hoch Returns: Dict mit Statistiken über erfolgreiche und fehlgeschlagene Uploads """ results = { 'total': len(articles), 'successful': 0, 'failed': 0, 'duplicates': 0, 'details': [] } logging.info(f"📦 Starte Batch-Upload von {len(articles)} Artikeln zu WordPress") for i, article in enumerate(articles, 1): title = article.get('title', f'Artikel {i}') logging.info(f"📤 Upload {i}/{len(articles)}: {title}") success, message, wp_post_id = self.upload_article(article) result_detail = { 'article_id': article.get('id'), 'title': title, 'success': success, 'message': message, 'wp_post_id': wp_post_id } results['details'].append(result_detail) if success: results['successful'] += 1 elif 'existiert bereits' in message: results['duplicates'] += 1 else: results['failed'] += 1 # Kurze Pause zwischen Uploads if i < len(articles): import time time.sleep(1) logging.info(f"📊 Batch-Upload abgeschlossen: {results['successful']} erfolgreich, {results['failed']} fehlgeschlagen, {results['duplicates']} Duplikate") return results def __del__(self): """ Session sauber schließen """ if hasattr(self, 'session'): self.session.close() def upload_articles_to_wordpress(articles: List[Dict]) -> Dict: """ Convenience-Funktion für den Upload von Artikeln zu WordPress """ uploader = WordPressUploader() # Verbindung testen connection_ok, connection_msg = uploader.test_connection() if not connection_ok: logging.error(f"❌ WordPress-Verbindung fehlgeschlagen: {connection_msg}") return { 'total': len(articles), 'successful': 0, 'failed': len(articles), 'duplicates': 0, 'error': connection_msg, 'details': [] } # Artikel hochladen return uploader.upload_multiple_articles(articles) def upload_single_article_to_wordpress(article: Dict) -> Tuple[bool, str, Optional[int]]: """ Convenience-Funktion für den Upload eines einzelnen Artikels """ uploader = WordPressUploader() # Verbindung testen connection_ok, connection_msg = uploader.test_connection() if not connection_ok: return False, connection_msg, None # Artikel hochladen return uploader.upload_article(article)