feat: add OSM map selection and geocode search
This commit is contained in:
parent
a0b1c76d14
commit
15bbb677d8
7 changed files with 954 additions and 3 deletions
|
|
@ -6,7 +6,8 @@ import os
|
|||
import uuid
|
||||
from http import HTTPStatus
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from urllib.parse import parse_qs, urlencode, urlparse
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from db import get_conn, init_db
|
||||
from score_engine import (
|
||||
|
|
@ -29,6 +30,12 @@ PORT = 8787
|
|||
DEFAULT_SALT = "change-me-in-production"
|
||||
SERVER_SALT = os.environ.get("STAYSENSE_SERVER_SALT", DEFAULT_SALT)
|
||||
SIGNAL_COOLDOWN_HOURS = int(os.environ.get("STAYSENSE_SIGNAL_COOLDOWN_HOURS", "24"))
|
||||
NOMINATIM_BASE_URL = os.environ.get("STAYSENSE_NOMINATIM_BASE_URL", "https://nominatim.openstreetmap.org/search")
|
||||
NOMINATIM_USER_AGENT = os.environ.get(
|
||||
"STAYSENSE_NOMINATIM_USER_AGENT",
|
||||
"StaySense/0.1 (staysense.vanityontour.de)",
|
||||
)
|
||||
OSM_TILE_BASE_URL = os.environ.get("STAYSENSE_OSM_TILE_BASE_URL", "https://tile.openstreetmap.org")
|
||||
|
||||
FALLBACK_POLICE_POINTS = [(51.2507, 6.9751), (51.2965, 6.8494), (51.3398, 7.0438)]
|
||||
FALLBACK_FIRE_POINTS = [(51.2518, 6.9800), (51.2937, 6.8568), (51.3314, 7.0540)]
|
||||
|
|
@ -62,6 +69,17 @@ def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict) -
|
|||
handler.wfile.write(raw)
|
||||
|
||||
|
||||
def binary_response(
|
||||
handler: BaseHTTPRequestHandler, status: int, payload: bytes, content_type: str, cache_control: str = "public, max-age=3600"
|
||||
) -> None:
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", content_type)
|
||||
handler.send_header("Content-Length", str(len(payload)))
|
||||
handler.send_header("Cache-Control", cache_control)
|
||||
handler.end_headers()
|
||||
handler.wfile.write(payload)
|
||||
|
||||
|
||||
def read_json(handler: BaseHTTPRequestHandler) -> dict:
|
||||
length = int(handler.headers.get("Content-Length", "0"))
|
||||
if length <= 0:
|
||||
|
|
@ -477,6 +495,110 @@ def handle_signal(handler: BaseHTTPRequestHandler) -> None:
|
|||
)
|
||||
|
||||
|
||||
def handle_geocode_search(handler: BaseHTTPRequestHandler, query: dict[str, list[str]]) -> None:
|
||||
raw_q = (query.get("q") or [""])[0].strip()
|
||||
if len(raw_q) < 2:
|
||||
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "query_too_short"})
|
||||
return
|
||||
if len(raw_q) > 160:
|
||||
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "query_too_long"})
|
||||
return
|
||||
|
||||
params = urlencode(
|
||||
{
|
||||
"q": raw_q,
|
||||
"format": "jsonv2",
|
||||
"limit": "5",
|
||||
"addressdetails": "0",
|
||||
"countrycodes": "de",
|
||||
}
|
||||
)
|
||||
request = Request(
|
||||
f"{NOMINATIM_BASE_URL}?{params}",
|
||||
headers={
|
||||
"User-Agent": NOMINATIM_USER_AGENT,
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
with urlopen(request, timeout=5) as response:
|
||||
if response.status != 200:
|
||||
raise RuntimeError("nominatim_status")
|
||||
raw = response.read().decode("utf-8")
|
||||
data = json.loads(raw)
|
||||
except Exception:
|
||||
json_response(handler, HTTPStatus.BAD_GATEWAY, {"error": "geocoder_unavailable"})
|
||||
return
|
||||
|
||||
results = []
|
||||
for item in data[:5]:
|
||||
try:
|
||||
lat = float(item.get("lat"))
|
||||
lon = float(item.get("lon"))
|
||||
except Exception:
|
||||
continue
|
||||
if not (47.0 <= lat <= 55.5 and 5.0 <= lon <= 16.0):
|
||||
continue
|
||||
results.append(
|
||||
{
|
||||
"display_name": str(item.get("display_name", "Unbekannter Treffer")),
|
||||
"lat": round(lat, 6),
|
||||
"lon": round(lon, 6),
|
||||
}
|
||||
)
|
||||
|
||||
json_response(handler, HTTPStatus.OK, {"results": results})
|
||||
|
||||
|
||||
def handle_tile_proxy(handler: BaseHTTPRequestHandler, path: str) -> None:
|
||||
parts = path.strip("/").split("/")
|
||||
if len(parts) != 5 or parts[0] != "map" or parts[1] != "tile":
|
||||
json_response(handler, HTTPStatus.NOT_FOUND, {"error": "not_found"})
|
||||
return
|
||||
|
||||
z_raw, x_raw, y_raw = parts[2], parts[3], parts[4]
|
||||
if not y_raw.endswith(".png"):
|
||||
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_tile_path"})
|
||||
return
|
||||
|
||||
try:
|
||||
z = int(z_raw)
|
||||
x = int(x_raw)
|
||||
y = int(y_raw[:-4])
|
||||
except Exception:
|
||||
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_tile_path"})
|
||||
return
|
||||
|
||||
if z < 0 or z > 19:
|
||||
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_zoom"})
|
||||
return
|
||||
|
||||
max_tile = 2**z - 1
|
||||
if x < 0 or x > max_tile or y < 0 or y > max_tile:
|
||||
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "tile_out_of_bounds"})
|
||||
return
|
||||
|
||||
request = Request(
|
||||
f"{OSM_TILE_BASE_URL}/{z}/{x}/{y}.png",
|
||||
headers={
|
||||
"User-Agent": NOMINATIM_USER_AGENT,
|
||||
"Accept": "image/png",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
with urlopen(request, timeout=5) as response:
|
||||
if response.status != 200:
|
||||
raise RuntimeError("tile_status")
|
||||
payload = response.read()
|
||||
except Exception:
|
||||
json_response(handler, HTTPStatus.BAD_GATEWAY, {"error": "tile_unavailable"})
|
||||
return
|
||||
|
||||
binary_response(handler, HTTPStatus.OK, payload, "image/png", cache_control="public, max-age=43200")
|
||||
|
||||
|
||||
class StaySenseHandler(BaseHTTPRequestHandler):
|
||||
def log_message(self, format: str, *args) -> None:
|
||||
return
|
||||
|
|
@ -491,6 +613,13 @@ class StaySenseHandler(BaseHTTPRequestHandler):
|
|||
query = parse_qs(parsed.query)
|
||||
handle_score(self, query)
|
||||
return
|
||||
if parsed.path == "/geocode/search":
|
||||
query = parse_qs(parsed.query)
|
||||
handle_geocode_search(self, query)
|
||||
return
|
||||
if parsed.path.startswith("/map/tile/"):
|
||||
handle_tile_proxy(self, parsed.path)
|
||||
return
|
||||
json_response(self, HTTPStatus.NOT_FOUND, {"error": "not_found"})
|
||||
|
||||
def do_POST(self) -> None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue