Files
apns2-push/app.py
T
2026-06-12 17:08:19 +02:00

305 lines
6.5 KiB
Python

import time
import jwt
import aiohttp
import asyncio
import configparser
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
config = configparser.ConfigParser()
config.read("config.ini")
REGISTER_AUTH = config.get("AUTH", "REGISTER_AUTH", fallback=None)
MANAGE_AUTH = config.get("AUTH", "MANAGE_AUTH", fallback=None)
SQLALCHEMY_DATABASE_URI = config.get(
"DATABASE",
"SQLALCHEMY_DATABASE_URI",
fallback="sqlite:///device_tokens.db"
)
auth_key_path = config.get(
"APNS",
"auth_key_path",
fallback="./APNSAuthKey.p8"
)
auth_key_id = config.get("APNS", "auth_key_id", fallback=None)
team_id = config.get("APNS", "team_id", fallback=None)
topic = config.get("APNS", "topic", fallback=None)
APNS_URL = "https://api.push.apple.com"
if not all([
REGISTER_AUTH,
MANAGE_AUTH,
auth_key_id,
team_id,
topic
]):
raise ValueError("Fehlende Pflichtfelder in der Konfiguration!")
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = SQLALCHEMY_DATABASE_URI
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
class DeviceToken(db.Model):
id = db.Column(db.Integer, primary_key=True)
device_token = db.Column(
db.String(256),
nullable=False,
unique=True
)
def authenticate(req, required_token):
auth_header = req.headers.get("Authorization")
return auth_header == f"Bearer {required_token}"
with open(auth_key_path, "r") as f:
APNS_PRIVATE_KEY = f.read()
def generate_apns_token():
return jwt.encode(
{
"iss": team_id,
"iat": int(time.time())
},
APNS_PRIVATE_KEY,
algorithm="ES256",
headers={
"alg": "ES256",
"kid": auth_key_id
}
)
async def send_notification(
session,
device_token,
title,
body
):
try:
jwt_token = generate_apns_token()
payload = {
"aps": {
"alert": {
"title": title,
"body": body
},
"sound": "default",
"badge": 1
}
}
headers = {
"authorization": f"bearer {jwt_token}",
"apns-topic": topic,
"apns-push-type": "alert",
"content-type": "application/json"
}
url = f"{APNS_URL}/3/device/{device_token}"
async with session.post(
url,
json=payload,
headers=headers
) as response:
if response.status != 200:
error_text = await response.text()
print(
f"APNS Fehler {response.status} "
f"für {device_token}: {error_text}"
)
except Exception as e:
print(f"Fehler beim Senden: {e}")
@app.route("/api/registerDeviceToken", methods=["POST"])
def register_device_token():
if not authenticate(request, REGISTER_AUTH):
return jsonify({
"error": "Unauthorized"
}), 401
try:
data = request.get_json()
if not data or "device_token" not in data:
return jsonify({
"error": "device_token fehlt"
}), 400
device_token = data["device_token"]
existing = DeviceToken.query.filter_by(
device_token=device_token
).first()
if existing:
return jsonify({
"message": "Bereits registriert"
}), 200
db.session.add(
DeviceToken(device_token=device_token)
)
db.session.commit()
return jsonify({
"message": "Device Token registriert"
}), 200
except Exception as e:
db.session.rollback()
return jsonify({
"error": "Interner Serverfehler",
"details": str(e)
}), 500
@app.route("/api/notify", methods=["POST"])
def notify():
if not authenticate(request, MANAGE_AUTH):
return jsonify({
"error": "Unauthorized"
}), 401
try:
data = request.get_json()
severity = data.get("severity")
notification_text = data.get("notification")
if not severity or not notification_text:
return jsonify({
"error": (
"severity und notification "
"sind erforderlich"
)
}), 400
severity_icons = {
"info": "",
"warning": "⚠️ ",
"urgent": "❗️ ",
"danger": ""
}
if severity not in severity_icons:
return jsonify({
"error": "Ungültige severity"
}), 400
title = (
f"{severity_icons[severity]}"
f"{notification_text}"
)
async def send_all():
async with aiohttp.ClientSession() as session:
tasks = []
for token in DeviceToken.query.all():
tasks.append(
send_notification(
session=session,
device_token=token.device_token,
title=title,
body=notification_text
)
)
await asyncio.gather(*tasks)
asyncio.run(send_all())
return jsonify({
"message": "Benachrichtigungen gesendet"
}), 200
except Exception as e:
return jsonify({
"error": "Interner Serverfehler",
"details": str(e)
}), 500
@app.route("/showdevicetokens", methods=["GET"])
def show_device_tokens():
tokens = DeviceToken.query.all()
return jsonify({
"device_tokens": [
token.device_token
for token in tokens
]
})
@app.route(
"/api/deleteAllDeviceTokens",
methods=["POST"]
)
def delete_all_device_tokens():
if not authenticate(request, MANAGE_AUTH):
return jsonify({
"error": "Unauthorized"
}), 401
try:
db.session.query(DeviceToken).delete()
db.session.commit()
return jsonify({
"message": "Alle Device Tokens gelöscht"
}), 200
except Exception as e:
db.session.rollback()
return jsonify({
"error": "Interner Serverfehler",
"details": str(e)
}), 500
if __name__ == "__main__":
with app.app_context():
db.create_all()
app.run(
host="0.0.0.0",
port=3000
)