from flask import Flask, request, jsonify import requests import yaml app = Flask(__name__) with open("config.yaml", "r") as f: config = yaml.safe_load(f) SERVER_TOKEN = config["server"]["bearer_token"] APNS2_CONFIG = config["apns2"] FIREBASE_CONFIG = config["firebase"] PREFIX_MAP = { "warning": "⚠️ ", "urgent": "❗️ ", "danger": "‼️ ", } def check_auth(req): auth = req.headers.get("Authorization", "") if not auth.startswith("Bearer "): return False token = auth.replace("Bearer ", "", 1) return token == SERVER_TOKEN def build_firebase_title(notification_type, title): prefix = PREFIX_MAP.get(notification_type, "") return f"{prefix}{title}" def send_to_apns2(notification_type, title, message): payload = { "severity": notification_type, "notification": f"{title}\n{message}" } headers = { "Authorization": f"Bearer {APNS2_CONFIG['bearer_token']}", "Content-Type": "application/json" } response = requests.post( APNS2_CONFIG["url"], json=payload, headers=headers, timeout=10 ) return { "status_code": response.status_code, "body": response.text } def send_to_firebase(notification_type, title, message): firebase_title = build_firebase_title(notification_type, title) payload = { "type": "info", "title": firebase_title, "message": message } headers = { "Authorization": f"Bearer {FIREBASE_CONFIG['bearer_token']}", "Content-Type": "application/json" } response = requests.post( FIREBASE_CONFIG["url"], json=payload, headers=headers, timeout=10 ) return { "status_code": response.status_code, "body": response.text } @app.route("/notify", methods=["POST"]) def notify(): if not check_auth(request): return jsonify({ "error": "Unauthorized" }), 401 data = request.get_json(silent=True) if not data: return jsonify({ "error": "Invalid JSON" }), 400 required_fields = ["type", "title", "message"] for field in required_fields: if field not in data: return jsonify({ "error": f"Missing field: {field}" }), 400 notification_type = data["type"] allowed_types = ["info", "warning", "urgent", "danger"] if notification_type not in allowed_types: return jsonify({ "error": "Invalid type" }), 400 title = data["title"] message = data["message"] results = {} try: apns_result = send_to_apns2( notification_type, title, message ) results["apns2"] = apns_result except Exception as e: results["apns2"] = { "error": str(e) } try: firebase_result = send_to_firebase( notification_type, title, message ) results["firebase"] = firebase_result except Exception as e: results["firebase"] = { "error": str(e) } return jsonify({ "success": True, "results": results }) if __name__ == "__main__": app.run( host="0.0.0.0", port=5501, debug=False )