import time import jwt import aiohttp import asyncio import configparser from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy config = configparser.ConfigParser() config.read("config.ini") REGISTER_AUTH = config.get("AUTH", "REGISTER_AUTH", fallback=None) MANAGE_AUTH = config.get("AUTH", "MANAGE_AUTH", fallback=None) SQLALCHEMY_DATABASE_URI = config.get( "DATABASE", "SQLALCHEMY_DATABASE_URI", fallback="sqlite:///device_tokens.db" ) auth_key_path = config.get( "APNS", "auth_key_path", fallback="./APNSAuthKey.p8" ) auth_key_id = config.get("APNS", "auth_key_id", fallback=None) team_id = config.get("APNS", "team_id", fallback=None) topic = config.get("APNS", "topic", fallback=None) APNS_URL = "https://api.push.apple.com" if not all([ REGISTER_AUTH, MANAGE_AUTH, auth_key_id, team_id, topic ]): raise ValueError("Fehlende Pflichtfelder in der Konfiguration!") app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = SQLALCHEMY_DATABASE_URI app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) class DeviceToken(db.Model): id = db.Column(db.Integer, primary_key=True) device_token = db.Column( db.String(256), nullable=False, unique=True ) def authenticate(req, required_token): auth_header = req.headers.get("Authorization") return auth_header == f"Bearer {required_token}" with open(auth_key_path, "r") as f: APNS_PRIVATE_KEY = f.read() def generate_apns_token(): return jwt.encode( { "iss": team_id, "iat": int(time.time()) }, APNS_PRIVATE_KEY, algorithm="ES256", headers={ "alg": "ES256", "kid": auth_key_id } ) async def send_notification( session, device_token, title, body ): try: jwt_token = generate_apns_token() payload = { "aps": { "alert": { "title": title, "body": body }, "sound": "default", "badge": 1 } } headers = { "authorization": f"bearer {jwt_token}", "apns-topic": topic, "apns-push-type": "alert", "content-type": "application/json" } url = f"{APNS_URL}/3/device/{device_token}" async with session.post( url, json=payload, headers=headers ) as response: if response.status != 200: error_text = await response.text() print( f"APNS Fehler {response.status} " f"für {device_token}: {error_text}" ) except Exception as e: print(f"Fehler beim Senden: {e}") @app.route("/api/registerDeviceToken", methods=["POST"]) def register_device_token(): if not authenticate(request, REGISTER_AUTH): return jsonify({ "error": "Unauthorized" }), 401 try: data = request.get_json() if not data or "device_token" not in data: return jsonify({ "error": "device_token fehlt" }), 400 device_token = data["device_token"] existing = DeviceToken.query.filter_by( device_token=device_token ).first() if existing: return jsonify({ "message": "Bereits registriert" }), 200 db.session.add( DeviceToken(device_token=device_token) ) db.session.commit() return jsonify({ "message": "Device Token registriert" }), 200 except Exception as e: db.session.rollback() return jsonify({ "error": "Interner Serverfehler", "details": str(e) }), 500 @app.route("/api/notify", methods=["POST"]) def notify(): if not authenticate(request, MANAGE_AUTH): return jsonify({ "error": "Unauthorized" }), 401 try: data = request.get_json() severity = data.get("severity") notification_text = data.get("notification") if not severity or not notification_text: return jsonify({ "error": ( "severity und notification " "sind erforderlich" ) }), 400 severity_icons = { "info": "", "warning": "⚠️ ", "urgent": "❗️ ", "danger": "❌ " } if severity not in severity_icons: return jsonify({ "error": "Ungültige severity" }), 400 title = ( f"{severity_icons[severity]}" f"{notification_text}" ) async def send_all(): async with aiohttp.ClientSession() as session: tasks = [] for token in DeviceToken.query.all(): tasks.append( send_notification( session=session, device_token=token.device_token, title=title, body=notification_text ) ) await asyncio.gather(*tasks) asyncio.run(send_all()) return jsonify({ "message": "Benachrichtigungen gesendet" }), 200 except Exception as e: return jsonify({ "error": "Interner Serverfehler", "details": str(e) }), 500 @app.route("/showdevicetokens", methods=["GET"]) def show_device_tokens(): tokens = DeviceToken.query.all() return jsonify({ "device_tokens": [ token.device_token for token in tokens ] }) @app.route( "/api/deleteAllDeviceTokens", methods=["POST"] ) def delete_all_device_tokens(): if not authenticate(request, MANAGE_AUTH): return jsonify({ "error": "Unauthorized" }), 401 try: db.session.query(DeviceToken).delete() db.session.commit() return jsonify({ "message": "Alle Device Tokens gelöscht" }), 200 except Exception as e: db.session.rollback() return jsonify({ "error": "Interner Serverfehler", "details": str(e) }), 500 if __name__ == "__main__": with app.app_context(): db.create_all() app.run( host="0.0.0.0", port=3000 )