334 lines
12 KiB
Python
334 lines
12 KiB
Python
import requests
|
|
import json
|
|
from datetime import datetime
|
|
import re
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import apprise
|
|
import time
|
|
|
|
# Lade Umgebungsvariablen aus .env Datei
|
|
load_dotenv()
|
|
|
|
def is_debug_mode():
|
|
"""Prüft ob Debug-Modus aktiviert ist"""
|
|
return os.getenv('DBG_MODE', 'false').lower() == 'true'
|
|
|
|
def get_ti_api_url():
|
|
"""Gibt die API-URL zurück, im Debug-Modus ggf. die lokale Test-API"""
|
|
# if is_debug_mode():
|
|
# return os.getenv("TI_API_URL_DEBUG", "http://localhost:8000/lageapi/v2/tilage")
|
|
return os.getenv("TI_API_URL", "https://ti-lage.prod.ccs.gematik.solutions/lageapi/v2/tilage")
|
|
|
|
TI_API_URL = get_ti_api_url()
|
|
STATE_FILE = "ti_status_state.json"
|
|
|
|
# Apprise Konfiguration aus Umgebungsvariablen
|
|
def get_apprise_urls():
|
|
"""Holt alle konfigurierten Apprise URLs aus der .env"""
|
|
urls = []
|
|
|
|
# Prüfe alle möglichen URL-Konfigurationen
|
|
url_keys = [
|
|
'APPRISE_URL_MATTERMOST',
|
|
'APPRISE_URL_SLACK',
|
|
'APPRISE_URL_TELEGRAM',
|
|
'APPRISE_URL_DISCORD',
|
|
'APPRISE_URL_EMAIL',
|
|
'APPRISE_URL_PUSHOVER',
|
|
'APPRISE_URL_TEAMS'
|
|
]
|
|
|
|
for key in url_keys:
|
|
url = os.getenv(key)
|
|
if url:
|
|
urls.append(url)
|
|
if is_debug_mode():
|
|
print(f"✅ {key}: {url}")
|
|
|
|
# Fallback für alte Konfiguration
|
|
if not urls:
|
|
legacy_url = os.getenv('APPRISE_URL')
|
|
if legacy_url:
|
|
urls.append(legacy_url)
|
|
if is_debug_mode():
|
|
print(f"✅ Legacy APPRISE_URL: {legacy_url}")
|
|
|
|
return urls
|
|
|
|
def get_notification_level():
|
|
"""Holt das konfigurierte Benachrichtigungslevel"""
|
|
return os.getenv('NOTIFICATION_LEVEL', 'all').lower()
|
|
|
|
def get_notification_filters():
|
|
"""Holt die konfigurierten Benachrichtigungsfilter"""
|
|
filters_str = os.getenv('NOTIFICATION_FILTERS', '')
|
|
if filters_str:
|
|
return [f.strip().lower() for f in filters_str.split(',')]
|
|
return []
|
|
|
|
def get_notification_hours():
|
|
"""Holt die konfigurierten Benachrichtigungszeiten"""
|
|
hours_str = os.getenv('NOTIFICATION_HOURS', '')
|
|
if hours_str:
|
|
return [h.strip() for h in hours_str.split(',')]
|
|
return []
|
|
|
|
def get_notification_delay():
|
|
"""Holt die konfigurierte Verzögerung zwischen Benachrichtigungen"""
|
|
return int(os.getenv('NOTIFICATION_DELAY', '0'))
|
|
|
|
def is_within_notification_hours():
|
|
"""Prüft ob die aktuelle Zeit innerhalb der konfigurierten Benachrichtigungszeiten liegt"""
|
|
hours = get_notification_hours()
|
|
if not hours:
|
|
return True # Keine Einschränkung
|
|
|
|
current_time = datetime.now().strftime('%H:%M')
|
|
|
|
for time_range in hours:
|
|
if '-' in time_range:
|
|
start, end = time_range.split('-')
|
|
if start <= current_time <= end:
|
|
return True
|
|
else:
|
|
# Einzelne Zeit
|
|
if current_time == time_range:
|
|
return True
|
|
|
|
return False
|
|
|
|
def should_send_notification(message):
|
|
"""Prüft ob eine Benachrichtigung gesendet werden soll basierend auf den Regeln"""
|
|
|
|
# Prüfe Benachrichtigungszeiten
|
|
if not is_within_notification_hours():
|
|
if is_debug_mode():
|
|
print(f"⏰ Benachrichtigung außerhalb der konfigurierten Zeiten: {datetime.now().strftime('%H:%M')}")
|
|
return False
|
|
|
|
# Prüfe Filter
|
|
filters = get_notification_filters()
|
|
if filters:
|
|
message_lower = message.lower()
|
|
for filter_term in filters:
|
|
if filter_term in message_lower:
|
|
if is_debug_mode():
|
|
print(f"✅ Nachricht entspricht Filter '{filter_term}'")
|
|
return True
|
|
|
|
if is_debug_mode():
|
|
print(f"❌ Nachricht entspricht keinem Filter: {filters}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def fetch_status_messages():
|
|
if is_debug_mode():
|
|
print(f"Rufe TI-Status-API ab: {TI_API_URL}")
|
|
resp = requests.get(TI_API_URL)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
if is_debug_mode():
|
|
print(f"API-Antwort erhalten. Anzahl Meldungen: {len(data.get('meldungen', []))}")
|
|
|
|
messages = []
|
|
# 1. Bisherige Meldungen
|
|
for meldung in data.get("meldungen", []):
|
|
zeit = meldung.get("zeitpunkt", "")
|
|
titel = meldung.get("titel", "")
|
|
beschreibung = meldung.get("beschreibung", "")
|
|
link = meldung.get("link", "")
|
|
msg = f"{zeit}\n- {titel}: {beschreibung}\n{link}".strip()
|
|
messages.append(msg)
|
|
if is_debug_mode():
|
|
print(f"Verarbeite Meldung: {titel[:50]}...")
|
|
# 2. Neue Auswertung von appStatus
|
|
app_status = data.get("appStatus", {})
|
|
for dienst, status in app_status.items():
|
|
outage = status.get("outage", "none")
|
|
if outage in ("partial", "full"):
|
|
has_maintenance = status.get("hasMaintenance", False)
|
|
sub_maintenance = status.get("hasSubComponentMaintenance", False)
|
|
affected = status.get("affectedFunctions", [])
|
|
# Baue eine verständliche Meldung mit Zeitstempel
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
msg = f"[{timestamp}] Störung bei {dienst.upper()} ({'Wartung' if has_maintenance else 'Störung'}): Status: {outage}"
|
|
if affected:
|
|
for func in affected:
|
|
func_name = func.get("function", "Unbekannte Funktion")
|
|
impact = func.get("impactDesc", "")
|
|
func_outage = func.get("outage", outage)
|
|
msg += f"\n- {func_name}: {impact} (Status: {func_outage})"
|
|
else:
|
|
msg += "\n- Keine weiteren Details."
|
|
messages.append(msg)
|
|
if is_debug_mode():
|
|
print(f"Erkannte Störung: {msg[:80]}...")
|
|
if is_debug_mode():
|
|
print(f"Insgesamt {len(messages)} Meldungen verarbeitet")
|
|
return messages
|
|
|
|
def load_state():
|
|
try:
|
|
with open(STATE_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {"messages": [], "last_status": {}}
|
|
|
|
def save_state(state):
|
|
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(state, f, ensure_ascii=False, indent=2)
|
|
|
|
def extract_outages(app_status):
|
|
"""Extrahiert alle aktuellen Störungen als Set von Strings"""
|
|
outages = set()
|
|
for dienst, status in app_status.items():
|
|
outage = status.get("outage", "none")
|
|
if outage in ("partial", "full"):
|
|
outages.add(dienst)
|
|
return outages
|
|
|
|
def markdownify_message(message):
|
|
# Datumsangaben fett hervorheben (z.B. 2025-06-23 oder 23.06.2025)
|
|
message = re.sub(r"(\d{4}-\d{2}-\d{2})", r"**\1**", message)
|
|
message = re.sub(r"(\d{2}\.\d{2}\.\d{4})", r"**\1**", message)
|
|
# URLs als Links darstellen
|
|
message = re.sub(r"(https?://\S+)", r"[\1](\1)", message)
|
|
# Listenpunkte erkennen (z.B. mit - oder *)
|
|
lines = message.splitlines()
|
|
md_lines = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith("-") or line.startswith("*"):
|
|
md_lines.append(f"- {line[1:].strip()}")
|
|
else:
|
|
md_lines.append(line)
|
|
# Absätze durch doppelte Zeilenumbrüche trennen
|
|
md_message = "\n\n".join([l for l in md_lines if l])
|
|
return md_message
|
|
|
|
def send_notification(message):
|
|
"""Sendet Benachrichtigungen an alle konfigurierten Endpunkte"""
|
|
# Debug-Ausgabe: Apprise-Version und URL aus .env
|
|
if is_debug_mode():
|
|
print(f"[DEBUG] Apprise-Version: {apprise.__version__}")
|
|
print(f"[DEBUG] APPRISE_URL_MATTERMOST aus .env: {os.getenv('APPRISE_URL_MATTERMOST')}")
|
|
# Prüfe ob Benachrichtigung gesendet werden soll
|
|
if not should_send_notification(message):
|
|
if is_debug_mode():
|
|
print("🚫 Benachrichtigung wird nicht gesendet (Regeln)")
|
|
return
|
|
|
|
md_message = markdownify_message(message)
|
|
|
|
# Hole alle konfigurierten URLs
|
|
urls = get_apprise_urls()
|
|
if not urls:
|
|
print("❌ Keine Apprise URLs konfiguriert!")
|
|
return
|
|
|
|
# Erstelle Apprise Objekt
|
|
apobj = apprise.Apprise()
|
|
|
|
# Füge alle URLs hinzu
|
|
for url in urls:
|
|
apobj.add(url)
|
|
|
|
# Erstelle die Nachricht
|
|
title = "Neue TI-Status-Meldung"
|
|
body = f"{md_message}\n\n[Zur Statusseite](https://fachportal.gematik.de/ti-status)\n_Gemeldet am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_"
|
|
|
|
# Setze den Absender-Namen
|
|
sender_name = "TI-Status Bot by medisoftware"
|
|
|
|
if is_debug_mode():
|
|
print(f"📤 Sende Benachrichtigung an {len(urls)} Endpunkt(e)")
|
|
print(f"Verwendete Apprise-URLs: {urls}")
|
|
print(f"Nachrichtentitel: {title}")
|
|
print(f"Nachrichtentext: {body[:200]}...")
|
|
|
|
# Sende die Nachricht mit Fehlerausgabe
|
|
try:
|
|
result = apobj.notify(
|
|
title=title,
|
|
body=body,
|
|
body_format=apprise.NotifyFormat.MARKDOWN,
|
|
sender=sender_name
|
|
)
|
|
if result:
|
|
if is_debug_mode():
|
|
print("✅ Benachrichtigung erfolgreich gesendet")
|
|
else:
|
|
print("❌ Fehler beim Senden der Benachrichtigung")
|
|
if is_debug_mode():
|
|
print("[DEBUG] Apprise notify() Rückgabewert: False")
|
|
print("[DEBUG] Prüfe, ob die Webhook-URL korrekt ist, der Zielserver erreichbar ist und keine Authentifizierungsprobleme bestehen.")
|
|
except Exception as e:
|
|
print("❌ Fehler beim Senden der Benachrichtigung (Exception)")
|
|
if is_debug_mode():
|
|
print(f"[DEBUG] Exception: {e}")
|
|
|
|
# Verzögerung zwischen Benachrichtigungen
|
|
delay = get_notification_delay()
|
|
if delay > 0:
|
|
if is_debug_mode():
|
|
print(f"⏳ Warte {delay} Sekunden...")
|
|
time.sleep(delay)
|
|
|
|
def main():
|
|
# Prüfe Konfiguration
|
|
urls = get_apprise_urls()
|
|
if not urls:
|
|
print("❌ Keine Apprise URLs konfiguriert!")
|
|
print("Bitte erstelle eine .env Datei basierend auf env.example")
|
|
return
|
|
|
|
if is_debug_mode():
|
|
print("🔧 Debug-Modus aktiviert")
|
|
print(f"📋 Benachrichtigungslevel: {get_notification_level()}")
|
|
print(f"🔍 Filter: {get_notification_filters()}")
|
|
print(f"⏰ Benachrichtigungszeiten: {get_notification_hours()}")
|
|
print(f"⏳ Verzögerung: {get_notification_delay()}s")
|
|
|
|
state = load_state()
|
|
known_messages = set(state.get("messages", []))
|
|
last_status = state.get("last_status", {})
|
|
print("Prüfe TI-Status-API auf neue Meldungen...")
|
|
|
|
try:
|
|
messages = fetch_status_messages()
|
|
# Extrahiere aktuelle und vorherige Störungen
|
|
resp = requests.get(TI_API_URL)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
app_status = data.get("appStatus", {})
|
|
current_outages = extract_outages(app_status)
|
|
previous_outages = extract_outages(last_status.get("appStatus", {})) if last_status else set()
|
|
|
|
# Entwarnungen erkennen
|
|
resolved = previous_outages - current_outages
|
|
for dienst in resolved:
|
|
msg = f"✅ Entwarnung: Die Störung bei {dienst.upper()} wurde behoben."
|
|
print(msg)
|
|
send_notification(msg)
|
|
known_messages.add(msg)
|
|
|
|
# Normale neue Meldungen
|
|
new_messages = [m for m in messages if m not in known_messages]
|
|
for msg in new_messages:
|
|
print(f"Neue Meldung gefunden: {msg[:100]}...")
|
|
send_notification(msg)
|
|
known_messages.add(msg)
|
|
|
|
if new_messages or resolved:
|
|
save_state({"messages": list(known_messages), "last_status": data})
|
|
print(f"✅ {len(new_messages)} neue Meldung(en) und {len(resolved)} Entwarnung(en) verarbeitet")
|
|
else:
|
|
print(f"Keine neuen Meldungen ({datetime.now().strftime('%H:%M:%S')})")
|
|
except Exception as e:
|
|
print(f"Fehler: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |