fix(scheduler): prevent duplicate slot assignment from concurrent pipeline runs
Two bugs caused multiple articles to land on the same publish slot: 1. main.py: asyncio.create_task() returned immediately, allowing a second pipeline trigger (N8N + Telegram /run or two N8N calls) to start a second concurrent run. Added asyncio.Lock (_pipeline_lock) so any second trigger while the pipeline is running is rejected immediately. 2. scheduler.py: reserve_publish_slot() read the list of occupied slots and wrote the new slot in two separate DB connections. Concurrent threads could both see the same "free" slot before either committed its write. Fixed by wrapping the entire read-find-write cycle in a threading.Lock (_slot_lock) and a single DB connection, so the slot check and the slot assignment are atomic. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
2456e4aca7
commit
f710141828
2 changed files with 90 additions and 35 deletions
|
|
@ -1,8 +1,10 @@
|
||||||
|
import asyncio
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
import csv
|
import csv
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
import io
|
import io
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
|
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
|
||||||
|
|
@ -637,20 +639,26 @@ def _require_api_key(request: Request) -> None:
|
||||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Ungültiger API-Key")
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Ungültiger API-Key")
|
||||||
|
|
||||||
|
|
||||||
|
_pipeline_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/n8n/pipeline")
|
@app.post("/api/n8n/pipeline")
|
||||||
async def api_n8n_pipeline(request: Request) -> dict:
|
async def api_n8n_pipeline(request: Request) -> dict:
|
||||||
"""Trigger the full auto pipeline in background. Returns immediately.
|
"""Trigger the full auto pipeline in background. Returns immediately.
|
||||||
Called by N8N (2x/day or on demand). Results arrive via Telegram."""
|
Called by N8N (2x/day or on demand). Results arrive via Telegram."""
|
||||||
_require_api_key(request)
|
_require_api_key(request)
|
||||||
import asyncio
|
|
||||||
import logging
|
if _pipeline_lock.locked():
|
||||||
|
logging.getLogger(__name__).warning("Pipeline bereits aktiv – Trigger ignoriert")
|
||||||
|
return {"ok": False, "message": "Pipeline läuft bereits – Trigger ignoriert"}
|
||||||
|
|
||||||
async def _run():
|
async def _run():
|
||||||
loop = asyncio.get_event_loop()
|
async with _pipeline_lock:
|
||||||
try:
|
loop = asyncio.get_event_loop()
|
||||||
await loop.run_in_executor(None, lambda: run_auto_pipeline(trigger="n8n"))
|
try:
|
||||||
except Exception as exc:
|
await loop.run_in_executor(None, lambda: run_auto_pipeline(trigger="n8n"))
|
||||||
logging.getLogger(__name__).error("Background pipeline error: %s", exc)
|
except Exception as exc:
|
||||||
|
logging.getLogger(__name__).error("Background pipeline error: %s", exc)
|
||||||
|
|
||||||
asyncio.create_task(_run())
|
asyncio.create_task(_run())
|
||||||
return {"ok": True, "message": "Pipeline gestartet – Ergebnisse kommen per Telegram"}
|
return {"ok": True, "message": "Pipeline gestartet – Ergebnisse kommen per Telegram"}
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ from __future__ import annotations
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import json
|
import json
|
||||||
|
import threading
|
||||||
import urllib.request
|
import urllib.request
|
||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
@ -18,6 +19,9 @@ from typing import Any
|
||||||
from .config import get_settings
|
from .config import get_settings
|
||||||
from .db import get_conn
|
from .db import get_conn
|
||||||
|
|
||||||
|
# Ensures that concurrent pipeline runs (two threads) never assign the same slot.
|
||||||
|
_slot_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
# CET offset (UTC+1 winter / UTC+2 summer – fixed +1 for simplicity)
|
# CET offset (UTC+1 winter / UTC+2 summer – fixed +1 for simplicity)
|
||||||
_CET_OFFSET = timedelta(hours=1)
|
_CET_OFFSET = timedelta(hours=1)
|
||||||
|
|
@ -256,34 +260,77 @@ def reserve_publish_slot(article_id: int) -> str:
|
||||||
|
|
||||||
If the article already has a scheduled_publish_at, keep it unchanged.
|
If the article already has a scheduled_publish_at, keep it unchanged.
|
||||||
Returns the formatted publish datetime string.
|
Returns the formatted publish datetime string.
|
||||||
|
|
||||||
|
Uses a module-level lock so that concurrent pipeline runs (two threads)
|
||||||
|
cannot read the same "free" slot and assign it twice.
|
||||||
"""
|
"""
|
||||||
# Check if already has a slot
|
# Fetch WP-occupied slots BEFORE acquiring the lock — the API call can be slow
|
||||||
with get_conn() as conn:
|
# and must not block other threads unnecessarily.
|
||||||
row = conn.execute(
|
|
||||||
"SELECT scheduled_publish_at FROM articles WHERE id = ?",
|
|
||||||
(article_id,),
|
|
||||||
).fetchone()
|
|
||||||
existing_slot = row["scheduled_publish_at"] if row else None
|
|
||||||
if existing_slot:
|
|
||||||
try:
|
|
||||||
dt = datetime.fromisoformat(existing_slot)
|
|
||||||
return _format_slot(dt.date(), dt.hour)
|
|
||||||
except Exception:
|
|
||||||
pass # invalid slot, re-assign below
|
|
||||||
|
|
||||||
wp_occupied = _fetch_wp_occupied_slots()
|
wp_occupied = _fetch_wp_occupied_slots()
|
||||||
result = _find_next_free_slot(wp_occupied, lookahead_days=30)
|
|
||||||
if result:
|
|
||||||
candidate, hour = result
|
|
||||||
else:
|
|
||||||
candidate = _today_cet() + timedelta(days=1)
|
|
||||||
hours = _preferred_hours()
|
|
||||||
hour = hours[0] if hours else 9
|
|
||||||
|
|
||||||
iso_ts = f"{candidate.isoformat()}T{hour:02d}:00:00"
|
with _slot_lock:
|
||||||
with get_conn() as conn:
|
# Single DB connection for the entire read-find-write cycle so the
|
||||||
conn.execute(
|
# slot we pick is still free when we write it.
|
||||||
"UPDATE articles SET scheduled_publish_at = ? WHERE id = ?",
|
with get_conn() as conn:
|
||||||
(iso_ts, article_id),
|
row = conn.execute(
|
||||||
)
|
"SELECT scheduled_publish_at FROM articles WHERE id = ?",
|
||||||
return _format_slot(candidate, hour)
|
(article_id,),
|
||||||
|
).fetchone()
|
||||||
|
existing_slot = row["scheduled_publish_at"] if row else None
|
||||||
|
if existing_slot:
|
||||||
|
try:
|
||||||
|
dt = datetime.fromisoformat(existing_slot)
|
||||||
|
return _format_slot(dt.date(), dt.hour)
|
||||||
|
except Exception:
|
||||||
|
pass # invalid — fall through and assign a fresh slot
|
||||||
|
|
||||||
|
# Find the next free (date, hour) slot using THIS connection so we
|
||||||
|
# see all slots written during this lock window.
|
||||||
|
hours = _preferred_hours()
|
||||||
|
today = _today_cet()
|
||||||
|
tomorrow = today + timedelta(days=1)
|
||||||
|
candidate: date | None = None
|
||||||
|
chosen_hour: int | None = None
|
||||||
|
|
||||||
|
for offset in range(0, 61):
|
||||||
|
d = tomorrow + timedelta(days=offset)
|
||||||
|
date_str = d.isoformat()
|
||||||
|
|
||||||
|
rows = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT scheduled_publish_at FROM articles
|
||||||
|
WHERE scheduled_publish_at >= ? AND scheduled_publish_at < ?
|
||||||
|
AND status NOT IN ('error', 'no_image')
|
||||||
|
""",
|
||||||
|
(date_str + "T00:00:00", date_str + "T23:59:59"),
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
used_hours: set[int] = set()
|
||||||
|
for r in rows:
|
||||||
|
ts = r["scheduled_publish_at"] or ""
|
||||||
|
try:
|
||||||
|
used_hours.add(datetime.fromisoformat(ts).hour)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
for d_str, h in wp_occupied:
|
||||||
|
if d_str == date_str:
|
||||||
|
used_hours.add(h)
|
||||||
|
|
||||||
|
for h in hours:
|
||||||
|
if h not in used_hours:
|
||||||
|
candidate = d
|
||||||
|
chosen_hour = h
|
||||||
|
break
|
||||||
|
if candidate is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
if candidate is None:
|
||||||
|
candidate = tomorrow
|
||||||
|
chosen_hour = hours[0] if hours else 9
|
||||||
|
|
||||||
|
iso_ts = f"{candidate.isoformat()}T{chosen_hour:02d}:00:00"
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE articles SET scheduled_publish_at = ? WHERE id = ?",
|
||||||
|
(iso_ts, article_id),
|
||||||
|
)
|
||||||
|
return _format_slot(candidate, chosen_hour)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue