Files
Thies Mueller 7be4036bed initial commit
2026-06-11 17:47:58 +02:00

196 lines
4.7 KiB
Python

#!/usr/bin/env python3
import logging
import signal
import sys
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import yaml
from flask import Flask, request, Response
app = Flask(__name__)
CONFIG_FILE = "config.yaml"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("pager-api")
def load_config() -> Dict[str, Any]:
with open(CONFIG_FILE, "r") as f:
return yaml.safe_load(f)
config = load_config()
PARALLEL_TARGETS = config.get("parallelism", {}).get("targets", 10)
PARALLEL_PAGERS = config.get("parallelism", {}).get("all_pagers", 10)
def get_all_valid_ids() -> List[str]:
ids = []
for group in config.get("pager_ranges", []):
for i in range(int(group["start"]), int(group["end"]) + 1):
ids.append(str(i))
return ids
def pager_exists(pager_id: str) -> bool:
return pager_id in get_all_valid_ids()
def validate_token(req) -> bool:
auth_header = req.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return False
token = auth_header.replace("Bearer ", "", 1).strip()
return token == config.get("bearer_token")
def call_target(target: str, path: str) -> bool:
url = target.rstrip("/") + path
try:
logger.info(f"CALL TARGET: {url}")
response = requests.get(url, timeout=10)
logger.info(
f"TARGET RESPONSE: {url} -> "
f"{response.status_code} {response.text}"
)
return response.status_code < 400
except Exception as e:
logger.exception(f"TARGET ERROR: {url} -> {e}")
return False
def notify_targets(path: str) -> bool:
targets = config.get("targets", [])
success = True
with ThreadPoolExecutor(max_workers=PARALLEL_TARGETS) as executor:
futures = [
executor.submit(call_target, target, path)
for target in targets
]
for f in as_completed(futures):
if not f.result():
success = False
return success
def notify_all_pagers_parallel() -> bool:
pager_ids = get_all_valid_ids()
success = True
with ThreadPoolExecutor(max_workers=PARALLEL_PAGERS) as executor:
futures = [
executor.submit(notify_targets, f"/call/P{pid}")
for pid in pager_ids
]
for f in as_completed(futures):
if not f.result():
success = False
return success
@app.route("/api/page", methods=["POST"])
def api_page():
logger.info(f"REQUEST FROM {request.remote_addr}")
if not validate_token(request):
return Response("ERROR", status=401, mimetype="text/plain")
body = request.get_data(as_text=True).strip()
if not body:
body = request.form.get("id", "").strip()
logger.info(f"INPUT: {body}")
if not body:
return Response("ERROR", status=400, mimetype="text/plain")
try:
if body.upper() == "ALL":
success = notify_all_pagers_parallel()
return Response(
"PAGERNOTIFIED" if success else "ERROR",
status=200 if success else 500,
mimetype="text/plain"
)
if not pager_exists(body):
logger.warning(f"PAGER NOT FOUND: {body}")
return Response("PAGERNOTFOUND", status=404, mimetype="text/plain")
success = notify_targets(f"/call/P{body}")
return Response(
"PAGERNOTIFIED" if success else "ERROR",
status=200 if success else 500,
mimetype="text/plain"
)
except Exception as e:
logger.exception(f"GENERAL ERROR: {e}")
return Response("ERROR", status=500, mimetype="text/plain")
@app.route("/api/shutdown", methods=["POST"])
def api_shutdown():
logger.info(f"SHUTDOWN REQUEST FROM {request.remote_addr}")
if not validate_token(request):
return Response("ERROR", status=401, mimetype="text/plain")
try:
success = notify_targets("/call/P999")
return Response(
"PAGERNOTIFIED" if success else "ERROR",
status=200 if success else 500,
mimetype="text/plain"
)
except Exception as e:
logger.exception(f"SHUTDOWN ERROR: {e}")
return Response("ERROR", status=500, mimetype="text/plain")
def signal_handler(sig, frame):
logger.info("SIGTERM/SIGINT received")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
logger.info("Starting pager API")
app.run(
host="0.0.0.0",
port=5500,
threaded=True
)