import time import jwt import json import httpx import configparser from flask import Flask, request, jsonify, abort from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import declarative_base, sessionmaker config = configparser.ConfigParser() config.read("config.ini") REGISTER_AUTH = config["AUTH"]["REGISTER_AUTH"] MANAGE_AUTH = config["AUTH"]["MANAGE_AUTH"] DATABASE_URI = config["DATABASE"]["SQLALCHEMY_DATABASE_URI"] APNS_KEY_PATH = config["APNS"]["auth_key_path"] APNS_KEY_ID = config["APNS"]["auth_key_id"] APNS_TEAM_ID = config["APNS"]["team_id"] APNS_TOPIC = config["APNS"]["topic"] APNS_URL = "https://api.push.apple.com/3/device/" Base = declarative_base() engine = create_engine(DATABASE_URI, echo=False) SessionLocal = sessionmaker(bind=engine) class DeviceToken(Base): __tablename__ = "device_tokens" id = Column(Integer, primary_key=True) token = Column(String, unique=True, nullable=False) Base.metadata.create_all(engine) app = Flask(__name__) def get_auth_header(): auth = request.headers.get("Authorization", "") if not auth.startswith("Bearer "): return None return auth.replace("Bearer ", "", 1) def require_register_auth(): if get_auth_header() != REGISTER_AUTH: abort(401) def require_manage_auth(): if get_auth_header() != MANAGE_AUTH: abort(401) def load_apns_key(): with open(APNS_KEY_PATH, "r") as f: return f.read() def create_apns_jwt(): headers = { "alg": "ES256", "kid": APNS_KEY_ID } payload = { "iss": APNS_TEAM_ID, "iat": int(time.time()) } token = jwt.encode( payload, load_apns_key(), algorithm="ES256", headers=headers ) return token def send_apns_notification(device_token: str, payload: dict): jwt_token = create_apns_jwt() headers = { "authorization": f"bearer {jwt_token}", "apns-topic": APNS_TOPIC, "apns-push-type": "alert" } url = APNS_URL + device_token with httpx.Client(http2=True) as client: response = client.post(url, headers=headers, json=payload) return response.status_code, response.text @app.route("/api/registerDeviceToken", methods=["POST"]) def register_device(): require_register_auth() data = request.get_json() if not data or "device_token" not in data: return jsonify({"error": "device_token required"}), 400 token = data["device_token"] session = SessionLocal() exists = session.query(DeviceToken).filter_by(token=token).first() if not exists: session.add(DeviceToken(token=token)) session.commit() session.close() return jsonify({"status": "registered"}) @app.route("/api/notify", methods=["POST"]) def notify(): require_manage_auth() data = request.get_json() if not data: return jsonify({"error": "invalid json"}), 400 severity = data.get("severity") message = data.get("notification") if severity not in ["info", "warning", "urgent", "danger"]: return jsonify({"error": "invalid severity"}), 400 if not message: return jsonify({"error": "notification required"}), 400 payload = { "aps": { "alert": message, "sound": "default" }, "severity": severity } session = SessionLocal() tokens = session.query(DeviceToken).all() session.close() results = [] for t in tokens: status, resp = send_apns_notification(t.token, payload) results.append({ "token": t.token, "status": status, "response": resp }) return jsonify({ "sent": len(results), "results": results }) if __name__ == "__main__": app.run(host="0.0.0.0", port=3000, debug=True)