#!/usr/bin/env python3 import logging import signal import sys from typing import List, Dict, Any from concurrent.futures import ThreadPoolExecutor, as_completed import requests import yaml from flask import Flask, request, Response app = Flask(__name__) CONFIG_FILE = "config.yaml" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("pager-api") def load_config() -> Dict[str, Any]: with open(CONFIG_FILE, "r") as f: return yaml.safe_load(f) config = load_config() PARALLEL_TARGETS = config.get("parallelism", {}).get("targets", 10) PARALLEL_PAGERS = config.get("parallelism", {}).get("all_pagers", 10) def get_all_valid_ids() -> List[str]: ids = [] for group in config.get("pager_ranges", []): for i in range(int(group["start"]), int(group["end"]) + 1): ids.append(str(i)) return ids def pager_exists(pager_id: str) -> bool: return pager_id in get_all_valid_ids() def validate_token(req) -> bool: auth_header = req.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return False token = auth_header.replace("Bearer ", "", 1).strip() return token == config.get("bearer_token") def call_target(target: str, path: str) -> bool: url = target.rstrip("/") + path try: logger.info(f"CALL TARGET: {url}") response = requests.get(url, timeout=10) logger.info( f"TARGET RESPONSE: {url} -> " f"{response.status_code} {response.text}" ) return response.status_code < 400 except Exception as e: logger.exception(f"TARGET ERROR: {url} -> {e}") return False def notify_targets(path: str) -> bool: targets = config.get("targets", []) success = True with ThreadPoolExecutor(max_workers=PARALLEL_TARGETS) as executor: futures = [ executor.submit(call_target, target, path) for target in targets ] for f in as_completed(futures): if not f.result(): success = False return success def notify_all_pagers_parallel() -> bool: pager_ids = get_all_valid_ids() success = True with ThreadPoolExecutor(max_workers=PARALLEL_PAGERS) as executor: futures = [ executor.submit(notify_targets, f"/call/P{pid}") for pid in pager_ids ] for f in as_completed(futures): if not f.result(): success = False return success @app.route("/api/page", methods=["POST"]) def api_page(): logger.info(f"REQUEST FROM {request.remote_addr}") if not validate_token(request): return Response("ERROR", status=401, mimetype="text/plain") body = request.get_data(as_text=True).strip() if not body: body = request.form.get("id", "").strip() logger.info(f"INPUT: {body}") if not body: return Response("ERROR", status=400, mimetype="text/plain") try: if body.upper() == "ALL": success = notify_all_pagers_parallel() return Response( "PAGERNOTIFIED" if success else "ERROR", status=200 if success else 500, mimetype="text/plain" ) if not pager_exists(body): logger.warning(f"PAGER NOT FOUND: {body}") return Response("PAGERNOTFOUND", status=404, mimetype="text/plain") success = notify_targets(f"/call/P{body}") return Response( "PAGERNOTIFIED" if success else "ERROR", status=200 if success else 500, mimetype="text/plain" ) except Exception as e: logger.exception(f"GENERAL ERROR: {e}") return Response("ERROR", status=500, mimetype="text/plain") @app.route("/api/shutdown", methods=["POST"]) def api_shutdown(): logger.info(f"SHUTDOWN REQUEST FROM {request.remote_addr}") if not validate_token(request): return Response("ERROR", status=401, mimetype="text/plain") try: success = notify_targets("/call/P999") return Response( "PAGERNOTIFIED" if success else "ERROR", status=200 if success else 500, mimetype="text/plain" ) except Exception as e: logger.exception(f"SHUTDOWN ERROR: {e}") return Response("ERROR", status=500, mimetype="text/plain") def signal_handler(sig, frame): logger.info("SIGTERM/SIGINT received") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if __name__ == "__main__": logger.info("Starting pager API") app.run( host="0.0.0.0", port=5500, threaded=True )