diff --git a/backend/app/main.py b/backend/app/main.py index 264a2be..b4776af 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,8 +1,10 @@ +import asyncio from contextlib import asynccontextmanager import csv from datetime import datetime, timezone import io import json +import logging from pathlib import Path from fastapi import Depends, FastAPI, HTTPException, Request, Response, status @@ -637,20 +639,26 @@ def _require_api_key(request: Request) -> None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Ungültiger API-Key") +_pipeline_lock = asyncio.Lock() + + @app.post("/api/n8n/pipeline") async def api_n8n_pipeline(request: Request) -> dict: """Trigger the full auto pipeline in background. Returns immediately. Called by N8N (2x/day or on demand). Results arrive via Telegram.""" _require_api_key(request) - import asyncio - import logging + + if _pipeline_lock.locked(): + logging.getLogger(__name__).warning("Pipeline bereits aktiv – Trigger ignoriert") + return {"ok": False, "message": "Pipeline läuft bereits – Trigger ignoriert"} async def _run(): - loop = asyncio.get_event_loop() - try: - await loop.run_in_executor(None, lambda: run_auto_pipeline(trigger="n8n")) - except Exception as exc: - logging.getLogger(__name__).error("Background pipeline error: %s", exc) + async with _pipeline_lock: + loop = asyncio.get_event_loop() + try: + await loop.run_in_executor(None, lambda: run_auto_pipeline(trigger="n8n")) + except Exception as exc: + logging.getLogger(__name__).error("Background pipeline error: %s", exc) asyncio.create_task(_run()) return {"ok": True, "message": "Pipeline gestartet – Ergebnisse kommen per Telegram"} diff --git a/backend/app/scheduler.py b/backend/app/scheduler.py index 8f8d498..d5ea5bf 100644 --- a/backend/app/scheduler.py +++ b/backend/app/scheduler.py @@ -11,6 +11,7 @@ from __future__ import annotations import base64 import json +import threading import urllib.request from datetime import date, datetime, timedelta, timezone from typing import Any @@ -18,6 +19,9 @@ from typing import Any from .config import get_settings from .db import get_conn +# Ensures that concurrent pipeline runs (two threads) never assign the same slot. +_slot_lock = threading.Lock() + # CET offset (UTC+1 winter / UTC+2 summer – fixed +1 for simplicity) _CET_OFFSET = timedelta(hours=1) @@ -256,34 +260,77 @@ def reserve_publish_slot(article_id: int) -> str: If the article already has a scheduled_publish_at, keep it unchanged. Returns the formatted publish datetime string. + + Uses a module-level lock so that concurrent pipeline runs (two threads) + cannot read the same "free" slot and assign it twice. """ - # Check if already has a slot - with get_conn() as conn: - row = conn.execute( - "SELECT scheduled_publish_at FROM articles WHERE id = ?", - (article_id,), - ).fetchone() - existing_slot = row["scheduled_publish_at"] if row else None - if existing_slot: - try: - dt = datetime.fromisoformat(existing_slot) - return _format_slot(dt.date(), dt.hour) - except Exception: - pass # invalid slot, re-assign below - + # Fetch WP-occupied slots BEFORE acquiring the lock — the API call can be slow + # and must not block other threads unnecessarily. wp_occupied = _fetch_wp_occupied_slots() - result = _find_next_free_slot(wp_occupied, lookahead_days=30) - if result: - candidate, hour = result - else: - candidate = _today_cet() + timedelta(days=1) - hours = _preferred_hours() - hour = hours[0] if hours else 9 - iso_ts = f"{candidate.isoformat()}T{hour:02d}:00:00" - with get_conn() as conn: - conn.execute( - "UPDATE articles SET scheduled_publish_at = ? WHERE id = ?", - (iso_ts, article_id), - ) - return _format_slot(candidate, hour) + with _slot_lock: + # Single DB connection for the entire read-find-write cycle so the + # slot we pick is still free when we write it. + with get_conn() as conn: + row = conn.execute( + "SELECT scheduled_publish_at FROM articles WHERE id = ?", + (article_id,), + ).fetchone() + existing_slot = row["scheduled_publish_at"] if row else None + if existing_slot: + try: + dt = datetime.fromisoformat(existing_slot) + return _format_slot(dt.date(), dt.hour) + except Exception: + pass # invalid — fall through and assign a fresh slot + + # Find the next free (date, hour) slot using THIS connection so we + # see all slots written during this lock window. + hours = _preferred_hours() + today = _today_cet() + tomorrow = today + timedelta(days=1) + candidate: date | None = None + chosen_hour: int | None = None + + for offset in range(0, 61): + d = tomorrow + timedelta(days=offset) + date_str = d.isoformat() + + rows = conn.execute( + """ + SELECT scheduled_publish_at FROM articles + WHERE scheduled_publish_at >= ? AND scheduled_publish_at < ? + AND status NOT IN ('error', 'no_image') + """, + (date_str + "T00:00:00", date_str + "T23:59:59"), + ).fetchall() + + used_hours: set[int] = set() + for r in rows: + ts = r["scheduled_publish_at"] or "" + try: + used_hours.add(datetime.fromisoformat(ts).hour) + except Exception: + pass + for d_str, h in wp_occupied: + if d_str == date_str: + used_hours.add(h) + + for h in hours: + if h not in used_hours: + candidate = d + chosen_hour = h + break + if candidate is not None: + break + + if candidate is None: + candidate = tomorrow + chosen_hour = hours[0] if hours else 9 + + iso_ts = f"{candidate.isoformat()}T{chosen_hour:02d}:00:00" + conn.execute( + "UPDATE articles SET scheduled_publish_at = ? WHERE id = ?", + (iso_ts, article_id), + ) + return _format_slot(candidate, chosen_hour)